feat: (errcode) metaTtl.c
This commit is contained in:
parent
b8b9dd1c47
commit
4ec034a266
|
@ -85,18 +85,18 @@ typedef struct {
|
||||||
TXN* pTxn;
|
TXN* pTxn;
|
||||||
} STtlDelTtlCtx;
|
} STtlDelTtlCtx;
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix, int32_t flushThreshold);
|
int32_t ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix, int32_t flushThreshold);
|
||||||
void ttlMgrClose(STtlManger* pTtlMgr);
|
void ttlMgrClose(STtlManger* pTtlMgr);
|
||||||
|
|
||||||
bool ttlMgrNeedUpgrade(TDB* pEnv);
|
bool ttlMgrNeedUpgrade(TDB* pEnv);
|
||||||
int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
|
int32_t ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
int32_t ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
|
||||||
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
int32_t ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
|
||||||
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
int32_t ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
|
||||||
|
|
||||||
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
int32_t ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
|
||||||
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids, int32_t ttlDropMaxCount);
|
int32_t ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids, int32_t ttlDropMaxCount);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,39 +46,38 @@ static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
|
||||||
const char *ttlTbname = "ttl.idx";
|
const char *ttlTbname = "ttl.idx";
|
||||||
const char *ttlV1Tbname = "ttlv1.idx";
|
const char *ttlV1Tbname = "ttlv1.idx";
|
||||||
|
|
||||||
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
|
int32_t ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
*ppTtlMgr = NULL;
|
*ppTtlMgr = NULL;
|
||||||
|
|
||||||
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
|
||||||
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (pTtlMgr == NULL) TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
|
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
|
||||||
if (logBuffer == NULL) {
|
if (logBuffer == NULL) {
|
||||||
tdbOsFree(pTtlMgr);
|
tdbOsFree(pTtlMgr);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
strcpy(logBuffer, logPrefix);
|
(void)strcpy(logBuffer, logPrefix);
|
||||||
pTtlMgr->logPrefix = logBuffer;
|
pTtlMgr->logPrefix = logBuffer;
|
||||||
pTtlMgr->flushThreshold = flushThreshold;
|
pTtlMgr->flushThreshold = flushThreshold;
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
code = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno));
|
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(code));
|
||||||
tdbOsFree(pTtlMgr);
|
tdbOsFree(pTtlMgr);
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
|
||||||
if (ret < 0) {
|
|
||||||
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
ttlMgrCleanup(pTtlMgr);
|
ttlMgrCleanup(pTtlMgr);
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t endNs = taosGetTimestampNs();
|
int64_t endNs = taosGetTimestampNs();
|
||||||
|
@ -86,7 +85,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo
|
||||||
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
|
||||||
|
|
||||||
*ppTtlMgr = pTtlMgr;
|
*ppTtlMgr = pTtlMgr;
|
||||||
return TSDB_CODE_SUCCESS;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
|
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
|
||||||
|
@ -99,37 +98,34 @@ bool ttlMgrNeedUpgrade(TDB *pEnv) {
|
||||||
return needUpgrade;
|
return needUpgrade;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
int32_t ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
|
||||||
SMeta *meta = (SMeta *)pMeta;
|
SMeta *meta = (SMeta *)pMeta;
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
|
if (!tdbTbExist(ttlTbname, meta->pEnv)) TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
|
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
|
||||||
|
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
|
|
||||||
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
code = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno));
|
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
|
if ((code = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta)) != TSDB_CODE_SUCCESS) {
|
||||||
if (ret < 0) {
|
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
|
if ((code = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn)) != TSDB_CODE_SUCCESS) {
|
||||||
if (ret < 0) {
|
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ttlMgrFillCache(pTtlMgr);
|
if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
|
||||||
if (ret < 0) {
|
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,14 +137,14 @@ _out:
|
||||||
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
tdbTbClose(pTtlMgr->pOldTtlIdx);
|
||||||
pTtlMgr->pOldTtlIdx = NULL;
|
pTtlMgr->pOldTtlIdx = NULL;
|
||||||
|
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
|
||||||
taosMemoryFree(pTtlMgr->logPrefix);
|
taosMemoryFree(pTtlMgr->logPrefix);
|
||||||
taosHashCleanup(pTtlMgr->pTtlCache);
|
taosHashCleanup(pTtlMgr->pTtlCache);
|
||||||
taosHashCleanup(pTtlMgr->pDirtyUids);
|
taosHashCleanup(pTtlMgr->pDirtyUids);
|
||||||
tdbTbClose(pTtlMgr->pTtlIdx);
|
(void)tdbTbClose(pTtlMgr->pTtlIdx);
|
||||||
taosMemoryFree(pTtlMgr);
|
taosMemoryFree(pTtlMgr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,30 +211,30 @@ static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void
|
||||||
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
|
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
|
||||||
SConvertData *pData = (SConvertData *)pConvertData;
|
SConvertData *pData = (SConvertData *)pConvertData;
|
||||||
|
|
||||||
STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
|
STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
|
||||||
tb_uid_t uid = ttlKey->uid;
|
tb_uid_t uid = ttlKey->uid;
|
||||||
int64_t ttlDays = 0;
|
int64_t ttlDays = 0;
|
||||||
|
|
||||||
int ret = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays);
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (ret < 0) {
|
if ((code = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays)) != TSDB_CODE_SUCCESS) {
|
||||||
metaError("ttlMgr convert failed to get ttl since %s", tstrerror(terrno));
|
metaError("ttlMgr convert failed to get ttl since %s", tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
|
STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
|
||||||
ret = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
|
code = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
|
||||||
if (ret < 0) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
metaError("ttlMgr convert failed to upsert since %s", tstrerror(terrno));
|
metaError("ttlMgr convert failed to upsert since %s", tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
|
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
|
||||||
|
@ -248,7 +244,10 @@ static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const voi
|
||||||
|
|
||||||
int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
|
int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
|
||||||
if (c > 0) {
|
if (c > 0) {
|
||||||
taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid);
|
if (NULL == taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid)) {
|
||||||
|
metaError("ttlMgr find expired failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
pCtx->count++;
|
pCtx->count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,16 +261,16 @@ static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||||
|
|
||||||
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
||||||
|
|
||||||
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
int code = TSDB_CODE_SUCCESS;
|
||||||
if (ret < 0) {
|
if ((code = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry)) != TSDB_CODE_SUCCESS) {
|
||||||
metaError("failed to convert since %s", tstrerror(terrno));
|
metaError("failed to convert since %s", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInfo("ttlMgr convert end.");
|
metaInfo("ttlMgr convert end.");
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
if (updCtx->ttlDays == 0) return 0;
|
if (updCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
|
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
|
||||||
|
@ -280,56 +279,55 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
.changeTimeMsDirty = updCtx->changeTimeMs};
|
.changeTimeMsDirty = updCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
int32_t code = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
code = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttlMgrNeedFlush(pTtlMgr)) {
|
if (ttlMgrNeedFlush(pTtlMgr)) {
|
||||||
ttlMgrFlush(pTtlMgr, updCtx->pTxn);
|
(void)ttlMgrFlush(pTtlMgr, updCtx->pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
||||||
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
||||||
|
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
if (delCtx->ttlDays == 0) return 0;
|
if (delCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
int32_t code = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttlMgrNeedFlush(pTtlMgr)) {
|
if (ttlMgrNeedFlush(pTtlMgr)) {
|
||||||
ttlMgrFlush(pTtlMgr, delCtx->pTxn);
|
(void)ttlMgrFlush(pTtlMgr, delCtx->pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
||||||
|
TAOS_RETURN(code);
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
|
int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
|
||||||
int ret = 0;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
|
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
|
||||||
if (oldData == NULL) {
|
if (oldData == NULL) {
|
||||||
|
@ -342,43 +340,39 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
|
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
code = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||||
sizeof(dirtryEntry));
|
sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix,
|
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix,
|
||||||
tstrerror(terrno));
|
tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ttlMgrNeedFlush(pTtlMgr)) {
|
if (ttlMgrNeedFlush(pTtlMgr)) {
|
||||||
ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
|
(void)ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
||||||
pUpdCtimeCtx->changeTimeMs);
|
pUpdCtimeCtx->changeTimeMs);
|
||||||
|
TAOS_RETURN(code);
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
|
int32_t ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
|
||||||
int ret = -1;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
|
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
|
||||||
STtlExpiredCtx expiredCtx = {
|
STtlExpiredCtx expiredCtx = {
|
||||||
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
|
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
|
||||||
ret = tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
|
TAOS_CHECK_GOTO(tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry), NULL, _out);
|
||||||
if (ret) {
|
|
||||||
goto _out;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t vIdx = 0;
|
size_t vIdx = 0;
|
||||||
for (size_t i = 0; i < pTbUids->size; i++) {
|
for (size_t i = 0; i < pTbUids->size; i++) {
|
||||||
|
@ -393,20 +387,20 @@ int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids,
|
||||||
taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
|
taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
|
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
|
||||||
return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
|
return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
int64_t startNs = taosGetTimestampNs();
|
int64_t startNs = taosGetTimestampNs();
|
||||||
int64_t endNs = startNs;
|
int64_t endNs = startNs;
|
||||||
|
|
||||||
metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
|
metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
|
||||||
|
|
||||||
int ret = -1;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pTtlMgr->pDirtyUids, NULL);
|
void *pIter = taosHashIterate(pTtlMgr->pDirtyUids, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
|
@ -415,8 +409,8 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
|
|
||||||
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (cacheEntry == NULL) {
|
if (cacheEntry == NULL) {
|
||||||
metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix,
|
metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
|
||||||
tstrerror(terrno), *pUid, pEntry->type);
|
pEntry->type);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,26 +422,26 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
|
|
||||||
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
||||||
// delete old key & upsert new key
|
// delete old key & upsert new key
|
||||||
tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
|
(void)tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
|
||||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
|
code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
|
||||||
sizeof(cacheEntry->ttlDaysDirty), pTxn);
|
sizeof(cacheEntry->ttlDaysDirty), pTxn);
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
|
cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
|
||||||
cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
|
cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
|
||||||
} else if (pEntry->type == ENTRY_TYPE_DELETE) {
|
} else if (pEntry->type == ENTRY_TYPE_DELETE) {
|
||||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (ret < 0) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -457,16 +451,16 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
|
|
||||||
void *pIterTmp = pIter;
|
void *pIterTmp = pIter;
|
||||||
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
|
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
|
||||||
taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
|
(void)taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pTtlMgr->pDirtyUids);
|
taosHashClear(pTtlMgr->pDirtyUids);
|
||||||
|
|
||||||
ret = 0;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
endNs = taosGetTimestampNs();
|
endNs = taosGetTimestampNs();
|
||||||
metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
|
metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
|
||||||
|
|
||||||
return ret;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue