diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index ff42643b75..265d9506f5 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -197,7 +197,7 @@ typedef struct SStoreCacheReader { SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks); void (*closeReader)(void *pReader); int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, - SArray *pTableUidList); + SArray *pTableUidList, bool* pGotAllRows); int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables); } SStoreCacheReader; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2f56aac7d6..ffe6ca2e44 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -186,7 +186,7 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, SArray *pFuncTypeList, SColumnInfo *pkCol, int32_t numOfPks); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, - SArray *pTableUids); + SArray *pTableUids, bool *pGotAllRows); void tsdbCacherowsReaderClose(void *pReader); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 72f2052867..f1051daf8c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -433,7 +433,7 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) { } int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds, - SArray* pTableUidList) { + SArray* pTableUidList, bool* pGotAll) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -642,7 +642,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem); } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { - for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { + int32_t i = pr->tableIndex; + for (; i < pr->numOfTables; ++i) { tb_uid_t uid = pTableList[i].uid; if ((code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype)) != 0) { @@ -673,9 +674,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ++pr->tableIndex; if (pResBlock->info.rows >= pResBlock->info.capacity) { - goto _end; + break; } } + + if (pGotAll && i == pr->numOfTables) { + *pGotAll = true; + } } else { code = TSDB_CODE_INVALID_PARA; } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b575613efe..7c1ca294e7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -48,6 +48,7 @@ typedef struct SCacheRowsScanInfo { SArray* pFuncTypeList; int32_t numOfPks; SColumnInfo pkCol; + bool gotAll; } SCacheRowsScanInfo; static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); @@ -292,12 +293,12 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pBufRes->info.rows) { + if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) { blockDataCleanup(pBufRes); taosArrayClear(pInfo->pUidList); - code = - pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); + code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, + pInfo->pUidList, &pInfo->gotAll); QUERY_CHECK_CODE(code, lino, _end); // check for tag values @@ -394,7 +395,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { taosArrayClear(pInfo->pUidList); code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds, - pInfo->pUidList); + pInfo->pUidList, NULL); QUERY_CHECK_CODE(code, lino, _end); pInfo->currentGroupIndex += 1;