fix: ttlMgrDeleteTtl should ignore ttl 0 tables
This commit is contained in:
parent
f4249234ef
commit
a39a9428af
|
@ -38,6 +38,8 @@ typedef struct STtlManger {
|
||||||
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
|
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
|
||||||
SHashObj* pDirtyUids; // dirty tuid
|
SHashObj* pDirtyUids; // dirty tuid
|
||||||
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
|
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
|
||||||
|
|
||||||
|
char* logPrefix;
|
||||||
} STtlManger;
|
} STtlManger;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -77,9 +79,10 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
TXN* pTxn;
|
TXN* pTxn;
|
||||||
|
int64_t ttlDays;
|
||||||
} STtlDelTtlCtx;
|
} STtlDelTtlCtx;
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
|
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix);
|
||||||
void ttlMgrClose(STtlManger* pTtlMgr);
|
void ttlMgrClose(STtlManger* pTtlMgr);
|
||||||
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
|
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
|
|
|
@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open pTtlMgr ("ttlv1.idx")
|
// open pTtlMgr ("ttlv1.idx")
|
||||||
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
|
char logPrefix[128] = {0};
|
||||||
|
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
|
||||||
|
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -971,7 +971,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
|
||||||
|
|
||||||
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
|
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
|
||||||
|
if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
|
ctx.ttlDays = pME->ctbEntry.ttlDays;
|
||||||
|
} else {
|
||||||
|
ctx.ttlDays = pME->ntbEntry.ttlDays;
|
||||||
|
}
|
||||||
|
|
||||||
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
|
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1965,7 +1973,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
|
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
|
||||||
|
|
||||||
STtlUpdTtlCtx ctx = {.uid = pME->uid};
|
STtlUpdTtlCtx ctx = {.uid = pME->uid};
|
||||||
|
|
||||||
if (pME->type == TSDB_CHILD_TABLE) {
|
if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
ctx.ttlDays = pME->ctbEntry.ttlDays;
|
ctx.ttlDays = pME->ctbEntry.ttlDays;
|
||||||
ctx.changeTimeMs = pME->ctbEntry.btime;
|
ctx.changeTimeMs = pME->ctbEntry.btime;
|
||||||
|
|
|
@ -39,7 +39,7 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
|
||||||
const char *ttlTbname = "ttl.idx";
|
const char *ttlTbname = "ttl.idx";
|
||||||
const char *ttlV1Tbname = "ttlv1.idx";
|
const char *ttlV1Tbname = "ttlv1.idx";
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) {
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
|
@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
||||||
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
|
||||||
|
if (logBuffer == NULL) {
|
||||||
|
tdbOsFree(pTtlMgr);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
strcpy(logBuffer, logPrefix);
|
||||||
|
pTtlMgr->logPrefix = logBuffer;
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
|
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno));
|
||||||
tdbOsFree(pTtlMgr);
|
tdbOsFree(pTtlMgr);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to fill hash since %s", tstrerror(terrno));
|
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
ttlMgrCleanup(pTtlMgr);
|
ttlMgrCleanup(pTtlMgr);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endNs = taosGetTimestampNs();
|
int64_t endNs = taosGetTimestampNs();
|
||||||
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
|
||||||
endNs - startNs);
|
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
||||||
|
|
||||||
*ppTtlMgr = pTtlMgr;
|
*ppTtlMgr = pTtlMgr;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
|
|
||||||
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
metaInfo("ttl mgr start upgrade");
|
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
|
||||||
|
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
|
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to convert ttl index since %s", tstrerror(terrno));
|
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
|
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
ret = ttlMgrFillCache(pTtlMgr);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to fill hash since %s", tstrerror(terrno));
|
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endNs = taosGetTimestampNs();
|
int64_t endNs = taosGetTimestampNs();
|
||||||
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
|
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
|
||||||
endNs - startNs);
|
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
||||||
_out:
|
_out:
|
||||||
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
||||||
pTtlMgr->pOldTtlIdx = NULL;
|
pTtlMgr->pOldTtlIdx = NULL;
|
||||||
|
@ -130,11 +138,12 @@ _out:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
||||||
|
taosMemoryFree(pTtlMgr->logPrefix);
|
||||||
taosHashCleanup(pTtlMgr->pTtlCache);
|
taosHashCleanup(pTtlMgr->pTtlCache);
|
||||||
taosHashCleanup(pTtlMgr->pDirtyUids);
|
taosHashCleanup(pTtlMgr->pDirtyUids);
|
||||||
tdbTbClose(pTtlMgr->pTtlIdx);
|
tdbTbClose(pTtlMgr->pTtlIdx);
|
||||||
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
taosThreadRwlockDestroy(&pTtlMgr->lock);
|
||||||
tdbOsFree(pTtlMgr);
|
taosMemoryFree(pTtlMgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
|
||||||
|
@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
_out:
|
_out:
|
||||||
ttlMgrULock(pTtlMgr);
|
ttlMgrULock(pTtlMgr);
|
||||||
|
|
||||||
metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid,
|
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
||||||
updCtx->changeTimeMs, updCtx->ttlDays);
|
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
|
if (delCtx->ttlDays == 0) return 0;
|
||||||
ttlMgrWLock(pTtlMgr);
|
ttlMgrWLock(pTtlMgr);
|
||||||
|
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
_out:
|
_out:
|
||||||
ttlMgrULock(pTtlMgr);
|
ttlMgrULock(pTtlMgr);
|
||||||
|
|
||||||
metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid);
|
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -293,6 +303,8 @@ _out:
|
||||||
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
|
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
|
||||||
ttlMgrWLock(pTtlMgr);
|
ttlMgrWLock(pTtlMgr);
|
||||||
|
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
|
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
|
||||||
if (oldData == NULL) {
|
if (oldData == NULL) {
|
||||||
goto _out;
|
goto _out;
|
||||||
|
@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
int ret =
|
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||||
sizeof(dirtryEntry));
|
sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
|
||||||
|
tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
_out:
|
_out:
|
||||||
ttlMgrULock(pTtlMgr);
|
ttlMgrULock(pTtlMgr);
|
||||||
|
|
||||||
metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs);
|
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
||||||
|
pUpdCtimeCtx->changeTimeMs);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -366,7 +379,7 @@ _out:
|
||||||
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
ttlMgrWLock(pTtlMgr);
|
ttlMgrWLock(pTtlMgr);
|
||||||
|
|
||||||
metaInfo("ttl mgr flush start.");
|
metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
|
||||||
|
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
|
|
||||||
|
@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
|
|
||||||
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (cacheEntry == NULL) {
|
if (cacheEntry == NULL) {
|
||||||
metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid,
|
metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix,
|
||||||
pEntry->type);
|
tstrerror(terrno), *pUid, pEntry->type);
|
||||||
goto _out;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STtlIdxKeyV1 ttlKey;
|
STtlIdxKeyV1 ttlKey;
|
||||||
|
@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
||||||
pTxn);
|
pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
||||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type);
|
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter);
|
void *pIterTmp = pIter;
|
||||||
|
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
|
||||||
|
taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pTtlMgr->pDirtyUids);
|
taosHashClear(pTtlMgr->pDirtyUids);
|
||||||
|
@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
_out:
|
_out:
|
||||||
ttlMgrULock(pTtlMgr);
|
ttlMgrULock(pTtlMgr);
|
||||||
|
|
||||||
metaInfo("ttl mgr flush end.");
|
metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -426,7 +441,7 @@ _out:
|
||||||
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
|
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
metaTrace("ttlMgr rlock %p", &pTtlMgr->lock);
|
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||||
|
|
||||||
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
|
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
|
||||||
|
|
||||||
|
@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
|
||||||
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
|
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
metaTrace("ttlMgr wlock %p", &pTtlMgr->lock);
|
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||||
|
|
||||||
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
|
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
|
||||||
|
|
||||||
|
@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
|
||||||
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
|
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
metaTrace("ttlMgr ulock %p", &pTtlMgr->lock);
|
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
|
||||||
|
|
||||||
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
|
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue