enh: tidy up walEndSnapshot
This commit is contained in:
parent
2a558323be
commit
7433bad888
|
@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
if (ver == -1) {
|
||||
code = -1;
|
||||
goto END;
|
||||
};
|
||||
}
|
||||
|
||||
pWal->vers.snapshotVer = ver;
|
||||
int ts = taosGetTimestampSec();
|
||||
|
||||
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
|
||||
|
||||
// compatible mode for refVer
|
||||
bool hasTopic = false;
|
||||
int64_t refVer = ver;
|
||||
int64_t refVer = INT64_MAX;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||
|
@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
SWalRef *pRef = *(SWalRef **)pIter;
|
||||
if (pRef->refVer == -1) continue;
|
||||
refVer = TMIN(refVer, pRef->refVer - 1);
|
||||
wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
|
||||
hasTopic = true;
|
||||
}
|
||||
// compatible mode
|
||||
if (pWal->cfg.retentionPeriod == 0 && hasTopic) {
|
||||
wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver);
|
||||
ver = TMIN(ver, refVer);
|
||||
}
|
||||
|
||||
// find files safe to delete
|
||||
int deleteCnt = 0;
|
||||
int64_t newTotSize = pWal->totSize;
|
||||
SWalFileInfo tmp;
|
||||
SWalFileInfo tmp = {0};
|
||||
tmp.firstVer = ver;
|
||||
// find files safe to delete
|
||||
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
||||
|
||||
if (pInfo) {
|
||||
SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet);
|
||||
wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
if (ver >= pInfo->lastVer) {
|
||||
wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver,
|
||||
pInfo->firstVer, pInfo->lastVer);
|
||||
ASSERT(ver <= pInfo->lastVer);
|
||||
if (ver == pInfo->lastVer) {
|
||||
pInfo++;
|
||||
wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
}
|
||||
if (pInfo <= pLastFileInfo) {
|
||||
wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
|
||||
}
|
||||
|
||||
// iterate files, until the searched result
|
||||
// delete according to file size or close time
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
|
||||
"), new tot size %" PRId64,
|
||||
pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize);
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 &&
|
||||
iter->closeTs + pWal->cfg.retentionPeriod < ts))) {
|
||||
// delete according to file size or close time
|
||||
wDebug("vgId:%d, check pass", pWal->cfg.vgId);
|
||||
if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
(pWal->cfg.retentionPeriod == 0 ||
|
||||
pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
taosArrayPush(pWal->toDeleteFiles, iter);
|
||||
}
|
||||
wDebug("vgId:%d, check not pass", pWal->cfg.vgId);
|
||||
}
|
||||
|
||||
UPDATE_META:
|
||||
// make new array, remove files
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
|
@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||
}
|
||||
}
|
||||
|
||||
// update meta
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
pWal->totSize = newTotSize;
|
||||
pWal->vers.verInSnapshotting = -1;
|
||||
|
||||
// save snapshot ver, commit ver
|
||||
code = walSaveMeta(pWal);
|
||||
if (code < 0) {
|
||||
goto END;
|
||||
|
@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
|
||||
// delete files
|
||||
deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
|
||||
wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
char fnameStr[WAL_FILE_LEN] = {0};
|
||||
pInfo = NULL;
|
||||
|
||||
for (int i = 0; i < deleteCnt; i++) {
|
||||
pInfo = taosArrayGet(pWal->toDeleteFiles, i);
|
||||
|
||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||
wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
|
||||
goto END;
|
||||
}
|
||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||
wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
if (pInfo != NULL) {
|
||||
wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId,
|
||||
deleteCnt, pInfo->lastVer, pInfo->closeTs);
|
||||
}
|
||||
taosArrayClear(pWal->toDeleteFiles);
|
||||
|
||||
END:
|
||||
|
|
Loading…
Reference in New Issue