From 8050d7320245c48a9e5fca256a11f5e2170177d8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 15 Nov 2023 13:41:00 +0800 Subject: [PATCH] first version --- source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/tsdb.h | 39 +++++++++++++------------ source/dnode/vnode/src/inc/vnodeInt.h | 26 ++++++++--------- source/dnode/vnode/src/tsdb/tsdbFS2.c | 25 ++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbFS2.h | 1 + source/dnode/vnode/src/tsdb/tsdbFSet2.h | 3 +- source/dnode/vnode/src/tsdb/tsdbOpen.c | 12 ++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 33 ++++++++++++--------- 8 files changed, 93 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dc43da7fe7..e8a82868e9 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -90,6 +90,7 @@ IF (TD_VNODE_PLUGINS) vnode PRIVATE ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c + ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompactMonitor.c ${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c ) ENDIF () diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ec69ae5ca7..2831a2e5e7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -74,6 +74,7 @@ typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SBlkInfo SBlkInfo; typedef struct STsdbDataIter2 STsdbDataIter2; typedef struct STsdbFilterInfo STsdbFilterInfo; +typedef struct STFileSystem STFileSystem; #define TSDBROW_ROW_FMT ((int8_t)0x0) #define TSDBROW_COL_FMT ((int8_t)0x1) @@ -368,24 +369,26 @@ typedef struct { } SCacheFlushState; struct STsdb { - char *path; - SVnode *pVnode; - STsdbKeepCfg keepCfg; - TdThreadMutex mutex; - SMemTable *mem; - SMemTable *imem; - STsdbFS fs; // old - SLRUCache *lruCache; - SCacheFlushState flushState; - TdThreadMutex lruMutex; - SLRUCache *biCache; - TdThreadMutex biMutex; - SLRUCache *bCache; - TdThreadMutex bMutex; - SLRUCache *pgCache; - TdThreadMutex pgMutex; - struct STFileSystem *pFS; // new - SRocksCache rCache; + char *path; + SVnode *pVnode; + STsdbKeepCfg keepCfg; + TdThreadMutex mutex; + SMemTable *mem; + SMemTable *imem; + STsdbFS fs; // old + SLRUCache *lruCache; + SCacheFlushState flushState; + TdThreadMutex lruMutex; + SLRUCache *biCache; + TdThreadMutex biMutex; + SLRUCache *bCache; + TdThreadMutex bMutex; + SLRUCache *pgCache; + TdThreadMutex pgMutex; + STFileSystem *pFS; // new + SRocksCache rCache; + // compact monitor + struct SCompMonitor *pCompMonitor; }; struct TSDBKEY { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index df1720d4a7..1b39473340 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -209,7 +209,7 @@ int32_t tsdbBegin(STsdb* pTsdb); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); -int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync); +int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); // int32_t tsdbFinishCommit(STsdb* pTsdb); // int32_t tsdbRollbackCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); @@ -494,18 +494,18 @@ struct SSma { void* pRSmaEnv; }; -#define SMA_CFG(s) (&(s)->pVnode->config) -#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) -#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions) -#define SMA_LOCKED(s) ((s)->locked) -#define SMA_META(s) ((s)->pVnode->pMeta) -#define SMA_VID(s) TD_VID((s)->pVnode) -#define SMA_TFS(s) ((s)->pVnode->pTfs) -#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) -#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) -#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) -#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0]) -#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1]) +#define SMA_CFG(s) (&(s)->pVnode->config) +#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) +#define SMA_RETENTION(s) ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions) +#define SMA_LOCKED(s) ((s)->locked) +#define SMA_META(s) ((s)->pVnode->pMeta) +#define SMA_VID(s) TD_VID((s)->pVnode) +#define SMA_TFS(s) ((s)->pVnode->pTfs) +#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) +#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) +#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) +#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0]) +#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1]) #define SMA_RSMA_GET_TSDB(pVnode, level) ((level == 0) ? pVnode->pTsdb : pVnode->pSma->pRSmaTsdb[level - 1]) // sma diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 02ef75ae86..bd73b63c91 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -1257,6 +1257,31 @@ int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int return 0; } +/* Try stop the bg task. If task is `stoped`, return true, else `false` + * The caller must hold the fs->tsdb->mutex + */ +bool tsdbFSStopBgTask(STFileSystem *fs, int32_t fid, int64_t taskId) { + STFileSet *fset = NULL; + tsdbFSGetFSet(fs, fid, &fset); + ASSERT(fset); + + if (fset->bgTaskRunning->taskid == taskId) { + return false; + } else { + for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) { + if (task->taskid == taskId) { + // stop and remove current task + task->next->prev = task->prev; + task->prev->next = task->next; + fset->bgTaskNum--; + tsdbDoDoneBgTask(fs, task); + return true; + } + } + return false; + } +} + int32_t tsdbFSDisableBgTask(STFileSystem *fs) { taosThreadMutexLock(&fs->tsdb->mutex); for (;;) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index a3a8e2f575..2202150c07 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -58,6 +58,7 @@ int32_t tsdbFSEditAbort(STFileSystem *fs); // background task int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), void (*destroy)(void *), void *arg, int64_t *taskid); +bool tsdbFSStopBgTask(STFileSystem *fs, int32_t fid, int64_t taskId); int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 34f174ade7..98011ab527 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -28,8 +28,7 @@ typedef struct SSttLvl SSttLvl; typedef TARRAY2(STFileObj *) TFileObjArray; typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(STFileOp) TFileOpArray; -typedef struct STFileSystem STFileSystem; -typedef struct STFSBgTask STFSBgTask; +typedef struct STFSBgTask STFSBgTask; typedef enum { TSDB_FOP_NONE = 0, diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index a1f864814f..ea3d285880 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -16,6 +16,9 @@ #include "tsdb.h" #include "tsdbFS2.h" +extern int32_t tsdbOpenCompMonitor(STsdb *tsdb); +extern int32_t tsdbCloseCompMonitor(STsdb *tsdb); + int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg; pKeepCfg->precision = pCfg->precision; @@ -81,6 +84,12 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } +#ifdef TD_ENTERPRISE + if (tsdbOpenCompMonitor(pTsdb) < 0) { + goto _err; + } +#endif + tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d, keepTimeoffset:%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2, pTsdb->keepCfg.keepTimeOffset); @@ -108,6 +117,9 @@ int tsdbClose(STsdb **pTsdb) { tsdbCloseFS(&(*pTsdb)->pFS); tsdbCloseCache(*pTsdb); +#ifdef TD_ENTERPRISE + tsdbCloseCompMonitor(*pTsdb); +#endif taosThreadMutexDestroy(&(*pTsdb)->mutex); taosMemoryFreeClear(*pTsdb); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 899efc8e70..ecf9953194 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -14,22 +14,22 @@ */ #include "audit.h" +#include "cos.h" #include "tencode.h" #include "tmsg.h" #include "tstrbuild.h" #include "vnd.h" -#include "cos.h" #include "vnode.h" #include "vnodeInt.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginRpc); +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, + SRpcMsg *pOriginRpc); static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); -static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginRpc); +static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, + SRpcMsg *pOriginRpc); static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -880,8 +880,8 @@ _err: return -1; } -static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginRpc) { +static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, + SRpcMsg *pOriginRpc) { SDecoder decoder = {0}; SEncoder encoder = {0}; int32_t rcode = 0; @@ -931,8 +931,8 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, goto _exit; } - if(tsEnableAudit && tsEnableAuditCreateTable){ - char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (tsEnableAudit && tsEnableAuditCreateTable) { + char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); if (str == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; rcode = -1; @@ -986,17 +986,17 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen); tEncodeSVCreateTbBatchRsp(&encoder, &rsp); - if(tsEnableAudit && tsEnableAuditCreateTable){ + if (tsEnableAudit && tsEnableAuditCreateTable) { int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId; SName name = {0}; tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB); SStringBuilder sb = {0}; - for(int32_t i = 0; i < tbNames->size; i++){ - char** key = (char**)taosArrayGet(tbNames, i); + for (int32_t i = 0; i < tbNames->size; i++) { + char **key = (char **)taosArrayGet(tbNames, i); taosStringBuilderAppendStringLen(&sb, *key, strlen(*key)); - if(i < tbNames->size - 1){ + if (i < tbNames->size - 1) { taosStringBuilderAppendChar(&sb, ','); } taosMemoryFreeClear(*key); @@ -1148,7 +1148,7 @@ _exit: } static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginRpc) { + SRpcMsg *pOriginRpc) { SVDropTbBatchReq req = {0}; SVDropTbBatchRsp rsp = {0}; SDecoder decoder = {0}; @@ -2035,6 +2035,11 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); } +static int32_t vnodeProcessStopCompactReq(SVnode *pVnode) { + // TODO + return 0; +} + static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { syncCheckMember(pVnode->sync);