diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 272629627a..2ef40cd738 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1243,7 +1243,9 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { +static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, + int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); { @@ -1257,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset; } } + *pRegionId = pMemFile->currRegionId; *pOffset = pMemFile->currRegionOffset; int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset); *pLength = blockLen; + pMemFile->currRegionOffset += blockLen; pMemFile->bRegionDirty = true; return TSDB_CODE_SUCCESS; @@ -1324,6 +1328,7 @@ static void initRowIdSort(SSortHandle* pHandle) { pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); blockDataAppendColInfo(pSortInput, &pkCol); } + blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; @@ -1444,32 +1449,61 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, return 0; } -static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { - int32_t sz = 0; - int32_t numCols = taosArrayGetSize(blk->pDataBlock); - if (!blk->info.hasVarCol) { - sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); - sz += blockDataGetRowSize(blk); +static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) { + int32_t size = 0; + int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock); + + if (!pSrcBlock->info.hasVarCol) { + size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pSrcBlock); } else { for (int32_t i = 0; i < numCols; ++i) { - SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); + SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) { - char* p = colDataGetData(pColInfoData, row); - sz += varDataTLen(p); + if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) { + char* p = colDataGetData(pColInfoData, srcRowIndex); + size += varDataTLen(p); } - sz += sizeof(pColInfoData->varmeta.offset[0]); + size += sizeof(pColInfoData->varmeta.offset[0]); } else { - sz += pColInfoData->info.bytes; + size += pColInfoData->info.bytes; - if (((rowIdxInPage) & 0x07) == 0) { - sz += 1; // bitmap + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap } } } } - return sz; + + return size; +} + +static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) { + int32_t size = 0; + int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); + + if (pPkCol == NULL) { + ASSERT(!pDstBlock->info.hasVarCol); + size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pDstBlock); + } else { + if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { + if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { + char* p = colDataGetData(pPkCol, srcRowIndex); + size += varDataTLen(p); + } + + size += sizeof(pPkCol->varmeta.offset[0]); + } else { + size += pPkCol->info.bytes; + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap + } + } + } + + return size; } static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { @@ -1533,8 +1567,19 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; - SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + + int32_t bufInc = 0; + if (pHandle->bSortByRowId) { + if (!pHandle->bSortPk) { + bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL); + } else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length + SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId); + bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol); + } + } else { + bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); + } if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); @@ -1552,7 +1597,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* nMergedRows += pHandle->pDataBlock->info.rows; blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; - incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; + + SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; bufInc = getPageBufIncForRow(incBlock, minRow, 0); if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { @@ -1566,10 +1612,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); - if (!pHandle->bSortByRowId) { - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); - } else { + if (pHandle->bSortByRowId) { appendToRowIndexDataBlock(pHandle, minBlk, &minRow); + } else { + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); } blkPgSz += bufInc;