Merge pull request #23440 from taosdata/feat/TD-26684

feat: concurrency on fileset
This commit is contained in:
Hongze Cheng 2023-10-31 00:52:47 -05:00 committed by GitHub
commit 987f6faf58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 543 additions and 444 deletions

View File

@ -309,7 +309,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STs
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(void *arg); int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid);
// tsdbDiskData ============================================================================================== // tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder); int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
@ -371,7 +371,7 @@ struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
STsdbKeepCfg keepCfg; STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock; TdThreadMutex mutex;
SMemTable *mem; SMemTable *mem;
SMemTable *imem; SMemTable *imem;
STsdbFS fs; // old STsdbFS fs; // old
@ -668,8 +668,8 @@ struct SDelFWriter {
}; };
#include "tarray2.h" #include "tarray2.h"
//#include "tsdbFS2.h" // #include "tsdbFS2.h"
// struct STFileSet; // struct STFileSet;
typedef struct STFileSet STFileSet; typedef struct STFileSet STFileSet;
typedef TARRAY2(STFileSet *) TFileSetArray; typedef TARRAY2(STFileSet *) TFileSetArray;
@ -677,9 +677,9 @@ typedef struct STSnapRange STSnapRange;
typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges
// util // util
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges); SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges);
// snap partition list // snap partition list
@ -873,8 +873,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
void tMergeTreePinSttBlock(SMergeTree* pMTree); void tMergeTreePinSttBlock(SMergeTree *pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree* pMTree); void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);

View File

@ -31,8 +31,6 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore); static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);

View File

@ -131,7 +131,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// lock // lock
if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) { if ((code = taosThreadMutexLock(&pTsdb->mutex))) {
code = TAOS_SYSTEM_ERROR(code); code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
pTsdb->mem = pMemTable; pTsdb->mem = pMemTable;
// unlock // unlock
if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) { if ((code = taosThreadMutexUnlock(&pTsdb->mutex))) {
code = TAOS_SYSTEM_ERROR(code); code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -152,11 +152,11 @@ _exit:
} }
int32_t tsdbPrepareCommit(STsdb *pTsdb) { int32_t tsdbPrepareCommit(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadMutexLock(&pTsdb->mutex);
ASSERT(pTsdb->imem == NULL); ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem; pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL; pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
return 0; return 0;
} }
@ -171,9 +171,9 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
// check // check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadMutexLock(&pTsdb->mutex);
pTsdb->imem = NULL; pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
goto _exit; goto _exit;
@ -501,6 +501,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL; SDFileSet *pRSet = NULL;
// memory // memory
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
@ -798,6 +799,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
// commit file data start // commit file data start
code = tsdbCommitFileDataStart(pCommitter); code = tsdbCommitFileDataStart(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -1650,18 +1652,18 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
// lock // lock
taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadMutexLock(&pTsdb->mutex);
code = tsdbFSCommit(pTsdb); code = tsdbFSCommit(pTsdb);
if (code) { if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
pTsdb->imem = NULL; pTsdb->imem = NULL;
// unlock // unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
if (pMemTable) { if (pMemTable) {
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
} }

View File

@ -367,7 +367,12 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t lino = 0; int32_t lino = 0;
STsdb *tsdb = committer->tsdb; STsdb *tsdb = committer->tsdb;
committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision); int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
// check if can commit
tsdbFSCheckCommit(tsdb, fid);
committer->ctx->fid = fid;
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now); committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey, tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
&committer->ctx->maxKey); &committer->ctx->maxKey);
@ -549,11 +554,11 @@ _exit:
} }
int32_t tsdbPreCommit(STsdb *tsdb) { int32_t tsdbPreCommit(STsdb *tsdb) {
taosThreadRwlockWrlock(&tsdb->rwLock); taosThreadMutexLock(&tsdb->mutex);
ASSERT(tsdb->imem == NULL); ASSERT(tsdb->imem == NULL);
tsdb->imem = tsdb->mem; tsdb->imem = tsdb->mem;
tsdb->mem = NULL; tsdb->mem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock); taosThreadMutexUnlock(&tsdb->mutex);
return 0; return 0;
} }
@ -568,15 +573,13 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
int64_t nDel = imem->nDel; int64_t nDel = imem->nDel;
if (nRow == 0 && nDel == 0) { if (nRow == 0 && nDel == 0) {
taosThreadRwlockWrlock(&tsdb->rwLock); taosThreadMutexLock(&tsdb->mutex);
tsdb->imem = NULL; tsdb->imem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock); taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(imem, NULL, true); tsdbUnrefMemTable(imem, NULL, true);
} else { } else {
SCommitter2 committer[1]; SCommitter2 committer[1];
tsdbFSCheckCommit(tsdb->pFS);
code = tsdbOpenCommitter(tsdb, info, committer); code = tsdbOpenCommitter(tsdb, info, committer);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -605,14 +608,14 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
if (tsdb->imem == NULL) goto _exit; if (tsdb->imem == NULL) goto _exit;
SMemTable *pMemTable = tsdb->imem; SMemTable *pMemTable = tsdb->imem;
taosThreadRwlockWrlock(&tsdb->rwLock); taosThreadMutexLock(&tsdb->mutex);
code = tsdbFSEditCommit(tsdb->pFS); code = tsdbFSEditCommit(tsdb->pFS);
if (code) { if (code) {
taosThreadRwlockUnlock(&tsdb->rwLock); taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tsdb->imem = NULL; tsdb->imem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock); taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
_exit: _exit:
@ -640,4 +643,4 @@ _exit:
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
} }
return code; return code;
} }

View File

