fix: create initial source with blocks

This commit is contained in:
slzhou 2023-07-17 15:05:30 +08:00
parent 9a0e9df567
commit 655233fd4f
1 changed files with 129 additions and 43 deletions

View File

@ -787,40 +787,106 @@ static int32_t createPageBuf(SSortHandle* pHandle) {
return 0;
}
static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) {
int32_t start = 0;
while (start < pDataBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
if (p == NULL) {
taosArrayDestroy(aPgId);
return terrno;
}
typedef struct SBlkMergeSupport {
int64_t** aTs;
int32_t* aRowIdx;
int32_t order;
} SBlkMergeSupport;
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
taosArrayDestroy(aPgId);
blockDataDestroy(p);
return terrno;
}
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
taosArrayPush(aPgId, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataDestroy(p);
start = stop + 1;
SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
if (pSup->aRowIdx[left] == -1) {
return 1;
} else if (pSup->aRowIdx[right] == -1) {
return -1;
}
blockDataCleanup(pDataBlock);
int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pSup->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
taosArrayPush(aPgId, &pageId);
blockDataToBuf(pPage, blk);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
blockDataCleanup(blk);
return 0;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlkMergeSupport sup;
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.order = order->order;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
}
int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
SMultiwayMergeTreeInfo* pTree = NULL;
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
int32_t numEnded = 0;
int32_t nRows = 0;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
++nRows;
if (pHandle->pDataBlock->info.rows >= rowCap) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
}
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
sup.aRowIdx[minIdx] = -1;
++numEnded;
} else {
++sup.aRowIdx[minIdx];
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
if (pHandle->pDataBlock->info.rows > 0) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
tMergeTreeDestroy(&pTree);
return 0;
}
@ -931,26 +997,46 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
}
}
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
// pHandle->numOfPages = 1024; //todo check sortbufsize
pHandle->numOfPages = 1024; //todo check sortbufsize
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
createPageBuf(pHandle);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
while (pBlk != NULL) {
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
addDataBlockToPageBuf(pHandle, pBlk, aPgId);
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aExtSrc);
return code;
}
pBlk = pHandle->fetchfp(pSrc->param);
}
int32_t szSort = 0;
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk == NULL) {
break;
};
szSort += blockDataGetSize(pBlk);
SSDataBlock* blk = createOneDataBlock(pBlk, true);
taosArrayPush(aBlkSort, &blk);
if (szSort > maxBufSize) {
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
}
}
if (szSort > 0) {
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
}
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc);