From 3f441bb8cfa9d2652fe86cf25c48c24930a6c940 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 22 Jan 2024 15:18:49 +0800 Subject: [PATCH] feat: extract rows within limit before sort to disk --- source/libs/executor/src/tsort.c | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..24df12d06b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1040,6 +1040,33 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getBlockWithinLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk) { + int64_t keepRows = pOrigBlk->info.rows; + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + if (nRows >= pHandle->mergeLimit) { + keepRows = pHandle->mergeLimit - prevRows; + } + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + } else { + pBlock = createOneDataBlock(pOrigBlk, true); + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1089,17 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + if (pBlk != NULL && pHandle->mergeLimit != -1) { + pBlk = getBlockWithinLimit(pHandle, mTableNumRows, pBlk); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1108,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1091,7 +1126,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1165,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; }