From 8872e017af58ef060c4e1edfc0155f5f1331e509 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 17 Jul 2023 11:15:17 +0800 Subject: [PATCH] fix: group table scan issue --- source/libs/executor/src/scanoperator.c | 109 ++++++++++++++---------- 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 46db8b6197..ccb837f26a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -823,6 +823,15 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { + setOperatorCompleted(pOperator); + if (pOperator->dynamicTask) { + taosArrayClear(pInfo->base.pTableListInfo->pTableList); + taosHashClear(pInfo->base.pTableListInfo->map); + } + return NULL; + } + // reset value for the next group data output pOperator->status = OP_OPENED; resetLimitInfoForNextGroup(&pInfo->base.limitInfo); @@ -844,10 +853,55 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { return result; } - setOperatorCompleted(pOperator); return NULL; } +static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { + STableScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + if (pInfo->currentGroupId == -1) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { + setOperatorCompleted(pOperator); + return NULL; + } + + int32_t num = 0; + STableKeyInfo* pList = NULL; + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); + ASSERT(pInfo->base.dataReader == NULL); + + int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, + (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { + pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; + } + } + + SSDataBlock* result = doGroupedTableScan(pOperator); + if (result != NULL) { + if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); + result->info.id.groupId = pKeyInfo->uid; + } + return result; + } + + while (true) { + result = startNextGroupScan(pOperator); + if (result || pOperator->status == OP_EXEC_DONE) { + return result; + } + } + + return result; +} + static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -862,8 +916,15 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } if (pOperator->status == OP_EXEC_DONE) { - pInfo->currentGroupId = 0; - return startNextGroupScan(pOperator); + pInfo->currentGroupId = -1; + pOperator->status = OP_OPENED; + SSDataBlock* result = NULL; + while (true) { + result = startNextGroupScan(pOperator); + if (result || pOperator->status == OP_EXEC_DONE) { + return result; + } + } } } @@ -901,47 +962,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pInfo->scanTimes = 0; } } else { // scan table group by group sequentially - if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { - setOperatorCompleted(pOperator); - return NULL; - } - - int32_t num = 0; - STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - ASSERT(pInfo->base.dataReader == NULL); - - int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, - (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { - pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; - } - } - - SSDataBlock* result = doGroupedTableScan(pOperator); - if (result != NULL) { - if (pInfo->base.pTableListInfo->oneTableForEachGroup) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); - result->info.id.groupId = pKeyInfo->uid; - } - return result; - } - - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { - setOperatorCompleted(pOperator); - if (pOperator->dynamicTask) { - taosArrayClear(pInfo->base.pTableListInfo->pTableList); - taosHashClear(pInfo->base.pTableListInfo->map); - } - return NULL; - } - - return startNextGroupScan(pOperator); + return groupSeqTableScan(pOperator); } }