enh: retrieveRows no more if got all data

This commit is contained in:
Shungang Li 2024-09-03 19:16:10 +08:00
parent fcf39a444a
commit 0fd4928dc2
4 changed files with 15 additions and 9 deletions

View File

@ -197,7 +197,7 @@ typedef struct SStoreCacheReader {
SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks);
void (*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);
SArray *pTableUidList, bool* pGotAllRows);
int32_t (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
} SStoreCacheReader;

View File

@ -186,7 +186,7 @@ int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray *pFuncTypeList, SColumnInfo *pkCol, int32_t numOfPks);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
SArray *pTableUids, bool *pGotAllRows);
void tsdbCacherowsReaderClose(void *pReader);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);

View File

@ -433,7 +433,7 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds,
SArray* pTableUidList) {
SArray* pTableUidList, bool* pGotAll) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
@ -642,7 +642,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem);
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
int32_t i = pr->tableIndex;
for (; i < pr->numOfTables; ++i) {
tb_uid_t uid = pTableList[i].uid;
if ((code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype)) != 0) {
@ -673,9 +674,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
++pr->tableIndex;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
goto _end;
break;
}
}
if (pGotAll && i == pr->numOfTables) {
*pGotAll = true;
}
} else {
code = TSDB_CODE_INVALID_PARA;
}

View File

@ -48,6 +48,7 @@ typedef struct SCacheRowsScanInfo {
SArray* pFuncTypeList;
int32_t numOfPks;
SColumnInfo pkCol;
bool gotAll;
} SCacheRowsScanInfo;
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
@ -292,12 +293,12 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pInfo->indexOfBufferedRes >= pBufRes->info.rows) {
if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) {
blockDataCleanup(pBufRes);
taosArrayClear(pInfo->pUidList);
code =
pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList, &pInfo->gotAll);
QUERY_CHECK_CODE(code, lino, _end);
// check for tag values
@ -394,7 +395,7 @@ static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
taosArrayClear(pInfo->pUidList);
code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
pInfo->pUidList, NULL);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->currentGroupIndex += 1;