From bde2cf7b34f8121059c0fb1eee7c022dc29d5760 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 20 Dec 2023 09:54:27 +0800 Subject: [PATCH] enhance: remove length col and refactor --- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/scanoperator.c | 49 ++++++++++++++----------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e3eeab4e98..fa5ad6ff7c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -312,7 +312,7 @@ typedef struct STableMergeScanInfo { bool rtnNextDurationBlocks; int32_t nextDurationBlocksIdx; bool bSortRowId; - STmsSortRowIdInfo tmsSortRowIdInfo; + STmsSortRowIdInfo sortRowIdInfo; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7a778624b2..43cfdedd46 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3221,12 +3221,11 @@ _error: // ========================= table merge scan static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { - SDiskbasedBuf* pResultBuf = pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf; + SDiskbasedBuf* pResultBuf = pInfo->sortRowIdInfo.pExtSrcRowsBuf; int32_t rowBytes = blockDataGetRowSize(pBlock) + taosArrayGetSize(pBlock->pDataBlock); SFilePage* pFilePage = NULL; - // in the first scan, new space needed for results int32_t pageId = -1; SArray* list = getDataBufPagesIdList(pResultBuf); @@ -3244,7 +3243,6 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock pageId = getPageId(pi); if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { - // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); pFilePage = getNewBufPage(pResultBuf, &pageId); @@ -3255,8 +3253,10 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock } if (pFilePage == NULL) { - return -1; + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; } + *pPageId = pageId; *pOffset = pFilePage->num; char* buf = (char*)pFilePage + (*pOffset); @@ -3298,8 +3298,8 @@ static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock return 0; } -static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { - STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; +static int32_t fillSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { + STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo; int32_t nRows = pSrcBlock->info.rows; pSortInputBlk->info.window = pSrcBlock->info.window; @@ -3321,7 +3321,6 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo saveBlockRowToBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length); colDataSetInt32(pageIdCol, i, &pageId); colDataSetInt32(offsetCol, i, &offset); - colDataSetInt32(lengthCol, i, &length); } pSortInputBlk->info.rows = nRows; @@ -3329,13 +3328,12 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo return 0; } -void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; +static void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + STmsSortRowIdInfo* pSortInfo = &pInfo->sortRowIdInfo; int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); - int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 3); - void* page = getBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, pageId); + void* page = getBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, pageId); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); char* buf = (char*)page + offset; @@ -3359,7 +3357,7 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc colDataSetNULL(pColInfo, pBlock->info.rows); } } - releaseBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, page); + releaseBufPage(pInfo->sortRowIdInfo.pExtSrcRowsBuf, page); pBlock->info.dataLoad = 1; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; @@ -3511,7 +3509,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { SSDataBlock* pSortInputBlk = NULL; if (pInfo->bSortRowId) { blockDataCleanup(pInfo->pSortInputBlock); - transformIntoSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock); + fillSortInputBlock(pInfo, pBlock, pInfo->pSortInputBlock); pSortInputBlk = pInfo->pSortInputBlock; } else { pSortInputBlk = pBlock; @@ -3568,17 +3566,17 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* } int32_t startRowIdSort(STableMergeScanInfo *pInfo) { - STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; + STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo; int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock), taosArrayGetSize(pInfo->pResBlock->pDataBlock)); - int32_t memSize = pageSize * 4 * 8192; - createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize * 4, memSize, "tms-ext-src-block", tsTempDir); + int32_t memSize = pageSize * 1024; + int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir); dBufSetPrintInfo(pSort->pExtSrcRowsBuf); - return 0; + return code; } int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { - STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; + STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo; destroyDiskbasedBuf(pSort->pExtSrcRowsBuf); pSort->pExtSrcRowsBuf = NULL; @@ -3595,7 +3593,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->bNewFilesetEvent = false; pInfo->bNextDurationBlockEvent = false; - startRowIdSort(pInfo); + code = startRowIdSort(pInfo); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, @@ -3814,6 +3816,11 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->base.cond); + if (pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf != NULL) { + destroyDiskbasedBuf(pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf); + pTableScanInfo->sortRowIdInfo.pExtSrcRowsBuf = NULL; + } + int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); @@ -3931,8 +3938,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN blockDataAppendColInfo(pSortInput, &pageIdCol); SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); blockDataAppendColInfo(pSortInput, &offsetCol); - SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); - blockDataAppendColInfo(pSortInput, &lengthCol); pInfo->pSortInputBlock = pSortInput; int32_t srcTsSlotId = 0; @@ -3942,7 +3947,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN srcTsSlotId = colInfo->dstSlotId; } } - pInfo->tmsSortRowIdInfo.srcTsSlotId = srcTsSlotId; + pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId; SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); SBlockOrderInfo bi = {0}; bi.order = pInfo->base.cond.order;