From 43b0b23cd34d05026215327aac60def6766f37b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jan 2022 11:59:08 +0800 Subject: [PATCH] [td-11818] Update the physical plan in case of the subscribe processing. --- source/libs/planner/src/physicalPlan.c | 39 +++++++++++++++++++------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 530d618aee..5d0600c820 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -296,6 +296,7 @@ static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNo if (needMultiNodeScan(pTable)) { return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } + return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); } @@ -386,6 +387,33 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { // todo deal subquery } +static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) { + // The exchange operator is not necessary, in case of the stream scan. + // Here we need to remove it from the DAG. + if (pQueryNode->info.type == QNODE_STREAMSCAN) { + SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0); + SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0); + + if (pSubplan->pNode->info.type == OP_Exchange) { + ASSERT(taosArrayGetSize(pRootLevel) == 1); + + taosArrayRemove(pDag->pSubplans, 0); + // And then update the number of the subplans. + pDag->numOfSubplans -= 1; + } + } else { + // Traverse the dag again to acquire the execution node. + if (pNodeList != NULL) { + SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans); + size_t num = taosArrayGetSize(*pSubLevel); + for (int32_t j = 0; j < num; ++j) { + SSubplan* pPlan = taosArrayGetP(*pSubLevel, j); + taosArrayPush(pNodeList, &pPlan->execNode); + } + } + } +} + int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) { TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { @@ -407,16 +435,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_FAILED; } END_TRY - // traverse the dag again to acquire the execution node. - if (pNodeList != NULL) { - SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans); - size_t num = taosArrayGetSize(*pSubLevel); - for (int32_t j = 0; j < num; ++j) { - SSubplan* pPlan = taosArrayGetP(*pSubLevel, j); - taosArrayPush(pNodeList, &pPlan->execNode); - } - } - + postCreateDag(pQueryNode, *pDag, pNodeList); return TSDB_CODE_SUCCESS; }