From cdb1d8296ddd322d8f0de91656ed9a153017d71f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 27 Oct 2023 17:48:19 +0800 Subject: [PATCH 1/2] feat: concurrency on fileset --- source/dnode/vnode/src/inc/tsdb.h | 18 +- source/dnode/vnode/src/sma/smaRollup.c | 2 - source/dnode/vnode/src/tsdb/tsdbCommit.c | 20 +- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 25 +- source/dnode/vnode/src/tsdb/tsdbFS2.c | 319 ++++++++++---------- source/dnode/vnode/src/tsdb/tsdbFS2.h | 50 +-- source/dnode/vnode/src/tsdb/tsdbFSet2.c | 33 +- source/dnode/vnode/src/tsdb/tsdbFSet2.h | 44 ++- source/dnode/vnode/src/tsdb/tsdbMerge.c | 117 ++++--- source/dnode/vnode/src/tsdb/tsdbOpen.c | 9 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 66 ++-- source/dnode/vnode/src/tsdb/tsdbRetention.c | 264 +++++++++------- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 18 +- 13 files changed, 542 insertions(+), 443 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 79112babc3..1efcd9417d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -309,7 +309,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STs void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); // tsdbMerge.c ============================================================================================== -int32_t tsdbMerge(void *arg); +int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid); // tsdbDiskData ============================================================================================== int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); @@ -371,7 +371,7 @@ struct STsdb { char *path; SVnode *pVnode; STsdbKeepCfg keepCfg; - TdThreadRwlock rwLock; + TdThreadMutex mutex; SMemTable *mem; SMemTable *imem; STsdbFS fs; // old @@ -668,8 +668,8 @@ struct SDelFWriter { }; #include "tarray2.h" -//#include "tsdbFS2.h" -// struct STFileSet; +// #include "tsdbFS2.h" +// struct STFileSet; typedef struct STFileSet STFileSet; typedef TARRAY2(STFileSet *) TFileSetArray; @@ -677,9 +677,9 @@ typedef struct STSnapRange STSnapRange; typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges // util -int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); +int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); +void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges); // snap partition list @@ -873,8 +873,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); -void tMergeTreePinSttBlock(SMergeTree* pMTree); -void tMergeTreeUnpinSttBlock(SMergeTree* pMTree); +void tMergeTreePinSttBlock(SMergeTree *pMTree); +void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8da2fff5a6..14c5baa402 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -31,8 +31,6 @@ SSmaMgmt smaMgmt = { typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; -extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now); - static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static void tdUidStoreDestory(STbUidStore *pStore); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b4c2c0a979..55ae25aee4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -131,7 +131,7 @@ int32_t tsdbBegin(STsdb *pTsdb) { TSDB_CHECK_CODE(code, lino, _exit); // lock - if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) { + if ((code = taosThreadMutexLock(&pTsdb->mutex))) { code = TAOS_SYSTEM_ERROR(code); TSDB_CHECK_CODE(code, lino, _exit); } @@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) { pTsdb->mem = pMemTable; // unlock - if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) { + if ((code = taosThreadMutexUnlock(&pTsdb->mutex))) { code = TAOS_SYSTEM_ERROR(code); TSDB_CHECK_CODE(code, lino, _exit); } @@ -152,11 +152,11 @@ _exit: } int32_t tsdbPrepareCommit(STsdb *pTsdb) { - taosThreadRwlockWrlock(&pTsdb->rwLock); + taosThreadMutexLock(&pTsdb->mutex); ASSERT(pTsdb->imem == NULL); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); return 0; } @@ -171,9 +171,9 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { // check if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { - taosThreadRwlockWrlock(&pTsdb->rwLock); + taosThreadMutexLock(&pTsdb->mutex); pTsdb->imem = NULL; - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); tsdbUnrefMemTable(pMemTable, NULL, true); goto _exit; @@ -501,6 +501,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; SDFileSet *pRSet = NULL; + // memory pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); @@ -798,6 +799,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; + // commit file data start code = tsdbCommitFileDataStart(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); @@ -1650,18 +1652,18 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { SMemTable *pMemTable = pTsdb->imem; // lock - taosThreadRwlockWrlock(&pTsdb->rwLock); + taosThreadMutexLock(&pTsdb->mutex); code = tsdbFSCommit(pTsdb); if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } pTsdb->imem = NULL; // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); if (pMemTable) { tsdbUnrefMemTable(pMemTable, NULL, true); } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 6dc492f420..79ecdab15c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -367,7 +367,12 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { int32_t lino = 0; STsdb *tsdb = committer->tsdb; - committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); + int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); + + // check if can commit + tsdbFSCheckCommit(tsdb, fid); + + committer->ctx->fid = fid; committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, &committer->ctx->maxKey); @@ -549,11 +554,11 @@ _exit: } int32_t tsdbPreCommit(STsdb *tsdb) { - taosThreadRwlockWrlock(&tsdb->rwLock); + taosThreadMutexLock(&tsdb->mutex); ASSERT(tsdb->imem == NULL); tsdb->imem = tsdb->mem; tsdb->mem = NULL; - taosThreadRwlockUnlock(&tsdb->rwLock); + taosThreadMutexUnlock(&tsdb->mutex); return 0; } @@ -568,15 +573,13 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { int64_t nDel = imem->nDel; if (nRow == 0 && nDel == 0) { - taosThreadRwlockWrlock(&tsdb->rwLock); + taosThreadMutexLock(&tsdb->mutex); tsdb->imem = NULL; - taosThreadRwlockUnlock(&tsdb->rwLock); + taosThreadMutexUnlock(&tsdb->mutex); tsdbUnrefMemTable(imem, NULL, true); } else { SCommitter2 committer[1]; - tsdbFSCheckCommit(tsdb->pFS); - code = tsdbOpenCommitter(tsdb, info, committer); TSDB_CHECK_CODE(code, lino, _exit); @@ -605,14 +608,14 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { if (tsdb->imem == NULL) goto _exit; SMemTable *pMemTable = tsdb->imem; - taosThreadRwlockWrlock(&tsdb->rwLock); + taosThreadMutexLock(&tsdb->mutex); code = tsdbFSEditCommit(tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&tsdb->rwLock); + taosThreadMutexUnlock(&tsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } tsdb->imem = NULL; - taosThreadRwlockUnlock(&tsdb->rwLock); + taosThreadMutexUnlock(&tsdb->mutex); tsdbUnrefMemTable(pMemTable, NULL, true); _exit: @@ -640,4 +643,4 @@ _exit: tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 93a16b5502..38d221d978 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -55,25 +55,11 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArrTmp); - // background task queue - taosThreadMutexInit(fs[0]->mutex, NULL); - fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue; - fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue; - - taosThreadMutexInit(&fs[0]->commitMutex, NULL); - taosThreadCondInit(&fs[0]->canCommit, NULL); - fs[0]->blockCommit = false; - return 0; } static int32_t destroy_fs(STFileSystem **fs) { if (fs[0] == NULL) return 0; - taosThreadMutexDestroy(&fs[0]->commitMutex); - taosThreadCondDestroy(&fs[0]->canCommit); - taosThreadMutexDestroy(fs[0]->mutex); - - ASSERT(fs[0]->bgTaskNum == 0); TARRAY2_DESTROY(fs[0]->fSetArr, NULL); TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL); @@ -264,10 +250,11 @@ static int32_t apply_commit(STFileSystem *fs) { if (fset1 && fset2) { if (fset1->fid < fset2->fid) { // delete fset1 - TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); + tsdbTFileSetRemove(fset1); + i1++; } else if (fset1->fid > fset2->fid) { // create new file set with fid of fset2->fid - code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1); + code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1); if (code) return code; code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; @@ -282,10 +269,11 @@ static int32_t apply_commit(STFileSystem *fs) { } } else if (fset1) { // delete fset1 - TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); + tsdbTFileSetRemove(fset1); + i1++; } else { // create new file set with fid of fset2->fid - code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1); + code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1); if (code) return code; code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); if (code) return code; @@ -512,7 +500,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { TARRAY2_FOREACH(lvl->fobjArr, fobj) { code = tsdbFSDoScanAndFixFile(fs, fobj); if (code) { - fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1; + fset->maxVerValid = + (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1; corrupt = true; } } @@ -592,7 +581,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) { const STFileSet *fset1; TARRAY2_FOREACH(src, fset1) { STFileSet *fset2; - code = tsdbTFileSetInitDup(fs->tsdb, fset1, &fset2); + code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2); if (code) return code; code = TARRAY2_APPEND(dst, fset2); if (code) return code; @@ -665,12 +654,6 @@ static int32_t close_file_system(STFileSystem *fs) { return 0; } -static int32_t apply_edit(STFileSystem *pFS) { - int32_t code = 0; - ASSERTS(0, "TODO: Not implemented yet"); - return code; -} - static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) { if (pSet1->fid < pSet2->fid) { return -1; @@ -710,10 +693,23 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) { TSDB_CHECK_CODE(code, lino, _exit); } - // remove empty file set + // remove empty empty stt level and empty file set int32_t i = 0; while (i < TARRAY2_SIZE(fsetArray)) { fset = TARRAY2_GET(fsetArray, i); + + SSttLvl *lvl; + int32_t j = 0; + while (j < TARRAY2_SIZE(fset->lvlArr)) { + lvl = TARRAY2_GET(fset->lvlArr, j); + + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { + TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear); + } else { + j++; + } + } + if (tsdbTFileSetIsEmpty(fset)) { TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear); } else { @@ -753,13 +749,13 @@ _exit: static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) { task->numWait++; - taosThreadCondWait(task->done, fs->mutex); + taosThreadCondWait(task->done, &fs->tsdb->mutex); task->numWait--; if (task->numWait == 0) { taosThreadCondDestroy(task->done); - if (task->free) { - task->free(task->arg); + if (task->destroy) { + task->destroy(task->arg); } taosMemoryFree(task); } @@ -770,8 +766,8 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) { taosThreadCondBroadcast(task->done); } else { taosThreadCondDestroy(task->done); - if (task->free) { - task->free(task->arg); + if (task->destroy) { + task->destroy(task->arg); } taosMemoryFree(task); } @@ -780,23 +776,16 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) { int32_t tsdbCloseFS(STFileSystem **fs) { if (fs[0] == NULL) return 0; - taosThreadMutexLock(fs[0]->mutex); - fs[0]->stop = true; - - if (fs[0]->bgTaskRunning) { - tsdbDoWaitBgTask(fs[0], fs[0]->bgTaskRunning); - } - taosThreadMutexUnlock(fs[0]->mutex); - + tsdbFSDisableBgTask(fs[0]); close_file_system(fs[0]); destroy_fs(fs); return 0; } int64_t tsdbFSAllocEid(STFileSystem *fs) { - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); int64_t cid = ++fs->neid; - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); return cid; } @@ -837,27 +826,34 @@ _exit: return code; } -static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) { - taosThreadMutexLock(&fs->commitMutex); +static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) { if (block) { - fs->blockCommit = true; + fset->blockCommit = true; } else { - fs->blockCommit = false; - taosThreadCondSignal(&fs->canCommit); + fset->blockCommit = false; + if (fset->numWaitCommit > 0) { + taosThreadCondSignal(&fset->canCommit); + } } - taosThreadMutexUnlock(&fs->commitMutex); return 0; } -int32_t tsdbFSCheckCommit(STFileSystem *fs) { - taosThreadMutexLock(&fs->commitMutex); - while (fs->blockCommit) { - taosThreadCondWait(&fs->canCommit, &fs->commitMutex); +int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) { + taosThreadMutexLock(&tsdb->mutex); + STFileSet *fset; + tsdbFSGetFSet(tsdb->pFS, fid, &fset); + if (fset) { + while (fset->blockCommit) { + fset->numWaitCommit++; + taosThreadCondWait(&fset->canCommit, &tsdb->mutex); + fset->numWaitCommit--; + } } - taosThreadMutexUnlock(&fs->commitMutex); + taosThreadMutexUnlock(&tsdb->mutex); return 0; } +// IMPORTANT: the caller must hold fs->tsdb->mutex int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t code = 0; int32_t lino = 0; @@ -867,36 +863,57 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) { TSDB_CHECK_CODE(code, lino, _exit); // schedule merge - if (fs->tsdb->pVnode->config.sttTrigger > 1) { + int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; + if (sttTrigger > 1) { STFileSet *fset; - int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger; - bool schedMerge = false; - bool blockCommit = false; - TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { - if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; + if (TARRAY2_SIZE(fset->lvlArr) == 0) { + tsdbFSSetBlockCommit(fset, false); + continue; + } SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); - if (lvl->level != 0) continue; + if (lvl->level != 0) { + tsdbFSSetBlockCommit(fset, false); + continue; + } int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); if (numFile >= sttTrigger) { - schedMerge = true; + // launch merge + code = tsdbSchedMerge(fs->tsdb, fset->fid); + TSDB_CHECK_CODE(code, lino, _exit); } if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) { - blockCommit = true; + tsdbFSSetBlockCommit(fset, true); + } else { + tsdbFSSetBlockCommit(fset, false); } + } + } - if (schedMerge && blockCommit) break; + // clear empty level and fset + int32_t i = 0; + while (i < TARRAY2_SIZE(fs->fSetArr)) { + STFileSet *fset = TARRAY2_GET(fs->fSetArr, i); + + int32_t j = 0; + while (j < TARRAY2_SIZE(fset->lvlArr)) { + SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j); + + if (TARRAY2_SIZE(lvl->fobjArr) == 0) { + TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear); + } else { + j++; + } } - if (schedMerge) { - code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL); - TSDB_CHECK_CODE(code, lino, _exit); + if (tsdbTFileSetIsEmpty(fset) && fset->bgTaskRunning == NULL) { + TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear); + } else { + i++; } - - tsdbFSSetBlockCommit(fs, blockCommit); } _exit: @@ -933,15 +950,15 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { TARRAY2_INIT(fsetArr[0]); - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); TARRAY2_FOREACH(fs->fSetArr, fset) { - code = tsdbTFileSetInitDup(fs->tsdb, fset, &fset1); + code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1); if (code) break; code = TARRAY2_APPEND(fsetArr[0], fset1); if (code) break; } - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); if (code) { TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); @@ -961,9 +978,9 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) { } int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr); - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); return code; } @@ -1017,7 +1034,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange } } - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); TARRAY2_FOREACH(fs->fSetArr, fset) { int64_t ever = VERSION_MAX; if (pHash) { @@ -1034,7 +1051,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange code = TARRAY2_APPEND(fsetArr[0], fset1); if (code) break; } - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); _out: if (code) { @@ -1089,7 +1106,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev } } - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); TARRAY2_FOREACH(fs->fSetArr, fset) { int64_t sver1 = sver; int64_t ever1 = ever; @@ -1118,7 +1135,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev fsr1 = NULL; } - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); if (code) { tsdbTSnapRangeClear(&fsr1); @@ -1137,59 +1154,69 @@ _out: const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"}; static int32_t tsdbFSRunBgTask(void *arg) { - STFileSystem *fs = (STFileSystem *)arg; + STFSBgTask *task = (STFSBgTask *)arg; + STFileSystem *fs = task->fs; + STFileSet *fset; - ASSERT(fs->bgTaskRunning != NULL); + tsdbFSGetFSet(fs, task->fid, &fset); - fs->bgTaskRunning->launchTime = taosGetTimestampMs(); - fs->bgTaskRunning->run(fs->bgTaskRunning->arg); - fs->bgTaskRunning->finishTime = taosGetTimestampMs(); + ASSERT(fset != NULL && fset->bgTaskRunning == task); + + task->launchTime = taosGetTimestampMs(); + task->run(task->arg); + task->finishTime = taosGetTimestampMs(); tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64 " finish time:%" PRId64, - TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->taskid, - fs->bgTaskRunning->scheduleTime, fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime); + TD_VID(fs->tsdb->pVnode), gFSBgTaskName[task->type], task->taskid, task->scheduleTime, task->launchTime, + task->finishTime); - taosThreadMutexLock(fs->mutex); + taosThreadMutexLock(&fs->tsdb->mutex); // free last - tsdbDoDoneBgTask(fs, fs->bgTaskRunning); - fs->bgTaskRunning = NULL; + tsdbDoDoneBgTask(fs, task); + fset->bgTaskRunning = NULL; // schedule next - if (fs->bgTaskNum > 0) { + if (fset->bgTaskNum > 0) { if (fs->stop) { - while (fs->bgTaskNum > 0) { - STFSBgTask *task = fs->bgTaskQueue->next; - task->prev->next = task->next; - task->next->prev = task->prev; - fs->bgTaskNum--; - tsdbDoDoneBgTask(fs, task); + while (fset->bgTaskNum > 0) { + STFSBgTask *nextTask = fset->bgTaskQueue->next; + nextTask->prev->next = nextTask->next; + nextTask->next->prev = nextTask->prev; + fset->bgTaskNum--; + tsdbDoDoneBgTask(fs, nextTask); } } else { // pop task from head - fs->bgTaskRunning = fs->bgTaskQueue->next; - fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next; - fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev; - fs->bgTaskNum--; - vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg); + fset->bgTaskRunning = fset->bgTaskQueue->next; + fset->bgTaskRunning->prev->next = fset->bgTaskRunning->next; + fset->bgTaskRunning->next->prev = fset->bgTaskRunning->prev; + fset->bgTaskNum--; + vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fset->bgTaskRunning); } } - taosThreadMutexUnlock(fs->mutex); + taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; } -static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), - void (*destroy)(void *), void *arg, int64_t *taskid) { +// IMPORTANT: the caller must hold the fs->tsdb->mutex +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), + void (*destroy)(void *), void *arg, int64_t *taskid) { if (fs->stop) { if (destroy) { destroy(arg); } - return 0; // TODO: use a better error code + return 0; } - for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) { + STFileSet *fset; + tsdbFSGetFSet(fs, fid, &fset); + + ASSERT(fset != NULL); + + for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) { if (task->type == type) { if (destroy) { destroy(arg); @@ -1203,22 +1230,24 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY; taosThreadCondInit(task->done, NULL); + task->fs = fs; + task->fid = fid; task->type = type; task->run = run; - task->free = destroy; + task->destroy = destroy; task->arg = arg; task->scheduleTime = taosGetTimestampMs(); task->taskid = ++fs->taskid; - if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) { + if (fset->bgTaskRunning == NULL && fset->bgTaskNum == 0) { // launch task directly - fs->bgTaskRunning = task; - vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs); + fset->bgTaskRunning = task; + vnodeScheduleTaskEx(1, tsdbFSRunBgTask, task); } else { // add to the queue tail - fs->bgTaskNum++; - task->next = fs->bgTaskQueue; - task->prev = fs->bgTaskQueue->prev; + fset->bgTaskNum++; + task->next = fset->bgTaskQueue; + task->prev = fset->bgTaskQueue->prev; task->prev->next = task; task->next->prev = task; } @@ -1227,68 +1256,30 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int return 0; } -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, - int64_t *taskid) { - taosThreadMutexLock(fs->mutex); - int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid); - taosThreadMutexUnlock(fs->mutex); - return code; -} +int32_t tsdbFSDisableBgTask(STFileSystem *fs) { + taosThreadMutexLock(&fs->tsdb->mutex); + for (;;) { + fs->stop = true; + bool done = true; -int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) { - STFSBgTask *task = NULL; - - taosThreadMutexLock(fs->mutex); - - if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) { - task = fs->bgTaskRunning; - } else { - for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) { - if (taskt->taskid == taskid) { - task = taskt; + STFileSet *fset; + TARRAY2_FOREACH(fs->fSetArr, fset) { + if (fset->bgTaskRunning) { + tsdbDoWaitBgTask(fs, fset->bgTaskRunning); + done = false; break; } } - } - if (task) { - tsdbDoWaitBgTask(fs, task); + if (done) break; } - - taosThreadMutexUnlock(fs->mutex); + taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; } -int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) { - taosThreadMutexLock(fs->mutex); - - while (fs->bgTaskRunning) { - taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex); - } - - taosThreadMutexUnlock(fs->mutex); - return 0; -} - -static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) { - fs->stop = true; - - if (fs->bgTaskRunning) { - tsdbDoWaitBgTask(fs, fs->bgTaskRunning); - } - return 0; -} - -int32_t tsdbFSDisableBgTask(STFileSystem *fs) { - taosThreadMutexLock(fs->mutex); - int32_t code = tsdbFSDoDisableBgTask(fs); - taosThreadMutexUnlock(fs->mutex); - return code; -} - int32_t tsdbFSEnableBgTask(STFileSystem *fs) { - taosThreadMutexLock(fs->mutex); + taosThreadMutexLock(&fs->tsdb->mutex); fs->stop = false; - taosThreadMutexUnlock(fs->mutex); + taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 31b98e5656..a3a8e2f575 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -22,22 +22,11 @@ extern "C" { #endif -/* Exposed Handle */ -typedef struct STFileSystem STFileSystem; -typedef struct STFSBgTask STFSBgTask; -// typedef TARRAY2(STFileSet *) TFileSetArray; - typedef enum { TSDB_FEDIT_COMMIT = 1, // TSDB_FEDIT_MERGE } EFEditT; -typedef enum { - TSDB_BG_TASK_MERGER = 1, - TSDB_BG_TASK_RETENTION, - TSDB_BG_TASK_COMPACT, -} EFSBgTaskT; - typedef enum { TSDB_FCURRENT = 1, TSDB_FCURRENT_C, // for commit @@ -67,37 +56,17 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs); // background task -int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, - int64_t *taskid); -int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid); -int32_t tsdbFSWaitAllBgTask(STFileSystem *fs); +int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *), + void (*destroy)(void *), void *arg, int64_t *taskid); int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs); // other int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); -int32_t tsdbFSCheckCommit(STFileSystem *fs); +int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); // utils int32_t save_fs(const TFileSetArray *arr, const char *fname); int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); -struct STFSBgTask { - EFSBgTaskT type; - int32_t (*run)(void *arg); - void (*free)(void *arg); - void *arg; - - TdThreadCond done[1]; - int32_t numWait; - - int64_t taskid; - int64_t scheduleTime; - int64_t launchTime; - int64_t finishTime; - - struct STFSBgTask *prev; - struct STFSBgTask *next; -}; - /* Exposed Structs */ struct STFileSystem { STsdb *tsdb; @@ -109,17 +78,8 @@ struct STFileSystem { TFileSetArray fSetArrTmp[1]; // background task queue - TdThreadMutex mutex[1]; - bool stop; - int64_t taskid; - int32_t bgTaskNum; - STFSBgTask bgTaskQueue[1]; - STFSBgTask *bgTaskRunning; - - // block commit variables - TdThreadMutex commitMutex; - TdThreadCond canCommit; - bool blockCommit; + bool stop; + int64_t taskid; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 620fcb3a47..642d555366 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -342,11 +342,6 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) { int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); ASSERT(idx >= 0); TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj); - - if (TARRAY2_SIZE(lvl->fobjArr) == 0) { - // TODO: remove the stt level if no file exists anymore - // TARRAY2_REMOVE(&fset->lvlArr, lvl - fset->lvlArr.data, tsdbSttLvlClear); - } } else { ASSERT(tsdbIsSameTFile(&op->of, fset->farr[op->of.type]->f)); tsdbTFileObjUnref(fset->farr[op->of.type]); @@ -454,10 +449,22 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { fset[0]->fid = fid; fset[0]->maxVerValid = VERSION_MAX; TARRAY2_INIT(fset[0]->lvlArr); + + // background task queue + fset[0]->bgTaskNum = 0; + fset[0]->bgTaskQueue->next = fset[0]->bgTaskQueue; + fset[0]->bgTaskQueue->prev = fset[0]->bgTaskQueue; + fset[0]->bgTaskRunning = NULL; + + // block commit variables + taosThreadCondInit(&fset[0]->canCommit, NULL); + fset[0]->numWaitCommit = 0; + fset[0]->blockCommit = false; + return 0; } -int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) { +int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) { int32_t code = tsdbTFileSetInit(fset1->fid, fset); if (code) return code; @@ -588,21 +595,23 @@ int32_t tsdbTFileSetClear(STFileSet **fset) { TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear); + taosThreadCondDestroy(&fset[0]->canCommit); taosMemoryFree(fset[0]); fset[0] = NULL; return 0; } -int32_t tsdbTFileSetRemove(STFileSet **fset) { +int32_t tsdbTFileSetRemove(STFileSet *fset) { + if (fset == NULL) return 0; + for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { - if (fset[0]->farr[ftype] == NULL) continue; - tsdbTFileObjRemove(fset[0]->farr[ftype]); + if (fset->farr[ftype] == NULL) continue; + tsdbTFileObjRemove(fset->farr[ftype]); } - TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlRemove); - taosMemoryFree(fset[0]); - fset[0] = NULL; + TARRAY2_DESTROY(fset->lvlArr, tsdbSttLvlRemove); + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index ea0f99f68e..34f174ade7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -28,6 +28,8 @@ typedef struct SSttLvl SSttLvl; typedef TARRAY2(STFileObj *) TFileObjArray; typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(STFileOp) TFileOpArray; +typedef struct STFileSystem STFileSystem; +typedef struct STFSBgTask STFSBgTask; typedef enum { TSDB_FOP_NONE = 0, @@ -41,10 +43,10 @@ typedef enum { // init/clear int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset); -int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); +int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); int32_t tsdbTFileSetClear(STFileSet **fset); -int32_t tsdbTFileSetRemove(STFileSet **fset); +int32_t tsdbTFileSetRemove(STFileSet *fset); int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, TFileOpArray *fopArr); @@ -58,6 +60,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset); // cmpr int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2); // edit +int32_t tsdbSttLvlClear(SSttLvl **lvl); int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op); int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset); // max commit id @@ -70,6 +73,33 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset); int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); int32_t tsdbSttLvlClear(SSttLvl **lvl); +typedef enum { + TSDB_BG_TASK_MERGER = 1, + TSDB_BG_TASK_RETENTION, + TSDB_BG_TASK_COMPACT, +} EFSBgTaskT; + +struct STFSBgTask { + STFileSystem *fs; + int32_t fid; + + EFSBgTaskT type; + int32_t (*run)(void *arg); + void (*destroy)(void *arg); + void *arg; + + TdThreadCond done[1]; + int32_t numWait; + + int64_t taskid; + int64_t scheduleTime; + int64_t launchTime; + int64_t finishTime; + + struct STFSBgTask *prev; + struct STFSBgTask *next; +}; + struct STFileOp { tsdb_fop_t optype; int32_t fid; @@ -87,6 +117,16 @@ struct STFileSet { int64_t maxVerValid; STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array + + // background task queue + int32_t bgTaskNum; + STFSBgTask bgTaskQueue[1]; + STFSBgTask *bgTaskRunning; + + // block commit variables + TdThreadCond canCommit; + int32_t numWaitCommit; + bool blockCommit; }; struct STSnapRange { diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index e659cedba3..0c20a342d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -15,11 +15,17 @@ #include "tsdbMerge.h" -#define TSDB_MAX_LEVEL 6 // means max level is 7 +#define TSDB_MAX_LEVEL 2 // means max level is 3 typedef struct { - STsdb *tsdb; - TFileSetArray *fsetArr; + STsdb *tsdb; + int32_t fid; +} SMergeArg; + +typedef struct { + STsdb *tsdb; + int32_t fid; + STFileSet *fset; int32_t sttTrigger; int32_t maxRow; @@ -313,7 +319,6 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) { if (merger->ctx->fset->farr[ftype]) { config.files[ftype].exist = true; config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0]; - } else { config.files[ftype].exist = false; } @@ -397,13 +402,13 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) { code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit); - taosThreadRwlockWrlock(&merger->tsdb->rwLock); + taosThreadMutexLock(&merger->tsdb->mutex); code = tsdbFSEditCommit(merger->tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&merger->tsdb->rwLock); + taosThreadMutexUnlock(&merger->tsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } - taosThreadRwlockUnlock(&merger->tsdb->rwLock); + taosThreadMutexUnlock(&merger->tsdb->mutex); _exit: if (code) { @@ -478,30 +483,21 @@ _exit: } static int32_t tsdbDoMerge(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SSttLvl *lvl = TARRAY2_FIRST(merger->fset->lvlArr); - STFileSet *fset; - TARRAY2_FOREACH(merger->fsetArr, fset) { - if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; + if (TARRAY2_SIZE(merger->fset->lvlArr) == 0) return 0; + if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) return 0; - SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); + code = tsdbMergerOpen(merger); + TSDB_CHECK_CODE(code, lino, _exit); - if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue; + code = tsdbMergeFileSet(merger, merger->fset); + TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx->opened) { - code = tsdbMergerOpen(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbMergeFileSet(merger, fset); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (merger->ctx->opened) { - code = tsdbMergerClose(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbMergerClose(merger); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -512,36 +508,73 @@ _exit: return code; } -int32_t tsdbMerge(void *arg) { - int32_t code = 0; - int32_t lino = 0; - STsdb *tsdb = (STsdb *)arg; +static int32_t tsdbMergeGetFSet(SMerger *merger) { + STFileSet *fset; - SMerger merger[1] = {{ - .tsdb = tsdb, - .sttTrigger = tsdb->pVnode->config.sttTrigger, - }}; - - if (merger->sttTrigger <= 1) { + taosThreadMutexLock(&merger->tsdb->mutex); + tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset); + if (fset == NULL) { + taosThreadMutexUnlock(&merger->tsdb->mutex); return 0; } - code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr); + int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); + if (code) { + taosThreadMutexUnlock(&merger->tsdb->mutex); + return code; + } + taosThreadMutexUnlock(&merger->tsdb->mutex); + return 0; +} + +static int32_t tsdbMerge(void *arg) { + int32_t code = 0; + int32_t lino = 0; + SMergeArg *mergeArg = (SMergeArg *)arg; + STsdb *tsdb = mergeArg->tsdb; + + SMerger merger[1] = {{ + .tsdb = tsdb, + .fid = mergeArg->fid, + .sttTrigger = tsdb->pVnode->config.sttTrigger, + }}; + + if (merger->sttTrigger <= 1) return 0; + + // copy snapshot + code = tsdbMergeGetFSet(merger); TSDB_CHECK_CODE(code, lino, _exit); + if (merger->fset == NULL) return 0; + + // do merge + tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid); code = tsdbDoMerge(merger); + tsdbDebug("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid); TSDB_CHECK_CODE(code, lino, _exit); - tsdbFSDestroyCopySnapshot(&merger->fsetArr); - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code); taosMsleep(100); exit(EXIT_FAILURE); - } else if (merger->ctx->opened) { - tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); } + tsdbTFileSetClear(&merger->fset); return code; } + +int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) { + SMergeArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + arg->tsdb = tsdb; + arg->fid = fid; + + int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, fid, TSDB_BG_TASK_MERGER, tsdbMerge, taosMemoryFree, arg, NULL); + if (code) taosMemoryFree(arg); + + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 6dd66c7a40..c32b2eedd7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -53,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee snprintf(pTsdb->path, TD_PATH_MAX, "%s%s%s", pVnode->path, TD_DIRSEP, dir); // taosRealPath(pTsdb->path, NULL, slen); pTsdb->pVnode = pVnode; - taosThreadRwlockInit(&pTsdb->rwLock, NULL); + taosThreadMutexInit(&pTsdb->mutex, NULL); if (!pKeepCfg) { tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg); } else { @@ -99,15 +99,14 @@ int tsdbClose(STsdb **pTsdb) { tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path, pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2, pdb->keepCfg.keepTimeOffset); - taosThreadRwlockWrlock(&(*pTsdb)->rwLock); + taosThreadMutexLock(&(*pTsdb)->mutex); tsdbMemTableDestroy((*pTsdb)->mem, true); (*pTsdb)->mem = NULL; - taosThreadRwlockUnlock(&(*pTsdb)->rwLock); - - taosThreadRwlockDestroy(&(*pTsdb)->rwLock); + taosThreadMutexUnlock(&(*pTsdb)->mutex); tsdbCloseFS(&(*pTsdb)->pFS); tsdbCloseCache(*pTsdb); + taosThreadMutexDestroy(&(*pTsdb)->mutex); taosMemoryFreeClear(*pTsdb); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 65cebf0ca0..41e0bd373e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1105,8 +1105,9 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p (pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer); } -static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, - int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { +static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, + STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, + SBrinRecord* pRecord) { bool asc = ASCENDING_TRAVERSE(order); if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { return false; @@ -1119,7 +1120,8 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo int32_t step = asc ? 1 : -1; // *nextIndex = pBlockInfo->tbBlockIdx + step; // *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); - STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); + STableDataBlockIdx* pTableDataBlockIdx = + taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); memcpy(pRecord, &p->record, sizeof(SBrinRecord)); @@ -1145,7 +1147,8 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock return -1; } -static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { +static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, + int32_t step) { if (index < 0 || index >= pBlockIter->numOfBlocks) { return -1; } @@ -1153,12 +1156,13 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index); pBlockIter->index += step; - if (index != pBlockIter->index) { + if (index != pBlockIter->index) { if (index > pBlockIter->index) { for (int32_t i = index - 1; i >= pBlockIter->index; --i) { SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); - STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = + getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); pTableDataBlockIdx->globalIndex = i + 1; @@ -1168,13 +1172,13 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte for (int32_t i = index + 1; i <= pBlockIter->index; ++i) { SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); - STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = + getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); pTableDataBlockIdx->globalIndex = i - 1; taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo); } - } taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock); @@ -1286,7 +1290,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* int32_t neighborIndex = 0; SBrinRecord rec = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec); + bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, + pReader->info.order, &rec); // overlap with neighbor if (hasNeighbor) { @@ -1420,9 +1425,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } } -static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { - tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); -} +static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); } static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) { tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree); @@ -1568,7 +1571,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1618,7 +1621,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == tsLast) { TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); + int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1826,8 +1829,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -2219,8 +2222,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; - int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : - (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); + int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) + ? pBlockData->aTSKEY[pDumpInfo->rowIndex] + : (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); if (pBlockScanInfo->iter.hasVal) { pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); } @@ -2257,7 +2261,8 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock *loadNeighbor = false; SBrinRecord rec = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec); + bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, + pReader->info.order, &rec); if (!hasNeighbor) { // do nothing return code; } @@ -2268,7 +2273,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock // 1. find the next neighbor block in the scan block list STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex); - int32_t neighborIndex = tableDataBlockIdx->globalIndex; + int32_t neighborIndex = tableDataBlockIdx->globalIndex; // 2. remove it from the scan block list setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); @@ -2704,7 +2709,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { - bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); + bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; if (!bHasDataInLastBlock || ((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) { @@ -3479,7 +3484,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, // start to merge duplicated rows STSchema* pTSchema = NULL; - if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory + if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid); if (pTSchema == NULL) { return terrno; @@ -3525,8 +3530,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p SRow** pTSRow) { SRowMerger* pMerger = &pReader->status.merger; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY ik = TSDBROW_KEY(piRow); + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBKEY ik = TSDBROW_KEY(piRow); STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -4907,12 +4912,12 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs SVersionRange* pRange = &pReader->info.verRange; // lock - taosThreadRwlockRdlock(&pTsdb->rwLock); + taosThreadMutexLock(&pTsdb->mutex); // alloc STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -4922,7 +4927,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pMem = pTsdb->mem; pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -4937,7 +4942,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pIMem = pTsdb->imem; pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); if (pSnap->pINode == NULL) { - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -4952,7 +4957,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); // unlock - taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadMutexUnlock(&pTsdb->mutex); if (code == TSDB_CODE_SUCCESS) { tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); @@ -5005,4 +5010,5 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr; } -void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } +void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index f2665dcf26..6c41b46c73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -25,11 +25,6 @@ typedef struct { TFileSetArray *fsetArr; TFileOpArray fopArr[1]; - - struct { - int32_t fsetArrIdx; - STFileSet *fset; - } ctx[1]; } SRTNer; static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { @@ -227,8 +222,8 @@ _exit: typedef struct { STsdb *tsdb; - int32_t sync; int64_t now; + int32_t fid; } SRtnArg; static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) { @@ -263,15 +258,15 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) { code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit); - taosThreadRwlockWrlock(&rtner->tsdb->rwLock); + taosThreadMutexLock(&rtner->tsdb->mutex); code = tsdbFSEditCommit(rtner->tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&rtner->tsdb->rwLock); + taosThreadMutexUnlock(&rtner->tsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } - taosThreadRwlockUnlock(&rtner->tsdb->rwLock); + taosThreadMutexUnlock(&rtner->tsdb->mutex); TARRAY2_DESTROY(rtner->fopArr, NULL); @@ -285,95 +280,83 @@ _exit: return code; } -static int32_t tsdbDoRetention2(void *arg) { - int32_t code = 0; - int32_t lino = 0; - SRTNer rtner[1] = {0}; +static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { + int32_t code = 0; + int32_t lino = 0; + STFileObj *fobj = NULL; + int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now); - code = tsdbDoRetentionBegin(arg, rtner); - TSDB_CHECK_CODE(code, lino, _exit); + if (expLevel < 0) { // remove the fileset + for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) { + if (fobj == NULL) continue; - for (rtner->ctx->fsetArrIdx = 0; rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr); rtner->ctx->fsetArrIdx++) { - rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx); - - STFileObj *fobj; - int32_t expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now); - - if (expLevel < 0) { // remove the file set - for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { - if (fobj == NULL) continue; - - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { - code = tsdbRemoveFileObjectS3(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDoRemoveFileObject(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - SSttLvl *lvl; - TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) { - TARRAY2_FOREACH(lvl->fobjArr, fobj) { - code = tsdbDoRemoveFileObject(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } else if (expLevel == 0) { - continue; - } else { - SDiskID did; - - if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) { - code = terrno; + int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { + code = tsdbRemoveFileObjectS3(rtner, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDoRemoveFileObject(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); } - tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did); + } - // data - for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { - if (fobj == NULL) continue; + SSttLvl *lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbDoRemoveFileObject(rtner, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else if (expLevel == 0) { // only migrate to upper level + return 0; + } else { // migrate + SDiskID did; + if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did); + + // data + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) { + if (fobj == NULL) continue; + + if (fobj->f->did.level == did.level) continue; + + int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); + if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { + code = tsdbMigrateDataFileS3(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + if (tsS3Enabled) { + int64_t fsize = 0; + if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(terrno); + tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__, + fobj->fname, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); + } + s3EvictCache(fobj->fname, fsize * 2); + } + + code = tsdbDoMigrateFileObj(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + // stt + SSttLvl *lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { if (fobj->f->did.level == did.level) continue; - int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { - code = tsdbMigrateDataFileS3(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (tsS3Enabled) { - int64_t fsize = 0; - if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(terrno); - tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__, - fobj->fname, tstrerror(code)); - TSDB_CHECK_CODE(code, lino, _exit); - } - s3EvictCache(fobj->fname, fsize * 2); - } - - code = tsdbDoMigrateFileObj(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // stt - SSttLvl *lvl; - TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) { - TARRAY2_FOREACH(lvl->fobjArr, fobj) { - if (fobj->f->did.level == did.level) continue; - - code = tsdbDoMigrateFileObj(rtner, fobj, &did); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbDoMigrateFileObj(rtner, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); } } } - code = tsdbDoRetentionEnd(rtner); - TSDB_CHECK_CODE(code, lino, _exit); - _exit: if (code) { if (TARRAY2_DATA(rtner->fopArr)) { @@ -389,30 +372,105 @@ _exit: return code; } -static void tsdbFreeRtnArg(void *arg) { - SRtnArg *rArg = (SRtnArg *)arg; - if (rArg->sync) { - tsem_post(&rArg->tsdb->pVnode->canCommit); +static int32_t tsdbDoRetentionSync(void *arg) { + int32_t code = 0; + int32_t lino = 0; + SRTNer rtner[1] = {0}; + + code = tsdbDoRetentionBegin(arg, rtner); + TSDB_CHECK_CODE(code, lino, _exit); + + STFileSet *fset; + TARRAY2_FOREACH(rtner->fsetArr, fset) { + code = tsdbDoRetentionOnFileSet(rtner, fset); + TSDB_CHECK_CODE(code, lino, _exit); } - taosMemoryFree(arg); + + code = tsdbDoRetentionEnd(rtner); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); + } + tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit); + return code; } -int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { - SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); - if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY; - arg->tsdb = tsdb; - arg->sync = sync; - arg->now = now; +static int32_t tsdbDoRetentionAsync(void *arg) { + int32_t code = 0; + int32_t lino = 0; + SRTNer rtner[1] = {0}; - if (sync) { - tsem_wait(&tsdb->pVnode->canCommit); + code = tsdbDoRetentionBegin(arg, rtner); + TSDB_CHECK_CODE(code, lino, _exit); + + STFileSet *fset; + TARRAY2_FOREACH(rtner->fsetArr, fset) { + if (fset->fid != ((SRtnArg *)arg)->fid) continue; + + code = tsdbDoRetentionOnFileSet(rtner, fset); + TSDB_CHECK_CODE(code, lino, _exit); } - int64_t taskid; - int32_t code = - tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid); + code = tsdbDoRetentionEnd(rtner); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: if (code) { - tsdbFreeRtnArg(arg); + TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code); } return code; } + +static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); } + +int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { + int32_t code = 0; + + if (sync) { // sync retention + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + arg->tsdb = tsdb; + arg->now = now; + arg->fid = INT32_MAX; + + tsem_wait(&tsdb->pVnode->canCommit); + code = vnodeScheduleTask(tsdbDoRetentionSync, arg); + if (code) { + tsem_post(&tsdb->pVnode->canCommit); + taosMemoryFree(arg); + return code; + } + } else { // async retention + taosThreadMutexLock(&tsdb->mutex); + + STFileSet *fset; + TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); + if (arg == NULL) { + taosThreadMutexUnlock(&tsdb->mutex); + return TSDB_CODE_OUT_OF_MEMORY; + } + + arg->tsdb = tsdb; + arg->now = now; + arg->fid = fset->fid; + + code = tsdbFSScheduleBgTask(tsdb->pFS, fset->fid, TSDB_BG_TASK_RETENTION, tsdbDoRetentionAsync, tsdbFreeRtnArg, + arg, NULL); + if (code) { + tsdbFreeRtnArg(arg); + taosThreadMutexUnlock(&tsdb->mutex); + return code; + } + } + + taosThreadMutexUnlock(&tsdb->mutex); + } + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 3b4827a6be..df7154b775 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -38,8 +38,8 @@ struct STsdbSnapReader { struct { int32_t fsrArrIdx; STSnapRange* fsr; - bool isDataDone; - bool isTombDone; + bool isDataDone; + bool isTombDone; } ctx[1]; // reader @@ -1095,17 +1095,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { code = tsdbFSEditAbort(writer[0]->tsdb->pFS); TSDB_CHECK_CODE(code, lino, _exit); } else { - taosThreadRwlockWrlock(&writer[0]->tsdb->rwLock); + taosThreadMutexLock(&writer[0]->tsdb->mutex); code = tsdbFSEditCommit(writer[0]->tsdb->pFS); if (code) { - taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; - taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); } tsdbFSEnableBgTask(tsdb->pFS); @@ -1236,7 +1236,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP if (fset->farr[ftype] == NULL) continue; typ = tsdbFTypeToSRangeTyp(ftype); ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX); - STFile* f = fset->farr[ftype]->f; + STFile* f = fset->farr[ftype]->f; if (f->maxVer > fset->maxVerValid) { corrupt = true; tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 @@ -1255,7 +1255,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP TARRAY2_FOREACH(fset->lvlArr, lvl) { STFileObj* fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { - STFile* f = fobj->f; + STFile* f = fobj->f; if (f->maxVer > fset->maxVerValid) { corrupt = true; tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 @@ -1299,7 +1299,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) { } int32_t code = 0; - taosThreadRwlockRdlock(&fs->tsdb->rwLock); + taosThreadMutexLock(&fs->tsdb->mutex); STFileSet* fset; TARRAY2_FOREACH(fs->fSetArr, fset) { STsdbSnapPartition* pItem = NULL; @@ -1311,7 +1311,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) { code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); ASSERT(code == 0); } - taosThreadRwlockUnlock(&fs->tsdb->rwLock); + taosThreadMutexUnlock(&fs->tsdb->mutex); if (code) { TARRAY2_DESTROY(pList, tsdbSnapPartitionClear); From a7f3041ff3e28179d7916a103e45967608fad1db Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 27 Oct 2023 17:59:27 +0800 Subject: [PATCH 2/2] more fix --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index ee3abf7559..cc77474e79 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -191,7 +191,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; pMemTable->minVer = TMIN(pMemTable->minVer, version); - pMemTable->maxVer = TMIN(pMemTable->maxVer, version); + pMemTable->maxVer = TMAX(pMemTable->maxVer, version); /* if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);