From 81abf3fe6d38eb4bc8616dfb1df85b09fac6a842 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 19:04:50 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/src/tsort.c | 95 +++++++++++++++++++------------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6f690b8911..21e9e5a70d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1538,53 +1538,76 @@ static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSD return inc; } +static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) { + memset(pSup, 0, sizeof(SBlkMergeSupport)); + + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + + pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t)); + pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*)); + pSup->tsOrder = tsOrder; + pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*)); + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pBlockList, i); + SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId); + pSup->aTs[i] = (int64_t*)col->pData; + pSup->aRowIdx[i] = 0; + pSup->aBlks[i] = pBlock; + } + + pSup->pPkOrder = pPkOrderInfo; + return TSDB_CODE_SUCCESS; +} + +static void cleanupMergeSup(SBlkMergeSupport* pSup) { + taosMemoryFree(pSup->aRowIdx); + taosMemoryFree(pSup->aTs); + taosMemoryFree(pSup->aBlks); +} + +static int32_t getTotalRows(SArray* pBlockList) { + int32_t totalRows = 0; + + for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) { + SSDataBlock* blk = taosArrayGetP(pBlockList, i); + totalRows += blk->info.rows; + } + + return totalRows; +} + static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; - int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); - int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); + int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); - int32_t numBlks = taosArrayGetSize(aBlk); + + SBlkMergeSupport sup = {0}; SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); - SBlkMergeSupport sup = {0}; - sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); - sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); - sup.tsOrder = pOrigBlockTsOrder->order; - sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*)); - - for (int32_t i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId); - sup.aTs[i] = (int64_t*)col->pData; - sup.aRowIdx[i] = 0; - sup.aBlks[i] = blk; - } - + SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); } - sup.pPkOrder = pOrigBlockPkOrder; - int32_t totalRows = 0; - for (int32_t i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - totalRows += blk->info.rows; - } + initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder); + + int32_t totalRows = getTotalRows(aBlk); SMultiwayMergeTreeInfo* pTree = NULL; __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; + code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn); if (TSDB_CODE_SUCCESS != code) { - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } @@ -1592,7 +1615,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t nRows = 0; int32_t nMergedRows = 0; bool mergeLimitReached = false; - size_t blkPgSz = pgHeaderSz; + size_t blkPgSz = pageHeaderSize; int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; while (nRows < totalRows) { @@ -1601,6 +1624,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t minRow = sup.aRowIdx[minIdx]; int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow); + if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; @@ -1608,15 +1632,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pTree); taosArrayDestroy(aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } nMergedRows += pHandle->pDataBlock->info.rows; blockDataCleanup(pHandle->pDataBlock); - blkPgSz = pgHeaderSz; + blkPgSz = pageHeaderSize; bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow); @@ -1639,7 +1661,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blkPgSz += bufInc; - ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pgHeaderSz); + ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize); ++nRows; @@ -1659,9 +1681,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(aPgId); taosMemoryFree(pTree); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } nMergedRows += pHandle->pDataBlock->info.rows; @@ -1679,10 +1699,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); - + cleanupMergeSup(&sup); tMergeTreeDestroy(&pTree); return 0;