From fae4f2c4ed96427cfefa25c6609455ed804d1e46 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Jan 2023 18:08:34 +0800 Subject: [PATCH 1/6] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 3 ++- source/libs/executor/src/exchangeoperator.c | 30 ++++++--------------- source/libs/executor/src/executil.c | 4 +++ source/libs/executor/src/projectoperator.c | 29 +++----------------- source/libs/executor/src/scanoperator.c | 16 +++++------ source/libs/executor/src/sortoperator.c | 28 +++++-------------- 6 files changed, 32 insertions(+), 78 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c68f7c4697..4ae178d508 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -704,9 +704,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SDiskbasedBuf* pBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 037b33dc9f..08b7d371e2 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -707,6 +707,8 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { } int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.id.groupId; @@ -750,36 +752,20 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa // set current group id pLimitInfo->currentGroupId = pBlock->info.id.groupId; - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // current group limitation is reached, and future blocks of this group need to be discarded. - if (pBlock->info.rows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } + return PROJECT_RETRIEVE_DONE; } - - return PROJECT_RETRIEVE_DONE; } // todo optimize performance // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // they may not belong to the same group the limit/offset value is not valid in this case. - if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 || - pLimitInfo->slimit.limit != -1) { + if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) { return PROJECT_RETRIEVE_DONE; } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 757324a773..92d52fbb0a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1749,6 +1749,10 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { pLimitInfo->slimit.offset != -1); } +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo) { + return (pLimitInfo->slimit.limit != -1 || pLimitInfo->slimit.offset != -1); +} + void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3e3610827b..b1dc217bf5 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -185,36 +185,15 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS SOperatorInfo* pOperator) { // set current group id pLimitInfo->currentGroupId = groupId; - - if (pLimitInfo->remainOffset >= pBlock->info.rows) { - pLimitInfo->remainOffset -= pBlock->info.rows; - blockDataCleanup(pBlock); + bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pOperator->pTaskInfo); + if (pBlock->info.rows == 0) { return PROJECT_RETRIEVE_CONTINUE; - } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) { - blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); - pLimitInfo->remainOffset = 0; - } - - // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { - int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keepRows); - - // TODO: optimize it later when partition by + limit - // all retrieved requirement has been fulfilled, let's finish this - if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || - (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { + } else { + if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); - } else { - // Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data - // from next group. So let's continue this retrieve process - if (keepRows == 0) { - return PROJECT_RETRIEVE_CONTINUE; - } } } - pLimitInfo->numOfOutputRows += pBlock->info.rows; return PROJECT_RETRIEVE_DONE; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 813763fffa..2813ef3505 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -256,12 +256,11 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo } } -// todo handle the slimit info -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SLimit* pLimit = &pLimitInfo->limit; const char* id = GET_TASKID(pTaskInfo); - if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { + if (pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; blockDataEmpty(pBlock); @@ -276,12 +275,14 @@ bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) { // limit the output rows int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows); - blockDataKeepFirstNRows(pBlock, keep); + + pLimitInfo->numOfOutputRows += pBlock->info.rows; qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); return true; } + pLimitInfo->numOfOutputRows += pBlock->info.rows; return false; } @@ -393,13 +394,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca } } - bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo); if (limitReached) { // set operator flag is done setOperatorCompleted(pOperator); } pCost->totalRows += pBlock->info.rows; - pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows; return TSDB_CODE_SUCCESS; } @@ -2714,9 +2714,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator); - pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows; - + applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%d, limit:%"PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 97b4fd9dc4..e91d41897d 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -222,6 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } + // multi-group case not handle here SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -236,28 +237,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { continue; } - // todo add the limit/offset info - if (pInfo->limitInfo.remainOffset > 0) { - if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { - pInfo->limitInfo.remainOffset -= pBlock->info.rows; - continue; - } - - blockDataTrimFirstNRows(pBlock, pInfo->limitInfo.remainOffset); - pInfo->limitInfo.remainOffset = 0; + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo); + if (limitReached) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); } - if (pInfo->limitInfo.limit.limit > 0 && - pInfo->limitInfo.limit.limit <= pInfo->limitInfo.numOfOutputRows + blockDataGetNumOfRows(pBlock)) { - int32_t remain = pInfo->limitInfo.limit.limit - pInfo->limitInfo.numOfOutputRows; - blockDataKeepFirstNRows(pBlock, remain); - } - - size_t numOfRows = blockDataGetNumOfRows(pBlock); - pInfo->limitInfo.numOfOutputRows += numOfRows; - pOperator->resultInfo.totalRows += numOfRows; - - if (numOfRows > 0) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + if (pBlock->info.rows > 0) { break; } } @@ -680,7 +666,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData break; } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo, pOperator); + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); } From 7803104b7e31ab9283b29aa33ec391804b721123 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Jan 2023 22:50:35 +0800 Subject: [PATCH 2/6] fix(query): do some internal refactor, and identify a bug. --- source/common/src/tdatablock.c | 1 + source/libs/executor/src/projectoperator.c | 11 ++++++++++- source/libs/executor/src/sortoperator.c | 2 ++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 43f272d599..f41eb1adaf 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1431,6 +1431,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.rows = 0; pBlock->info.capacity = 0; pBlock->info.rowSize = 0; + pBlock->info.id = pDataBlock->info.id; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index b1dc217bf5..d641810cee 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -90,7 +90,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? false : pProjPhyNode->mergeDataBlock; + + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + pInfo->mergeDataBlocks = false; + } else { + if (!pProjPhyNode->ignoreGroupId) { + pInfo->mergeDataBlocks = false; + } else { + pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; + } + } int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index e91d41897d..6d3da3e111 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -237,6 +237,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { continue; } + // there are bugs? bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); @@ -666,6 +667,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData break; } + // todo fix it: we need to decide whether this block is belonged to previous group or not . bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); From a898be4f7d1bc5fb9c528353d4c2ddf039c86b1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Jan 2023 23:38:31 +0800 Subject: [PATCH 3/6] fix(query): set correct total rsp rows. --- source/libs/executor/src/exchangeoperator.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 08b7d371e2..e5089ab4a9 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -218,10 +218,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { if (status == PROJECT_RETRIEVE_CONTINUE) { continue; } else if (status == PROJECT_RETRIEVE_DONE) { - size_t rows = pBlock->info.rows; - pExchangeInfo->limitInfo.numOfOutputRows += rows; - - if (rows == 0) { + if (pBlock->info.rows == 0) { setOperatorCompleted(pOperator); return NULL; } else { From beb3de8530ddd38e29efdc094badaf46158e360b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 28 Jan 2023 10:53:43 +0800 Subject: [PATCH 4/6] fix: limit push down error --- source/libs/planner/src/planSpliter.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d85e4ca10d..4c8b996a75 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -560,6 +560,8 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla if (NULL == pMerge->node.pLimit) { code = TSDB_CODE_OUT_OF_MEMORY; } + ((SLimitNode*)pSplitNode->pLimit)->limit += ((SLimitNode*)pSplitNode->pLimit)->offset; + ((SLimitNode*)pSplitNode->pLimit)->offset = 0; } if (TSDB_CODE_SUCCESS == code) { if (NULL == pSubplan) { From 8258c68b6dd76e7c14b50ceb3f4fde4183ecb79f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 28 Jan 2023 11:11:37 +0800 Subject: [PATCH 5/6] fix: subquery output ignores group id --- source/libs/planner/src/planLogicCreater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 084d99cae5..bd1823a770 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1016,7 +1016,7 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->ignoreGroupId = (NULL == pSelect->pPartitionByList); + pProject->ignoreGroupId = pSelect->isSubquery ? true : (NULL == pSelect->pPartitionByList); pProject->node.groupAction = (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; From aab31f655c14a06df56be761a397fb51b566b240 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Feb 2023 16:57:05 +0800 Subject: [PATCH 6/6] fix(query): fix bug in multi-group limit/offset of the merge sort . --- source/libs/executor/src/sortoperator.c | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 6d3da3e111..98ef6b8a36 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -544,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo { SSDataBlock* pIntermediateBlock; // to hold the intermediate result int64_t startTs; // sort start time bool groupSort; - bool hasGroupId; uint64_t groupId; STupleHandle* prefetchedTuple; } SMultiwayMergeOperatorInfo; @@ -591,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) { +static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, + SSDataBlock* p, bool* newgroup) { + *newgroup = false; while (1) { STupleHandle* pTupleHandle = NULL; @@ -600,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pTupleHandle = tsortNextTuple(pHandle); } else { pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); pInfo->prefetchedTuple = NULL; + uint64_t gid = tsortGetGroupId(pTupleHandle); + if (gid != pInfo->groupId) { + *newgroup = true; + pInfo->groupId = gid; + } } } else { pTupleHandle = tsortNextTuple(pHandle); @@ -614,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* if (pInfo->groupSort) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { + if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = tupleGroupId; pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); } else { pInfo->prefetchedTuple = pTupleHandle; break; @@ -632,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } } - - if (pInfo->groupSort) { - pInfo->hasGroupId = false; - } - } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, @@ -660,14 +658,18 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } SSDataBlock* p = pInfo->pIntermediateBlock; + bool newgroup = false; while (1) { - doGetSortedBlockData(pInfo, pHandle, capacity, p); + doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup); if (p->info.rows == 0) { break; } - // todo fix it: we need to decide whether this block is belonged to previous group or not . + if (newgroup) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo);