[td-11818] Update the physical plan in case of the subscribe processing.
This commit is contained in:
parent
afd1ce63db
commit
43b0b23cd3
|
@ -296,6 +296,7 @@ static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNo
|
||||||
if (needMultiNodeScan(pTable)) {
|
if (needMultiNodeScan(pTable)) {
|
||||||
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
||||||
}
|
}
|
||||||
|
|
||||||
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,6 +387,33 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
||||||
// todo deal subquery
|
// todo deal subquery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
|
||||||
|
// The exchange operator is not necessary, in case of the stream scan.
|
||||||
|
// Here we need to remove it from the DAG.
|
||||||
|
if (pQueryNode->info.type == QNODE_STREAMSCAN) {
|
||||||
|
SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
|
||||||
|
SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);
|
||||||
|
|
||||||
|
if (pSubplan->pNode->info.type == OP_Exchange) {
|
||||||
|
ASSERT(taosArrayGetSize(pRootLevel) == 1);
|
||||||
|
|
||||||
|
taosArrayRemove(pDag->pSubplans, 0);
|
||||||
|
// And then update the number of the subplans.
|
||||||
|
pDag->numOfSubplans -= 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Traverse the dag again to acquire the execution node.
|
||||||
|
if (pNodeList != NULL) {
|
||||||
|
SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
|
||||||
|
size_t num = taosArrayGetSize(*pSubLevel);
|
||||||
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
|
||||||
|
taosArrayPush(pNodeList, &pPlan->execNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
|
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
|
||||||
TRY(TSDB_MAX_TAG_CONDITIONS) {
|
TRY(TSDB_MAX_TAG_CONDITIONS) {
|
||||||
SPlanContext context = {
|
SPlanContext context = {
|
||||||
|
@ -407,16 +435,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
} END_TRY
|
} END_TRY
|
||||||
|
|
||||||
// traverse the dag again to acquire the execution node.
|
postCreateDag(pQueryNode, *pDag, pNodeList);
|
||||||
if (pNodeList != NULL) {
|
|
||||||
SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans);
|
|
||||||
size_t num = taosArrayGetSize(*pSubLevel);
|
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
|
||||||
SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
|
|
||||||
taosArrayPush(pNodeList, &pPlan->execNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue