fix: add to page buf in the same way as single source sort
This commit is contained in:
parent
97a6e89d11
commit
f79fc81d9c
|
@ -837,6 +837,10 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
|
||||||
int32_t pageId = -1;
|
int32_t pageId = -1;
|
||||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
taosArrayPush(aPgId, &pageId);
|
taosArrayPush(aPgId, &pageId);
|
||||||
|
|
||||||
|
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
|
||||||
|
ASSERT(size <= getBufPageSize(pHandle->pBuf));
|
||||||
|
|
||||||
blockDataToBuf(pPage, blk);
|
blockDataToBuf(pPage, blk);
|
||||||
|
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
|
@ -846,13 +850,39 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
|
||||||
|
int sz = 0;
|
||||||
|
int numCols = taosArrayGetSize(blk->pDataBlock);
|
||||||
|
if (!blk->info.hasVarCol) {
|
||||||
|
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
|
||||||
|
sz += blockDataGetRowSize(blk);
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < numCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
if (pColInfoData->varmeta.offset[row] != -1) {
|
||||||
|
char* p = colDataGetData(pColInfoData, row);
|
||||||
|
sz += varDataTLen(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
sz += sizeof(pColInfoData->varmeta.offset[0]);
|
||||||
|
} else {
|
||||||
|
sz += pColInfoData->info.bytes;
|
||||||
|
|
||||||
|
if (((rowIdxInPage) & 0x07) == 0) {
|
||||||
|
sz += 1; // bitmap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sz;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
|
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
|
||||||
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
|
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
|
||||||
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
|
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
|
||||||
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
|
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
|
|
||||||
int32_t numBlks = taosArrayGetSize(aBlk);
|
int32_t numBlks = taosArrayGetSize(aBlk);
|
||||||
|
|
||||||
SBlkMergeSupport sup;
|
SBlkMergeSupport sup;
|
||||||
|
@ -878,16 +908,25 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||||
int32_t numEnded = 0;
|
int32_t numEnded = 0;
|
||||||
int32_t nRows = 0;
|
int32_t nRows = 0;
|
||||||
|
|
||||||
|
size_t blkPgSz = pgHeaderSz;
|
||||||
|
|
||||||
while (nRows < totalRows) {
|
while (nRows < totalRows) {
|
||||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
int32_t minRow = sup.aRowIdx[minIdx];
|
int32_t minRow = sup.aRowIdx[minIdx];
|
||||||
|
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
||||||
|
|
||||||
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
++nRows;
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
if (pHandle->pDataBlock->info.rows >= rowCap) {
|
blkPgSz = pgHeaderSz;
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
||||||
}
|
}
|
||||||
|
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||||
|
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||||
|
blkPgSz += bufInc;
|
||||||
|
|
||||||
|
++nRows;
|
||||||
|
|
||||||
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
|
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
|
||||||
sup.aRowIdx[minIdx] = -1;
|
sup.aRowIdx[minIdx] = -1;
|
||||||
|
@ -1073,10 +1112,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
|
} else if (pHandle->type == SORT_TABLE_MERGE_SCAN) {
|
||||||
code = createBlocksMergeSortInitialSources(pHandle);
|
code = createBlocksMergeSortInitialSources(pHandle);
|
||||||
}
|
}
|
||||||
qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
|
uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
|
||||||
for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) {
|
for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) {
|
||||||
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i);
|
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i);
|
||||||
qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList));
|
uInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue