diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 1f7a059ffc..c8a9f2bfc4 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -14,6 +14,7 @@ target_sources( "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" + "src/vnd/vnodeCompact.c" # meta "src/meta/metaOpen.c" diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 28797c5361..279d5e4e95 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -94,6 +94,10 @@ int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); +// vnodeCompact.c +int32_t vnodeAsyncCompact(SVnode* pVnode); +int32_t vnodeSyncCompact(SVnode* pVnode); + // vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncStart(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e56f130c2c..12659ed0ed 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct SCompactInfo SCompactInfo; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" @@ -428,6 +429,11 @@ struct SCommitInfo { SVnode* pVnode; TXN* txn; }; +struct SCompactInfo { + SVnodeInfo info; + SVnode* pVnode; + TXN* txn; +}; #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 9b6ebe2bd1..7b64776187 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -15,8 +15,8 @@ #include "tsdb.h" -typedef struct { -} SMemDIter; +// typedef struct { +// } SMemDIter; typedef struct { SArray *aBlockIdx; // SArray @@ -34,6 +34,12 @@ typedef struct { int32_t iRow; } SSttDIter; +typedef struct { + int32_t flag; + SRowInfo rowInfo; + char handle[]; +} STsdbDataIter; + typedef struct { STsdb *pTsdb; STsdbFS fs; @@ -47,6 +53,25 @@ typedef struct { #define TSDB_FLG_DEEP_COMPACT 0x1 // ITER ========================= +static int32_t tsdbDataIterOpen(STsdbDataIter *pIter) { + int32_t code = 0; + int32_t lino = 0; + // TODO +_exit: + return code; +} + +static void tsdbDataIterClose(STsdbDataIter *pIter) { + // TODO +} + +static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { + int32_t code = 0; + int32_t lino = 0; + // TODO +_exit: + return code; +} // COMPACT ========================= static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { @@ -106,8 +131,6 @@ static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) { code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); - // - _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4daab074b5..bfa4dff03c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -184,7 +184,7 @@ _err: return -1; } -static void vnodePrepareCommit(SVnode *pVnode) { +void vnodePrepareCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); tsdbPrepareCommit(pVnode->pTsdb); diff --git a/source/dnode/vnode/src/vnd/vnodeCompact.c b/source/dnode/vnode/src/vnd/vnodeCompact.c new file mode 100644 index 0000000000..26c1ddd9e5 --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeCompact.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnd.h" + +extern void vnodePrepareCommit(SVnode *pVnode); + +#define vnodePrepareCompact vnodePrepareCommit + +static int32_t vnodeCompactImpl(SCompactInfo *pInfo) { + int32_t code = 0; + + // TODO + + return code; +} + +static int32_t vnodeCompactTask(void *param) { + int32_t code = 0; + + SCompactInfo *pInfo = (SCompactInfo *)param; + + // compact + vnodeCompactImpl(pInfo); + + // end compact + tsem_post(&pInfo->pVnode->canCommit); + +_exit: + taosMemoryFree(pInfo); + return code; +} +int32_t vnodeAsyncCompact(SVnode *pVnode) { + int32_t code = 0; + + // prepare + vnodePrepareCompact(pVnode); + + // schedule compact task + SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo)); + if (NULL == pInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + pInfo->info.config = pVnode->config; + pInfo->info.state.committed = pVnode->state.applied; + pInfo->info.state.commitTerm = pVnode->state.applyTerm; + pInfo->info.state.commitID = pVnode->state.commitID; + pInfo->pVnode = pVnode; + pInfo->txn = metaGetTxn(pVnode->pMeta); + vnodeScheduleTask(vnodeCompactTask, pInfo); + +_exit: + if (code) { + vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code)); + } + return code; +} + +int32_t vnodeSyncCompact(SVnode *pVnode) { + vnodeAsyncCompact(pVnode); + tsem_wait(&pVnode->canCommit); + tsem_post(&pVnode->canCommit); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6092888136..57368cfcea 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -299,6 +299,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vnodeSyncCommit(pVnode); vnodeBegin(pVnode); goto _exit; + case TDMT_VND_COMPACT: + vnodeAsyncCompact(pVnode); + vnodeBegin(pVnode); + goto _exit; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return -1;