diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8dd7057f0c..f0f0361031 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -680,8 +680,7 @@ typedef struct SJoinOperatorInfo { SSDataBlock *pRight; int32_t rightPos; SColumnInfo rightCol; - SNode *pOnConditions; - SNode *pOtherConditions; + SNode *pCondAfterMerge; } SJoinOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 71f04815dc..e9995ed77a 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -62,11 +62,19 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ASSERT(false); } - //TODO: merge these two conditions - ASSERT(pJoinNode->pOnConditions); - pInfo->pOnConditions = nodesCloneNode(pJoinNode->pOnConditions); - if (pJoinNode->node.pConditions != NULL) { - pInfo->pOtherConditions = pJoinNode->node.pConditions; + if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); + pLogicCond->pParameterList = nodesMakeList(); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); + pLogicCond->condType = LOGIC_COND_TYPE_AND; + } else if (pJoinNode->pOnConditions != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions); + } else if (pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); + } else { + pInfo->pCondAfterMerge = NULL; } pOperator->fpSet = @@ -95,8 +103,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; - nodesDestroyNode(pJoinOperator->pOnConditions); - nodesDestroyNode(pJoinOperator->pOtherConditions); + nodesDestroyNode(pJoinOperator->pCondAfterMerge); } static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { @@ -201,8 +208,9 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (numOfNewRows == 0) { break; } - doFilter(pJoinInfo->pOnConditions, pRes); - doFilter(pJoinInfo->pOtherConditions, pRes); + if (pJoinInfo->pCondAfterMerge != NULL) { + doFilter(pJoinInfo->pCondAfterMerge, pRes); + } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7e0c5d6f5a..495ef9e698 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -546,15 +546,11 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* } return hasPrimaryKeyEqualCond; } else { - bool isPriKeyEqualCond = pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); - if (isPriKeyEqualCond) { - pJoin->pMergeCondition = nodesCloneNode(pCond); - } - return isPriKeyEqualCond; + return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); } } -static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pOnConditions) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); } @@ -564,13 +560,61 @@ static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoin return TSDB_CODE_SUCCESS; } +static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pOnConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { + *ppMergeCond = nodesCloneNode(pCond); + } else { + code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); + } + } + + SNode* pTempOnCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempOnCond, &pOnConds); + } + + if (TSDB_CODE_SUCCESS == code && NULL != *ppMergeCond) { + *ppOnCond = pTempOnCond; + nodesDestroyNode(pJoin->pOnConditions); + pJoin->pOnConditions = NULL; + return TSDB_CODE_SUCCESS; + } else { + nodesDestroyList(pOnConds); + nodesDestroyNode(pTempOnCond); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } +} + +static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { + return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppMergeCond, ppOnCond); + } + + if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOnConditions)) { + *ppMergeCond = nodesCloneNode(pJoin->pOnConditions); + *ppOnCond = NULL; + nodesDestroyNode(pJoin->pOnConditions); + pJoin->pOnConditions = NULL; + return TSDB_CODE_SUCCESS; + } else { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } +} + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; } if (NULL == pJoin->node.pConditions) { - return pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); + return pushDownCondOptCheckJoinOnCond(pCxt, pJoin); } SNode* pOnCond = NULL; @@ -589,11 +633,26 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + } + + SNode* pJoinMergeCond = NULL; + SNode* pJoinOnCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinMergeCond) { + pJoin->pMergeCondition = pJoinMergeCond; + pJoin->pOnConditions = pJoinOnCond; + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; - code = pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); } else { + nodesDestroyNode(pJoinMergeCond); + nodesDestroyNode(pJoinOnCond); nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); nodesDestroyNode(pRightChildCond);