@ -55,25 +55,11 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArr);
TARRAY2_INIT(fs[0]->fSetArrTmp); TARRAY2_INIT(fs[0]->fSetArrTmp);
// background task queue
taosThreadMutexInit(fs[0]->mutex, NULL);
fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue;
fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue;
taosThreadMutexInit(&fs[0]->commitMutex, NULL);
taosThreadCondInit(&fs[0]->canCommit, NULL);
fs[0]->blockCommit = false;
return 0; return 0;
} }
static int32_t destroy_fs(STFileSystem **fs) { static int32_t destroy_fs(STFileSystem **fs) {
if (fs[0] == NULL) return 0; if (fs[0] == NULL) return 0;
taosThreadMutexDestroy(&fs[0]->commitMutex);
taosThreadCondDestroy(&fs[0]->canCommit);
taosThreadMutexDestroy(fs[0]->mutex);
ASSERT(fs[0]->bgTaskNum == 0);
TARRAY2_DESTROY(fs[0]->fSetArr, NULL); TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL); TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
@ -264,10 +250,11 @@ static int32_t apply_commit(STFileSystem *fs) {
if (fset1 && fset2) { if (fset1 && fset2) {
if (fset1->fid < fset2->fid) { if (fset1->fid < fset2->fid) {
// delete fset1 // delete fset1
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); tsdbTFileSetRemove(fset1);
i1++;
} else if (fset1->fid > fset2->fid) { } else if (fset1->fid > fset2->fid) {
// create new file set with fid of fset2->fid // create new file set with fid of fset2->fid
code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1); code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
if (code) return code; if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code; if (code) return code;
@ -282,10 +269,11 @@ static int32_t apply_commit(STFileSystem *fs) {
} }
} else if (fset1) { } else if (fset1) {
// delete fset1 // delete fset1
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove); tsdbTFileSetRemove(fset1);
i1++;
} else { } else {
// create new file set with fid of fset2->fid // create new file set with fid of fset2->fid
code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1); code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
if (code) return code; if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn); code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code; if (code) return code;
@ -512,7 +500,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) { TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbFSDoScanAndFixFile(fs, fobj); code = tsdbFSDoScanAndFixFile(fs, fobj);
if (code) { if (code) {
fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1; fset->maxVerValid =
(fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
corrupt = true; corrupt = true;
} }
} }
@ -592,7 +581,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) {
const STFileSet *fset1; const STFileSet *fset1;
TARRAY2_FOREACH(src, fset1) { TARRAY2_FOREACH(src, fset1) {
STFileSet *fset2; STFileSet *fset2;
code = tsdbTFileSetInitDup(fs->tsdb, fset1, &fset2); code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
if (code) return code; if (code) return code;
code = TARRAY2_APPEND(dst, fset2); code = TARRAY2_APPEND(dst, fset2);
if (code) return code; if (code) return code;
@ -665,12 +654,6 @@ static int32_t close_file_system(STFileSystem *fs) {
return 0; return 0;
} }
static int32_t apply_edit(STFileSystem *pFS) {
int32_t code = 0;
ASSERTS(0, "TODO: Not implemented yet");
return code;
}
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) { static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
if (pSet1->fid < pSet2->fid) { if (pSet1->fid < pSet2->fid) {
return -1; return -1;
@ -710,10 +693,23 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// remove empty file set // remove empty empty stt level and empty file set
int32_t i = 0; int32_t i = 0;
while (i < TARRAY2_SIZE(fsetArray)) { while (i < TARRAY2_SIZE(fsetArray)) {
fset = TARRAY2_GET(fsetArray, i); fset = TARRAY2_GET(fsetArray, i);
SSttLvl *lvl;
int32_t j = 0;
while (j < TARRAY2_SIZE(fset->lvlArr)) {
lvl = TARRAY2_GET(fset->lvlArr, j);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
} else {
j++;
}
}
if (tsdbTFileSetIsEmpty(fset)) { if (tsdbTFileSetIsEmpty(fset)) {
TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear); TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
} else { } else {
@ -753,13 +749,13 @@ _exit:
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) { static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
task->numWait++; task->numWait++;
taosThreadCondWait(task->done, fs->mutex); taosThreadCondWait(task->done, &fs->tsdb->mutex);
task->numWait--; task->numWait--;
if (task->numWait == 0) { if (task->numWait == 0) {
taosThreadCondDestroy(task->done); taosThreadCondDestroy(task->done);
if (task->free) { if (task->destroy) {
task->free(task->arg); task->destroy(task->arg);
} }
taosMemoryFree(task); taosMemoryFree(task);
} }
@ -770,8 +766,8 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
taosThreadCondBroadcast(task->done); taosThreadCondBroadcast(task->done);
} else { } else {
taosThreadCondDestroy(task->done); taosThreadCondDestroy(task->done);
if (task->free) { if (task->destroy) {
task->free(task->arg); task->destroy(task->arg);
} }
taosMemoryFree(task); taosMemoryFree(task);
} }
@ -780,23 +776,16 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
int32_t tsdbCloseFS(STFileSystem **fs) { int32_t tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return 0; if (fs[0] == NULL) return 0;
taosThreadMutexLock(fs[0]->mutex); tsdbFSDisableBgTask(fs[0]);
fs[0]->stop = true;
if (fs[0]->bgTaskRunning) {
tsdbDoWaitBgTask(fs[0], fs[0]->bgTaskRunning);
}
taosThreadMutexUnlock(fs[0]->mutex);
close_file_system(fs[0]); close_file_system(fs[0]);
destroy_fs(fs); destroy_fs(fs);
return 0; return 0;
} }
int64_t tsdbFSAllocEid(STFileSystem *fs) { int64_t tsdbFSAllocEid(STFileSystem *fs) {
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
int64_t cid = ++fs->neid; int64_t cid = ++fs->neid;
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
return cid; return cid;
} }
@ -837,27 +826,34 @@ _exit:
return code; return code;
} }
static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) { static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
taosThreadMutexLock(&fs->commitMutex);
if (block) { if (block) {
fs->blockCommit = true; fset->blockCommit = true;
} else { } else {
fs->blockCommit = false; fset->blockCommit = false;
taosThreadCondSignal(&fs->canCommit); if (fset->numWaitCommit > 0) {
taosThreadCondSignal(&fset->canCommit);
}
} }
taosThreadMutexUnlock(&fs->commitMutex);
return 0; return 0;
} }
int32_t tsdbFSCheckCommit(STFileSystem *fs) { int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
taosThreadMutexLock(&fs->commitMutex); taosThreadMutexLock(&tsdb->mutex);
while (fs->blockCommit) { STFileSet *fset;
taosThreadCondWait(&fs->canCommit, &fs->commitMutex); tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset) {
while (fset->blockCommit) {
fset->numWaitCommit++;
taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
fset->numWaitCommit--;
}
} }
taosThreadMutexUnlock(&fs->commitMutex); taosThreadMutexUnlock(&tsdb->mutex);
return 0; return 0;
} }
// IMPORTANT: the caller must hold fs->tsdb->mutex
int32_t tsdbFSEditCommit(STFileSystem *fs) { int32_t tsdbFSEditCommit(STFileSystem *fs) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -867,36 +863,57 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// schedule merge // schedule merge
if (fs->tsdb->pVnode->config.sttTrigger > 1) { int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
if (sttTrigger > 1) {
STFileSet *fset; STFileSet *fset;
int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
bool schedMerge = false;
bool blockCommit = false;
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) { TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue; if (TARRAY2_SIZE(fset->lvlArr) == 0) {
tsdbFSSetBlockCommit(fset, false);
continue;
}
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
if (lvl->level != 0) continue; if (lvl->level != 0) {
tsdbFSSetBlockCommit(fset, false);
continue;
}
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr); int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger) { if (numFile >= sttTrigger) {
schedMerge = true; // launch merge
code = tsdbSchedMerge(fs->tsdb, fset->fid);
TSDB_CHECK_CODE(code, lino, _exit);
} }
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) { if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
blockCommit = true; tsdbFSSetBlockCommit(fset, true);
} else {
tsdbFSSetBlockCommit(fset, false);
} }
}
}
if (schedMerge && blockCommit) break; // clear empty level and fset
int32_t i = 0;
while (i < TARRAY2_SIZE(fs->fSetArr)) {
STFileSet *fset = TARRAY2_GET(fs->fSetArr, i);
int32_t j = 0;
while (j < TARRAY2_SIZE(fset->lvlArr)) {
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
} else {
j++;
}
} }
if (schedMerge) { if (tsdbTFileSetIsEmpty(fset) && fset->bgTaskRunning == NULL) {
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL); TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear);
TSDB_CHECK_CODE(code, lino, _exit); } else {
i++;
} }
tsdbFSSetBlockCommit(fs, blockCommit);
} }
_exit: _exit:
@ -933,15 +950,15 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
TARRAY2_INIT(fsetArr[0]); TARRAY2_INIT(fsetArr[0]);
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
code = tsdbTFileSetInitDup(fs->tsdb, fset, &fset1); code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
if (code) break; if (code) break;
code = TARRAY2_APPEND(fsetArr[0], fset1); code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break; if (code) break;
} }
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
@ -961,9 +978,9 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
} }
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr); int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
return code; return code;
} }
@ -1017,7 +1034,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange
} }
} }
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t ever = VERSION_MAX; int64_t ever = VERSION_MAX;
if (pHash) { if (pHash) {
@ -1034,7 +1051,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange
code = TARRAY2_APPEND(fsetArr[0], fset1); code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break; if (code) break;
} }
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
_out: _out:
if (code) { if (code) {
@ -1089,7 +1106,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
} }
} }
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t sver1 = sver; int64_t sver1 = sver;
int64_t ever1 = ever; int64_t ever1 = ever;
@ -1118,7 +1135,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
fsr1 = NULL; fsr1 = NULL;
} }
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
tsdbTSnapRangeClear(&fsr1); tsdbTSnapRangeClear(&fsr1);
@ -1137,59 +1154,69 @@ _out:
const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"}; const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};
static int32_t tsdbFSRunBgTask(void *arg) { static int32_t tsdbFSRunBgTask(void *arg) {
STFileSystem *fs = (STFileSystem *)arg; STFSBgTask *task = (STFSBgTask *)arg;
STFileSystem *fs = task->fs;
STFileSet *fset;
ASSERT(fs->bgTaskRunning != NULL); tsdbFSGetFSet(fs, task->fid, &fset);
fs->bgTaskRunning->launchTime = taosGetTimestampMs(); ASSERT(fset != NULL && fset->bgTaskRunning == task);
fs->bgTaskRunning->run(fs->bgTaskRunning->arg);
fs->bgTaskRunning->finishTime = taosGetTimestampMs(); task->launchTime = taosGetTimestampMs();
task->run(task->arg);
task->finishTime = taosGetTimestampMs();
tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64 tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64
" finish time:%" PRId64, " finish time:%" PRId64,
TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->taskid, TD_VID(fs->tsdb->pVnode), gFSBgTaskName[task->type], task->taskid, task->scheduleTime, task->launchTime,
fs->bgTaskRunning->scheduleTime, fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime); task->finishTime);
taosThreadMutexLock(fs->mutex); taosThreadMutexLock(&fs->tsdb->mutex);
// free last // free last
tsdbDoDoneBgTask(fs, fs->bgTaskRunning); tsdbDoDoneBgTask(fs, task);
fs->bgTaskRunning = NULL; fset->bgTaskRunning = NULL;
// schedule next // schedule next
if (fs->bgTaskNum > 0) { if (fset->bgTaskNum > 0) {
if (fs->stop) { if (fs->stop) {
while (fs->bgTaskNum > 0) { while (fset->bgTaskNum > 0) {
STFSBgTask *task = fs->bgTaskQueue->next; STFSBgTask *nextTask = fset->bgTaskQueue->next;
task->prev->next = task->next; nextTask->prev->next = nextTask->next;
task->next->prev = task->prev; nextTask->next->prev = nextTask->prev;
fs->bgTaskNum--; fset->bgTaskNum--;
tsdbDoDoneBgTask(fs, task); tsdbDoDoneBgTask(fs, nextTask);
} }
} else { } else {
// pop task from head // pop task from head
fs->bgTaskRunning = fs->bgTaskQueue->next; fset->bgTaskRunning = fset->bgTaskQueue->next;
fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next; fset->bgTaskRunning->prev->next = fset->bgTaskRunning->next;
fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev; fset->bgTaskRunning->next->prev = fset->bgTaskRunning->prev;
fs->bgTaskNum--; fset->bgTaskNum--;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg); vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fset->bgTaskRunning);
} }
} }
taosThreadMutexUnlock(fs->mutex); taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0; return 0;
} }
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), // IMPORTANT: the caller must hold the fs->tsdb->mutex
void (*destroy)(void *), void *arg, int64_t *taskid) { int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid) {
if (fs->stop) { if (fs->stop) {
if (destroy) { if (destroy) {
destroy(arg); destroy(arg);
} }
return 0; // TODO: use a better error code return 0;
} }
for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) { STFileSet *fset;
tsdbFSGetFSet(fs, fid, &fset);
ASSERT(fset != NULL);
for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) {
if (task->type == type) { if (task->type == type) {
if (destroy) { if (destroy) {
destroy(arg); destroy(arg);
@ -1203,22 +1230,24 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int
if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadCondInit(task->done, NULL); taosThreadCondInit(task->done, NULL);
task->fs = fs;
task->fid = fid;
task->type = type; task->type = type;
task->run = run; task->run = run;
task->free = destroy; task->destroy = destroy;
task->arg = arg; task->arg = arg;
task->scheduleTime = taosGetTimestampMs(); task->scheduleTime = taosGetTimestampMs();
task->taskid = ++fs->taskid; task->taskid = ++fs->taskid;
if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) { if (fset->bgTaskRunning == NULL && fset->bgTaskNum == 0) {
// launch task directly // launch task directly
fs->bgTaskRunning = task; fset->bgTaskRunning = task;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs); vnodeScheduleTaskEx(1, tsdbFSRunBgTask, task);
} else { } else {
// add to the queue tail // add to the queue tail
fs->bgTaskNum++; fset->bgTaskNum++;
task->next = fs->bgTaskQueue; task->next = fset->bgTaskQueue;
task->prev = fs->bgTaskQueue->prev; task->prev = fset->bgTaskQueue->prev;
task->prev->next = task; task->prev->next = task;
task->next->prev = task; task->next->prev = task;
} }
@ -1227,68 +1256,30 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int
return 0; return 0;
} }
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
int64_t *taskid) { taosThreadMutexLock(&fs->tsdb->mutex);
taosThreadMutexLock(fs->mutex); for (;;) {
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid); fs->stop = true;
taosThreadMutexUnlock(fs->mutex); bool done = true;
return code;
}
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) { STFileSet *fset;
STFSBgTask *task = NULL; TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->bgTaskRunning) {
taosThreadMutexLock(fs->mutex); tsdbDoWaitBgTask(fs, fset->bgTaskRunning);
done = false;
if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) {
task = fs->bgTaskRunning;
} else {
for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) {
if (taskt->taskid == taskid) {
task = taskt;
break; break;
} }
} }
}
if (task) { if (done) break;
tsdbDoWaitBgTask(fs, task);
} }
taosThreadMutexUnlock(&fs->tsdb->mutex);
taosThreadMutexUnlock(fs->mutex);
return 0; return 0;
} }
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
while (fs->bgTaskRunning) {
taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex);
}
taosThreadMutexUnlock(fs->mutex);
return 0;
}
static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) {
fs->stop = true;
if (fs->bgTaskRunning) {
tsdbDoWaitBgTask(fs, fs->bgTaskRunning);
}
return 0;
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
int32_t code = tsdbFSDoDisableBgTask(fs);
taosThreadMutexUnlock(fs->mutex);
return code;
}
int32_t tsdbFSEnableBgTask(STFileSystem *fs) { int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex); taosThreadMutexLock(&fs->tsdb->mutex);
fs->stop = false; fs->stop = false;
taosThreadMutexUnlock(fs->mutex); taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0; return 0;
} }

