diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 724d6638db..873b95b29f 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -106,6 +106,8 @@ typedef struct SMCtbCursor { void *pVal; int kLen; int vLen; + int8_t paused; + int lock; } SMCtbCursor; typedef struct SRowBuffPos { @@ -295,7 +297,9 @@ int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); */ SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock); - void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock); + int32_t (*resumeCtbCursor)(SMCtbCursor* pCtbCur, int8_t first); + void (*pauseCtbCursor)(SMCtbCursor* pCtbCur); + void (*closeCtbCursor)(SMCtbCursor *pCtbCur); tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur); } SStoreMeta; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e3b2d3e41e..4f6a5bb765 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -168,7 +168,9 @@ int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int64_t metaGetTimeSeriesNum(SMeta* pMeta); SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock); -void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock); +int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first); +void metaPauseCtbCursor(SMCtbCursor* pCtbCur); +void metaCloseCtbCursor(SMCtbCursor* pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid); void metaCloseStbCursor(SMStbCursor* pStbCur); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0dea0f346d..a46186c92e 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -423,40 +423,75 @@ SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) { pCtbCur->pMeta = pMeta; pCtbCur->suid = uid; - if (lock) { - metaRLock(pMeta); - } + pCtbCur->lock = lock; + pCtbCur->paused = 1; - ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL); + ret = metaResumeCtbCursor(pCtbCur, 1); if (ret < 0) { - metaULock(pMeta); - taosMemoryFree(pCtbCur); return NULL; } - - // move to the suid - ctbIdxKey.suid = uid; - ctbIdxKey.uid = INT64_MIN; - tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c); - if (c > 0) { - tdbTbcMoveToNext(pCtbCur->pCur); - } - return pCtbCur; } -void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) { +void metaCloseCtbCursor(SMCtbCursor *pCtbCur) { if (pCtbCur) { - if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta); - if (pCtbCur->pCur) { - tdbTbcClose(pCtbCur->pCur); + if (!pCtbCur->paused) { + if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta); + if (pCtbCur->pCur) { + tdbTbcClose(pCtbCur->pCur); + } + } + tdbFree(pCtbCur->pKey); + tdbFree(pCtbCur->pVal); + } + taosMemoryFree(pCtbCur); +} - tdbFree(pCtbCur->pKey); - tdbFree(pCtbCur->pVal); +void metaPauseCtbCursor(SMCtbCursor* pCtbCur) { + if (!pCtbCur->paused) { + tdbTbcClose((TBC*)pCtbCur->pCur); + if (pCtbCur->lock) { + metaULock(pCtbCur->pMeta); + } + pCtbCur->paused = 1; + } +} + +int32_t metaResumeCtbCursor(SMCtbCursor* pCtbCur, int8_t first) { + if (pCtbCur->paused) { + pCtbCur->paused = 0; + + if (pCtbCur->lock) { + metaRLock(pCtbCur->pMeta); + } + int ret = 0; + ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL); + if (ret < 0) { + metaCloseCtbCursor(pCtbCur); + return -1; } - taosMemoryFree(pCtbCur); + if (first) { + SCtbIdxKey ctbIdxKey; + // move to the suid + ctbIdxKey.suid = pCtbCur->suid; + ctbIdxKey.uid = INT64_MIN; + int c = 0; + tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c); + if (c > 0) { + tdbTbcMoveToNext(pCtbCur->pCur); + } + } else { + int c = 0; + ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c); + if (c < 0) { + tdbTbcMoveToPrev(pCtbCur->pCur); + } else { + tdbTbcMoveToNext(pCtbCur->pCur); + } + } } + return 0; } tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { @@ -1414,7 +1449,7 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { } taosHashCleanup(pSepecifiedUidMap); - metaCloseCtbCursor(pCur, 1); + metaCloseCtbCursor(pCur); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index dca8dd271c..c72ecd4824 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -98,6 +98,8 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache; pMeta->openCtbCursor = metaOpenCtbCursor; + pMeta->resumeCtbCursor = metaResumeCtbCursor; + pMeta->pauseCtbCursor = metaPauseCtbCursor; pMeta->closeCtbCursor = metaCloseCtbCursor; pMeta->ctbCursorNext = metaCtbCursorNext; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 48f8ec021d..a40accf4fe 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -452,7 +452,7 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { taosArrayPush(list, &info); } - metaCloseCtbCursor(pCur, 1); + metaCloseCtbCursor(pCur); return TSDB_CODE_SUCCESS; } @@ -473,7 +473,7 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) { taosArrayPush(list, &id); } - metaCloseCtbCursor(pCur, 1); + metaCloseCtbCursor(pCur); return TSDB_CODE_SUCCESS; } @@ -536,7 +536,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { ++(*num); } - metaCloseCtbCursor(pCur, 0); + metaCloseCtbCursor(pCur); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 184cb558fe..4caa604c3e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2863,12 +2863,14 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { if (pInfo->pCtbCursor == NULL) { pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); + } else { + pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0); } SArray* aUidTags = pInfo->aUidTags; SArray* aFilterIdxs = pInfo->aFilterIdxs; int32_t count = 0; - + bool ctbCursorFinished = false; while (1) { taosArrayClearEx(aUidTags, tagScanFreeUidTag); taosArrayClear(aFilterIdxs); @@ -2878,6 +2880,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { SMCtbCursor* pCur = pInfo->pCtbCursor; tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor); if (uid == 0) { + ctbCursorFinished = true; break; } STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; @@ -2906,7 +2909,15 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { break; } } - + + if (count > 0) { + pAPI->metaFn.pauseCtbCursor(pInfo->pCtbCursor); + } + if (count == 0 || ctbCursorFinished) { + pAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor); + pInfo->pCtbCursor = NULL; + setOperatorCompleted(pOperator); + } pRes->info.rows = count; pOperator->resultInfo.totalRows += count; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; @@ -2971,7 +2982,7 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; if (pInfo->pCtbCursor != NULL) { - pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); + pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor); } taosHashCleanup(pInfo->filterCtx.colHash); taosArrayDestroy(pInfo->filterCtx.cInfoList);