From aab31f655c14a06df56be761a397fb51b566b240 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Feb 2023 16:57:05 +0800 Subject: [PATCH] fix(query): fix bug in multi-group limit/offset of the merge sort . --- source/libs/executor/src/sortoperator.c | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 6d3da3e111..98ef6b8a36 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -544,7 +544,6 @@ typedef struct SMultiwayMergeOperatorInfo { SSDataBlock* pIntermediateBlock; // to hold the intermediate result int64_t startTs; // sort start time bool groupSort; - bool hasGroupId; uint64_t groupId; STupleHandle* prefetchedTuple; } SMultiwayMergeOperatorInfo; @@ -591,7 +590,9 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, SSDataBlock* p) { +static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, + SSDataBlock* p, bool* newgroup) { + *newgroup = false; while (1) { STupleHandle* pTupleHandle = NULL; @@ -600,8 +601,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pTupleHandle = tsortNextTuple(pHandle); } else { pTupleHandle = pInfo->prefetchedTuple; - pInfo->groupId = tsortGetGroupId(pTupleHandle); pInfo->prefetchedTuple = NULL; + uint64_t gid = tsortGetGroupId(pTupleHandle); + if (gid != pInfo->groupId) { + *newgroup = true; + pInfo->groupId = gid; + } } } else { pTupleHandle = tsortNextTuple(pHandle); @@ -614,12 +619,10 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* if (pInfo->groupSort) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (!pInfo->hasGroupId) { + if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = tupleGroupId; pInfo->groupId = tupleGroupId; - pInfo->hasGroupId = true; - appendOneRowToDataBlock(p, pTupleHandle); - } else if (pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); } else { pInfo->prefetchedTuple = pTupleHandle; break; @@ -632,11 +635,6 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } } - - if (pInfo->groupSort) { - pInfo->hasGroupId = false; - } - } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, @@ -660,14 +658,18 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } SSDataBlock* p = pInfo->pIntermediateBlock; + bool newgroup = false; while (1) { - doGetSortedBlockData(pInfo, pHandle, capacity, p); + doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup); if (p->info.rows == 0) { break; } - // todo fix it: we need to decide whether this block is belonged to previous group or not . + if (newgroup) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); if (limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo);