diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c56dab1e33..18c46cd03f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -24,6 +24,7 @@ #include "tpagedbuf.h" #include "tsort.h" #include "tutil.h" +#include "tsimplehash.h" struct STupleHandle { SSDataBlock* pBlock; @@ -975,15 +976,27 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int32_t szSort = 0; SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); - SSDataBlock* blk = createOneDataBlock(pBlk, true); - taosArrayPush(aBlkSort, &blk); + + void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); + if (ppBlk != NULL) { + SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); + blockDataMerge(tBlk, pBlk); + } else { + SSDataBlock* tBlk = createOneDataBlock(pBlk, true); + tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); + taosArrayPush(aBlkSort, &tBlk); + } } + if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { + tSimpleHashClear(mUidBlk); + int64_t p = taosGetTimestampUs(); sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); int64_t el = taosGetTimestampUs() - p; @@ -1000,7 +1013,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { break; }; } - + tSimpleHashCleanup(mUidBlk); taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);