diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e8dab42129..701c7ee0f3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -909,6 +909,11 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond); +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f5d813f2bf..f6a5899ad8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4675,33 +4675,43 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; +// +// tsdbReaderT pDataReader = +// doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); +// if (pDataReader == NULL && terrno != 0) { +// return NULL; +// } +// +// int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); +// if (code) { +// tsdbCleanupReadHandle(pDataReader); +// return NULL; +// } +// +// SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); +// code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json +// taosArrayDestroy(groupKeys); +// if (code){ +// tsdbCleanupReadHandle(pDataReader); +// return NULL; +// } +// +// SOperatorInfo* pOperator = +// createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); +// +// STableScanInfo* pScanInfo = pOperator->info; +// pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - tsdbReaderT pDataReader = - doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); - if (pDataReader == NULL && terrno != 0) { - return NULL; - } - - int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - if (code) { - tsdbCleanupReadHandle(pDataReader); - return NULL; - } - + SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); + createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); + extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); - code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json taosArrayDestroy(groupKeys); - if (code){ - tsdbCleanupReadHandle(pDataReader); - return NULL; - } - SOperatorInfo* pOperator = - createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - + createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 47289221c0..e342912727 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1865,7 +1865,7 @@ _error: static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableScanInfo* pInfo = pOperator->info; + STableMergeScanInfo* pInfo = pOperator->info; SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; @@ -2154,7 +2154,6 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; - blockDataDestroy(pTableScanInfo->pResBlock); clearupQueryTableDataCond(&pTableScanInfo->cond); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {