From 033ebb5fa59362bdf3a4ddf982e67342687dec39 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 11:11:45 +0800 Subject: [PATCH] feat: prepare for switch between one reader and multi readers --- source/libs/executor/inc/executorInt.h | 2 - source/libs/executor/src/scanoperator.c | 76 +++++++++++++------------ 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b86ff2f7fa..013fe41f4e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -326,7 +326,6 @@ typedef struct STableMergeScanInfo { SSDataBlock* pSortInputBlock; SSDataBlock* pReaderBlock; int64_t startTs; // sort start time - SArray* sortSourceParams; SLimitInfo limitInfo; int64_t numOfRows; SScanInfo scanInfo; @@ -348,7 +347,6 @@ typedef struct STableMergeScanInfo { bool rtnNextDurationBlocks; int32_t nextDurationBlocksIdx; - STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d73b43ee3e..068bdeb075 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3437,7 +3437,7 @@ static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { pInfo->tableEndIndex = i - 1; } -static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { +static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i; if (pInput->rowIdx == -1) { @@ -3628,34 +3628,31 @@ static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) { initSubTableInputs(pOperator, pInfo); - initSubTablesMergeSort(pInfo->pSubTablesMergeInfo); + openSubTablesMergeSort(pInfo->pSubTablesMergeInfo); return TSDB_CODE_SUCCESS; } -static int32_t stopSubTablesTableMergeScan(SOperatorInfo* pOperator) { - STableMergeScanInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; +static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + if (pSubTblsInfo != NULL) { + tMergeTreeDestroy(&pSubTblsInfo->pTree); - SReadHandle* pHandle = &pInfo->base.readHandle; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; - tMergeTreeDestroy(&pSubTblsInfo->pTree); + blockDataDestroy(pInput->pBlock); + taosArrayDestroy(pInput->aBlockPages); + pInfo->base.readerAPI.tsdReaderClose(pInput->pReader); + pInput->pReader = NULL; + } - for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { - STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; + destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); + taosMemoryFree(pSubTblsInfo->aInputs); - blockDataDestroy(pInput->pBlock); - taosArrayDestroy(pInput->aBlockPages); - pAPI->tsdReader.tsdReaderClose(pInput->pReader); + taosMemoryFree(pSubTblsInfo); + pInfo->pSubTablesMergeInfo = NULL; } - - destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); - taosMemoryFree(pSubTblsInfo->aInputs); - - taosMemoryFree(pSubTblsInfo); - pInfo->pSubTablesMergeInfo = NULL; return TSDB_CODE_SUCCESS; } @@ -4136,10 +4133,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; - cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - - int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); + // start one reader variable pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; @@ -4150,20 +4145,24 @@ void destroyTableMergeScanOperatorInfo(void* param) { } } - taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; tSimpleHashCleanup(pTableScanInfo->mTableNumRows); pTableScanInfo->mTableNumRows = NULL; taosHashCleanup(pTableScanInfo->mSkipTables); pTableScanInfo->mSkipTables = NULL; + pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + // end one reader variable + + cleanupQueryTableDataCond(&pTableScanInfo->base.cond); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); - pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock); - taosArrayDestroy(pTableScanInfo->pSortInfo); + + stopSubTablesTableMergeScan(pTableScanInfo); + taosMemoryFreeClear(param); } @@ -4237,35 +4236,38 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 1024); - pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); - blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); - - pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); - - pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); - pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - pInfo->mTableNumRows = tSimpleHashInit(1024, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); pInfo->mergeLimit = -1; bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; if (hasLimit) { pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; pInfo->mSkipTables = NULL; } + + initResultSizeInfo(&pOperator->resultInfo, 1024); + pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); + + pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); + pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; + int32_t rowSize = pInfo->pResBlock->info.rowSize; uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); + + //start one reader variable + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + pInfo->mTableNumRows = tSimpleHashInit(1024, + taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + if (!tsExperimental) { pInfo->filesetDelimited = false; } else { pInfo->filesetDelimited = pTableScanNode->filesetDelimited; } - pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; - + // end one reader variable setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols;