refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-08 19:04:50 +08:00
parent 25d84ab7d3
commit 81abf3fe6d
1 changed files with 56 additions and 39 deletions

View File

@ -1538,53 +1538,76 @@ static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSD
return inc;
}
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) {
memset(pSup, 0, sizeof(SBlkMergeSupport));
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
pSup->tsOrder = tsOrder;
pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
pSup->aTs[i] = (int64_t*)col->pData;
pSup->aRowIdx[i] = 0;
pSup->aBlks[i] = pBlock;
}
pSup->pPkOrder = pPkOrderInfo;
return TSDB_CODE_SUCCESS;
}
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
taosMemoryFree(pSup->aRowIdx);
taosMemoryFree(pSup->aTs);
taosMemoryFree(pSup->aBlks);
}
static int32_t getTotalRows(SArray* pBlockList) {
int32_t totalRows = 0;
for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
SSDataBlock* blk = taosArrayGetP(pBlockList, i);
totalRows += blk->info.rows;
}
return totalRows;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlkMergeSupport sup = {0};
SBlockOrderInfo* pOrigBlockTsOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup = {0};
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.tsOrder = pOrigBlockTsOrder->order;
sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*));
for (int32_t i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
sup.aBlks[i] = blk;
}
SBlockOrderInfo* pOrigBlockPkOrder = NULL;
if (pHandle->bSortPk) {
pOrigBlockPkOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
}
sup.pPkOrder = pOrigBlockPkOrder;
int32_t totalRows = 0;
for (int32_t i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
int32_t totalRows = getTotalRows(aBlk);
SMultiwayMergeTreeInfo* pTree = NULL;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
@ -1592,7 +1615,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
size_t blkPgSz = pageHeaderSize;
int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
@ -1601,6 +1624,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
@ -1608,15 +1632,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
blkPgSz = pageHeaderSize;
bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
@ -1639,7 +1661,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
}
blkPgSz += bufInc;
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pgHeaderSz);
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize);
++nRows;
@ -1659,9 +1681,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aPgId);
taosMemoryFree(pTree);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
@ -1679,10 +1699,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return 0;