View File

@ -22,22 +22,11 @@
extern "C" { extern "C" {
#endif #endif
/* Exposed Handle */
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
// typedef TARRAY2(STFileSet *) TFileSetArray;
typedef enum { typedef enum {
TSDB_FEDIT_COMMIT = 1, // TSDB_FEDIT_COMMIT = 1, //
TSDB_FEDIT_MERGE TSDB_FEDIT_MERGE
} EFEditT; } EFEditT;
typedef enum {
TSDB_BG_TASK_MERGER = 1,
TSDB_BG_TASK_RETENTION,
TSDB_BG_TASK_COMPACT,
} EFSBgTaskT;
typedef enum { typedef enum {
TSDB_FCURRENT = 1, TSDB_FCURRENT = 1,
TSDB_FCURRENT_C, // for commit TSDB_FCURRENT_C, // for commit
@ -67,37 +56,17 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
int32_t tsdbFSEditCommit(STFileSystem *fs); int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task // background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg, int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
int64_t *taskid); void (*destroy)(void *), void *arg, int64_t *taskid);
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
int32_t tsdbFSDisableBgTask(STFileSystem *fs); int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs); int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other // other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
int32_t tsdbFSCheckCommit(STFileSystem *fs); int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
// utils // utils
int32_t save_fs(const TFileSetArray *arr, const char *fname); int32_t save_fs(const TFileSetArray *arr, const char *fname);
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
struct STFSBgTask {
EFSBgTaskT type;
int32_t (*run)(void *arg);
void (*free)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
};
/* Exposed Structs */ /* Exposed Structs */
struct STFileSystem { struct STFileSystem {
STsdb *tsdb; STsdb *tsdb;
@ -109,17 +78,8 @@ struct STFileSystem {
TFileSetArray fSetArrTmp[1]; TFileSetArray fSetArrTmp[1];
// background task queue // background task queue
TdThreadMutex mutex[1]; bool stop;
bool stop; int64_t taskid;
int64_t taskid;
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
// block commit variables
TdThreadMutex commitMutex;
TdThreadCond canCommit;
bool blockCommit;
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -342,11 +342,6 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ); int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
ASSERT(idx >= 0); ASSERT(idx >= 0);
TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj); TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
// TODO: remove the stt level if no file exists anymore
// TARRAY2_REMOVE(&fset->lvlArr, lvl - fset->lvlArr.data, tsdbSttLvlClear);
}
} else { } else {
ASSERT(tsdbIsSameTFile(&op->of, fset->farr[op->of.type]->f)); ASSERT(tsdbIsSameTFile(&op->of, fset->farr[op->of.type]->f));
tsdbTFileObjUnref(fset->farr[op->of.type]); tsdbTFileObjUnref(fset->farr[op->of.type]);
@ -454,10 +449,22 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
fset[0]->fid = fid; fset[0]->fid = fid;
fset[0]->maxVerValid = VERSION_MAX; fset[0]->maxVerValid = VERSION_MAX;
TARRAY2_INIT(fset[0]->lvlArr); TARRAY2_INIT(fset[0]->lvlArr);
// background task queue
fset[0]->bgTaskNum = 0;
fset[0]->bgTaskQueue->next = fset[0]->bgTaskQueue;
fset[0]->bgTaskQueue->prev = fset[0]->bgTaskQueue;
fset[0]->bgTaskRunning = NULL;
// block commit variables
taosThreadCondInit(&fset[0]->canCommit, NULL);
fset[0]->numWaitCommit = 0;
fset[0]->blockCommit = false;
return 0; return 0;
} }
int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) { int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
int32_t code = tsdbTFileSetInit(fset1->fid, fset); int32_t code = tsdbTFileSetInit(fset1->fid, fset);
if (code) return code; if (code) return code;
@ -588,21 +595,23 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear); TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear);
taosThreadCondDestroy(&fset[0]->canCommit);
taosMemoryFree(fset[0]); taosMemoryFree(fset[0]);
fset[0] = NULL; fset[0] = NULL;
return 0; return 0;
} }
int32_t tsdbTFileSetRemove(STFileSet **fset) { int32_t tsdbTFileSetRemove(STFileSet *fset) {
if (fset == NULL) return 0;
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset[0]->farr[ftype] == NULL) continue; if (fset->farr[ftype] == NULL) continue;
tsdbTFileObjRemove(fset[0]->farr[ftype]); tsdbTFileObjRemove(fset->farr[ftype]);
} }
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlRemove); TARRAY2_DESTROY(fset->lvlArr, tsdbSttLvlRemove);
taosMemoryFree(fset[0]);
fset[0] = NULL;
return 0; return 0;
} }

