feat: split join on condition into merge condition and on condition

This commit is contained in:
slzhou 2022-07-05 13:42:49 +08:00
parent 024796599c
commit 73e0e09dd8
3 changed files with 85 additions and 19 deletions

View File

@ -680,8 +680,7 @@ typedef struct SJoinOperatorInfo {
SSDataBlock *pRight; SSDataBlock *pRight;
int32_t rightPos; int32_t rightPos;
SColumnInfo rightCol; SColumnInfo rightCol;
SNode *pOnConditions; SNode *pCondAfterMerge;
SNode *pOtherConditions;
} SJoinOperatorInfo; } SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)

View File

@ -62,11 +62,19 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
ASSERT(false); ASSERT(false);
} }
//TODO: merge these two conditions if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
ASSERT(pJoinNode->pOnConditions); pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
pInfo->pOnConditions = nodesCloneNode(pJoinNode->pOnConditions); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge);
if (pJoinNode->node.pConditions != NULL) { pLogicCond->pParameterList = nodesMakeList();
pInfo->pOtherConditions = pJoinNode->node.pConditions; nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions));
nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions));
pLogicCond->condType = LOGIC_COND_TYPE_AND;
} else if (pJoinNode->pOnConditions != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions);
} else if (pJoinNode->node.pConditions != NULL) {
pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions);
} else {
pInfo->pCondAfterMerge = NULL;
} }
pOperator->fpSet = pOperator->fpSet =
@ -95,8 +103,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pOnConditions); nodesDestroyNode(pJoinOperator->pCondAfterMerge);
nodesDestroyNode(pJoinOperator->pOtherConditions);
} }
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
@ -201,8 +208,9 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if (numOfNewRows == 0) { if (numOfNewRows == 0) {
break; break;
} }
doFilter(pJoinInfo->pOnConditions, pRes); if (pJoinInfo->pCondAfterMerge != NULL) {
doFilter(pJoinInfo->pOtherConditions, pRes); doFilter(pJoinInfo->pCondAfterMerge, pRes);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break; break;
} }

View File

@ -546,15 +546,11 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode*
} }
return hasPrimaryKeyEqualCond; return hasPrimaryKeyEqualCond;
} else { } else {
bool isPriKeyEqualCond = pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond);
if (isPriKeyEqualCond) {
pJoin->pMergeCondition = nodesCloneNode(pCond);
}
return isPriKeyEqualCond;
} }
} }
static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) { if (NULL == pJoin->pOnConditions) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
} }
@ -564,13 +560,61 @@ static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoin
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pOnConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) {
*ppMergeCond = nodesCloneNode(pCond);
} else {
code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond));
}
}
SNode* pTempOnCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempOnCond, &pOnConds);
}
if (TSDB_CODE_SUCCESS == code && NULL != *ppMergeCond) {
*ppOnCond = pTempOnCond;
nodesDestroyNode(pJoin->pOnConditions);
pJoin->pOnConditions = NULL;
return TSDB_CODE_SUCCESS;
} else {
nodesDestroyList(pOnConds);
nodesDestroyNode(pTempOnCond);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
}
static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppMergeCond, ppOnCond);
}
if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOnConditions)) {
*ppMergeCond = nodesCloneNode(pJoin->pOnConditions);
*ppOnCond = NULL;
nodesDestroyNode(pJoin->pOnConditions);
pJoin->pOnConditions = NULL;
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
}
static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL == pJoin->node.pConditions) { if (NULL == pJoin->node.pConditions) {
return pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); return pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
} }
SNode* pOnCond = NULL; SNode* pOnCond = NULL;
@ -589,11 +633,26 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin);
}
SNode* pJoinMergeCond = NULL;
SNode* pJoinOnCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinMergeCond) {
pJoin->pMergeCondition = pJoinMergeCond;
pJoin->pOnConditions = pJoinOnCond;
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true; pCxt->optimized = true;
code = pushDownCondOptExtractJoinMergeCond(pCxt, pJoin);
} else { } else {
nodesDestroyNode(pJoinMergeCond);
nodesDestroyNode(pJoinOnCond);
nodesDestroyNode(pOnCond); nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond); nodesDestroyNode(pLeftChildCond);
nodesDestroyNode(pRightChildCond); nodesDestroyNode(pRightChildCond);