Merge pull request #22066 from taosdata/fix/TD-25215-main
fix/TD-25215: ttlMgrDeleteTtl should ignore ttl 0 tables for main
This commit is contained in:
commit
c2b3c2db52
|
@ -38,6 +38,8 @@ typedef struct STtlManger {
|
|||
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
|
||||
SHashObj* pDirtyUids; // dirty tuid
|
||||
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
|
||||
|
||||
char* logPrefix;
|
||||
} STtlManger;
|
||||
|
||||
typedef struct {
|
||||
|
@ -77,9 +79,10 @@ typedef struct {
|
|||
typedef struct {
|
||||
tb_uid_t uid;
|
||||
TXN* pTxn;
|
||||
int64_t ttlDays;
|
||||
} STtlDelTtlCtx;
|
||||
|
||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix);
|
||||
void ttlMgrClose(STtlManger* pTtlMgr);
|
||||
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
|
||||
|
||||
|
|
|
@ -130,7 +130,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
|||
}
|
||||
|
||||
// open pTtlMgr ("ttlv1.idx")
|
||||
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
|
||||
char logPrefix[128] = {0};
|
||||
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
|
||||
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
|
|
@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
|
|||
}
|
||||
|
||||
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
||||
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
|
||||
|
||||
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
|
||||
if (pME->type == TSDB_CHILD_TABLE) {
|
||||
ctx.ttlDays = pME->ctbEntry.ttlDays;
|
||||
} else {
|
||||
ctx.ttlDays = pME->ntbEntry.ttlDays;
|
||||
}
|
||||
|
||||
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
|
||||
}
|
||||
|
||||
|
@ -1973,7 +1981,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
|
||||
|
||||
STtlUpdTtlCtx ctx = {.uid = pME->uid};
|
||||
|
||||
if (pME->type == TSDB_CHILD_TABLE) {
|
||||
ctx.ttlDays = pME->ctbEntry.ttlDays;
|
||||
ctx.changeTimeMs = pME->ctbEntry.btime;
|
||||
|
|
|
@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
|
|||
const char *ttlTbname = "ttl.idx";
|
||||
const char *ttlV1Tbname = "ttlv1.idx";
|
||||
|
||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||
int ret = TSDB_CODE_SUCCESS;
|
||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) {
|
||||
int ret = TSDB_CODE_SUCCESS;
|
||||
int64_t startNs = taosGetTimestampNs();
|
||||
|
||||
*ppTtlMgr = NULL;
|
||||
|
@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
|||
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
||||
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
|
||||
if (logBuffer == NULL) {
|
||||
tdbOsFree(pTtlMgr);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(logBuffer, logPrefix);
|
||||
pTtlMgr->logPrefix = logBuffer;
|
||||
|
||||
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
||||
if (ret < 0) {
|
||||
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
||||
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno));
|
||||
tdbOsFree(pTtlMgr);
|
||||
return ret;
|
||||
}
|
||||
|
@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
|||
|
||||
ret = ttlMgrFillCache(pTtlMgr);
|
||||
if (ret < 0) {
|
||||
metaError("failed to fill hash since %s", tstrerror(terrno));
|
||||
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
ttlMgrCleanup(pTtlMgr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t endNs = taosGetTimestampNs();
|
||||
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||
endNs - startNs);
|
||||
metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
|
||||
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
||||
|
||||
*ppTtlMgr = pTtlMgr;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
|||
|
||||
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
||||
|
||||
metaInfo("ttl mgr start upgrade");
|
||||
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
|
||||
|
||||
int64_t startNs = taosGetTimestampNs();
|
||||
|
||||
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
||||
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
||||
if (ret < 0) {
|
||||
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
||||
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
||||
if (ret < 0) {
|
||||
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
||||
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = ttlMgrFillCache(pTtlMgr);
|
||||
if (ret < 0) {
|
||||
metaError("failed to fill hash since %s", tstrerror(terrno));
|
||||
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
int64_t endNs = taosGetTimestampNs();
|
||||
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
||||
endNs - startNs);
|
||||
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
|
||||
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
||||
_out:
|
||||
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
||||
pTtlMgr->pOldTtlIdx = NULL;
|
||||
|
@ -130,11 +138,12 @@ _out:
|
|||
}
|
||||
|
||||
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
||||
taosMemoryFree(pTtlMgr->logPrefix);
|
||||
taosHashCleanup(pTtlMgr->pTtlCache);
|
||||
taosHashCleanup(pTtlMgr->pDirtyUids);
|
||||
tdbTbClose(pTtlMgr->pTtlIdx);
|
||||
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
||||
tdbOsFree(pTtlMgr);
|
||||
taosMemoryFree(pTtlMgr);
|
||||
}
|
||||
|
||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
||||
|
@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
|||
|
||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
|||
_out:
|
||||
ttlMgrULock(pTtlMgr);
|
||||
|
||||
metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid,
|
||||
updCtx->changeTimeMs, updCtx->ttlDays);
|
||||
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
||||
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||
if (delCtx->ttlDays == 0) return 0;
|
||||
ttlMgrWLock(pTtlMgr);
|
||||
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
||||
|
||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
|||
_out:
|
||||
ttlMgrULock(pTtlMgr);
|
||||
|
||||
metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid);
|
||||
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -293,6 +303,8 @@ _out:
|
|||
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
|
||||
ttlMgrWLock(pTtlMgr);
|
||||
|
||||
int ret = 0;
|
||||
|
||||
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
|
||||
if (oldData == NULL) {
|
||||
goto _out;
|
||||
|
@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
|||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||
|
||||
int ret =
|
||||
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||
sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
|
||||
tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
|||
_out:
|
||||
ttlMgrULock(pTtlMgr);
|
||||
|
||||
metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs);
|
||||
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
||||
pUpdCtimeCtx->changeTimeMs);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -366,7 +379,7 @@ _out:
|
|||
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||
ttlMgrWLock(pTtlMgr);
|
||||
|
||||
metaInfo("ttl mgr flush start.");
|
||||
metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
|
||||
|
||||
int ret = -1;
|
||||
|
||||
|
@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
|||
|
||||
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||
if (cacheEntry == NULL) {
|
||||
metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid,
|
||||
pEntry->type);
|
||||
goto _out;
|
||||
metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix,
|
||||
tstrerror(terrno), *pUid, pEntry->type);
|
||||
continue;
|
||||
}
|
||||
|
||||
STtlIdxKeyV1 ttlKey;
|
||||
|
@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
|||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
||||
pTxn);
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||
if (ret < 0) {
|
||||
metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
} else {
|
||||
metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type);
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter);
|
||||
void *pIterTmp = pIter;
|
||||
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
|
||||
taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
|
||||
}
|
||||
|
||||
taosHashClear(pTtlMgr->pDirtyUids);
|
||||
|
@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
|||
_out:
|
||||
ttlMgrULock(pTtlMgr);
|
||||
|
||||
metaInfo("ttl mgr flush end.");
|
||||
metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -426,7 +441,7 @@ _out:
|
|||
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
|
||||
int32_t ret = 0;
|
||||
|
||||
metaTrace("ttlMgr rlock %p", &pTtlMgr->lock);
|
||||
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||
|
||||
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
|
||||
|
||||
|
@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
|
|||
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
|
||||
int32_t ret = 0;
|
||||
|
||||
metaTrace("ttlMgr wlock %p", &pTtlMgr->lock);
|
||||
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||
|
||||
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
|
||||
|
||||
|
@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
|
|||
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
|
||||
int32_t ret = 0;
|
||||
|
||||
metaTrace("ttlMgr ulock %p", &pTtlMgr->lock);
|
||||
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||
|
||||
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
|
||||
|
||||
|
|
Loading…
Reference in New Issue