diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 012ba114ce..05553694f5 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -82,6 +82,7 @@ typedef struct SScanLogicNode { typedef struct SJoinLogicNode { SLogicNode node; EJoinType joinType; + SNode* pMergeCondition; SNode* pOnConditions; bool isSingleTableJoin; } SJoinLogicNode; @@ -327,6 +328,7 @@ typedef struct SInterpFuncPhysiNode { typedef struct SJoinPhysiNode { SPhysiNode node; EJoinType joinType; + SNode* pMergeCondition; SNode* pOnConditions; SNodeList* pTargets; } SJoinPhysiNode; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 6fbda77808..33954d5517 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -90,13 +90,9 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; } -SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { +static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - SSDataBlock* pRes = pJoinInfo->pRes; - blockDataCleanup(pRes); - blockDataEnsureCapacity(pRes, 4096); - int32_t nrows = 0; while (1) { @@ -181,7 +177,26 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } } +} +SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, 4096); + while (true) { + int32_t numOfRowsBefore = pRes->info.rows; + doMergeJoinImpl(pOperator, pRes); + int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; + if (numOfNewRows == 0) { + break; + } + doFilter(pJoinInfo->pOnCondition, pRes); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } return (pRes->info.rows > 0) ? pRes : NULL; }