diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ccc6439b5d..625cca7087 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -475,7 +475,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } -extern void tsdbAlterMaxCompactTasks(); +extern void tsdbAlterNumCompactThreads(); static int32_t dmAlterMaxCompactTask(const char *value) { int32_t max_compact_tasks; char *endptr = NULL; @@ -489,7 +489,7 @@ static int32_t dmAlterMaxCompactTask(const char *value) { dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks); tsNumOfCompactThreads = max_compact_tasks; #ifdef TD_ENTERPRISE - tsdbAlterMaxCompactTasks(); + (void)tsdbAlterNumCompactThreads(); #endif } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 29248e360a..47890e9b4b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -1083,9 +1083,6 @@ void tsdbRemoveFile(const char *path); } \ } while (0) -int32_t tsdbInit(); -void tsdbCleanUp(); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 882ea155e4..b1a5ca4709 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -55,18 +55,32 @@ typedef enum { EVA_PRIORITY_LOW, } EVAPriority; +typedef enum { + EVA_TASK_COMMIT = 1, + EVA_TASK_MERGE, + EVA_TASK_COMPACT, + EVA_TASK_RETENTION, +} EVATaskT; + +#define COMMIT_TASK_ASYNC 1 +#define MERGE_TASK_ASYNC 2 +#define COMPACT_TASK_ASYNC 3 +#define RETENTION_TASK_ASYNC 4 + int32_t vnodeAsyncOpen(); void vnodeAsyncClose(); int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID); int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning); int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, SVATaskID* taskID); -int32_t vnodeAsync2(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), +int32_t vnodeAsyncC(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, SVATaskID* taskID); void vnodeAWait(SVATaskID* taskID); int32_t vnodeACancel(SVATaskID* taskID); int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers); +const char* vnodeGetATaskName(EVATaskT task); + // vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; struct SVBufPoolNode { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ef4b233f94..4f1ecf81dd 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -479,8 +479,7 @@ struct SVnode { SVBufPool* onRecycle; // commit variables - SVAChannelID commitChannel; - SVATaskID commitTask; + SVATaskID commitTask; SMeta* pMeta; SSma* pSma; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index e3c75760c8..5822463f9e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -574,7 +574,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { // begin tasks on file set for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); - tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); + tsdbBeginTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT, &fset); if (fset) { code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset); if (code) { @@ -712,7 +712,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); if (info->fset) { - tsdbFinishTaskOnFileSet(tsdb, info->fid); + tsdbFinishTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT); } } @@ -743,7 +743,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); if (info->fset) { - tsdbFinishTaskOnFileSet(pTsdb, info->fid); + tsdbFinishTaskOnFileSet(pTsdb, info->fid, EVA_TASK_COMMIT); } } (void)taosThreadMutexUnlock(&pTsdb->mutex); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index f024dd7210..fdc2ac4800 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -770,8 +770,8 @@ extern void tsdbStopAllCompTask(STsdb *tsdb); int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { STFileSystem *fs = pTsdb->pFS; - SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID)); - if (channelArray == NULL) { + SArray *asyncTasks = taosArrayInit(0, sizeof(SVATaskID)); + if (asyncTasks == NULL) { return terrno; } @@ -783,30 +783,31 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { // collect channel STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { - if (fset->channelOpened) { - if (taosArrayPush(channelArray, &fset->channel) == NULL) { - taosArrayDestroy(channelArray); - (void)taosThreadMutexUnlock(&pTsdb->mutex); - return terrno; - } - fset->channel = (SVAChannelID){0}; - fset->mergeScheduled = false; - tsdbFSSetBlockCommit(fset, false); - fset->channelOpened = false; + if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL // + || taosArrayPush(asyncTasks, &fset->compactTask) == NULL // + || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) { + taosArrayDestroy(asyncTasks); + (void)taosThreadMutexUnlock(&pTsdb->mutex); + return terrno; } + fset->mergeScheduled = false; + tsdbFSSetBlockCommit(fset, false); } (void)taosThreadMutexUnlock(&pTsdb->mutex); // destroy all channels - for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) { - SVAChannelID *channel = taosArrayGet(channelArray, i); - int32_t code = vnodeAChannelDestroy(channel, true); - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + for (int32_t k = 0; k < 2; k++) { + for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) { + SVATaskID *task = taosArrayGet(asyncTasks, i); + if (k == 0) { + (void)vnodeACancel(task); + } else { + (void)vnodeAWait(task); + } } } - taosArrayDestroy(channelArray); + taosArrayDestroy(asyncTasks); #ifdef TD_ENTERPRISE tsdbStopAllCompTask(pTsdb); @@ -934,9 +935,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { // bool skipMerge = false; int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); if (numFile >= sttTrigger && (!fset->mergeScheduled)) { - code = tsdbTFileSetOpenChannel(fset); - TSDB_CHECK_CODE(code, lino, _exit); - SMergeArg *arg = taosMemoryMalloc(sizeof(*arg)); if (arg == NULL) { code = terrno; @@ -946,7 +944,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { arg->tsdb = fs->tsdb; arg->fid = fset->fid; - code = vnodeAsync2(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL); + code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask); TSDB_CHECK_CODE(code, lino, _exit); fset->mergeScheduled = true; } @@ -1202,42 +1200,61 @@ _out: void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); } -void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) { +void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) { + // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; tsdbFSGetFSet(tsdb->pFS, fid, fset); - if (sttTrigger == 1 && (*fset)) { - for (;;) { - if ((*fset)->taskRunning) { - (*fset)->numWaitTask++; - - (void)taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex); - - tsdbFSGetFSet(tsdb->pFS, fid, fset); - - (*fset)->numWaitTask--; - } else { - (*fset)->taskRunning = true; - break; - } - } - tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid); + if (*fset == NULL) { + return; } + + struct STFileSetCond *cond = NULL; + if (sttTrigger == 1 || task == EVA_TASK_COMMIT) { + cond = &(*fset)->conds[0]; + } else { + cond = &(*fset)->conds[1]; + } + + while (1) { + if (cond->running) { + cond->numWait++; + (void)taosThreadCondWait(&cond->cond, &tsdb->mutex); + cond->numWait--; + } else { + cond->running = true; + break; + } + } + + tsdbInfo("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid); + return; } -void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) { +void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) { + // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; - if (sttTrigger == 1) { - STFileSet *fset = NULL; - tsdbFSGetFSet(tsdb->pFS, fid, &fset); - if (fset != NULL && fset->taskRunning) { - fset->taskRunning = false; - if (fset->numWaitTask > 0) { - (void)taosThreadCondSignal(&fset->beginTask); - } - tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid); - } + + STFileSet *fset = NULL; + tsdbFSGetFSet(tsdb->pFS, fid, &fset); + if (fset == NULL) { + return; } + + struct STFileSetCond *cond = NULL; + if (sttTrigger == 1 || task == EVA_TASK_COMMIT) { + cond = &fset->conds[0]; + } else { + cond = &fset->conds[1]; + } + + cond->running = false; + if (cond->numWait > 0) { + (void)taosThreadCondSignal(&cond->cond); + } + + tsdbInfo("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid); + return; } struct SFileSetReader { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 119015636b..9694edcdd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -14,6 +14,7 @@ */ #include "tsdbFSet2.h" +#include "vnd.h" #ifndef _TSDB_FILE_SYSTEM_H #define _TSDB_FILE_SYSTEM_H @@ -61,8 +62,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs); // other void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); -void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset); -void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid); +void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset); +void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task); // utils int32_t save_fs(const TFileSetArray *arr, const char *fname); void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index a0ae58ac96..68914300e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -480,16 +480,18 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { fset[0]->maxVerValid = VERSION_MAX; TARRAY2_INIT(fset[0]->lvlArr); - // background task queue - (void)taosThreadCondInit(&(*fset)->beginTask, NULL); - (*fset)->taskRunning = false; - (*fset)->numWaitTask = 0; - // block commit variables (void)taosThreadCondInit(&fset[0]->canCommit, NULL); (*fset)->numWaitCommit = 0; (*fset)->blockCommit = false; + for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) { + struct STFileSetCond *cond = &(*fset)->conds[i]; + cond->running = false; + cond->numWait = 0; + (void)taosThreadCondInit(&cond->cond, NULL); + } + return 0; } @@ -648,8 +650,10 @@ void tsdbTFileSetClear(STFileSet **fset) { TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear); - (void)taosThreadCondDestroy(&(*fset)->beginTask); (void)taosThreadCondDestroy(&(*fset)->canCommit); + for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) { + (void)taosThreadCondDestroy(&(*fset)->conds[i].cond); + } taosMemoryFreeClear(*fset); } } @@ -703,14 +707,3 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) { } return TARRAY2_SIZE(fset->lvlArr) == 0; } - -int32_t tsdbTFileSetOpenChannel(STFileSet *fset) { - int32_t code; - if (!fset->channelOpened) { - if ((code = vnodeAChannelInit(2, &fset->channel))) { - return code; - } - fset->channelOpened = true; - } - return 0; -} diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 24ae59e300..83ef32e5e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -68,8 +68,6 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset); // stt int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); void tsdbSttLvlClear(SSttLvl **lvl); -// open channel -int32_t tsdbTFileSetOpenChannel(STFileSet *fset); struct STFileOp { tsdb_fop_t optype; @@ -83,26 +81,30 @@ struct SSttLvl { TFileObjArray fobjArr[1]; }; +struct STFileSetCond { + bool running; + int32_t numWait; + TdThreadCond cond; +}; + struct STFileSet { int32_t fid; int64_t maxVerValid; STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array - // background task - bool channelOpened; - SVAChannelID channel; - bool mergeScheduled; - - // sttTrigger = 1 - TdThreadCond beginTask; - bool taskRunning; - int32_t numWaitTask; + bool mergeScheduled; + SVATaskID mergeTask; + SVATaskID compactTask; + SVATaskID retentionTask; // block commit variables TdThreadCond canCommit; int32_t numWaitCommit; bool blockCommit; + + // conditions + struct STFileSetCond conds[2]; }; struct STFileSetRange { diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 61a82d828e..39d8a57692 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -462,21 +462,29 @@ _exit: static int32_t tsdbMergeGetFSet(SMerger *merger) { STFileSet *fset; + int32_t code; + STsdb *tsdb = merger->tsdb; (void)taosThreadMutexLock(&merger->tsdb->mutex); - tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset); - if (fset == NULL) { + + if (tsdb->bgTaskDisabled) { (void)taosThreadMutexUnlock(&merger->tsdb->mutex); return 0; } - fset->mergeScheduled = false; + tsdbBeginTaskOnFileSet(tsdb, merger->fid, EVA_TASK_MERGE, &fset); + if (NULL == fset) { + (void)taosThreadMutexUnlock(&merger->tsdb->mutex); + return 0; + } - int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); + code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); if (code) { (void)taosThreadMutexUnlock(&merger->tsdb->mutex); return code; } + + fset->mergeScheduled = false; (void)taosThreadMutexUnlock(&merger->tsdb->mutex); return 0; } @@ -493,10 +501,13 @@ int32_t tsdbMerge(void *arg) { .sttTrigger = tsdb->pVnode->config.sttTrigger, }}; - if (merger->sttTrigger <= 1) return 0; + if (merger->sttTrigger <= 1) { + return 0; + } // copy snapshot - TAOS_CHECK_GOTO(tsdbMergeGetFSet(merger), &lino, _exit); + code = tsdbMergeGetFSet(merger); + TSDB_CHECK_CODE(code, lino, _exit); if (merger->fset == NULL) { return 0; @@ -509,12 +520,19 @@ int32_t tsdbMerge(void *arg) { TSDB_CHECK_CODE(code, lino, _exit); _exit: + if (merger->fset) { + (void)taosThreadMutexLock(&tsdb->mutex); + tsdbFinishTaskOnFileSet(tsdb, mergeArg->fid, EVA_TASK_MERGE); + (void)taosThreadMutexUnlock(&tsdb->mutex); + } + if (code) { tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code); taosMsleep(100); exit(EXIT_FAILURE); } + tsdbTFileSetClear(&merger->fset); taosMemoryFree(arg); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index b2e4621878..c1f8f45d7e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -18,22 +18,6 @@ extern int32_t tsdbOpenCompMonitor(STsdb *tsdb); extern void tsdbCloseCompMonitor(STsdb *tsdb); -extern int32_t tsdbInitCompact(); -extern void tsdbCleanupCompact(); - -int32_t tsdbInit() { -#ifdef TD_ENTERPRISE - return tsdbInitCompact(); -#endif - return 0; -} - -void tsdbCleanUp() { -#ifdef TD_ENTERPRISE - tsdbCleanupCompact(); -#endif - return; -} void tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 0cd83e3f8c..fcce36b121 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -325,11 +325,21 @@ static int32_t tsdbRetention(void *arg) { // begin task (void)taosThreadMutexLock(&pTsdb->mutex); - tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset); + + // check if background task is disabled + if (pTsdb->bgTaskDisabled) { + tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode)); + (void)taosThreadMutexUnlock(&pTsdb->mutex); + return 0; + } + + // set flag and copy + tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset); if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) { (void)taosThreadMutexUnlock(&pTsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } + (void)taosThreadMutexUnlock(&pTsdb->mutex); // do retention @@ -346,7 +356,7 @@ static int32_t tsdbRetention(void *arg) { _exit: if (rtner.fset) { (void)taosThreadMutexLock(&pTsdb->mutex); - tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid); + tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION); (void)taosThreadMutexUnlock(&pTsdb->mutex); } @@ -364,26 +374,29 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) int32_t code = 0; int32_t lino = 0; + // check if background task is disabled + if (tsdb->bgTaskDisabled) { + tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode)); + return 0; + } + STFileSet *fset; + TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + TAOS_CHECK_GOTO(terrno, &lino, _exit); + } - if (!tsdb->bgTaskDisabled) { - TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { - TAOS_CHECK_GOTO(tsdbTFileSetOpenChannel(fset), &lino, _exit); + arg->tsdb = tsdb; + arg->now = now; + arg->fid = fset->fid; + arg->s3Migrate = s3Migrate; - SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) { - TAOS_CHECK_GOTO(terrno, &lino, _exit); - } - - arg->tsdb = tsdb; - arg->now = now; - arg->fid = fset->fid; - arg->s3Migrate = s3Migrate; - - if ((code = vnodeAsync2(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) { - taosMemoryFree(arg); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, + &fset->retentionTask); + if (code) { + taosMemoryFree(arg); + TSDB_CHECK_CODE(code, lino, _exit); } } diff --git a/source/dnode/vnode/src/vnd/vnodeAsync.c b/source/dnode/vnode/src/vnd/vnodeAsync.c index 7b2412e7ae..a829585f52 100644 --- a/source/dnode/vnode/src/vnd/vnodeAsync.c +++ b/source/dnode/vnode/src/vnd/vnodeAsync.c @@ -493,10 +493,10 @@ int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void .async = async, .id = 0, }; - return vnodeAsync2(&channelID, priority, execute, complete, arg, taskID); + return vnodeAsyncC(&channelID, priority, execute, complete, arg, taskID); } -int32_t vnodeAsync2(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), +int32_t vnodeAsyncC(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), void *arg, SVATaskID *taskID) { if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL || channelID->id < 0) { @@ -827,4 +827,19 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) { channelID->async = 0; channelID->id = 0; return 0; +} + +const char *vnodeGetATaskName(EVATaskT taskType) { + switch (taskType) { + case EVA_TASK_COMMIT: + return "vnode-commit"; + case EVA_TASK_MERGE: + return "vnode-merge"; + case EVA_TASK_COMPACT: + return "vnode-compact"; + case EVA_TASK_RETENTION: + return "vnode-retention"; + default: + return "unknown"; + } } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 0c44048fbf..7fd3f9a8b6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -389,8 +389,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); // schedule the task - code = vnodeAsync2(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, - &pVnode->commitTask); + code = vnodeAsync(COMMIT_TASK_ASYNC, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index a91c46db70..ff537ef4a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -27,7 +27,6 @@ int vnodeInit(StopDnodeFp stopDnodeFp) { TAOS_CHECK_RETURN(vnodeAsyncOpen()); TAOS_CHECK_RETURN(walInit(stopDnodeFp)); - TAOS_CHECK_RETURN(tsdbInit()); monInitVnode(); @@ -36,7 +35,6 @@ int vnodeInit(StopDnodeFp stopDnodeFp) { void vnodeCleanup() { if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return; - tsdbCleanUp(); vnodeAsyncClose(); walCleanUp(); smaCleanUp(); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 945b4cbeae..6de5298728 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -438,11 +438,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC (void)taosThreadMutexInit(&pVnode->mutex, NULL); (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL); - if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) { - vError("vgId:%d, failed to init commit channel", TD_VID(pVnode)); - goto _err; - } - int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool @@ -558,10 +553,6 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeAWait(&pVnode->commitTask); - if (vnodeAChannelDestroy(&pVnode->commitChannel, true) != 0) { - vError("vgId:%d, failed to destroy commit channel", TD_VID(pVnode)); - } - vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); tqClose(pVnode->pTq); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 0c11083367..9e3c6861c0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -597,13 +597,11 @@ extern void tsdbEnableBgTask(STsdb *pTsdb); static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) { TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb)); TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode)); - TAOS_CHECK_RETURN(vnodeAChannelDestroy(&pVnode->commitChannel, true)); return 0; } static int32_t vnodeEnableBgTask(SVnode *pVnode) { tsdbEnableBgTask(pVnode->pTsdb); - TAOS_CHECK_RETURN(vnodeAChannelInit(1, &pVnode->commitChannel)); return 0; }