From dc9452fb0a0acbeb8e42e9d3186b84b036b16a4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 16:21:48 +0800 Subject: [PATCH 1/9] fix(sort): set correct output row index. --- source/libs/executor/src/tsort.c | 170 +++++++++++++++++-------------- 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index cd1a858175..b1688755e6 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -38,11 +38,11 @@ typedef struct SSortMemFileRegion { int32_t bufRegOffset; int32_t bufLen; - char* buf; + char* buf; } SSortMemFileRegion; typedef struct SSortMemFile { - char* writeBuf; + char* writeBuf; int32_t writeBufSize; int64_t writeFileOffset; @@ -55,7 +55,7 @@ typedef struct SSortMemFile { int32_t blockSize; FILE* pTdFile; - char memFilePath[PATH_MAX]; + char memFilePath[PATH_MAX]; } SSortMemFile; struct SSortHandle { @@ -260,6 +260,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.sortType = type; + if (type == SORT_BLOCK_TS_MERGE) { SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0); pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId; @@ -522,10 +523,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); - bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); + bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); if (isNull) { colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); } else { @@ -557,7 +557,9 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); } else { - if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex); + if (pSource->pageIndex % 512 == 0) { + qDebug("begin source %p page %d", pSource, pSource->pageIndex); + } int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -635,7 +637,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa // TODO: improve this function performance -int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, +int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) { SBlockOrderInfo* pOrder = pCompareOrder; SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); @@ -680,7 +682,7 @@ int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, left1 = colDataGetData(pLeftColInfoData, leftRowIndex); right1 = colDataGetData(pRightColInfoData, rightRowIndex); __compar_fn_t fn = pOrder->compFn; - int ret = fn(left1, right1); + int32_t ret = fn(left1, right1); return ret; } @@ -719,7 +721,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex; int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex; - int ret = pParam->cmpTsFn(leftTs, rightTs); + int32_t ret = pParam->cmpTsFn(leftTs, rightTs); if (ret == 0 && pParam->pPkOrder) { ret = tsortComparBlockCell(pLeftBlock, pRightBlock, pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder); @@ -782,7 +784,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { pOrder->compFn = fn; } - int ret = fn(left1, right1); + int32_t ret = fn(left1, right1); if (ret == 0) { continue; } else { @@ -855,7 +857,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } - int nMergedRows = 0; + int32_t nMergedRows = 0; SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { @@ -1075,7 +1077,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i } taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); - int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1095,7 +1097,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock); taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); - int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); if (ret != 1) { taosMemoryFreeClear(*ppRow); terrno = TAOS_SYSTEM_ERROR(errno); @@ -1214,7 +1216,7 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) { pRegion->regionSize = pMemFile->currRegionOffset; int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); if (writeBytes > 0) { - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); + int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1247,7 +1249,7 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); + int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); if (ret != 1) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -1317,6 +1319,7 @@ static void initRowIdSort(SSortHandle* pHandle) { blockDataAppendColInfo(pSortInput, &offsetCol); SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); blockDataAppendColInfo(pSortInput, &lengthCol); + if (pHandle->bSortPk) { pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); blockDataAppendColInfo(pSortInput, &pkCol); @@ -1324,20 +1327,21 @@ static void initRowIdSort(SSortHandle* pHandle) { blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; - int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); - size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); +// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); +// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); pHandle->pageSize = 256 * 1024; // 256k pHandle->numOfPages = 256; - SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + + int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order; - SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlockOrderInfo biTs = {0}; - biTs.order = pTsOrder->order; + biTs.order = tsOrder; biTs.slotId = 0; biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC); biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); - taosArrayPush(aOrder, &biTs); + taosArrayPush(pOrderInfoList, &biTs); if (pHandle->bSortPk) { SBlockOrderInfo biPk = {0}; @@ -1345,11 +1349,11 @@ static void initRowIdSort(SSortHandle* pHandle) { biPk.slotId = 4; biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC); biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order); - taosArrayPush(aOrder, &biPk); + taosArrayPush(pOrderInfoList, &biPk); } + taosArrayDestroy(pHandle->pSortInfo); - pHandle->pSortInfo = aOrder; - return; + pHandle->pSortInfo = pOrderInfoList; } int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) { @@ -1441,8 +1445,8 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, } static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { - int sz = 0; - int numCols = taosArrayGetSize(blk->pDataBlock); + int32_t sz = 0; + int32_t numCols = taosArrayGetSize(blk->pDataBlock); if (!blk->info.hasVarCol) { sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); sz += blockDataGetRowSize(blk); @@ -1470,42 +1474,46 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; - int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); + int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); + blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); int32_t numBlks = taosArrayGetSize(aBlk); - SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? - taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + SBlockOrderInfo* pOrigBlockTsOrder = + (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlkMergeSupport sup = {0}; sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); sup.tsOrder = pOrigBlockTsOrder->order; sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*)); - for (int i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); + + for (int32_t i = 0; i < numBlks; ++i) { + SSDataBlock* blk = taosArrayGetP(aBlk, i); SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId); sup.aTs[i] = (int64_t*)col->pData; sup.aRowIdx[i] = 0; sup.aBlks[i] = blk; } + SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { - pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? - taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); + pOrigBlockPkOrder = + (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); } sup.pPkOrder = pOrigBlockPkOrder; int32_t totalRows = 0; - for (int i = 0; i < numBlks; ++i) { + for (int32_t i = 0; i < numBlks; ++i) { SSDataBlock* blk = taosArrayGetP(aBlk, i); totalRows += blk->info.rows; } SMultiwayMergeTreeInfo* pTree = NULL; - __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; + __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn); if (TSDB_CODE_SUCCESS != code) { taosMemoryFree(sup.aRowIdx); @@ -1517,50 +1525,53 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); int32_t nRows = 0; int32_t nMergedRows = 0; - bool mergeLimitReached = false; - size_t blkPgSz = pgHeaderSz; + bool mergeLimitReached = false; + size_t blkPgSz = pgHeaderSz; int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; - int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; + while (nRows < totalRows) { - int32_t minIdx = tMergeTreeGetChosenIndex(pTree); + int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); - int32_t minRow = sup.aRowIdx[minIdx]; + int32_t minRow = sup.aRowIdx[minIdx]; SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { - SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); - lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; - code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pTree); - taosArrayDestroy(aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); - return code; + SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); + lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; + code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pTree); + taosArrayDestroy(aPgId); + taosMemoryFree(sup.aRowIdx); + taosMemoryFree(sup.aTs); + taosMemoryFree(sup.aBlks); + return code; + } + + nMergedRows += pHandle->pDataBlock->info.rows; + blockDataCleanup(pHandle->pDataBlock); + blkPgSz = pgHeaderSz; + incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; + bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + mergeLimitReached = true; + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { + pHandle->currMergeLimitTs = lastPageBufTs; } - nMergedRows += pHandle->pDataBlock->info.rows; - blockDataCleanup(pHandle->pDataBlock); - blkPgSz = pgHeaderSz; - incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - bufInc = getPageBufIncForRow(incBlock, minRow, 0); - - if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { - mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { - pHandle->currMergeLimitTs = lastPageBufTs; - } - break; - } + break; + } } + blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); if (!pHandle->bSortByRowId) { - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); } else { - appendToRowIndexDataBlock(pHandle, minBlk, &minRow); + appendToRowIndexDataBlock(pHandle, minBlk, &minRow); } + blkPgSz += bufInc; ++nRows; @@ -1572,6 +1583,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } + if (pHandle->pDataBlock->info.rows > 0) { if (!mergeLimitReached) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); @@ -1584,14 +1596,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* taosMemoryFree(sup.aTs); taosMemoryFree(sup.aBlks); return code; - } + } nMergedRows += pHandle->pDataBlock->info.rows; if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { - mergeLimitReached = true; - if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || - (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { - pHandle->currMergeLimitTs = lastPageBufTs; - } + mergeLimitReached = true; + if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || + (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { + pHandle->currMergeLimitTs = lastPageBufTs; + } } } blockDataCleanup(pHandle->pDataBlock); @@ -1724,7 +1736,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); if (code != TSDB_CODE_SUCCESS) { - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1736,7 +1748,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1750,7 +1762,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (tsortIsClosed(pHandle)) { tSimpleHashClear(mUidBlk); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayClear(aBlkSort); @@ -1759,7 +1771,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } tSimpleHashCleanup(mUidBlk); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } taosArrayDestroy(aBlkSort); @@ -2048,10 +2060,10 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) if (!lData) return pOrder->nullFirst ? -1 : 1; if (!rData) return pOrder->nullFirst ? 1 : -1; - int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; + int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); - int ret = fn(lData, rData); + int32_t ret = fn(lData, rData); if (ret == 0) { continue; } else { From 4f59754a6327d3d3a9b0d6008a59ee53f998c17a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 16:22:02 +0800 Subject: [PATCH 2/9] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 2 ++ source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b53989e59e..38d3fa8e96 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4320,6 +4320,7 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; @@ -4339,6 +4340,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { return code; } } + tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c401141821..ab7a23d34f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1734,7 +1734,7 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta bool hasFillhistoryTask = false; STaskId hId = {0}; - stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId); + stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); streamMetaRLock(pMeta); From f120a62434aa2255d383f867567ac141a7cb13ce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 16:37:31 +0800 Subject: [PATCH 3/9] refactor: do some internal refactor. --- source/libs/executor/src/tsort.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index b1688755e6..6fe7e73f77 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1553,7 +1553,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + bufInc = getPageBufIncForRow(incBlock, minRow, 0); if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { mergeLimitReached = true; @@ -1573,6 +1573,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blkPgSz += bufInc; + ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock)); ++nRows; From 6e832b68bdea2d1a4a5bb26b00e0df87da8214d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 16:40:51 +0800 Subject: [PATCH 4/9] refactor: do some internal refactor. --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6fe7e73f77..272629627a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1573,7 +1573,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blkPgSz += bufInc; - ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock)); + ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pgHeaderSz); ++nRows; From ca022259a04bb1a165d53708fb48030cbd77a653 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 18:17:22 +0800 Subject: [PATCH 5/9] fix(query): set correct row length for rowid sort. --- source/libs/executor/src/tsort.c | 90 ++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 272629627a..2ef40cd738 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1243,7 +1243,9 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { +static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, + int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); { @@ -1257,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset; } } + *pRegionId = pMemFile->currRegionId; *pOffset = pMemFile->currRegionOffset; int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset); *pLength = blockLen; + pMemFile->currRegionOffset += blockLen; pMemFile->bRegionDirty = true; return TSDB_CODE_SUCCESS; @@ -1324,6 +1328,7 @@ static void initRowIdSort(SSortHandle* pHandle) { pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); blockDataAppendColInfo(pSortInput, &pkCol); } + blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; @@ -1444,32 +1449,61 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, return 0; } -static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { - int32_t sz = 0; - int32_t numCols = taosArrayGetSize(blk->pDataBlock); - if (!blk->info.hasVarCol) { - sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); - sz += blockDataGetRowSize(blk); +static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) { + int32_t size = 0; + int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock); + + if (!pSrcBlock->info.hasVarCol) { + size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pSrcBlock); } else { for (int32_t i = 0; i < numCols; ++i) { - SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); + SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) { - char* p = colDataGetData(pColInfoData, row); - sz += varDataTLen(p); + if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) { + char* p = colDataGetData(pColInfoData, srcRowIndex); + size += varDataTLen(p); } - sz += sizeof(pColInfoData->varmeta.offset[0]); + size += sizeof(pColInfoData->varmeta.offset[0]); } else { - sz += pColInfoData->info.bytes; + size += pColInfoData->info.bytes; - if (((rowIdxInPage) & 0x07) == 0) { - sz += 1; // bitmap + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap } } } } - return sz; + + return size; +} + +static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) { + int32_t size = 0; + int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); + + if (pPkCol == NULL) { + ASSERT(!pDstBlock->info.hasVarCol); + size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); + size += blockDataGetRowSize(pDstBlock); + } else { + if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { + if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { + char* p = colDataGetData(pPkCol, srcRowIndex); + size += varDataTLen(p); + } + + size += sizeof(pPkCol->varmeta.offset[0]); + } else { + size += pPkCol->info.bytes; + if (((dstRowIndex) & 0x07) == 0) { + size += 1; // bitmap + } + } + } + + return size; } static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { @@ -1533,8 +1567,19 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; - SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); + + int32_t bufInc = 0; + if (pHandle->bSortByRowId) { + if (!pHandle->bSortPk) { + bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL); + } else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length + SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId); + bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol); + } + } else { + bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); + } if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); @@ -1552,7 +1597,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* nMergedRows += pHandle->pDataBlock->info.rows; blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; - incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; + + SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; bufInc = getPageBufIncForRow(incBlock, minRow, 0); if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { @@ -1566,10 +1612,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); - if (!pHandle->bSortByRowId) { - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); - } else { + if (pHandle->bSortByRowId) { appendToRowIndexDataBlock(pHandle, minBlk, &minRow); + } else { + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); } blkPgSz += bufInc; From 75e0ce8cbc16d10dd16e665038d382772be90c2d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 18:25:56 +0800 Subject: [PATCH 6/9] fix(query): calculate the correct dst row length. --- source/libs/executor/src/tsort.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 2ef40cd738..906fe0626b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1484,10 +1484,16 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); if (pPkCol == NULL) { - ASSERT(!pDstBlock->info.hasVarCol); + ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol)); + size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); size += blockDataGetRowSize(pDstBlock); } else { + ASSERT(numOfCols == 5); + + size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0); + size += (8 + 4 + 4 + 4); // todo refactor later + if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { char* p = colDataGetData(pPkCol, srcRowIndex); From 25d84ab7d3336d2b92fc4d2086a2626fa05af282 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 18:41:57 +0800 Subject: [PATCH 7/9] refactor: do some internal refactor. --- source/libs/executor/src/tsort.c | 50 ++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 906fe0626b..6f690b8911 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1479,11 +1479,12 @@ static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, return size; } -static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) { +static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, + SColumnInfoData* pPkCol) { int32_t size = 0; int32_t numOfCols = blockDataGetNumOfCols(pDstBlock); - if (pPkCol == NULL) { + if (pPkCol == NULL) { // no var column ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol)); size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0); @@ -1492,8 +1493,12 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI ASSERT(numOfCols == 5); size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0); - size += (8 + 4 + 4 + 4); // todo refactor later + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i); + size += pColInfo->info.bytes; + } + // handle the pk column, the last column, may be the var char column if (IS_VAR_DATA_TYPE(pPkCol->info.type)) { if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) { char* p = colDataGetData(pPkCol, srcRowIndex); @@ -1512,6 +1517,27 @@ static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowI return size; } +static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock, + int32_t srcRowIndex) { + int32_t inc = 0; + + if (pHandle->bSortByRowId) { + SColumnInfoData* pPkCol = NULL; + + // there may be varchar column exists, so we need to get the pk info, and then calculate the row length + if (pHandle->bSortPk) { + SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); + pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId); + } + + inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol); + } else { + inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex); + } + + return inc; +} + static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); @@ -1574,19 +1600,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; - int32_t bufInc = 0; - if (pHandle->bSortByRowId) { - if (!pHandle->bSortPk) { - bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL); - } else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length - SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1); - SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId); - bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol); - } - } else { - bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); - } - + int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow); if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; @@ -1604,8 +1618,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; - SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; - bufInc = getPageBufIncForRow(incBlock, minRow, 0); + bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow); if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { mergeLimitReached = true; @@ -1613,6 +1626,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { pHandle->currMergeLimitTs = lastPageBufTs; } + break; } } From 81abf3fe6d38eb4bc8616dfb1df85b09fac6a842 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 19:04:50 +0800 Subject: [PATCH 8/9] refactor: do some internal refactor. --- source/libs/executor/src/tsort.c | 95 +++++++++++++++++++------------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6f690b8911..21e9e5a70d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1538,53 +1538,76 @@ static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSD return inc; } +static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) { + memset(pSup, 0, sizeof(SBlkMergeSupport)); + + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + + pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t)); + pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*)); + pSup->tsOrder = tsOrder; + pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*)); + + for (int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* pBlock = taosArrayGetP(pBlockList, i); + SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId); + pSup->aTs[i] = (int64_t*)col->pData; + pSup->aRowIdx[i] = 0; + pSup->aBlks[i] = pBlock; + } + + pSup->pPkOrder = pPkOrderInfo; + return TSDB_CODE_SUCCESS; +} + +static void cleanupMergeSup(SBlkMergeSupport* pSup) { + taosMemoryFree(pSup->aRowIdx); + taosMemoryFree(pSup->aTs); + taosMemoryFree(pSup->aBlks); +} + +static int32_t getTotalRows(SArray* pBlockList) { + int32_t totalRows = 0; + + for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) { + SSDataBlock* blk = taosArrayGetP(pBlockList, i); + totalRows += blk->info.rows; + } + + return totalRows; +} + static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { int32_t code = TSDB_CODE_SUCCESS; - int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); - int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); + int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); - int32_t numBlks = taosArrayGetSize(aBlk); + + SBlkMergeSupport sup = {0}; SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); - SBlkMergeSupport sup = {0}; - sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); - sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); - sup.tsOrder = pOrigBlockTsOrder->order; - sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*)); - - for (int32_t i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId); - sup.aTs[i] = (int64_t*)col->pData; - sup.aRowIdx[i] = 0; - sup.aBlks[i] = blk; - } - + SBlockOrderInfo* pOrigBlockPkOrder = NULL; if (pHandle->bSortPk) { pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); } - sup.pPkOrder = pOrigBlockPkOrder; - int32_t totalRows = 0; - for (int32_t i = 0; i < numBlks; ++i) { - SSDataBlock* blk = taosArrayGetP(aBlk, i); - totalRows += blk->info.rows; - } + initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder); + + int32_t totalRows = getTotalRows(aBlk); SMultiwayMergeTreeInfo* pTree = NULL; __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; + code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn); if (TSDB_CODE_SUCCESS != code) { - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } @@ -1592,7 +1615,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t nRows = 0; int32_t nMergedRows = 0; bool mergeLimitReached = false; - size_t blkPgSz = pgHeaderSz; + size_t blkPgSz = pageHeaderSize; int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; while (nRows < totalRows) { @@ -1601,6 +1624,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t minRow = sup.aRowIdx[minIdx]; int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow); + if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; @@ -1608,15 +1632,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pTree); taosArrayDestroy(aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } nMergedRows += pHandle->pDataBlock->info.rows; blockDataCleanup(pHandle->pDataBlock); - blkPgSz = pgHeaderSz; + blkPgSz = pageHeaderSize; bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow); @@ -1639,7 +1661,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blkPgSz += bufInc; - ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pgHeaderSz); + ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize); ++nRows; @@ -1659,9 +1681,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(aPgId); taosMemoryFree(pTree); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); + cleanupMergeSup(&sup); return code; } nMergedRows += pHandle->pDataBlock->info.rows; @@ -1679,10 +1699,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); - taosMemoryFree(sup.aRowIdx); - taosMemoryFree(sup.aTs); - taosMemoryFree(sup.aBlks); - + cleanupMergeSup(&sup); tMergeTreeDestroy(&pTree); return 0; From cbf994f04fe4e4ec07eed35fb9aa3b84734e37b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 22:28:49 +0800 Subject: [PATCH 9/9] fix(query): set correct length for json value. --- source/libs/executor/src/tsort.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 21e9e5a70d..daac98bbfc 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1462,7 +1462,12 @@ static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) { char* p = colDataGetData(pColInfoData, srcRowIndex); - size += varDataTLen(p); + + if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { + size += getJsonValueLen(p); + } else { + size += varDataTLen(p); + } } size += sizeof(pColInfoData->varmeta.offset[0]);