View File

@ -28,6 +28,8 @@ typedef struct SSttLvl SSttLvl;
typedef TARRAY2(STFileObj *) TFileObjArray; typedef TARRAY2(STFileObj *) TFileObjArray;
typedef TARRAY2(SSttLvl *) TSttLvlArray; typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray; typedef TARRAY2(STFileOp) TFileOpArray;
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
typedef enum { typedef enum {
TSDB_FOP_NONE = 0, TSDB_FOP_NONE = 0,
@ -41,10 +43,10 @@ typedef enum {
// init/clear // init/clear
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset); int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset); int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetClear(STFileSet **fset); int32_t tsdbTFileSetClear(STFileSet **fset);
int32_t tsdbTFileSetRemove(STFileSet **fset); int32_t tsdbTFileSetRemove(STFileSet *fset);
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
TFileOpArray *fopArr); TFileOpArray *fopArr);
@ -58,6 +60,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
// cmpr // cmpr
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2); int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
// edit // edit
int32_t tsdbSttLvlClear(SSttLvl **lvl);
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op); int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset); int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset);
// max commit id // max commit id
@ -70,6 +73,33 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset);
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl); int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
int32_t tsdbSttLvlClear(SSttLvl **lvl); int32_t tsdbSttLvlClear(SSttLvl **lvl);
typedef enum {
TSDB_BG_TASK_MERGER = 1,
TSDB_BG_TASK_RETENTION,
TSDB_BG_TASK_COMPACT,
} EFSBgTaskT;
struct STFSBgTask {
STFileSystem *fs;
int32_t fid;
EFSBgTaskT type;
int32_t (*run)(void *arg);
void (*destroy)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
};
struct STFileOp { struct STFileOp {
tsdb_fop_t optype; tsdb_fop_t optype;
int32_t fid; int32_t fid;
@ -87,6 +117,16 @@ struct STFileSet {
int64_t maxVerValid; int64_t maxVerValid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array TSttLvlArray lvlArr[1]; // level array
// background task queue
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
// block commit variables
TdThreadCond canCommit;
int32_t numWaitCommit;
bool blockCommit;
}; };
struct STSnapRange { struct STSnapRange {

View File

@ -191,7 +191,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++; pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version); pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
/* /*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);

View File

@ -15,11 +15,17 @@
#include "tsdbMerge.h" #include "tsdbMerge.h"
#define TSDB_MAX_LEVEL 6 // means max level is 7 #define TSDB_MAX_LEVEL 2 // means max level is 3
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;
TFileSetArray *fsetArr; int32_t fid;
} SMergeArg;
typedef struct {
STsdb *tsdb;
int32_t fid;
STFileSet *fset;
int32_t sttTrigger; int32_t sttTrigger;
int32_t maxRow; int32_t maxRow;
@ -313,7 +319,6 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
if (merger->ctx->fset->farr[ftype]) { if (merger->ctx->fset->farr[ftype]) {
config.files[ftype].exist = true; config.files[ftype].exist = true;
config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0]; config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
} else { } else {
config.files[ftype].exist = false; config.files[ftype].exist = false;
} }
@ -397,13 +402,13 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE); code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&merger->tsdb->rwLock); taosThreadMutexLock(&merger->tsdb->mutex);
code = tsdbFSEditCommit(merger->tsdb->pFS); code = tsdbFSEditCommit(merger->tsdb->pFS);
if (code) { if (code) {
taosThreadRwlockUnlock(&merger->tsdb->rwLock); taosThreadMutexUnlock(&merger->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosThreadRwlockUnlock(&merger->tsdb->rwLock); taosThreadMutexUnlock(&merger->tsdb->mutex);
_exit: _exit:
if (code) { if (code) {
@ -478,30 +483,21 @@ _exit:
} }
static int32_t tsdbDoMerge(SMerger *merger) { static int32_t tsdbDoMerge(SMerger *merger) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SSttLvl *lvl = TARRAY2_FIRST(merger->fset->lvlArr);
STFileSet *fset; if (TARRAY2_SIZE(merger->fset->lvlArr) == 0) return 0;
TARRAY2_FOREACH(merger->fsetArr, fset) { if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) return 0;
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr); code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue; code = tsdbMergeFileSet(merger, merger->fset);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx->opened) { code = tsdbMergerClose(merger);
code = tsdbMergerOpen(merger); TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeFileSet(merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx->opened) {
code = tsdbMergerClose(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit: _exit:
if (code) { if (code) {
@ -512,36 +508,73 @@ _exit:
return code; return code;
} }
int32_t tsdbMerge(void *arg) { static int32_t tsdbMergeGetFSet(SMerger *merger) {
int32_t code = 0; STFileSet *fset;
int32_t lino = 0;
STsdb *tsdb = (STsdb *)arg;
SMerger merger[1] = {{ taosThreadMutexLock(&merger->tsdb->mutex);
.tsdb = tsdb, tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
.sttTrigger = tsdb->pVnode->config.sttTrigger, if (fset == NULL) {
}}; taosThreadMutexUnlock(&merger->tsdb->mutex);
if (merger->sttTrigger <= 1) {
return 0; return 0;
} }
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr); int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
if (code) {
taosThreadMutexUnlock(&merger->tsdb->mutex);
return code;
}
taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0;
}
static int32_t tsdbMerge(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SMergeArg *mergeArg = (SMergeArg *)arg;
STsdb *tsdb = mergeArg->tsdb;
SMerger merger[1] = {{
.tsdb = tsdb,
.fid = mergeArg->fid,
.sttTrigger = tsdb->pVnode->config.sttTrigger,
}};
if (merger->sttTrigger <= 1) return 0;
// copy snapshot
code = tsdbMergeGetFSet(merger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (merger->fset == NULL) return 0;
// do merge
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
code = tsdbDoMerge(merger); code = tsdbDoMerge(merger);
tsdbDebug("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSDestroyCopySnapshot(&merger->fsetArr);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code); tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code);
taosMsleep(100); taosMsleep(100);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} else if (merger->ctx->opened) {
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
} }
tsdbTFileSetClear(&merger->fset);
return code; return code;
} }
int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) {
SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->fid = fid;
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, fid, TSDB_BG_TASK_MERGER, tsdbMerge, taosMemoryFree, arg, NULL);
if (code) taosMemoryFree(arg);
return code;
}

View File

@ -53,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
snprintf(pTsdb->path, TD_PATH_MAX, "%s%s%s", pVnode->path, TD_DIRSEP, dir); snprintf(pTsdb->path, TD_PATH_MAX, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
// taosRealPath(pTsdb->path, NULL, slen); // taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode; pTsdb->pVnode = pVnode;
taosThreadRwlockInit(&pTsdb->rwLock, NULL); taosThreadMutexInit(&pTsdb->mutex, NULL);
if (!pKeepCfg) { if (!pKeepCfg) {
tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg); tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg);
} else { } else {
@ -99,15 +99,14 @@ int tsdbClose(STsdb **pTsdb) {
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path, tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2, pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2,
pdb->keepCfg.keepTimeOffset); pdb->keepCfg.keepTimeOffset);
taosThreadRwlockWrlock(&(*pTsdb)->rwLock); taosThreadMutexLock(&(*pTsdb)->mutex);
tsdbMemTableDestroy((*pTsdb)->mem, true); tsdbMemTableDestroy((*pTsdb)->mem, true);
(*pTsdb)->mem = NULL; (*pTsdb)->mem = NULL;
taosThreadRwlockUnlock(&(*pTsdb)->rwLock); taosThreadMutexUnlock(&(*pTsdb)->mutex);
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
tsdbCloseFS(&(*pTsdb)->pFS); tsdbCloseFS(&(*pTsdb)->pFS);
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);
taosThreadMutexDestroy(&(*pTsdb)->mutex);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);
} }
return 0; return 0;

View File

@ -1105,8 +1105,9 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer); (pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
} }
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order); bool asc = ASCENDING_TRAVERSE(order);
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
return false; return false;
@ -1119,7 +1120,8 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step; // *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); // *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
memcpy(pRecord, &p->record, sizeof(SBrinRecord)); memcpy(pRecord, &p->record, sizeof(SBrinRecord));
@ -1145,7 +1147,8 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return -1; return -1;
} }
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) { if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1; return -1;
} }
@ -1153,12 +1156,13 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index); SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
pBlockIter->index += step; pBlockIter->index += step;
if (index != pBlockIter->index) { if (index != pBlockIter->index) {
if (index > pBlockIter->index) { if (index > pBlockIter->index) {
for (int32_t i = index - 1; i >= pBlockIter->index; --i) { for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i + 1; pTableDataBlockIdx->globalIndex = i + 1;
@ -1168,13 +1172,13 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) { for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i - 1; pTableDataBlockIdx->globalIndex = i - 1;
taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo); taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo);
} }
} }
taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock); taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock);
@ -1286,7 +1290,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
int32_t neighborIndex = 0; int32_t neighborIndex = 0;
SBrinRecord rec = {0}; SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec); bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec);
// overlap with neighbor // overlap with neighbor
if (hasNeighbor) { if (hasNeighbor) {
@ -1420,9 +1425,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
} }
} }
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); }
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree);
}
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) { static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree); tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree);
@ -1568,7 +1571,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1618,7 +1621,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -1826,8 +1829,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
@ -2219,8 +2222,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped))
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN); ? pBlockData->aTSKEY[pDumpInfo->rowIndex]
: (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} }
@ -2257,7 +2261,8 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
*loadNeighbor = false; *loadNeighbor = false;
SBrinRecord rec = {0}; SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec); bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex,
pReader->info.order, &rec);
if (!hasNeighbor) { // do nothing if (!hasNeighbor) { // do nothing
return code; return code;
} }
@ -2268,7 +2273,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
// 1. find the next neighbor block in the scan block list // 1. find the next neighbor block in the scan block list
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex); STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
int32_t neighborIndex = tableDataBlockIdx->globalIndex; int32_t neighborIndex = tableDataBlockIdx->globalIndex;
// 2. remove it from the scan block list // 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
@ -2704,7 +2709,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader); bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN; int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
if (!bHasDataInLastBlock || if (!bHasDataInLastBlock ||
((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) { ((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) {
@ -3479,7 +3484,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
// start to merge duplicated rows // start to merge duplicated rows
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid); pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
if (pTSchema == NULL) { if (pTSchema == NULL) {
return terrno; return terrno;
@ -3525,8 +3530,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
SRow** pTSRow) { SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
@ -4907,12 +4912,12 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
SVersionRange* pRange = &pReader->info.verRange; SVersionRange* pRange = &pReader->info.verRange;
// lock // lock
taosThreadRwlockRdlock(&pTsdb->rwLock); taosThreadMutexLock(&pTsdb->mutex);
// alloc // alloc
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
if (pSnap == NULL) { if (pSnap == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
@ -4922,7 +4927,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pMem = pTsdb->mem; pSnap->pMem = pTsdb->mem;
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) { if (pSnap->pNode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
@ -4937,7 +4942,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pIMem = pTsdb->imem; pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) { if (pSnap->pINode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
@ -4952,7 +4957,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray);
// unlock // unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadMutexUnlock(&pTsdb->mutex);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
@ -5005,4 +5010,5 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr; pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr;
} }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
}

View File

@ -25,11 +25,6 @@ typedef struct {
TFileSetArray *fsetArr; TFileSetArray *fsetArr;
TFileOpArray fopArr[1]; TFileOpArray fopArr[1];
struct {
int32_t fsetArrIdx;
STFileSet *fset;
} ctx[1];
} SRTNer; } SRTNer;
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
@ -227,8 +222,8 @@ _exit:
typedef struct { typedef struct {
STsdb *tsdb; STsdb *tsdb;
int32_t sync;
int64_t now; int64_t now;
int32_t fid;
} SRtnArg; } SRtnArg;
static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) { static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
@ -263,15 +258,15 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE); code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&rtner->tsdb->rwLock); taosThreadMutexLock(&rtner->tsdb->mutex);
code = tsdbFSEditCommit(rtner->tsdb->pFS); code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) { if (code) {
taosThreadRwlockUnlock(&rtner->tsdb->rwLock); taosThreadMutexUnlock(&rtner->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosThreadRwlockUnlock(&rtner->tsdb->rwLock); taosThreadMutexUnlock(&rtner->tsdb->mutex);
TARRAY2_DESTROY(rtner->fopArr, NULL); TARRAY2_DESTROY(rtner->fopArr, NULL);
@ -285,95 +280,83 @@ _exit:
return code; return code;
} }
static int32_t tsdbDoRetention2(void *arg) { static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SRTNer rtner[1] = {0}; STFileObj *fobj = NULL;
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
code = tsdbDoRetentionBegin(arg, rtner); if (expLevel < 0) { // remove the fileset
TSDB_CHECK_CODE(code, lino, _exit); for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
for (rtner->ctx->fsetArrIdx = 0; rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr); rtner->ctx->fsetArrIdx++) { int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx); if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
STFileObj *fobj; TSDB_CHECK_CODE(code, lino, _exit);
int32_t expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now); } else {
code = tsdbDoRemoveFileObject(rtner, fobj);
if (expLevel < 0) { // remove the file set
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
SSttLvl *lvl;
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else if (expLevel == 0) {
continue;
} else {
SDiskID did;
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did); }
// data SSttLvl *lvl;
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) { TARRAY2_FOREACH(fset->lvlArr, lvl) {
if (fobj == NULL) continue; TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else if (expLevel == 0) { // only migrate to upper level
return 0;
} else { // migrate
SDiskID did;
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
// data
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (tsS3Enabled) {
int64_t fsize = 0;
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(terrno);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
fobj->fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
s3EvictCache(fobj->fname, fsize * 2);
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue; if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); code = tsdbDoMigrateFileObj(rtner, fobj, &did);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) { TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (tsS3Enabled) {
int64_t fsize = 0;
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(terrno);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
fobj->fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
s3EvictCache(fobj->fname, fsize * 2);
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue;
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
} }
} }
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
if (TARRAY2_DATA(rtner->fopArr)) { if (TARRAY2_DATA(rtner->fopArr)) {
@ -389,30 +372,105 @@ _exit:
return code; return code;
} }
static void tsdbFreeRtnArg(void *arg) { static int32_t tsdbDoRetentionSync(void *arg) {
SRtnArg *rArg = (SRtnArg *)arg; int32_t code = 0;
if (rArg->sync) { int32_t lino = 0;
tsem_post(&rArg->tsdb->pVnode->canCommit); SRTNer rtner[1] = {0};
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
code = tsdbDoRetentionOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
} }
taosMemoryFree(arg);
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit);
return code;
} }
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) { static int32_t tsdbDoRetentionAsync(void *arg) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg)); int32_t code = 0;
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY; int32_t lino = 0;
arg->tsdb = tsdb; SRTNer rtner[1] = {0};
arg->sync = sync;
arg->now = now;
if (sync) { code = tsdbDoRetentionBegin(arg, rtner);
tsem_wait(&tsdb->pVnode->canCommit); TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
code = tsdbDoRetentionOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
} }
int64_t taskid; code = tsdbDoRetentionEnd(rtner);
int32_t code = TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid);
_exit:
if (code) { if (code) {
tsdbFreeRtnArg(arg); TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
} }
return code; return code;
} }
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
int32_t code = 0;
if (sync) { // sync retention
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = INT32_MAX;
tsem_wait(&tsdb->pVnode->canCommit);
code = vnodeScheduleTask(tsdbDoRetentionSync, arg);
if (code) {
tsem_post(&tsdb->pVnode->canCommit);
taosMemoryFree(arg);
return code;
}
} else { // async retention
taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
taosThreadMutexUnlock(&tsdb->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = fset->fid;
code = tsdbFSScheduleBgTask(tsdb->pFS, fset->fid, TSDB_BG_TASK_RETENTION, tsdbDoRetentionAsync, tsdbFreeRtnArg,
arg, NULL);
if (code) {
tsdbFreeRtnArg(arg);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
}
taosThreadMutexUnlock(&tsdb->mutex);
}
return code;
}

View File

@ -38,8 +38,8 @@ struct STsdbSnapReader {
struct { struct {
int32_t fsrArrIdx; int32_t fsrArrIdx;
STSnapRange* fsr; STSnapRange* fsr;
bool isDataDone; bool isDataDone;
bool isTombDone; bool isTombDone;
} ctx[1]; } ctx[1];
// reader // reader
@ -1095,17 +1095,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
code = tsdbFSEditAbort(writer[0]->tsdb->pFS); code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
taosThreadRwlockWrlock(&writer[0]->tsdb->rwLock); taosThreadMutexLock(&writer[0]->tsdb->mutex);
code = tsdbFSEditCommit(writer[0]->tsdb->pFS); code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
if (code) { if (code) {
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
} }
tsdbFSEnableBgTask(tsdb->pFS); tsdbFSEnableBgTask(tsdb->pFS);
@ -1236,7 +1236,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
if (fset->farr[ftype] == NULL) continue; if (fset->farr[ftype] == NULL) continue;
typ = tsdbFTypeToSRangeTyp(ftype); typ = tsdbFTypeToSRangeTyp(ftype);
ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX); ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX);
STFile* f = fset->farr[ftype]->f; STFile* f = fset->farr[ftype]->f;
if (f->maxVer > fset->maxVerValid) { if (f->maxVer > fset->maxVerValid) {
corrupt = true; corrupt = true;
tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
@ -1255,7 +1255,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
TARRAY2_FOREACH(fset->lvlArr, lvl) { TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj* fobj; STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) { TARRAY2_FOREACH(lvl->fobjArr, fobj) {
STFile* f = fobj->f; STFile* f = fobj->f;
if (f->maxVer > fset->maxVerValid) { if (f->maxVer > fset->maxVerValid) {
corrupt = true; corrupt = true;
tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
@ -1299,7 +1299,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
} }
int32_t code = 0; int32_t code = 0;
taosThreadRwlockRdlock(&fs->tsdb->rwLock); taosThreadMutexLock(&fs->tsdb->mutex);
STFileSet* fset; STFileSet* fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
STsdbSnapPartition* pItem = NULL; STsdbSnapPartition* pItem = NULL;
@ -1311,7 +1311,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn);
ASSERT(code == 0); ASSERT(code == 0);
} }
taosThreadRwlockUnlock(&fs->tsdb->rwLock); taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
TARRAY2_DESTROY(pList, tsdbSnapPartitionClear); TARRAY2_DESTROY(pList, tsdbSnapPartitionClear);