feat: code optimization for data migrate

This commit is contained in:
Cary Xu 2022-09-27 17:43:33 +08:00
parent 3d92b6c6f9
commit c426fa8bb1
2 changed files with 82 additions and 151 deletions

View File

@ -767,7 +767,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
if (++nLoops > 1000) { if (++nLoops > 1000) {
nLoops = 0; nLoops = 0;
sched_yield(); sched_yield();
// printf("%s:%d wait retention to finish\n", __func__, __LINE__);
} }
} }
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
@ -780,7 +779,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
} else { } else {
goto _wait_retention_end; goto _wait_retention_end;
} }
atomic_store_8(&pTsdb->trimHdl.commitInWait, 0); atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 1, 0);
} }
code = tsdbFSCopy(pTsdb, &pCommitter->fs); code = tsdbFSCopy(pTsdb, &pCommitter->fs);

View File

@ -17,20 +17,12 @@
enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 };
#if 1
#define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB #define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
#define MIGRATE_MIN_COST (5) // second #define MIGRATE_MIN_COST (5) // second
#else
#define MIGRATE_MIN_FSIZE (1048576 << 5) // 32 MB
#define MIGRATE_MAX_SPEED (1048576 << 2) // 4 MB
#define MIGRATE_MIN_COST (5) // second
#endif
static bool tsdbShouldDoMigrate(STsdb *pTsdb); static bool tsdbShouldDoMigrate(STsdb *pTsdb);
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention); static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type);
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention);
static bool tsdbShouldDoMigrate(STsdb *pTsdb) { static bool tsdbShouldDoMigrate(STsdb *pTsdb) {
if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) {
@ -74,110 +66,31 @@ static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
return retention; return retention;
} }
static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention) {
int32_t code = 0;
int32_t nLoops = 0;
int32_t maxFid = INT32_MIN;
STsdbFS fs = {0};
STsdbFS fsLatest = {0};
if (!(retention & RETENTION_EXPIRED)) {
goto _exit;
}
code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _exit;
int32_t fsSize = taosArrayGetSize(fs.aDFileSet);
for (int32_t iSet = 0; iSet < fsSize; iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet);
if (pSet->fid > maxFid) maxFid = pSet->fid;
} else {
break;
}
}
if (maxFid == INT32_MIN) goto _exit;
_wait_commit_end:
while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
if (++nLoops > 1000) {
nLoops = 0;
sched_yield();
// printf("%s:%d wait commit finished\n", __func__, __LINE__);
}
}
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
atomic_store_8(&pTsdb->trimHdl.state, 0);
goto _wait_commit_end;
}
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid);
atomic_store_8(&pTsdb->trimHdl.state, 0);
} else {
goto _wait_commit_end;
}
_merge_fs:
taosThreadRwlockWrlock(&pTsdb->rwLock);
if ((code = tsdbFSCopy(pTsdb, &fsLatest))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
// 1) merge tsdbFSNew and pTsdb->fs
if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
// 2) save CURRENT
if ((code = tsdbFSCommit1(pTsdb, &fsLatest))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
// 3) apply the tsdbFS to pTsdb->fs
if ((code = tsdbFSCommit2(pTsdb, &fsLatest))) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
tsdbFSDestroy(&fs);
tsdbFSDestroy(&fsLatest);
if (code != 0) {
tsdbError("vgId:%d, tsdb do retention(expire) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
ASSERT(0);
}
return code;
}
/** /**
* @brief * @brief process retention
* *
* @param pTsdb * @param pTsdb
* @param now * @param now
* @param maxSpeed
* @param retention * @param retention
* @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention) { static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type) {
int32_t code = 0; int32_t code = 0;
int32_t nBatch = 0; int32_t nBatch = 0;
int32_t nLoops = 0; int32_t nLoops = 0;
int32_t maxFid = INT32_MIN; int32_t maxFid = 0;
int64_t fSize = 0; int64_t fSize = 0;
int64_t speed = maxSpeed > 0 ? maxSpeed : MIGRATE_MAX_SPEED;
STsdbFS fs = {0}; STsdbFS fs = {0};
STsdbFS fsLatest = {0}; STsdbFS fsLatest = {0};
if (!(retention & RETENTION_MIGRATE)) { if (!(retention & type)) {
goto _exit; goto _exit;
} }
_migrate_loop: _retention_loop:
// reset // reset
maxFid = INT32_MIN; maxFid = INT32_MIN;
fSize = 0; fSize = 0;
@ -186,32 +99,47 @@ _migrate_loop:
if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) {
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN); atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN);
taosMsleep(10); taosMsleep(50);
} }
code = tsdbFSCopy(pTsdb, &fs); code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _exit; if (code) goto _exit;
int32_t fsSize = taosArrayGetSize(fs.aDFileSet); int32_t fsSize = taosArrayGetSize(fs.aDFileSet);
for (int32_t iSet = 0; iSet < fsSize; ++iSet) { if (type == RETENTION_MIGRATE) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
SDiskID did; int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (pSet->diskId.level == expLevel) continue; if (pSet->diskId.level == expLevel) continue;
if (expLevel > 0) { if (expLevel > 0) {
ASSERT(pSet->fid > maxFid); ASSERT(pSet->fid > maxFid);
maxFid = pSet->fid; maxFid = pSet->fid;
fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size);
if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { if (fSize / speed > MIGRATE_MIN_COST) {
break; break;
}
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) {
fSize += pSet->aSttF[iStt]->size;
}
if (fSize / speed > MIGRATE_MIN_COST) {
tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
break;
}
} }
for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { }
fSize += pSet->aSttF[iStt]->size; } else if (type == RETENTION_EXPIRED) {
} for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet);
if (pSet->fid > maxFid) maxFid = pSet->fid;
} else {
break; break;
} }
} }
@ -219,7 +147,7 @@ _migrate_loop:
if (maxFid == INT32_MIN) goto _exit; if (maxFid == INT32_MIN) goto _exit;
_wait_commit_end: _commit_conflict_check:
while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
if (++nLoops > 1000) { if (++nLoops > 1000) {
nLoops = 0; nLoops = 0;
@ -229,46 +157,48 @@ _wait_commit_end:
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) {
atomic_store_8(&pTsdb->trimHdl.state, 0); atomic_store_8(&pTsdb->trimHdl.state, 0);
goto _wait_commit_end; goto _commit_conflict_check;
} }
atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid);
atomic_store_8(&pTsdb->trimHdl.state, 0); atomic_store_8(&pTsdb->trimHdl.state, 0);
} else { } else {
goto _wait_commit_end; goto _commit_conflict_check;
} }
// migrate // migrate
for (int32_t iSet = 0; iSet < fsSize; ++iSet) { if (type == RETENTION_MIGRATE) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); for (int32_t iSet = 0; iSet < fsSize; ++iSet) {
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
SDiskID did; int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (pSet->fid > maxFid) break; if (pSet->fid > maxFid) break;
tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode),
nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel);
if (expLevel < 0) { if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet); SET_DFSET_EXPIRED(pSet);
} else { } else {
if (expLevel == pSet->diskId.level) continue; if (expLevel == pSet->diskId.level) continue;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno; code = terrno;
goto _exit; goto _exit;
}
if (did.level == pSet->diskId.level) continue;
// copy file to new disk
SDFileSet fSet = *pSet;
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed);
if (code) goto _exit;
code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _exit;
} }
if (did.level == pSet->diskId.level) continue;
// copy file to new disk
SDFileSet fSet = *pSet;
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed);
if (code) goto _exit;
code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _exit;
} }
} }
@ -296,14 +226,16 @@ _merge_fs:
} }
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
++nBatch; if (type == RETENTION_MIGRATE) {
goto _migrate_loop; ++nBatch;
goto _retention_loop;
}
_exit: _exit:
tsdbFSDestroy(&fs); tsdbFSDestroy(&fs);
tsdbFSDestroy(&fsLatest); tsdbFSDestroy(&fsLatest);
if (code != 0) { if (code != 0) {
tsdbError("vgId:%d, tsdb do retention(migrate) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, tsdb do retention %" PRIi8 " failed since %s", TD_VID(pTsdb->pVnode), type, tstrerror(code));
ASSERT(0); ASSERT(0);
} }
return code; return code;
@ -331,21 +263,21 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) {
} }
// step 1: process expire // step 1: process expire
code = tsdbProcessExpire(pTsdb, now, retention); code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_EXPIRED);
if (code < 0) goto _exit; if (code < 0) goto _exit;
// step 2: process multi-tier migration // step 2: process multi-tier migration
code = tsdbProcessMigrate(pTsdb, now, maxSpeed, retention); code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_MIGRATE);
if (code < 0) goto _exit; if (code < 0) goto _exit;
_exit: _exit:
pTsdb->trimHdl.maxRetentFid = INT32_MIN; pTsdb->trimHdl.maxRetentFid = INT32_MIN;
if (code != 0) { if (code != 0) {
tsdbError("vgId:%d, tsdb do retention:%d failed since %s", TD_VID(pTsdb->pVnode), retention, tstrerror(code)); tsdbError("vgId:%d, tsdb do retention %d failed since %s", TD_VID(pTsdb->pVnode), retention, tstrerror(code));
ASSERT(0); ASSERT(0);
// tsdbFSRollback(pTsdb->pFS); // tsdbFSRollback(pTsdb->pFS);
} else { } else {
tsdbInfo("vgId:%d, tsdb do retention:%d succeed", TD_VID(pTsdb->pVnode), retention); tsdbInfo("vgId:%d, tsdb do retention %d succeed", TD_VID(pTsdb->pVnode), retention);
} }
return code; return code;
} }