From 655233fd4fe8541a1af30c4ea24634307f7124c9 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 17 Jul 2023 15:05:30 +0800 Subject: [PATCH] fix: create initial source with blocks --- source/libs/executor/src/tsort.c | 172 +++++++++++++++++++++++-------- 1 file changed, 129 insertions(+), 43 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c262153464..e710b619b8 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -787,40 +787,106 @@ static int32_t createPageBuf(SSortHandle* pHandle) { return 0; } -static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) { - int32_t start = 0; - while (start < pDataBlock->info.rows) { - int32_t stop = 0; - blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); - SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - if (p == NULL) { - taosArrayDestroy(aPgId); - return terrno; - } +typedef struct SBlkMergeSupport { + int64_t** aTs; + int32_t* aRowIdx; + int32_t order; +} SBlkMergeSupport; - int32_t pageId = -1; - void* pPage = getNewBufPage(pHandle->pBuf, &pageId); - if (pPage == NULL) { - taosArrayDestroy(aPgId); - blockDataDestroy(p); - return terrno; - } +static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) { + int32_t left = *(int32_t*)pLeft; + int32_t right = *(int32_t*)pRight; - taosArrayPush(aPgId, &pageId); - - int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); - ASSERT(size <= getBufPageSize(pHandle->pBuf)); - - blockDataToBuf(pPage, p); - - setBufPageDirty(pPage, true); - releaseBufPage(pHandle->pBuf, pPage); - - blockDataDestroy(p); - start = stop + 1; + SBlkMergeSupport* pSup = (SBlkMergeSupport*)param; + if (pSup->aRowIdx[left] == -1) { + return 1; + } else if (pSup->aRowIdx[right] == -1) { + return -1; } - blockDataCleanup(pDataBlock); + int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]]; + int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]]; + + int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0); + if (pSup->order == TSDB_ORDER_DESC) { + ret = -1 * ret; + } + return ret; +} + +static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) { + int32_t pageId = -1; + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); + taosArrayPush(aPgId, &pageId); + blockDataToBuf(pPage, blk); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + blockDataCleanup(blk); + + return 0; +} + + +static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, + blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); + blockDataCleanup(pHandle->pDataBlock); + + int32_t numBlks = taosArrayGetSize(aBlk); + + SBlkMergeSupport sup; + sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); + sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); + sup.order = order->order; + for (int i = 0; i < numBlks; ++i) { + SSDataBlock* blk = taosArrayGetP(aBlk, i); + SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId); + sup.aTs[i] = (int64_t*)col->pData; + sup.aRowIdx[i] = 0; + } + + int32_t totalRows = 0; + for (int i = 0; i < numBlks; ++i) { + SSDataBlock* blk = taosArrayGetP(aBlk, i); + totalRows += blk->info.rows; + } + + SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + + SMultiwayMergeTreeInfo* pTree = NULL; + tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); + int32_t numEnded = 0; + int32_t nRows = 0; + while (nRows < totalRows) { + int32_t minIdx = tMergeTreeGetChosenIndex(pTree); + SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); + int32_t minRow = sup.aRowIdx[minIdx]; + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + ++nRows; + if (pHandle->pDataBlock->info.rows >= rowCap) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + } + if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { + sup.aRowIdx[minIdx] = -1; + ++numEnded; + } else { + ++sup.aRowIdx[minIdx]; + } + tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); + } + if (pHandle->pDataBlock->info.rows > 0) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + } + SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); + + taosMemoryFree(sup.aRowIdx); + taosMemoryFree(sup.aTs); + + tMergeTreeDestroy(&pTree); + return 0; } @@ -931,26 +997,46 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } } } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - // pHandle->numOfPages = 1024; //todo check sortbufsize + pHandle->numOfPages = 1024; //todo check sortbufsize + size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; createPageBuf(pHandle); SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - while (pBlk != NULL) { - SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); - addDataBlockToPageBuf(pHandle, pBlk, aPgId); - SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aExtSrc); - return code; - } - pBlk = pHandle->fetchfp(pSrc->param); - } + int32_t szSort = 0; + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + while (1) { + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + if (pBlk == NULL) { + break; + }; + + szSort += blockDataGetSize(pBlk); + SSDataBlock* blk = createOneDataBlock(pBlk, true); + taosArrayPush(aBlkSort, &blk); + + if (szSort > maxBufSize) { + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + szSort = 0; + } + } + if (szSort > 0) { + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + szSort = 0; + } + taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayDestroy(aExtSrc);