diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 34f9dfc233..6b93f95099 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -837,6 +837,10 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); taosArrayPush(aPgId, &pageId); + + int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pHandle->pBuf)); + blockDataToBuf(pPage, blk); setBufPageDirty(pPage, true); @@ -846,13 +850,39 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, return 0; } +static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { + int sz = 0; + int numCols = taosArrayGetSize(blk->pDataBlock); + if (!blk->info.hasVarCol) { + sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); + sz += blockDataGetRowSize(blk); + } else { + for (int32_t i = 0; i < numCols; ++i) { + SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + if (pColInfoData->varmeta.offset[row] != -1) { + char* p = colDataGetData(pColInfoData, row); + sz += varDataTLen(p); + } + + sz += sizeof(pColInfoData->varmeta.offset[0]); + } else { + sz += pColInfoData->info.bytes; + + if (((rowIdxInPage) & 0x07) == 0) { + sz += 1; // bitmap + } + } + } + } + return sz; +} static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { - int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, - blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); - int32_t numBlks = taosArrayGetSize(aBlk); SBlkMergeSupport sup; @@ -878,16 +908,25 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); int32_t numEnded = 0; int32_t nRows = 0; + + size_t blkPgSz = pgHeaderSz; + while (nRows < totalRows) { int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; + int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); - ++nRows; - if (pHandle->pDataBlock->info.rows >= rowCap) { - appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blkPgSz = pgHeaderSz; + bufInc = getPageBufIncForRow(minBlk, minRow, 0); } + blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + blkPgSz += bufInc; + + ++nRows; if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; @@ -1073,10 +1112,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { code = createBlocksMergeSortInitialSources(pHandle); } - qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); + uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) { SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); - qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); + uInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); } return code; }