From 5d9f6fd4812db0bdee7f328c955e1aeb60af5864 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 20 Jul 2023 12:29:34 +0800 Subject: [PATCH] fix: fix limit/offset bugs --- source/libs/executor/src/scanoperator.c | 38 ++++++++++++++----------- source/libs/executor/src/tsort.c | 33 ++++++++++++--------- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6edd7b2d47..27cbdb66cf 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2905,28 +2905,32 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pResBlock); - + STupleHandle* pTupleHandle = NULL; while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; + while (1) { + pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (pResBlock->info.rows >= capacity) { + break; + } } - appendOneRowToDataBlock(pResBlock, pTupleHandle); - if (pResBlock->info.rows >= capacity) { - break; + if (tsortIsClosed(pHandle)) { + terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); } + + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); + qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, + pInfo->limitInfo.numOfOutputRows); + if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) { + break; + } } - - if (tsortIsClosed(pHandle)) { - terrno = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, terrno); - } - - bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); - qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, - pInfo->limitInfo.numOfOutputRows); - return (pResBlock->info.rows > 0) ? pResBlock : NULL; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 18c46cd03f..b51f515240 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -714,7 +714,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { taosArrayDestroy(pResList); return code; } - int nRows = 0; + + int nMergedRows = 0; + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { if (tsortIsClosed(pHandle)) { @@ -722,15 +724,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } - if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { - break; - } - SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; } - nRows += pDataBlock->info.rows; int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); @@ -750,7 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); + nMergedRows += pDataBlock->info.rows; + blockDataCleanup(pDataBlock); + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + break; + } } sortComparCleanup(&pHandle->cmpParam); @@ -915,15 +917,12 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO SMultiwayMergeTreeInfo* pTree = NULL; tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); - int32_t numEnded = 0; int32_t nRows = 0; - + int32_t nMergedRows = 0; + bool mergeLimitReached = false; size_t blkPgSz = pgHeaderSz; while (nRows < totalRows) { - if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { - break; - } int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; @@ -931,9 +930,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + nMergedRows += pHandle->pDataBlock->info.rows; + blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; bufInc = getPageBufIncForRow(minBlk, minRow, 0); + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + mergeLimitReached = true; + break; + } } blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); @@ -943,14 +948,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; - ++numEnded; } else { ++sup.aRowIdx[minIdx]; } tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } if (pHandle->pDataBlock->info.rows > 0) { - appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (!mergeLimitReached) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + nMergedRows += pHandle->pDataBlock->info.rows; + } blockDataCleanup(pHandle->pDataBlock); } SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);