From 3d4764b3836a1e7c71741cb2a1598d540647b324 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 2 Feb 2024 13:17:13 +0800 Subject: [PATCH] fix: when rows is greater than merge limit, skip the block --- source/libs/executor/src/tsort.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3559e57ebc..fde5f22aaa 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1043,7 +1043,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } -static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { +static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock, bool *pSkipBlock) { int64_t nRows = 0; int64_t prevRows = 0; void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); @@ -1062,9 +1062,15 @@ static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSH if (pHandle->mergeLimitReachedFn) { pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam); } - keepRows = pHandle->mergeLimit - prevRows; + keepRows = pHandle->mergeLimit > prevRows ? (pHandle->mergeLimit - prevRows) : 0; } - + + if (keepRows == 0) { + *pSkipBlock = true; + return pOrigBlk; + } + + *pSkipBlock = false; SSDataBlock* pBlock = NULL; if (keepRows != pOrigBlk->info.rows) { pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); @@ -1106,8 +1112,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); bool bExtractedBlock = false; + bool bSkipBlock = false; if (pBlk != NULL && pHandle->mergeLimit > 0) { - pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); + pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock); + if (bSkipBlock) { + continue; + } } if (pBlk != NULL) { @@ -1121,7 +1131,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); - void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); @@ -1138,7 +1147,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk);