refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-08 18:41:57 +08:00
parent 75e0ce8cbc
commit 25d84ab7d3
1 changed files with 32 additions and 18 deletions

View File

@ -1479,11 +1479,12 @@ static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex,
return size; return size;
} }
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) { static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
SColumnInfoData* pPkCol) {
int32_t size = 0; int32_t size = 0;
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
if (pPkCol == NULL) { if (pPkCol == NULL) { // no var column
ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol)); ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol));
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
@ -1492,8 +1493,12 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI
ASSERT(numOfCols == 5); ASSERT(numOfCols == 5);
size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0); size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
size += (8 + 4 + 4 + 4); // todo refactor later for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
size += pColInfo->info.bytes;
}
// handle the pk column, the last column, may be the var char column
if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
char* p = colDataGetData(pPkCol, srcRowIndex); char* p = colDataGetData(pPkCol, srcRowIndex);
@ -1512,6 +1517,27 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI
return size; return size;
} }
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
int32_t srcRowIndex) {
int32_t inc = 0;
if (pHandle->bSortByRowId) {
SColumnInfoData* pPkCol = NULL;
// there may be varchar column exists, so we need to get the pk info, and then calculate the row length
if (pHandle->bSortPk) {
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
}
inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
} else {
inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
}
return inc;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
@ -1574,19 +1600,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx]; int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = 0; int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
if (pHandle->bSortByRowId) {
if (!pHandle->bSortPk) {
bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL);
} else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId);
bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol);
}
} else {
bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
}
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
@ -1604,8 +1618,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
blockDataCleanup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz; blkPgSz = pgHeaderSz;
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true; mergeLimitReached = true;
@ -1613,6 +1626,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs; pHandle->currMergeLimitTs = lastPageBufTs;
} }
break; break;
} }
} }