enh: seperate tsdb async tasks to different thread pools

This commit is contained in:
Hongze Cheng 2024-12-19 17:48:11 +08:00
parent 18fb01264d
commit 288dc33f97
17 changed files with 188 additions and 149 deletions

View File

@ -475,7 +475,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
extern void tsdbAlterMaxCompactTasks(); extern void tsdbAlterNumCompactThreads();
static int32_t dmAlterMaxCompactTask(const char *value) { static int32_t dmAlterMaxCompactTask(const char *value) {
int32_t max_compact_tasks; int32_t max_compact_tasks;
char *endptr = NULL; char *endptr = NULL;
@ -489,7 +489,7 @@ static int32_t dmAlterMaxCompactTask(const char *value) {
dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks); dInfo("alter max compact tasks from %d to %d", tsNumOfCompactThreads, max_compact_tasks);
tsNumOfCompactThreads = max_compact_tasks; tsNumOfCompactThreads = max_compact_tasks;
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
tsdbAlterMaxCompactTasks(); (void)tsdbAlterNumCompactThreads();
#endif #endif
} }

View File

@ -1083,9 +1083,6 @@ void tsdbRemoveFile(const char *path);
} \ } \
} while (0) } while (0)
int32_t tsdbInit();
void tsdbCleanUp();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -55,18 +55,32 @@ typedef enum {
EVA_PRIORITY_LOW, EVA_PRIORITY_LOW,
} EVAPriority; } EVAPriority;
typedef enum {
EVA_TASK_COMMIT = 1,
EVA_TASK_MERGE,
EVA_TASK_COMPACT,
EVA_TASK_RETENTION,
} EVATaskT;
#define COMMIT_TASK_ASYNC 1
#define MERGE_TASK_ASYNC 2
#define COMPACT_TASK_ASYNC 3
#define RETENTION_TASK_ASYNC 4
int32_t vnodeAsyncOpen(); int32_t vnodeAsyncOpen();
void vnodeAsyncClose(); void vnodeAsyncClose();
int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID); int32_t vnodeAChannelInit(int64_t async, SVAChannelID* channelID);
int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning); int32_t vnodeAChannelDestroy(SVAChannelID* channelID, bool waitRunning);
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg, int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), void* arg,
SVATaskID* taskID); SVATaskID* taskID);
int32_t vnodeAsync2(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*), int32_t vnodeAsyncC(SVAChannelID* channelID, EVAPriority priority, int32_t (*execute)(void*), void (*complete)(void*),
void* arg, SVATaskID* taskID); void* arg, SVATaskID* taskID);
void vnodeAWait(SVATaskID* taskID); void vnodeAWait(SVATaskID* taskID);
int32_t vnodeACancel(SVATaskID* taskID); int32_t vnodeACancel(SVATaskID* taskID);
int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers); int32_t vnodeAsyncSetWorkers(int64_t async, int32_t numWorkers);
const char* vnodeGetATaskName(EVATaskT task);
// vnodeBufPool.c // vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode; typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode { struct SVBufPoolNode {

View File

@ -479,7 +479,6 @@ struct SVnode {
SVBufPool* onRecycle; SVBufPool* onRecycle;
// commit variables // commit variables
SVAChannelID commitChannel;
SVATaskID commitTask; SVATaskID commitTask;
SMeta* pMeta; SMeta* pMeta;

View File

@ -574,7 +574,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
// begin tasks on file set // begin tasks on file set
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); tsdbBeginTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT, &fset);
if (fset) { if (fset) {
code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset); code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
if (code) { if (code) {
@ -712,7 +712,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(tsdb, info->fid); tsdbFinishTaskOnFileSet(tsdb, info->fid, EVA_TASK_COMMIT);
} }
} }
@ -743,7 +743,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid); tsdbFinishTaskOnFileSet(pTsdb, info->fid, EVA_TASK_COMMIT);
} }
} }
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);

View File

@ -770,8 +770,8 @@ extern void tsdbStopAllCompTask(STsdb *tsdb);
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) { int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSystem *fs = pTsdb->pFS; STFileSystem *fs = pTsdb->pFS;
SArray *channelArray = taosArrayInit(0, sizeof(SVAChannelID)); SArray *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
if (channelArray == NULL) { if (asyncTasks == NULL) {
return terrno; return terrno;
} }
@ -783,30 +783,31 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
// collect channel // collect channel
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->channelOpened) { if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL //
if (taosArrayPush(channelArray, &fset->channel) == NULL) { || taosArrayPush(asyncTasks, &fset->compactTask) == NULL //
taosArrayDestroy(channelArray); || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) {
taosArrayDestroy(asyncTasks);
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
return terrno; return terrno;
} }
fset->channel = (SVAChannelID){0};
fset->mergeScheduled = false; fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false); tsdbFSSetBlockCommit(fset, false);
fset->channelOpened = false;
}
} }
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
// destroy all channels // destroy all channels
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) { for (int32_t k = 0; k < 2; k++) {
SVAChannelID *channel = taosArrayGet(channelArray, i); for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
int32_t code = vnodeAChannelDestroy(channel, true); SVATaskID *task = taosArrayGet(asyncTasks, i);
if (code) { if (k == 0) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); (void)vnodeACancel(task);
} else {
(void)vnodeAWait(task);
} }
} }
taosArrayDestroy(channelArray); }
taosArrayDestroy(asyncTasks);
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
tsdbStopAllCompTask(pTsdb); tsdbStopAllCompTask(pTsdb);
@ -934,9 +935,6 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
// bool skipMerge = false; // bool skipMerge = false;
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger && (!fset->mergeScheduled)) { if (numFile >= sttTrigger && (!fset->mergeScheduled)) {
code = tsdbTFileSetOpenChannel(fset);
TSDB_CHECK_CODE(code, lino, _exit);
SMergeArg *arg = taosMemoryMalloc(sizeof(*arg)); SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) { if (arg == NULL) {
code = terrno; code = terrno;
@ -946,7 +944,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
arg->tsdb = fs->tsdb; arg->tsdb = fs->tsdb;
arg->fid = fset->fid; arg->fid = fset->fid;
code = vnodeAsync2(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, NULL); code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
fset->mergeScheduled = true; fset->mergeScheduled = true;
} }
@ -1202,42 +1200,61 @@ _out:
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); } void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) { void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
// Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
tsdbFSGetFSet(tsdb->pFS, fid, fset); tsdbFSGetFSet(tsdb->pFS, fid, fset);
if (sttTrigger == 1 && (*fset)) { if (*fset == NULL) {
for (;;) { return;
if ((*fset)->taskRunning) { }
(*fset)->numWaitTask++;
(void)taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex); struct STFileSetCond *cond = NULL;
if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
tsdbFSGetFSet(tsdb->pFS, fid, fset); cond = &(*fset)->conds[0];
(*fset)->numWaitTask--;
} else { } else {
(*fset)->taskRunning = true; cond = &(*fset)->conds[1];
}
while (1) {
if (cond->running) {
cond->numWait++;
(void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
cond->numWait--;
} else {
cond->running = true;
break; break;
} }
} }
tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid);
} tsdbInfo("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
return;
} }
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) { void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
// Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
int16_t sttTrigger = tsdb->pVnode->config.sttTrigger; int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
if (sttTrigger == 1) {
STFileSet *fset = NULL; STFileSet *fset = NULL;
tsdbFSGetFSet(tsdb->pFS, fid, &fset); tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset != NULL && fset->taskRunning) { if (fset == NULL) {
fset->taskRunning = false; return;
if (fset->numWaitTask > 0) {
(void)taosThreadCondSignal(&fset->beginTask);
} }
tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
struct STFileSetCond *cond = NULL;
if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
cond = &fset->conds[0];
} else {
cond = &fset->conds[1];
} }
cond->running = false;
if (cond->numWait > 0) {
(void)taosThreadCondSignal(&cond->cond);
} }
tsdbInfo("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
return;
} }
struct SFileSetReader { struct SFileSetReader {

View File

@ -14,6 +14,7 @@
*/ */
#include "tsdbFSet2.h" #include "tsdbFSet2.h"
#include "vnd.h"
#ifndef _TSDB_FILE_SYSTEM_H #ifndef _TSDB_FILE_SYSTEM_H
#define _TSDB_FILE_SYSTEM_H #define _TSDB_FILE_SYSTEM_H
@ -61,8 +62,8 @@ int32_t tsdbFSEditAbort(STFileSystem *fs);
// other // other
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset); void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset);
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid); void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task);
// utils // utils
int32_t save_fs(const TFileSetArray *arr, const char *fname); int32_t save_fs(const TFileSetArray *arr, const char *fname);
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);

View File

@ -480,16 +480,18 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
fset[0]->maxVerValid = VERSION_MAX; fset[0]->maxVerValid = VERSION_MAX;
TARRAY2_INIT(fset[0]->lvlArr); TARRAY2_INIT(fset[0]->lvlArr);
// background task queue
(void)taosThreadCondInit(&(*fset)->beginTask, NULL);
(*fset)->taskRunning = false;
(*fset)->numWaitTask = 0;
// block commit variables // block commit variables
(void)taosThreadCondInit(&fset[0]->canCommit, NULL); (void)taosThreadCondInit(&fset[0]->canCommit, NULL);
(*fset)->numWaitCommit = 0; (*fset)->numWaitCommit = 0;
(*fset)->blockCommit = false; (*fset)->blockCommit = false;
for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
struct STFileSetCond *cond = &(*fset)->conds[i];
cond->running = false;
cond->numWait = 0;
(void)taosThreadCondInit(&cond->cond, NULL);
}
return 0; return 0;
} }
@ -648,8 +650,10 @@ void tsdbTFileSetClear(STFileSet **fset) {
TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear); TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);
(void)taosThreadCondDestroy(&(*fset)->beginTask);
(void)taosThreadCondDestroy(&(*fset)->canCommit); (void)taosThreadCondDestroy(&(*fset)->canCommit);
for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
(void)taosThreadCondDestroy(&(*fset)->conds[i].cond);
}
taosMemoryFreeClear(*fset); taosMemoryFreeClear(*fset);
} }
} }
@ -703,14 +707,3 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
} }
return TARRAY2_SIZE(fset->lvlArr) == 0; return TARRAY2_SIZE(fset->lvlArr) == 0;
} }
int32_t tsdbTFileSetOpenChannel(STFileSet *fset) {
int32_t code;
if (!fset->channelOpened) {
if ((code = vnodeAChannelInit(2, &fset->channel))) {
return code;
}
fset->channelOpened = true;
}
return 0;
}

View File

@ -68,8 +68,6 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset);
// stt // stt
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
void tsdbSttLvlClear(SSttLvl **lvl); void tsdbSttLvlClear(SSttLvl **lvl);
// open channel
int32_t tsdbTFileSetOpenChannel(STFileSet *fset);
struct STFileOp { struct STFileOp {
tsdb_fop_t optype; tsdb_fop_t optype;
@ -83,26 +81,30 @@ struct SSttLvl {
TFileObjArray fobjArr[1]; TFileObjArray fobjArr[1];
}; };
struct STFileSetCond {
bool running;
int32_t numWait;
TdThreadCond cond;
};
struct STFileSet { struct STFileSet {
int32_t fid; int32_t fid;
int64_t maxVerValid; int64_t maxVerValid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array TSttLvlArray lvlArr[1]; // level array
// background task
bool channelOpened;
SVAChannelID channel;
bool mergeScheduled; bool mergeScheduled;
SVATaskID mergeTask;
// sttTrigger = 1 SVATaskID compactTask;
TdThreadCond beginTask; SVATaskID retentionTask;
bool taskRunning;
int32_t numWaitTask;
// block commit variables // block commit variables
TdThreadCond canCommit; TdThreadCond canCommit;
int32_t numWaitCommit; int32_t numWaitCommit;
bool blockCommit; bool blockCommit;
// conditions
struct STFileSetCond conds[2];
}; };
struct STFileSetRange { struct STFileSetRange {

View File

@ -462,21 +462,29 @@ _exit:
static int32_t tsdbMergeGetFSet(SMerger *merger) { static int32_t tsdbMergeGetFSet(SMerger *merger) {
STFileSet *fset; STFileSet *fset;
int32_t code;
STsdb *tsdb = merger->tsdb;
(void)taosThreadMutexLock(&merger->tsdb->mutex); (void)taosThreadMutexLock(&merger->tsdb->mutex);
tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
if (fset == NULL) { if (tsdb->bgTaskDisabled) {
(void)taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0; return 0;
} }
fset->mergeScheduled = false; tsdbBeginTaskOnFileSet(tsdb, merger->fid, EVA_TASK_MERGE, &fset);
if (NULL == fset) {
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0;
}
int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
if (code) { if (code) {
(void)taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return code; return code;
} }
fset->mergeScheduled = false;
(void)taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0; return 0;
} }
@ -493,10 +501,13 @@ int32_t tsdbMerge(void *arg) {
.sttTrigger = tsdb->pVnode->config.sttTrigger, .sttTrigger = tsdb->pVnode->config.sttTrigger,
}}; }};
if (merger->sttTrigger <= 1) return 0; if (merger->sttTrigger <= 1) {
return 0;
}
// copy snapshot // copy snapshot
TAOS_CHECK_GOTO(tsdbMergeGetFSet(merger), &lino, _exit); code = tsdbMergeGetFSet(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (merger->fset == NULL) { if (merger->fset == NULL) {
return 0; return 0;
@ -509,12 +520,19 @@ int32_t tsdbMerge(void *arg) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (merger->fset) {
(void)taosThreadMutexLock(&tsdb->mutex);
tsdbFinishTaskOnFileSet(tsdb, mergeArg->fid, EVA_TASK_MERGE);
(void)taosThreadMutexUnlock(&tsdb->mutex);
}
if (code) { if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code); tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code);
taosMsleep(100); taosMsleep(100);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
tsdbTFileSetClear(&merger->fset); tsdbTFileSetClear(&merger->fset);
taosMemoryFree(arg); taosMemoryFree(arg);
return code; return code;

View File

@ -18,22 +18,6 @@
extern int32_t tsdbOpenCompMonitor(STsdb *tsdb); extern int32_t tsdbOpenCompMonitor(STsdb *tsdb);
extern void tsdbCloseCompMonitor(STsdb *tsdb); extern void tsdbCloseCompMonitor(STsdb *tsdb);
extern int32_t tsdbInitCompact();
extern void tsdbCleanupCompact();
int32_t tsdbInit() {
#ifdef TD_ENTERPRISE
return tsdbInitCompact();
#endif
return 0;
}
void tsdbCleanUp() {
#ifdef TD_ENTERPRISE
tsdbCleanupCompact();
#endif
return;
}
void tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) { void tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg; STsdbKeepCfg *pKeepCfg = &pTsdb->keepCfg;

View File

@ -325,11 +325,21 @@ static int32_t tsdbRetention(void *arg) {
// begin task // begin task
(void)taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
// check if background task is disabled
if (pTsdb->bgTaskDisabled) {
tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
(void)taosThreadMutexUnlock(&pTsdb->mutex);
return 0;
}
// set flag and copy
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) { if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
// do retention // do retention
@ -346,7 +356,7 @@ static int32_t tsdbRetention(void *arg) {
_exit: _exit:
if (rtner.fset) { if (rtner.fset) {
(void)taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid); tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
} }
@ -364,12 +374,14 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// check if background task is disabled
if (tsdb->bgTaskDisabled) {
tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
return 0;
}
STFileSet *fset; STFileSet *fset;
if (!tsdb->bgTaskDisabled) {
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
TAOS_CHECK_GOTO(tsdbTFileSetOpenChannel(fset), &lino, _exit);
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) { if (arg == NULL) {
TAOS_CHECK_GOTO(terrno, &lino, _exit); TAOS_CHECK_GOTO(terrno, &lino, _exit);
@ -380,12 +392,13 @@ static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate)
arg->fid = fset->fid; arg->fid = fset->fid;
arg->s3Migrate = s3Migrate; arg->s3Migrate = s3Migrate;
if ((code = vnodeAsync2(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) { code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
&fset->retentionTask);
if (code) {
taosMemoryFree(arg); taosMemoryFree(arg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
}
_exit: _exit:
if (code) { if (code) {

View File

@ -493,10 +493,10 @@ int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void
.async = async, .async = async,
.id = 0, .id = 0,
}; };
return vnodeAsync2(&channelID, priority, execute, complete, arg, taskID); return vnodeAsyncC(&channelID, priority, execute, complete, arg, taskID);
} }
int32_t vnodeAsync2(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *), int32_t vnodeAsyncC(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
void *arg, SVATaskID *taskID) { void *arg, SVATaskID *taskID) {
if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL || if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
channelID->id < 0) { channelID->id < 0) {
@ -828,3 +828,18 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
channelID->id = 0; channelID->id = 0;
return 0; return 0;
} }
const char *vnodeGetATaskName(EVATaskT taskType) {
switch (taskType) {
case EVA_TASK_COMMIT:
return "vnode-commit";
case EVA_TASK_MERGE:
return "vnode-merge";
case EVA_TASK_COMPACT:
return "vnode-compact";
case EVA_TASK_RETENTION:
return "vnode-retention";
default:
return "unknown";
}
}

View File

@ -389,8 +389,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// schedule the task // schedule the task
code = vnodeAsync2(&pVnode->commitChannel, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, code = vnodeAsync(COMMIT_TASK_ASYNC, EVA_PRIORITY_HIGH, vnodeCommit, vnodeCommitCancel, pInfo, &pVnode->commitTask);
&pVnode->commitTask);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:

View File

@ -27,7 +27,6 @@ int vnodeInit(StopDnodeFp stopDnodeFp) {
TAOS_CHECK_RETURN(vnodeAsyncOpen()); TAOS_CHECK_RETURN(vnodeAsyncOpen());
TAOS_CHECK_RETURN(walInit(stopDnodeFp)); TAOS_CHECK_RETURN(walInit(stopDnodeFp));
TAOS_CHECK_RETURN(tsdbInit());
monInitVnode(); monInitVnode();
@ -36,7 +35,6 @@ int vnodeInit(StopDnodeFp stopDnodeFp) {
void vnodeCleanup() { void vnodeCleanup() {
if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return; if (atomic_val_compare_exchange_32(&VINIT, 1, 0) == 0) return;
tsdbCleanUp();
vnodeAsyncClose(); vnodeAsyncClose();
walCleanUp(); walCleanUp();
smaCleanUp(); smaCleanUp();

View File

@ -438,11 +438,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
(void)taosThreadMutexInit(&pVnode->mutex, NULL); (void)taosThreadMutexInit(&pVnode->mutex, NULL);
(void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL); (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
if (vnodeAChannelInit(1, &pVnode->commitChannel) != 0) {
vError("vgId:%d, failed to init commit channel", TD_VID(pVnode));
goto _err;
}
int8_t rollback = vnodeShouldRollback(pVnode); int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool // open buffer pool
@ -558,10 +553,6 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeAWait(&pVnode->commitTask); vnodeAWait(&pVnode->commitTask);
if (vnodeAChannelDestroy(&pVnode->commitChannel, true) != 0) {
vError("vgId:%d, failed to destroy commit channel", TD_VID(pVnode));
}
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
tqClose(pVnode->pTq); tqClose(pVnode->pTq);

View File

@ -597,13 +597,11 @@ extern void tsdbEnableBgTask(STsdb *pTsdb);
static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) { static int32_t vnodeCancelAndDisableAllBgTask(SVnode *pVnode) {
TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb)); TAOS_CHECK_RETURN(tsdbDisableAndCancelAllBgTask(pVnode->pTsdb));
TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode)); TAOS_CHECK_RETURN(vnodeSyncCommit(pVnode));
TAOS_CHECK_RETURN(vnodeAChannelDestroy(&pVnode->commitChannel, true));
return 0; return 0;
} }
static int32_t vnodeEnableBgTask(SVnode *pVnode) { static int32_t vnodeEnableBgTask(SVnode *pVnode) {
tsdbEnableBgTask(pVnode->pTsdb); tsdbEnableBgTask(pVnode->pTsdb);
TAOS_CHECK_RETURN(vnodeAChannelInit(1, &pVnode->commitChannel));
return 0; return 0;
} }