Merge pull request #22632 from taosdata/fix/ttl
fix/TD-26032: ttl cache entry record the original information for deletion
This commit is contained in:
commit
701ac939c1
|
@ -26,7 +26,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef enum DirtyEntryType {
|
||||
ENTRY_TYPE_DEL = 1,
|
||||
ENTRY_TYPE_DELETE = 1,
|
||||
ENTRY_TYPE_UPSERT = 2,
|
||||
} DirtyEntryType;
|
||||
|
||||
|
@ -44,6 +44,8 @@ typedef struct STtlManger {
|
|||
typedef struct {
|
||||
int64_t ttlDays;
|
||||
int64_t changeTimeMs;
|
||||
int64_t ttlDaysDirty;
|
||||
int64_t changeTimeMsDirty;
|
||||
} STtlCacheEntry;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -209,7 +209,8 @@ static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void
|
|||
int64_t ttlDays = *(int64_t *)pVal;
|
||||
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
|
||||
|
||||
STtlCacheEntry data = {.ttlDays = ttlDays, .changeTimeMs = changeTimeMs};
|
||||
STtlCacheEntry data = {
|
||||
.ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
|
||||
|
||||
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
||||
}
|
||||
|
@ -257,34 +258,37 @@ static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const voi
|
|||
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||
SMeta *meta = pMeta;
|
||||
|
||||
metaInfo("ttlMgr convert ttl start.");
|
||||
metaInfo("ttlMgr convert start.");
|
||||
|
||||
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
||||
|
||||
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
||||
if (ret < 0) {
|
||||
metaError("failed to convert ttl since %s", tstrerror(terrno));
|
||||
metaError("failed to convert since %s", tstrerror(terrno));
|
||||
}
|
||||
|
||||
metaInfo("ttlMgr convert ttl end.");
|
||||
metaInfo("ttlMgr convert end.");
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||
if (updCtx->ttlDays == 0) return 0;
|
||||
|
||||
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
|
||||
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
|
||||
.changeTimeMs = updCtx->changeTimeMs,
|
||||
.ttlDaysDirty = updCtx->ttlDays,
|
||||
.changeTimeMsDirty = updCtx->changeTimeMs};
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||
|
||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -304,11 +308,11 @@ _out:
|
|||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||
if (delCtx->ttlDays == 0) return 0;
|
||||
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
|
||||
|
||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
@ -332,19 +336,22 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
|||
goto _out;
|
||||
}
|
||||
|
||||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
||||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
|
||||
.changeTimeMs = oldData->changeTimeMs,
|
||||
.ttlDaysDirty = oldData->ttlDays,
|
||||
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
|
||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||
|
||||
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||
sizeof(dirtryEntry));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
|
||||
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix,
|
||||
tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
@ -396,27 +403,35 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
|||
STtlIdxKeyV1 ttlKey;
|
||||
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
|
||||
|
||||
STtlIdxKeyV1 ttlKeyDirty;
|
||||
ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
|
||||
|
||||
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
||||
pTxn);
|
||||
// delete old key & upsert new key
|
||||
tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
|
||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
|
||||
sizeof(cacheEntry->ttlDaysDirty), pTxn);
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
||||
|
||||
cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
|
||||
cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
|
||||
} else if (pEntry->type == ENTRY_TYPE_DELETE) {
|
||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||
if (ret < 0) {
|
||||
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||
goto _out;
|
||||
}
|
||||
} else {
|
||||
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||
metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue