diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 906fe0626b..6f690b8911 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1479,11 +1479,12 @@ static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, return size; } -static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) { +static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, + SColumnInfoData* pPkCol) { int32_t size = 0; int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); - if (pPkCol == NULL) { + if (pPkCol == NULL) { // no var column ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol)); size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); @@ -1492,8 +1493,12 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI ASSERT(numOfCols == 5); size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0); - size += (8 + 4 + 4 + 4); // todo refactor later + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i); + size += pColInfo->info.bytes; + } + // handle the pk column, the last column, may be the var char column if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { char* p = colDataGetData(pPkCol, srcRowIndex); @@ -1512,6 +1517,27 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI return size; } +static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock, + int32_t srcRowIndex) { + int32_t inc = 0; + + if (pHandle->bSortByRowId) { + SColumnInfoData* pPkCol = NULL; + + // there may be varchar column exists, so we need to get the pk info, and then calculate the row length + if (pHandle->bSortPk) { + SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId); + } + + inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol); + } else { + inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex); + } + + return inc; +} + static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); @@ -1574,19 +1600,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; - int32_t bufInc = 0; - if (pHandle->bSortByRowId) { - if (!pHandle->bSortPk) { - bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL); - } else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length - SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); - SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId); - bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol); - } - } else { - bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); - } - + int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow); if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; @@ -1604,8 +1618,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; - SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - bufInc = getPageBufIncForRow(incBlock, minRow, 0); + bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow); if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { mergeLimitReached = true; @@ -1613,6 +1626,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { pHandle->currMergeLimitTs = lastPageBufTs; } + break; } }