From 629ab2b85fb59eeebc3e4634de1db0c9b0b91586 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 15 May 2023 16:10:11 +0800 Subject: [PATCH] fix: extract tag equal condition --- include/libs/nodes/plannodes.h | 2 + source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 18 ++++-- source/libs/nodes/src/nodesMsgFuncs.c | 10 ++- source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/planner/src/planOptimizer.c | 73 ++++++++++++++++++++++ source/libs/planner/src/planPhysiCreater.c | 3 + 7 files changed, 103 insertions(+), 6 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ad4b59714c..0e235191b1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -112,6 +112,7 @@ typedef struct SJoinLogicNode { SNode* pOnConditions; bool isSingleTableJoin; EOrder inputTsOrder; + SNode* pTagEqualConditions; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pOnConditions; SNodeList* pTargets; EOrder inputTsOrder; + SNode* pTagEqualCondtions; } SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index d9a4c5178f..91db527a02 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinType); CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); + CLONE_NODE_FIELD(pTagEqualConditions); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0aeb83ce08..b4e2d95e26 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; +static const char* jkJoinLogicPlanTagEqualConditions = "TagEqualConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions); + } return code; } @@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions); + } return code; } @@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; +static const char* jkJoinPhysiPlanTagEqualConditions = "TagEqualConditions"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions); + } return code; } @@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } - + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 6c6b6c0e81..45cebb4559 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2317,7 +2317,8 @@ enum { PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_TARGETS, - PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER + PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, + PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS }; static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions); + } return code; } @@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER: code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); break; + case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f71eef7969..d97b17e30c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); + nodesDestroyNode(pLogicNode->pTagEqualConditions); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { @@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); + nodesDestroyNode(pPhyNode->pTagEqualCondtions); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 66d85a9c89..f5ef02e207 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -740,6 +740,75 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin return code; } +static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + SColumnNode* pCol = (SColumnNode*)pNode; + if (COLUMN_TYPE_TAG != pCol->colType) { + return false; + } + return pushDownCondOptBelongThisTable(pNode, pTableCols); +} + +static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { + if (QUERY_NODE_OPERATOR != nodeType(pCond)) { + return false; + } + SOperatorNode* pOper = (SOperatorNode*)pCond; + if (OP_TYPE_EQUAL != pOper->opType) { + return false; + } + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { + return pushDownCondOptIsTag(pOper->pRight, pRightCols); + } else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) { + return pushDownCondOptIsTag(pOper->pRight, pLeftCols); + } + return false; +} + +static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pTagEqualConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) { + code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond)); + } + } + + SNode* pTempTagEqCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagEqCond, &pTagEqualConds); + } + + if (TSDB_CODE_SUCCESS == code) { + pJoin->pTagEqualConditions = pTempTagEqCond; + return TSDB_CODE_SUCCESS; + } else { + nodesDestroyList(pTagEqualConds); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { + return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin); + } + + if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) { + pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -774,6 +843,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e2c2e4c655..9c6c227480 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions); + } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); }