more concurrency fix

This commit is contained in:
Hongze Cheng 2022-07-21 13:02:48 +00:00
parent bce29321c6
commit a086019de7
1 changed files with 69 additions and 64 deletions

View File

@ -15,94 +15,99 @@
#include "tsdb.h" #include "tsdb.h"
static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t *canDo) { static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0; for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
#if 0 SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
STsdbFSState *pState; int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
if (try) {
pState = pTsdb->pFS->cState;
*canDo = 0;
} else {
pState = pTsdb->pFS->nState;
}
for (int32_t iSet = 0; iSet < taosArrayGetSize(pState->aDFileSet); iSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pState->aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pDFileSet->fid, &pTsdb->keepCfg, now);
SDiskID did; SDiskID did;
// check if (expLevel == pSet->diskId.level) continue;
if (expLevel == pDFileSet->diskId.id) continue;
// delete or move
if (expLevel < 0) { if (expLevel < 0) {
if (try) { return true;
*canDo = 1; } else {
} else { if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
tsdbFSStateDeleteDFileSet(pState, pDFileSet->fid); return false;
iSet--; }
}
if (did.level == pSet->diskId.level) continue;
return true;
}
}
return false;
}
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0;
if (!tsdbShouldDoRetention(pTsdb, now)) {
return code;
}
// do retention
STsdbFS fs;
code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _err;
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pLastF);
taosMemoryFree(pSet->pSmaF);
taosArrayRemove(fs.aDFileSet, iSet);
iSet--;
} else { } else {
// alloc
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno; code = terrno;
goto _exit; goto _exit;
} }
if (did.level == pDFileSet->diskId.level) continue; if (did.level == pSet->diskId.level) continue;
if (try) { // copy file to new disk (todo)
*canDo = 1; SDFileSet fSet = *pSet;
} else { fSet.diskId = did;
// copy the file to new disk
SDFileSet nDFileSet = *pDFileSet; code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
nDFileSet.diskId = did; if (code) goto _err;
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _err;
code = tsdbDFileSetCopy(pTsdb, pDFileSet, &nDFileSet);
if (code) goto _exit;
code = tsdbFSUpsertFSet(pState, &nDFileSet);
if (code) goto _exit;
}
} }
/* code */
} }
#endif // do change fs
_exit: code = tsdbFSCommit1(pTsdb, &fs);
return code;
}
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0;
#if 0
int8_t canDo;
// try
tsdbDoRetentionImpl(pTsdb, now, 1, &canDo);
if (!canDo) goto _exit;
// begin
code = tsdbFSBegin(pTsdb->pFS);
if (code) goto _err; if (code) goto _err;
// do retention taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbDoRetentionImpl(pTsdb, now, 0, NULL);
if (code) goto _err;
// commit code = tsdbFSCommit2(pTsdb, &fs);
code = tsdbFSCommit(pTsdb->pFS); if (code) {
if (code) goto _err; taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbFSDestroy(&fs);
_exit: _exit:
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbFSRollback(pTsdb->pFS); ASSERT(0);
#endif // tsdbFSRollback(pTsdb->pFS);
return code; return code;
} }