diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b71fac459d..debfce5f2d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -82,6 +82,7 @@ typedef struct SScanLogicNode { typedef struct SJoinLogicNode { SLogicNode node; EJoinType joinType; + SNode* pMergeCondition; SNode* pOnConditions; bool isSingleTableJoin; } SJoinLogicNode; @@ -329,6 +330,7 @@ typedef struct SInterpFuncPhysiNode { typedef struct SJoinPhysiNode { SPhysiNode node; EJoinType joinType; + SNode* pMergeCondition; SNode* pOnConditions; SNodeList* pTargets; } SJoinPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 3478926fef..dd337bcee0 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -376,6 +376,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols); +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols); typedef bool (*FFuncClassifier)(int32_t funcId); int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifier classifier, SNodeList** pFuncs); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1073ebc316..6ead922d95 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -274,15 +274,14 @@ static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) { SDnodeEp dnodeEp = {0}; dnodeEp.id = pDnode->id; - dnodeEp.isMnode = 0; dnodeEp.ep.port = pDnode->port; memcpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + sdbRelease(pSdb, pDnode); + dnodeEp.isMnode = 0; if (mndIsMnode(pMnode, pDnode->id)) { dnodeEp.isMnode = 1; } - - sdbRelease(pSdb, pDnode); taosArrayPush(pDnodeEps, &dnodeEp); } } diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index fbf66da632..3db0087334 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -131,7 +131,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { hashType = TSDB_DATA_TYPE_BINARY; } - SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_NO_LOCK); + SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_ENTRY_LOCK); if (hash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b562b0631e..f0f0361031 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -680,7 +680,7 @@ typedef struct SJoinOperatorInfo { SSDataBlock *pRight; int32_t rightPos; SColumnInfo rightCol; - SNode *pOnCondition; + SNode *pCondAfterMerge; } SJoinOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 6fbda77808..e9995ed77a 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -53,13 +53,28 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - SNode* pOnCondition = pJoinNode->pOnConditions; - if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pOnCondition; + SNode* pMergeCondition = pJoinNode->pMergeCondition; + if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { - extractTimeCondition(pInfo, (SLogicConditionNode*)pOnCondition); + } else { + ASSERT(false); + } + + if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); + pLogicCond->pParameterList = nodesMakeList(); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); + pLogicCond->condType = LOGIC_COND_TYPE_AND; + } else if (pJoinNode->pOnConditions != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions); + } else if (pJoinNode->node.pConditions != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); + } else { + pInfo->pCondAfterMerge = NULL; } pOperator->fpSet = @@ -88,15 +103,12 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + nodesDestroyNode(pJoinOperator->pCondAfterMerge); } -SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { +static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - SSDataBlock* pRes = pJoinInfo->pRes; - blockDataCleanup(pRes); - blockDataEnsureCapacity(pRes, 4096); - int32_t nrows = 0; while (1) { @@ -181,7 +193,28 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } } +} +SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, 4096); + while (true) { + int32_t numOfRowsBefore = pRes->info.rows; + doMergeJoinImpl(pOperator, pRes); + int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; + if (numOfNewRows == 0) { + break; + } + if (pJoinInfo->pCondAfterMerge != NULL) { + doFilter(pJoinInfo->pCondAfterMerge, pRes); + } + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } return (pRes->info.rows > 0) ? pRes : NULL; } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index da9474ede0..1bc759e833 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1565,6 +1565,10 @@ void constructUdfService(void *argsThread) { //TODO return value of uv_run uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); uv_loop_close(&udfc->uvLoop); + + uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL); + uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + uv_loop_close(&udfc->uvLoop); } int32_t udfcOpen() { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 9c5cec07df..25a41fb15c 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -368,6 +368,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); + CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4179930033..34f92dac0b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1254,6 +1254,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; +static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1262,6 +1263,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1617,6 +1621,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; @@ -1627,6 +1632,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1648,6 +1656,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanMergeCondition, &pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 3747dde9ed..b12e3b14c7 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -470,6 +470,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); + } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index bf4eac3698..118cd80807 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -718,6 +718,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); break; } @@ -828,6 +829,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); break; @@ -1493,6 +1495,38 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* return TSDB_CODE_SUCCESS; } +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols) { + if (NULL == pCols) { + return TSDB_CODE_FAILED; + } + SCollectColumnsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pTableAlias = pTableAlias, + .collectType = type, + .pCols = (NULL == *pCols ? nodesMakeList() : *pCols), + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pCols || NULL == cxt.pColHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pCols = NULL; + + nodesWalkExpr(node, collectColumns, &cxt); + + taosHashCleanup(cxt.pColHash); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pCols); + return cxt.errCode; + } + if (LIST_LENGTH(cxt.pCols) > 0) { + *pCols = cxt.pCols; + } else { + nodesDestroyList(cxt.pCols); + } + + return TSDB_CODE_SUCCESS; + +} + typedef struct SCollectFuncsCxt { int32_t errCode; FFuncClassifier classifier; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 88930b6269..a4f30228d5 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -480,12 +480,18 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond); } +static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) { + return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond); +} + static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { switch (nodeType(pChild)) { case QUERY_NODE_LOGIC_PLAN_SCAN: return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond); case QUERY_NODE_LOGIC_PLAN_PROJECT: return pushDownCondOptPushCondToProject(pCxt, (SProjectLogicNode*)pChild, pCond); + case QUERY_NODE_LOGIC_PLAN_JOIN: + return pushDownCondOptPushCondToJoin(pCxt, (SJoinLogicNode*)pChild, pCond); default: break; } @@ -554,13 +560,83 @@ static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogic return TSDB_CODE_SUCCESS; } +static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pOnConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { + *ppMergeCond = nodesCloneNode(pCond); + } else { + code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); + } + } + + SNode* pTempOnCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempOnCond, &pOnConds); + } + + if (TSDB_CODE_SUCCESS == code && NULL != *ppMergeCond) { + *ppOnCond = pTempOnCond; + nodesDestroyNode(pJoin->pOnConditions); + pJoin->pOnConditions = NULL; + return TSDB_CODE_SUCCESS; + } else { + nodesDestroyList(pOnConds); + nodesDestroyNode(pTempOnCond); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } +} + +static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) { + return pushDownCondOptPartJoinOnCondLogicCond(pJoin, ppMergeCond, ppOnCond); + } + + if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOnConditions)) { + *ppMergeCond = nodesCloneNode(pJoin->pOnConditions); + *ppOnCond = NULL; + nodesDestroyNode(pJoin->pOnConditions); + pJoin->pOnConditions = NULL; + return TSDB_CODE_SUCCESS; + } else { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } +} + +static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + SNode* pJoinMergeCond = NULL; + SNode* pJoinOnCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + pJoin->pMergeCondition = pJoinMergeCond; + pJoin->pOnConditions = pJoinOnCond; + } else { + nodesDestroyNode(pJoinMergeCond); + nodesDestroyNode(pJoinOnCond); + } + return code; +} + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; } if (NULL == pJoin->node.pConditions) { - return pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + int32_t code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); + pCxt->optimized = true; + } + return code; } SNode* pOnCond = NULL; @@ -579,10 +655,13 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p pushDownCondOptPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; - code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); } else { nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); @@ -720,7 +799,8 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg // TODO: remove it after full implementation of pushing down to child if (1 != LIST_LENGTH(pAgg->node.pChildren) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && - QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { + QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && + QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0eb05ccbe9..d10908c519 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -612,10 +612,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; - if (NULL != pJoinLogicNode->pOnConditions) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions, - &pJoin->pOnConditions); - } + setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, + &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); @@ -623,6 +621,21 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } + + SNodeList* condCols = nodesMakeList(); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &condCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, condCols, pJoin->node.pOutputDataBlockDesc); + nodesDestroyList(condCols); + } + + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions, + &pJoin->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); }