feat: db auto compact

This commit is contained in:
kailixu 2024-12-05 10:33:51 +08:00
parent f4fc423f34
commit a3dc026c0d
2 changed files with 26 additions and 19 deletions

View File

@ -942,57 +942,64 @@ static int32_t mndCompactDispatch(SRpcMsg *pReq) {
int32_t code = 0; int32_t code = 0;
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t now = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
int64_t nowSec = now / 60000; int64_t curMin = curMs / 60000LL;
void *pIter = NULL; void *pIter = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) { while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
if ((pDb->cfg.compactInterval == 0) || (pDb->cfg.compactStartTime >= pDb->cfg.compactEndTime)) { if ((pDb->cfg.compactInterval == 0) || (pDb->cfg.compactStartTime >= pDb->cfg.compactEndTime)) {
mDebug("db:%p,%s, compact interval is %dm or start time:%dm >= end time:%dm, skip", pDb, pDb->name,
pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime);
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
continue; continue;
} }
if (pDb->compactStartTime == nowSec) { int64_t remainder = ((curMin + (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
assert(0); if (remainder != 0) {
mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
"h, remainder:%" PRIi64 "m, skip",
pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
continue; continue;
} }
if (((nowSec + pDb->cfg.compactTimeOffset) % pDb->cfg.compactInterval) != 0) { if ((pDb->compactStartTime / 60000LL) == curMin) {
mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
curMin, pDb->compactStartTime);
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
continue; continue;
} }
STimeWindow tw = {.skey = convertTimePrecision(now - (int64_t)pDb->cfg.compactStartTime * 60000, STimeWindow tw = {.skey = convertTimePrecision(curMs + (int64_t)pDb->cfg.compactStartTime * 60000LL,
TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision), TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
.ekey = convertTimePrecision(now - (int64_t)pDb->cfg.compactEndTime * 60000, .ekey = convertTimePrecision(curMs + (int64_t)pDb->cfg.compactEndTime * 60000LL,
TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)}; TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
if ((code = mndCompactDb(pMnode, pReq, pDb, tw, NULL)) == 0) { if ((code = mndCompactDb(pMnode, pReq, pDb, tw, NULL)) == 0) {
pDb->compactStartTime = nowSec; mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64
mInfo("db:%s, dispatch auto compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%dm, end:%dm, offset:%" PRIi8 "h",
"], interval:%d, start:%d, end:%d, offset:%" PRIi8, pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime,
pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime, pDb->cfg.compactEndTime, pDb->cfg.compactTimeOffset);
pDb->cfg.compactTimeOffset);
} else { } else {
mWarn("db:%s, failed to dispatch auto compact with range:[%" PRIi64 ",%" PRIi64 mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64
"], interval:%d, start:%d, end:%d, offset:%" PRIi8 ", since %s", "], interval:%dm, start:%dm, end:%dm, offset:%" PRIi8 "h, since %s",
pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime, pDb->cfg.compactEndTime, pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, pDb->cfg.compactStartTime,
pDb->cfg.compactTimeOffset, tstrerror(code)); pDb->cfg.compactEndTime, pDb->cfg.compactTimeOffset, tstrerror(code));
} }
TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw)); TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
} }
return 0; return 0;
} }
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) { static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
#ifdef TD_ENTERPRISE
mTrace("start to process compact timer"); mTrace("start to process compact timer");
mndCompactPullup(pReq->info.node); mndCompactPullup(pReq->info.node);
TAOS_UNUSED(mndCompactDispatch(pReq)); TAOS_UNUSED(mndCompactDispatch(pReq));
#endif
return 0; return 0;
} }