diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 6418654ed5..dab0499aad 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -299,8 +299,8 @@ typedef struct STmsSubTableInput { SArray* aBlockPages; SSDataBlock* pPageBlock; - int32_t pageIdx; + int32_t rowIdx; int64_t* aTs; } STmsSubTableInput; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 55cad4236e..5c490b9d3b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3431,46 +3431,6 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu return TSDB_CODE_SUCCESS; } -static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo, - STmsSubTableInput* pInput) { - taosArrayClear(pInput->aBlockPages); - int32_t start = 0; - SSDataBlock* pDataBlock = pInput->pReaderBlock; - while (start < pDataBlock->info.rows) { - int32_t stop = 0; - blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize); - SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - - int32_t pageId = -1; - void* pPage = getNewBufPage(pSubTblsInfo->pBlocksBuf, &pageId); - - taosArrayPush(pInput->aBlockPages, &pageId); - - int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); - ASSERT(size <= getBufPageSize(pSubTblsInfo->pBlocksBuf)); - - blockDataToBuf(pPage, p); - - setBufPageDirty(pPage, true); - releaseBufPage(pSubTblsInfo->pBlocksBuf, pPage); - - blockDataDestroy(p); - start = stop + 1; - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) { - int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex); - void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); - - blockDataFromBuf(pInput->pPageBlock, page); - - releaseBufPage(pSubTblsInfo->pBlocksBuf, page); - return TSDB_CODE_SUCCESS; -} - static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { pInfo->bGroupProcessed = false; @@ -3491,11 +3451,7 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { if (pInput->rowIdx == -1) { continue; } - if (pInput->type == SUB_TABLE_EXT_PAGES) { - pInput->pageIdx = 0; - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); - pInput->rowIdx = 0; - } else { + if (pInput->type == SUB_TABLE_MEM_BLOCK) { pInput->rowIdx = 0; pInput->pageIdx = -1; } @@ -3564,11 +3520,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInput->rowIdx = 0; pInput->pageIdx = -1; } - if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { - pInput->aBlockPages = taosArrayInit(32, sizeof(int32_t)); - saveSubTableBlock(pInfo, pSubTblsInfo, pInput); - pInput->type = SUB_TABLE_EXT_PAGES; - } } return TSDB_CODE_SUCCESS; } @@ -3588,29 +3539,6 @@ static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubT return TSDB_CODE_SUCCESS; } -static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { - STableMergeScanInfo* pInfo = pOperatorInfo->info; - STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); - if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) { - bool hasNext = true; - fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); - if (!hasNext) { - pInput->rowIdx = -1; - ++pSubTblsInfo->numSubTablesCompleted; - } else { - saveSubTableBlock(pInfo, pSubTblsInfo, pInput); - pInput->pageIdx = 0; - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); - pInput->rowIdx = 0; - - } - } else { - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); - pInput->rowIdx = 0; - } - return TSDB_CODE_SUCCESS; -} - static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { STableMergeScanInfo* pInfo = pOperatorInfo->info; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); @@ -3621,9 +3549,6 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab } else if (pInput->rowIdx == pInputBlock->info.rows -1 ) { if (pInput->type == SUB_TABLE_MEM_BLOCK) { adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo); - } else if (pInput->type == SUB_TABLE_EXT_PAGES) { - ++pInput->pageIdx; - adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo); } if (pInput->rowIdx != -1) { SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); @@ -3732,7 +3657,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { return TSDB_CODE_SUCCESS; } -SSDataBlock* doTableMergeScanSeqBlocksInMem(SOperatorInfo* pOperator) { +SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -4320,14 +4245,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, - pTableScanNode->paraTablesSort ? doTableMergeScanSeqBlocksInMem : doTableMergeScan, - NULL, - destroyTableMergeScanOperatorInfo, - optrDefaultBufFn, - getTableMergeScanExplainExecInfo, - optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = createOperatorFpSet( + optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL, + destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, + NULL); pOperator->cost.openCost = 0; return pOperator;