From 13735d725715922aba2a8881ace4c52a992f635c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 13 Apr 2022 19:04:25 +0800 Subject: [PATCH] feat(query): distributed splitting of child/normal table JOIN --- include/libs/nodes/nodes.h | 1 + source/libs/executor/src/scanoperator.c | 2 +- source/libs/nodes/src/nodesTraverseFuncs.c | 50 ++-- source/libs/parser/src/parAstCreater.c | 22 +- .../libs/parser/test/mockCatalogService.cpp | 11 +- source/libs/planner/src/planLogicCreater.c | 1 - source/libs/planner/src/planOptimizer.c | 244 ++++++++++++++++-- source/libs/planner/src/planSpliter.c | 147 ++++++++--- source/libs/planner/test/plannerTest.cpp | 16 +- 9 files changed, 396 insertions(+), 98 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3a1d7954a7..4ffacf3787 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -231,6 +231,7 @@ typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, + DEAL_RES_END } EDealRes; typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4179999c4b..5079a49a62 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -548,7 +548,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { char* dbName = nodesGetValueFromNode(node); strncpy(pContext, varDataVal(dbName), varDataLen(dbName)); *((char*)pContext + varDataLen(dbName)) = 0; - return DEAL_RES_ERROR; // stop walk + return DEAL_RES_END; // stop walk } default: break; diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 4a782cce08..99a08923bb 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -46,7 +46,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; res = walkNode(pOpNode->pLeft, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pOpNode->pRight, order, walker, pContext); } break; @@ -63,10 +63,10 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; res = walkNode(pJoinTableNode->pLeft, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pJoinTableNode->pRight, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext); } break; @@ -80,7 +80,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; res = walkNode(pState->pExpr, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pState->pCol, order, walker, pContext); } break; @@ -88,7 +88,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; res = walkNode(pSession->pCol, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pSession->pGap, order, walker, pContext); } break; @@ -96,16 +96,16 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; res = walkNode(pInterval->pInterval, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pOffset, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pSliding, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pFill, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pCol, order, walker, pContext); } break; @@ -126,7 +126,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker break; } - if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { res = walker(pNode, pContext); } @@ -136,8 +136,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { SNode* node; FOREACH(node, pNodeList) { - if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) { - return DEAL_RES_ERROR; + EDealRes res = walkNode(node, order, walker, pContext); + if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { + return res; } } return DEAL_RES_CONTINUE; @@ -185,7 +186,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext); } break; @@ -202,10 +203,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext); } break; @@ -219,7 +220,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; res = rewriteNode(&pState->pExpr, order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&pState->pCol, order, rewriter, pContext); } break; @@ -227,7 +228,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; res = rewriteNode(&pSession->pCol, order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&pSession->pGap, order, rewriter, pContext); } break; @@ -235,16 +236,16 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext); } break; @@ -265,7 +266,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit break; } - if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { res = rewriter(pRawNode, pContext); } @@ -275,8 +276,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { SNode** pNode; FOREACH_FOR_REWRITE(pNode, pNodeList) { - if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) { - return DEAL_RES_ERROR; + EDealRes res = rewriteNode(pNode, order, rewriter, pContext); + if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { + return res; } } return DEAL_RES_CONTINUE; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 318f7ca5f7..9a26e38c98 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -328,8 +328,26 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ CHECK_OUT_OF_MEM(cond); cond->condType = type; cond->pParameterList = nodesMakeList(); - nodesListAppend(cond->pParameterList, pParam1); - nodesListAppend(cond->pParameterList, pParam2); + if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) || + (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) { + nodesListAppend(cond->pParameterList, pParam1); + nodesListAppend(cond->pParameterList, pParam2); + } else { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList); + ((SLogicConditionNode*)pParam1)->pParameterList = NULL; + nodesDestroyNode(pParam1); + } else { + nodesListAppend(cond->pParameterList, pParam1); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList); + ((SLogicConditionNode*)pParam2)->pParameterList = NULL; + nodesDestroyNode(pParam2); + } else { + nodesListAppend(cond->pParameterList, pParam2); + } + } return (SNode*)cond; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 402caeb252..3da3678563 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -146,6 +146,7 @@ public: meta_[db][tbname].reset(new MockTableMeta()); meta_[db][tbname]->schema = table.release(); meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE; SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,}; addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030); @@ -197,11 +198,11 @@ public: std::cout << "Table:" << table.first << std::endl; std::cout << SH("Field") << SH("Type") << SH("DataType") << IH("Bytes") << std::endl; std::cout << SL(3, 1) << std::endl; - int16_t numOfTags = schema->tableInfo.numOfTags; - int16_t numOfFields = numOfTags + schema->tableInfo.numOfColumns; + int16_t numOfColumns = schema->tableInfo.numOfColumns; + int16_t numOfFields = numOfColumns + schema->tableInfo.numOfTags; for (int16_t i = 0; i < numOfFields; ++i) { const SSchema* col = schema->schema + i; - std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfTags)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl; + std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl; } std::cout << std::endl; } @@ -262,8 +263,8 @@ private: return tDataTypes[type].name; } - std::string ftToString(int16_t colid, int16_t numOfTags) const { - return (0 == colid ? "column" : (colid <= numOfTags ? "tag" : "column")); + std::string ftToString(int16_t colid, int16_t numOfColumns) const { + return (0 == colid ? "column" : (colid <= numOfColumns ? "tag" : "column")); } STableMeta* getTableSchemaMeta(const std::string& db, const std::string& tbname) const { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 1d8400e1eb..3c60b62434 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -694,7 +694,6 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } return code; - return TSDB_CODE_SUCCESS; } static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index c56e113d40..19e6718fe8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -41,6 +41,21 @@ typedef struct SOsdInfo { SNodeList* pDsoFuncs; } SOsdInfo; +typedef struct SCpdIsMultiTableCondCxt { + SNodeList* pLeftCols; + SNodeList* pRightCols; + bool havaLeftCol; + bool haveRightCol; +} SCpdIsMultiTableCondCxt; + +typedef enum ECondAction { + COND_ACTION_STAY = 1, + COND_ACTION_PUSH_JOIN, + COND_ACTION_PUSH_LEFT_CHILD, + COND_ACTION_PUSH_RIGHT_CHILD + // after supporting outer join, there are other possibilities +} ECondAction; + static bool osdMayBeOptimized(SLogicNode* pNode) { if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) { return false; @@ -152,34 +167,227 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* return TSDB_CODE_SUCCESS; } -static int32_t cpdPartitionCondition(SJoinLogicNode* pJoin, SNodeList** pMultiTableCond, SNodeList** pSingleTableCond) { - // todo +static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) { + SNode* pTableCol = NULL; + FOREACH(pTableCol, pTableCols) { + if (nodesEqualNode(pCondCol, pTableCol)) { + return true; + } + } + return false; +} + +static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) { + SCpdIsMultiTableCondCxt* pCxt = pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (belongThisTable(pNode, pCxt->pLeftCols)) { + pCxt->havaLeftCol = true; + } else if (belongThisTable(pNode, pCxt->pRightCols)) { + pCxt->haveRightCol = true; + } + return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE; + } + return DEAL_RES_CONTINUE; +} + +static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols, SNode* pNode) { + SCpdIsMultiTableCondCxt cxt = { .pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false }; + nodesWalkExpr(pNode, cpdIsMultiTableCondImpl, &cxt); + return (JOIN_TYPE_INNER != joinType ? COND_ACTION_STAY : + (cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD))); +} + +static int32_t cpdMakeCond(SNodeList** pConds, SNode** pCond) { + if (NULL == *pConds) { + return TSDB_CODE_SUCCESS; + } + + if (1 == LIST_LENGTH(*pConds)) { + *pCond = nodesListGetNode(*pConds, 0); + nodesClearList(*pConds); + } else { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + pLogicCond->pParameterList = *pConds; + *pCond = (SNode*)pLogicCond; + } + *pConds = NULL; + return TSDB_CODE_SUCCESS; } -static int32_t cpdPushJoinCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pMultiTableCond) { - // todo +static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions; + if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { + return TSDB_CODE_SUCCESS; + } + + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + int32_t code = TSDB_CODE_SUCCESS; + + SNodeList* pOnConds = NULL; + SNodeList* pLeftChildConds = NULL; + SNodeList* pRightChildConds = NULL; + SNodeList* pRemainConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond); + if (COND_ACTION_PUSH_JOIN == condAction) { + code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); + } else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) { + code = nodesListMakeAppend(&pLeftChildConds, nodesCloneNode(pCond)); + } else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) { + code = nodesListMakeAppend(&pRightChildConds, nodesCloneNode(pCond)); + } else { + code = nodesListMakeAppend(&pRemainConds, nodesCloneNode(pCond)); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + SNode* pTempOnCond = NULL; + SNode* pTempLeftChildCond = NULL; + SNode* pTempRightChildCond = NULL; + SNode* pTempRemainCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pOnConds, &pTempOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pLeftChildConds, &pTempLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pRightChildConds, &pTempRightChildCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pRemainConds, &pTempRemainCond); + } + + if (TSDB_CODE_SUCCESS == code) { + *pOnCond = pTempOnCond; + *pLeftChildCond = pTempLeftChildCond; + *pRightChildCond = pTempRightChildCond; + nodesDestroyNode(pJoin->node.pConditions); + pJoin->node.pConditions = pTempRemainCond; + } else { + nodesDestroyList(pOnConds); + nodesDestroyList(pLeftChildConds); + nodesDestroyList(pRightChildConds); + nodesDestroyList(pRemainConds); + nodesDestroyNode(pTempOnCond); + nodesDestroyNode(pTempLeftChildCond); + nodesDestroyNode(pTempRightChildCond); + nodesDestroyNode(pTempRemainCond); + } + + return code; +} + +static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions); + if (COND_ACTION_STAY == condAction) { + return TSDB_CODE_SUCCESS; + } else if (COND_ACTION_PUSH_JOIN == condAction) { + *pOnCond = pJoin->node.pConditions; + } else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) { + *pLeftChildCond = pJoin->node.pConditions; + } else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) { + *pRightChildCond = pJoin->node.pConditions; + } + pJoin->node.pConditions = NULL; return TSDB_CODE_SUCCESS; } -static int32_t cpdPushJoinCondToChildren(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pSingleTableCond) { - // todo - return TSDB_CODE_SUCCESS; +static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) { + return cpdPartitionLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + } else { + return cpdPartitionOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + } +} + +static int32_t cpdCondAppend(SOptimizeContext* pCxt, SNode** pCond, SNode** pAdditionalCond) { + if (NULL == *pCond) { + TSWAP(*pCond, *pAdditionalCond, SNode*); + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) { + code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond); + if (TSDB_CODE_SUCCESS == code) { + *pAdditionalCond = NULL; + } + } else { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + code = nodesListMakeAppend(&pLogicCond->pParameterList, *pAdditionalCond); + if (TSDB_CODE_SUCCESS == code) { + *pAdditionalCond = NULL; + code = nodesListMakeAppend(&pLogicCond->pParameterList, *pCond); + } + if (TSDB_CODE_SUCCESS == code) { + *pCond = (SNode*)pLogicCond; + } else { + nodesDestroyNode(pLogicCond); + } + } + return code; +} + +static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { + return cpdCondAppend(pCxt, &pJoin->pOnConditions, pCond); +} + +static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) { + return cpdCondAppend(pCxt, &pScan->node.pConditions, pCond); +} + +static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { + switch (nodeType(pChild)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + return cpdPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond); + default: + break; + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; } static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL != pJoin->node.pConditions) { - SNodeList* pMultiTableCond = NULL; - SNodeList* pSingleTableCond = NULL; - int32_t code = cpdPartitionCondition(pJoin, &pMultiTableCond, &pSingleTableCond); - if (TSDB_CODE_SUCCESS == code && NULL != pMultiTableCond) { - code = cpdPushJoinCondToOnCond(pCxt, pJoin, pMultiTableCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pSingleTableCond) { - code = cpdPushJoinCondToChildren(pCxt, pJoin, pSingleTableCond); - } + if (NULL == pJoin->node.pConditions) { + return TSDB_CODE_SUCCESS; } - return TSDB_CODE_SUCCESS; + + SNode* pOnCond = NULL; + SNode* pLeftChildCond = NULL; + SNode* pRightChildCond = NULL; + int32_t code = cpdPartitionCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond); + if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { + code = cpdPushCondToOnCond(pCxt, pJoin, &pOnCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { + code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { + code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pOnCond); + nodesDestroyNode(pLeftChildCond); + nodesDestroyNode(pRightChildCond); + } + + return code; } static int32_t cpdPushAggCondition(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 0b21052955..e54cf33934 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -18,6 +18,7 @@ #define SPLIT_FLAG_MASK(n) (1 << n) #define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0) +#define SPLIT_FLAG_CTJ SPLIT_FLAG_MASK(1) #define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask) #define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) @@ -39,43 +40,14 @@ typedef struct SStsInfo { SLogicSubplan* pSubplan; } SStsInfo; -static SLogicNode* stsMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && - NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} +typedef struct SCtjInfo { + SScanLogicNode* pScan; + SLogicSubplan* pSubplan; +} SCtjInfo; -static void stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { - SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pScan = (SScanLogicNode*)pSplitNode; - pInfo->pSubplan = pSubplan; - } -} -static void stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStsInfo* pInfo) { - if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) { - stsFindSplitNode(pSubplan, pInfo); - } - SNode* pChild; - FOREACH(pChild, pSubplan->pChildren) { - stsMatch(pCxt, (SLogicSubplan*)pChild, pInfo); - if (NULL != pInfo->pScan) { - break; - } - } - return; -} +typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, SStsInfo* pInfo); -static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) { +static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) { SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); if (NULL == pSubplan) { return NULL; @@ -84,11 +56,11 @@ static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*); - SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS); + SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag); return pSubplan; } -static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) { +static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, ESubplanType subplanType) { SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; @@ -119,10 +91,48 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla return TSDB_CODE_FAILED; } +static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) { + if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) { + if (func(pSubplan, pInfo)) { + return true; + } + } + SNode* pChild; + FOREACH(pChild, pSubplan->pChildren) { + if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) { + return true; + } + } + return false; +} + +static SLogicNode* stsMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && + NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { + SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SStsInfo info = {0}; - stsMatch(pCxt, pSubplan, &info); - if (NULL == info.pScan) { + if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, stsFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } if (NULL == info.pSubplan->pChildren) { @@ -131,9 +141,61 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return TSDB_CODE_OUT_OF_MEMORY; } } - int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, stsCreateScanSubplan(pCxt, info.pScan)); + int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS)); if (TSDB_CODE_SUCCESS == code) { - code = stsCreateExchangeNode(pCxt, info.pSubplan, info.pScan); + code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, SUBPLAN_TYPE_MERGE); + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + +static bool ctjIsSingleTable(int8_t tableType) { + return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType); +} + +static SLogicNode* ctjMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { + SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1); + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pLeft) && ctjIsSingleTable(((SScanLogicNode*)pLeft)->pMeta->tableType) && + QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pRight) && ctjIsSingleTable(((SScanLogicNode*)pRight)->pMeta->tableType)) { + return pRight; + } + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = ctjMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { + SLogicNode* pSplitNode = ctjMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + +static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + SCtjInfo info = {0}; + if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, ctjFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + if (NULL == info.pSubplan->pChildren) { + info.pSubplan->pChildren = nodesMakeList(); + if (NULL == info.pSubplan->pChildren) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ)); + if (TSDB_CODE_SUCCESS == code) { + code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, info.pSubplan->subplanType); } ++(pCxt->groupId); pCxt->split = true; @@ -141,7 +203,8 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { } static const SSplitRule splitRuleSet[] = { - { .pName = "SuperTableScan", .splitFunc = stsSplit } + { .pName = "SuperTableScan", .splitFunc = stsSplit }, + { .pName = "ChildTableJoin", .splitFunc = ctjSplit }, }; static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index fd0084c01e..69c76fc3ae 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -70,6 +70,12 @@ protected: cout << "unformatted logic plan : " << endl; cout << toString((const SNode*)pLogicNode, false) << endl; + code = optimizeLogicPlan(&cxt, pLogicNode); + if (code != TSDB_CODE_SUCCESS) { + cout << "sql:[" << cxt_.pSql << "] optimizeLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl; + return false; + } + SLogicSubplan* pLogicSubplan = nullptr; code = splitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); if (code != TSDB_CODE_SUCCESS) { @@ -174,13 +180,13 @@ TEST_F(PlannerTest, selectStableBasic) { TEST_F(PlannerTest, selectJoin) { setDatabase("root", "test"); - bind("SELECT * FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); - ASSERT_TRUE(run()); + // bind("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + // ASSERT_TRUE(run()); - bind("SELECT * FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1"); - ASSERT_TRUE(run()); + // bind("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + // ASSERT_TRUE(run()); - bind("SELECT t1.* FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1"); + bind("SELECT t1.c1, t2.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and t2.c2 = 'qwe'"); ASSERT_TRUE(run()); }