diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 7fd5432a4e..8e200b67b4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -48,15 +48,23 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { tsem_init(&fs[0]->canEdit, 0, 1); fs[0]->state = TSDB_FS_STATE_NONE; fs[0]->neid = 0; - fs[0]->mergeTaskOn = false; TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArrTmp); + // background task queue + taosThreadMutexInit(fs[0]->mutex, NULL); + fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue; + fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue; + return 0; } static int32_t destroy_fs(STFileSystem **fs) { if (fs[0] == NULL) return 0; + taosThreadMutexDestroy(fs[0]->mutex); + + ASSERT(fs[0]->bgTaskNum == 0); + TARRAY2_DESTROY(fs[0]->fSetArr, NULL); TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL); tsem_destroy(&fs[0]->canEdit); @@ -595,25 +603,18 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { code = commit_edit(fs); TSDB_CHECK_CODE(code, lino, _exit); - if (fs->etype == TSDB_FEDIT_MERGE) { - ASSERT(fs->mergeTaskOn); - fs->mergeTaskOn = false; - } - - // check if need to merge - if (fs->tsdb->pVnode->config.sttTrigger > 1 && fs->mergeTaskOn == false) { + // schedule merge + if (fs->tsdb->pVnode->config.sttTrigger != 1) { STFileSet *fset; TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; - SSttLvl *lvl0 = TARRAY2_FIRST(fset->lvlArr); - if (lvl0->level != 0 || TARRAY2_SIZE(lvl0->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; + SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); + if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < fs->tsdb->pVnode->config.sttTrigger) continue; - code = vnodeScheduleTaskEx(1, tsdbMerge, fs->tsdb); + code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, fs->tsdb, NULL); TSDB_CHECK_CODE(code, lino, _exit); - fs->mergeTaskOn = true; - break; } } @@ -707,4 +708,136 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) { fsetArr[0] = NULL; } return 0; +} + +const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"}; + +static int32_t tsdbFSRunBgTask(void *arg) { + STFileSystem *fs = (STFileSystem *)arg; + + ASSERT(fs->bgTaskRunning != NULL); + + fs->bgTaskRunning->launchTime = taosGetTimestampMs(); + fs->bgTaskRunning->run(fs->bgTaskRunning->arg); + fs->bgTaskRunning->finishTime = taosGetTimestampMs(); + + tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64 + " finish time:%" PRId64, + TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->taskid, + fs->bgTaskRunning->scheduleTime, fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime); + + taosThreadMutexLock(fs->mutex); + + // free last + if (fs->bgTaskRunning->numWait > 0) { + taosThreadCondBroadcast(fs->bgTaskRunning->done); + } else { + taosThreadCondDestroy(fs->bgTaskRunning->done); + taosMemoryFree(fs->bgTaskRunning); + } + fs->bgTaskRunning = NULL; + + // schedule next + if (fs->bgTaskNum > 0) { + // pop task from head + fs->bgTaskRunning = fs->bgTaskQueue->next; + fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next; + fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev; + fs->bgTaskNum--; + + vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg); + } + + taosThreadMutexUnlock(fs->mutex); + return 0; +} + +static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, + int64_t *taskid) { + // check if same task is on + if (fs->bgTaskRunning && fs->bgTaskRunning->type == type) { + return 0; + } + + for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) { + if (task->type == type) { + return 0; + } + } + + // do schedule task + STFSBgTask *task = taosMemoryCalloc(1, sizeof(STFSBgTask)); + if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY; + taosThreadCondInit(task->done, NULL); + + task->type = type; + task->run = run; + task->arg = arg; + task->scheduleTime = taosGetTimestampMs(); + task->taskid = ++fs->taskid; + + if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) { + // launch task directly + fs->bgTaskRunning = task; + vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs); + } else { + // add to the queue tail + fs->bgTaskNum++; + task->next = fs->bgTaskQueue; + task->prev = fs->bgTaskQueue->prev; + task->prev->next = task; + task->next->prev = task; + } + + if (taskid) *taskid = task->taskid; + return 0; +} + +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid) { + taosThreadMutexLock(fs->mutex); + int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, arg, taskid); + taosThreadMutexUnlock(fs->mutex); + return code; +} + +int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) { + STFSBgTask *task = NULL; + + taosThreadMutexLock(fs->mutex); + + if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) { + task = fs->bgTaskRunning; + } else { + for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) { + if (taskt->taskid == taskid) { + task = taskt; + break; + } + } + } + + if (task) { + task->numWait++; + taosThreadCondWait(task->done, fs->mutex); + task->numWait--; + + if (task->numWait == 0) { + taosThreadCondDestroy(task->done); + taosMemoryFree(task); + } + } + + taosThreadMutexUnlock(fs->mutex); + return 0; +} + +int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) { + taosThreadMutexLock(fs->mutex); + + while (fs->bgTaskRunning) { + taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex); + } + + taosThreadMutexUnlock(fs->mutex); + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 074ce7c551..36156d0662 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -24,6 +24,7 @@ extern "C" { /* Exposed Handle */ typedef struct STFileSystem STFileSystem; +typedef struct STFSBgTask STFSBgTask; typedef TARRAY2(STFileSet *) TFileSetArray; typedef enum { @@ -31,6 +32,12 @@ typedef enum { TSDB_FEDIT_MERGE } EFEditT; +typedef enum { + TSDB_BG_TASK_MERGER = 1, + TSDB_BG_TASK_RETENTION, + TSDB_BG_TASK_COMPACT, +} EFSBgTaskT; + /* Exposed APIs */ // open/close int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback); @@ -45,9 +52,30 @@ int64_t tsdbFSAllocEid(STFileSystem *fs); int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs); +// background task +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void *arg, int64_t *taskid); +int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid); +int32_t tsdbFSWaitAllBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); +struct STFSBgTask { + EFSBgTaskT type; + int32_t (*run)(void *arg); + void *arg; + + TdThreadCond done[1]; + int32_t numWait; + + int64_t taskid; + int64_t scheduleTime; + int64_t launchTime; + int64_t finishTime; + + struct STFSBgTask *prev; + struct STFSBgTask *next; +}; + /* Exposed Structs */ struct STFileSystem { STsdb *tsdb; @@ -55,9 +83,15 @@ struct STFileSystem { int32_t state; int64_t neid; EFEditT etype; - bool mergeTaskOn; TFileSetArray fSetArr[1]; TFileSetArray fSetArrTmp[1]; + + // background task queue + TdThreadMutex mutex[1]; + int64_t taskid; + int32_t bgTaskNum; + STFSBgTask bgTaskQueue[1]; + STFSBgTask *bgTaskRunning; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index a6b72e38bd..6580b89dad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -16,105 +16,6 @@ #include "tsdb.h" #include "tsdbFS2.h" -static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) { - for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - - if (expLevel == pSet->diskId.level) continue; - - if (expLevel < 0) { - return true; - } else { - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - return false; - } - - if (did.level == pSet->diskId.level) continue; - - return true; - } - } - - return false; -} -bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { - bool should; - taosThreadRwlockRdlock(&pTsdb->rwLock); - should = tsdbShouldDoRetentionImpl(pTsdb, now); - taosThreadRwlockUnlock(&pTsdb->rwLock); - return should; -} - -int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { - int32_t code = 0; - int32_t lino = 0; - STsdbFS fs = {0}; - - code = tsdbFSCopy(pTsdb, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - - if (expLevel < 0) { - taosMemoryFree(pSet->pHeadF); - taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->pSmaF); - for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { - taosMemoryFree(pSet->aSttF[iStt]); - } - taosArrayRemove(fs.aDFileSet, iSet); - iSet--; - } else { - if (expLevel == 0) continue; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - code = terrno; - goto _exit; - } - - if (did.level == pSet->diskId.level) continue; - - // copy file to new disk (todo) - SDFileSet fSet = *pSet; - fSet.diskId = did; - - code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertFSet(&fs, &fSet); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // do change fs - code = tsdbFSPrepareCommit(pTsdb, &fs); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - tsdbFSDestroy(&fs); - return code; -} - -static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); } - -int32_t tsdbCommitRetention(STsdb *pTsdb) { - taosThreadRwlockWrlock(&pTsdb->rwLock); - tsdbCommitRetentionImpl(pTsdb); - taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - return 0; -} - -// new ============== typedef struct { STsdb *tsdb; int32_t szPage; @@ -128,19 +29,19 @@ typedef struct { int32_t fsetArrIdx; STFileSet *fset; } ctx[1]; -} SRTXer; +} SRTNer; -static int32_t tsdbDoRemoveFileObject(SRTXer *rtxer, const STFileObj *fobj) { +static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { STFileOp op = { .optype = TSDB_FOP_REMOVE, .fid = fobj->f->fid, .of = fobj->f[0], }; - return TARRAY2_APPEND(rtxer->fopArr, op); + return TARRAY2_APPEND(rtner->fopArr, op); } -static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile *to) { +static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) { int32_t code = 0; int32_t lino = 0; @@ -148,7 +49,7 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile TdFilePtr fdFrom = NULL; TdFilePtr fdTo = NULL; - tsdbTFileName(rtxer->tsdb, to, fname); + tsdbTFileName(rtner->tsdb, to, fname); fdFrom = taosOpenFile(from->fname, TD_FILE_READ); if (fdFrom == NULL) code = terrno; @@ -158,7 +59,7 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile if (fdTo == NULL) code = terrno; TSDB_CHECK_CODE(code, lino, _exit); - int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtxer->szPage)); + int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit); @@ -168,14 +69,14 @@ static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); taosCloseFile(&fdFrom); taosCloseFile(&fdTo); } return code; } -static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const SDiskID *did) { +static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) { int32_t code = 0; int32_t lino = 0; STFileOp op = {0}; @@ -187,7 +88,7 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const .of = fobj->f[0], }; - code = TARRAY2_APPEND(rtxer->fopArr, op); + code = TARRAY2_APPEND(rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // create new @@ -199,7 +100,7 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const .type = fobj->f->type, .did = did[0], .fid = fobj->f->fid, - .cid = rtxer->cid, + .cid = rtner->cid, .size = fobj->f->size, .stt[0] = { @@ -208,101 +109,105 @@ static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const }, }; - code = TARRAY2_APPEND(rtxer->fopArr, op); + code = TARRAY2_APPEND(rtner->fopArr, op); TSDB_CHECK_CODE(code, lino, _exit); // do copy the file - code = tsdbDoCopyFile(rtxer, fobj, &op.nf); + code = tsdbDoCopyFile(rtner, fobj, &op.nf); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } return code; } -static int32_t tsdbDoRetentionBegin(STsdb *tsdb, SRTXer *rtxer) { +typedef struct { + STsdb *tsdb; + int64_t now; +} SRtnArg; + +static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) { int32_t code = 0; int32_t lino = 0; - // TODO: wait for merge and compact task done + STsdb *tsdb = arg->tsdb; - rtxer->tsdb = tsdb; - rtxer->szPage = tsdb->pVnode->config.tsdbPageSize; - rtxer->now = taosGetTimestampMs(); - rtxer->cid = tsdbFSAllocEid(tsdb->pFS); + rtner->tsdb = tsdb; + rtner->szPage = tsdb->pVnode->config.tsdbPageSize; + rtner->now = arg->now; + rtner->cid = tsdbFSAllocEid(tsdb->pFS); - code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtxer->fsetArr); + code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtner->fsetArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } else { - tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__); + tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); } return code; } -static int32_t tsdbDoRetentionEnd(SRTXer *rtxer) { +static int32_t tsdbDoRetentionEnd(SRTNer *rtner) { int32_t code = 0; int32_t lino = 0; - if (TARRAY2_SIZE(rtxer->fopArr) == 0) goto _exit; + if (TARRAY2_SIZE(rtner->fopArr) == 0) goto _exit; - code = tsdbFSEditBegin(rtxer->tsdb->pFS, rtxer->fopArr, TSDB_FEDIT_MERGE); + code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit); - taosThreadRwlockWrlock(&rtxer->tsdb->rwLock); + taosThreadRwlockWrlock(&rtner->tsdb->rwLock); - code = tsdbFSEditCommit(rtxer->tsdb->pFS); + code = tsdbFSEditCommit(rtner->tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&rtxer->tsdb->rwLock); + taosThreadRwlockUnlock(&rtner->tsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); } - taosThreadRwlockUnlock(&rtxer->tsdb->rwLock); + taosThreadRwlockUnlock(&rtner->tsdb->rwLock); - TARRAY2_DESTROY(rtxer->fopArr, NULL); - tsdbFSDestroyCopySnapshot(&rtxer->fsetArr); + TARRAY2_DESTROY(rtner->fopArr, NULL); + tsdbFSDestroyCopySnapshot(&rtner->fsetArr); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } else { - tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__); + tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__); } return code; } -static int32_t tsdbDoRetention2(STsdb *tsdb) { +static int32_t tsdbDoRetention2(void *arg) { int32_t code = 0; int32_t lino = 0; + SRTNer rtner[1] = {0}; - SRTXer rtxer[1] = {0}; - - code = tsdbDoRetentionBegin(tsdb, rtxer); + code = tsdbDoRetentionBegin(arg, rtner); TSDB_CHECK_CODE(code, lino, _exit); - while (rtxer->ctx->fsetArrIdx < TARRAY2_SIZE(rtxer->fsetArr)) { - rtxer->ctx->fset = TARRAY2_GET(rtxer->fsetArr, rtxer->ctx->fsetArrIdx); + while (rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr)) { + rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx); STFileObj *fobj; - int32_t expLevel = tsdbFidLevel(rtxer->ctx->fset->fid, &rtxer->tsdb->keepCfg, rtxer->now); + int32_t expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now); if (expLevel < 0) { // remove the file set - for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) { + for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - code = tsdbDoRemoveFileObject(rtxer, fobj); + code = tsdbDoRemoveFileObject(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); } SSttLvl *lvl; - TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) { + TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) { TARRAY2_FOREACH(lvl->fobjArr, fobj) { - code = tsdbDoRemoveFileObject(rtxer, fobj); + code = tsdbDoRemoveFileObject(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -311,39 +216,62 @@ static int32_t tsdbDoRetention2(STsdb *tsdb) { } else { SDiskID did; - if (tfsAllocDisk(rtxer->tsdb->pVnode->pTfs, expLevel, &did) < 0) { + if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } // data - for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) { + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; if (fobj->f->did.level == did.level) continue; - code = tsdbDoMigrateFileObj(rtxer, fobj, &did); + code = tsdbDoMigrateFileObj(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } // stt SSttLvl *lvl; - TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) { + TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) { TARRAY2_FOREACH(lvl->fobjArr, fobj) { if (fobj->f->did.level == did.level) continue; - code = tsdbDoMigrateFileObj(rtxer, fobj, &did); + code = tsdbDoMigrateFileObj(rtner, fobj, &did); TSDB_CHECK_CODE(code, lino, _exit); } } } } - code = tsdbDoRetentionEnd(rtxer); + code = tsdbDoRetentionEnd(rtner); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } + taosMemoryFree(arg); return code; } + +int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now, int64_t *taskid) { + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + arg->tsdb = tsdb; + arg->now = now; + + int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, arg, taskid); + if (code) taosMemoryFree(arg); + + return code; +} + +int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now) { + int64_t taskid; + + int32_t code = tsdbAsyncRetention(tsdb, now, &taskid); + if (code) return code; + + return tsdbFSWaitBgTask(tsdb->pFS, taskid); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e526d67f3c..84671197d8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -528,25 +528,25 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK *maxKey = *minKey + tsTickPerMin[precision] * minutes - 1; } -int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) { +int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec) { int32_t aFid[3]; TSKEY key; if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) { - now = now * 1000; + nowSec = nowSec * 1000; } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) { - now = now * 1000000l; + nowSec = nowSec * 1000000l; } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) { - now = now * 1000000000l; + nowSec = nowSec * 1000000000l; } else { ASSERT(0); } - key = now - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision]; + key = nowSec - pKeepCfg->keep0 * tsTickPerMin[pKeepCfg->precision]; aFid[0] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision); - key = now - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision]; + key = nowSec - pKeepCfg->keep1 * tsTickPerMin[pKeepCfg->precision]; aFid[1] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision); - key = now - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision]; + key = nowSec - pKeepCfg->keep2 * tsTickPerMin[pKeepCfg->precision]; aFid[2] = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision); if (fid >= aFid[0]) { diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index 170deb4286..43b071a48f 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -15,116 +15,16 @@ #include "vnd.h" -typedef struct { - SVnode *pVnode; - int64_t now; - int64_t commitID; - SVnodeInfo info; -} SRetentionInfo; +extern int32_t tsdbSyncRetention(STsdb *tsdb, int64_t now); -extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); -extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now); -extern int32_t tsdbCommitRetention(STsdb *pTsdb); - -static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - - tsem_wait(&pVnode->canCommit); - - pInfo->commitID = ++pVnode->state.commitID; - - char dir[TSDB_FILENAME_LEN] = {0}; - if (pVnode->pTfs) { - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - } else { - snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); - } - - if (vnodeLoadInfo(dir, &pInfo->info) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); +int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now) { + int32_t code; + if (pVnode->config.sttTrigger == 1) { + tsem_wait(&pVnode->canCommit); + code = tsdbSyncRetention(pVnode->pTsdb, now); tsem_post(&pVnode->canCommit); } else { - vInfo("vgId:%d %s done", TD_VID(pVnode), __func__); + code = tsdbSyncRetention(pVnode->pTsdb, now); } return code; } - -static int32_t vnodeRetentionTask(void *param) { - int32_t code = 0; - int32_t lino = 0; - - SRetentionInfo *pInfo = (SRetentionInfo *)param; - SVnode *pVnode = pInfo->pVnode; - char dir[TSDB_FILENAME_LEN] = {0}; - - if (pVnode->pTfs) { - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - } else { - snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); - } - - // save info - pInfo->info.state.commitID = pInfo->commitID; - - if (vnodeSaveInfo(dir, &pInfo->info) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // do job - code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now); - TSDB_CHECK_CODE(code, lino, _exit); - - // commit info - vnodeCommitInfo(dir); - - // commit sub-job - tsdbCommitRetention(pVnode->pTsdb); - -_exit: - if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code)); - } else { - vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); - } - tsem_post(&pInfo->pVnode->canCommit); - taosMemoryFree(pInfo); - return code; -} - -int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) { - int32_t code = 0; - int32_t lino = 0; - - if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code; - - SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); - if (pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pInfo->pVnode = pVnode; - pInfo->now = now; - - code = vnodePrepareRentention(pVnode, pInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - vnodeScheduleTask(vnodeRetentionTask, pInfo); - -_exit: - if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code)); - if (pInfo) taosMemoryFree(pInfo); - } else { - vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); - } - return 0; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5f866dee69..7efcd0b2c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -509,7 +509,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || + pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && + !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } @@ -565,8 +567,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); -// case TDMT_VND_TMQ_CONSUME: -// return tqProcessPollReq(pVnode->pTq, pMsg); + // case TDMT_VND_TMQ_CONSUME: + // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: @@ -609,7 +611,9 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } -extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now); +extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now); +extern int32_t vnodeSyncRetention(SVnode *pVnode, int64_t now); + static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -622,10 +626,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3 vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); - // process - vnodeAsyncRentention(pVnode, trimReq.timestamp); - tsem_wait(&pVnode->canCommit); - tsem_post(&pVnode->canCommit); + code = vnodeSyncRetention(pVnode, trimReq.timestamp); _exit: return code; @@ -650,7 +651,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, tqUpdateTbUidList(pVnode->pTq, tbUids, false); } - vnodeAsyncRentention(pVnode, ttlReq.timestamp); + vnodeSyncRetention(pVnode, ttlReq.timestamp); end: taosArrayDestroy(tbUids);