fix: ttl cache entry record the original information for deletion
This commit is contained in:
parent
b72370f154
commit
df2b2484df
|
@ -26,7 +26,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum DirtyEntryType {
|
typedef enum DirtyEntryType {
|
||||||
ENTRY_TYPE_DEL = 1,
|
ENTRY_TYPE_DELETE = 1,
|
||||||
ENTRY_TYPE_UPSERT = 2,
|
ENTRY_TYPE_UPSERT = 2,
|
||||||
} DirtyEntryType;
|
} DirtyEntryType;
|
||||||
|
|
||||||
|
@ -44,6 +44,8 @@ typedef struct STtlManger {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t ttlDays;
|
int64_t ttlDays;
|
||||||
int64_t changeTimeMs;
|
int64_t changeTimeMs;
|
||||||
|
int64_t ttlDaysDirty;
|
||||||
|
int64_t changeTimeMsDirty;
|
||||||
} STtlCacheEntry;
|
} STtlCacheEntry;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -209,7 +209,8 @@ static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void
|
||||||
int64_t ttlDays = *(int64_t *)pVal;
|
int64_t ttlDays = *(int64_t *)pVal;
|
||||||
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
|
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
|
||||||
|
|
||||||
STtlCacheEntry data = {.ttlDays = ttlDays, .changeTimeMs = changeTimeMs};
|
STtlCacheEntry data = {
|
||||||
|
.ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
|
||||||
|
|
||||||
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
||||||
}
|
}
|
||||||
|
@ -257,34 +258,37 @@ static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const voi
|
||||||
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||||
SMeta *meta = pMeta;
|
SMeta *meta = pMeta;
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl start.");
|
metaInfo("ttlMgr convert start.");
|
||||||
|
|
||||||
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
||||||
|
|
||||||
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to convert ttl since %s", tstrerror(terrno));
|
metaError("failed to convert since %s", tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl end.");
|
metaInfo("ttlMgr convert end.");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
if (updCtx->ttlDays == 0) return 0;
|
if (updCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
|
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
|
||||||
|
.changeTimeMs = updCtx->changeTimeMs,
|
||||||
|
.ttlDaysDirty = updCtx->ttlDays,
|
||||||
|
.changeTimeMsDirty = updCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,11 +308,11 @@ _out:
|
||||||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
if (delCtx->ttlDays == 0) return 0;
|
if (delCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,19 +336,22 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
|
||||||
|
.changeTimeMs = oldData->changeTimeMs,
|
||||||
|
.ttlDaysDirty = oldData->ttlDays,
|
||||||
|
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||||
sizeof(dirtryEntry));
|
sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
|
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
@ -396,27 +403,35 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
STtlIdxKeyV1 ttlKey;
|
STtlIdxKeyV1 ttlKey;
|
||||||
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
|
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
|
||||||
|
|
||||||
|
STtlIdxKeyV1 ttlKeyDirty;
|
||||||
|
ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
|
||||||
|
|
||||||
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
||||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
// delete old key & upsert new key
|
||||||
pTxn);
|
tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
|
||||||
|
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
|
||||||
|
sizeof(cacheEntry->ttlDaysDirty), pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
|
||||||
|
cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
|
||||||
|
cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
|
||||||
|
} else if (pEntry->type == ENTRY_TYPE_DELETE) {
|
||||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue