fix: search memory in ttlMgrFindExpired
This commit is contained in:
parent
8b5448bb2f
commit
a6ba7e678f
|
@ -52,7 +52,9 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv
|
||||||
int metaPrepareAsyncCommit(SMeta *pMeta) {
|
int metaPrepareAsyncCommit(SMeta *pMeta) {
|
||||||
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
|
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
metaWLock(pMeta);
|
||||||
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
|
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
|
||||||
|
metaULock(pMeta);
|
||||||
code = tdbCommit(pMeta->pEnv, pMeta->txn);
|
code = tdbCommit(pMeta->pEnv, pMeta->txn);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -931,21 +931,16 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
|
int metaTtlFindExpired(SMeta *pMeta, int64_t timePointMs, SArray *tbUids, int32_t ttlDropMaxCount) {
|
||||||
metaWLock(pMeta);
|
metaRLock(pMeta);
|
||||||
int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
|
|
||||||
if (ret != 0) {
|
int ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
|
||||||
metaError("ttl failed to flush, ret:%d", ret);
|
|
||||||
goto _err;
|
metaULock(pMeta);
|
||||||
}
|
|
||||||
|
|
||||||
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids, ttlDropMaxCount);
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
metaError("ttl failed to find expired table, ret:%d", ret);
|
metaError("ttl failed to find expired table, ret:%d", ret);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_err:
|
|
||||||
metaULock(pMeta);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -299,7 +299,7 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
|
||||||
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -323,7 +323,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -363,17 +363,37 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
|
||||||
_out:
|
_out:
|
||||||
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
|
||||||
pUpdCtimeCtx->changeTimeMs);
|
pUpdCtimeCtx->changeTimeMs);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
|
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
|
||||||
|
int ret = -1;
|
||||||
|
|
||||||
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
|
STtlIdxKeyV1 ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
|
||||||
STtlExpiredCtx expiredCtx = {
|
STtlExpiredCtx expiredCtx = {
|
||||||
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
|
.ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
|
||||||
return tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
|
ret = tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry);
|
||||||
|
if (ret) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t vIdx = 0;
|
||||||
|
for (size_t i = 0; i < pTbUids->size; i++) {
|
||||||
|
tb_uid_t *pUid = taosArrayGet(pTbUids, i);
|
||||||
|
if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
|
||||||
|
// not in dirty && expired in tdb => must be expired
|
||||||
|
taosArraySet(pTbUids, vIdx, pUid);
|
||||||
|
vIdx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
|
||||||
|
|
||||||
|
_out:
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
|
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
|
||||||
|
|
|
@ -160,7 +160,7 @@ static int32_t vnodePreProcessDropTtlMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // find expired uids
|
{ // find expired uids
|
||||||
tbUids = taosArrayInit(8, sizeof(int64_t));
|
tbUids = taosArrayInit(8, sizeof(tb_uid_t));
|
||||||
if (tbUids == NULL) {
|
if (tbUids == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -945,7 +945,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
char detail[1000] = {0};
|
char detail[1000] = {0};
|
||||||
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
|
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
|
||||||
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
|
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
|
Loading…
Reference in New Issue