From 97aca25633ba3830a2b5374d8435546a42352e7d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 23 Feb 2024 17:59:45 +0800 Subject: [PATCH] enh: support group join --- include/libs/nodes/plannodes.h | 4 + source/libs/executor/inc/mergejoin.h | 40 ++++--- source/libs/executor/src/mergejoin.c | 16 ++- source/libs/executor/src/mergejoinoperator.c | 2 + source/libs/executor/test/joinTests.cpp | 12 +- source/libs/nodes/src/nodesCloneFuncs.c | 3 + source/libs/nodes/src/nodesCodeFuncs.c | 9 +- source/libs/nodes/src/nodesMsgFuncs.c | 9 +- source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/planner/src/planLogicCreater.c | 2 + source/libs/planner/src/planOptimizer.c | 114 +++++++++++++++++++ source/libs/planner/src/planPhysiCreater.c | 1 + tests/script/tsim/join/left_asof_join.sim | 2 + 13 files changed, 187 insertions(+), 29 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 605bbf0165..faf196f5f3 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -138,10 +138,13 @@ typedef struct SJoinLogicNode { SNode* pTagEqCond; SNode* pTagOnCond; SNode* pFullOnCond; // except prim eq cond + SNodeList* pLeftEqNodes; + SNodeList* pRightEqNodes; bool isSingleTableJoin; bool hasSubQuery; bool isLowLevelJoin; bool seqWinGroup; + bool grpJoin; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -503,6 +506,7 @@ typedef struct SSortMergeJoinPhysiNode { SNodeList* pTargets; SQueryStat inputStat[2]; bool seqWinGroup; + bool grpJoin; } SSortMergeJoinPhysiNode; typedef struct SHashJoinPhysiNode { diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index b8f1ea45a0..65ea21ccad 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -19,7 +19,7 @@ extern "C" { #endif -#if 0 +#if 1 #define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 #define MJOIN_HJOIN_CART_THRESHOLD 10 #define MJOIN_BLK_SIZE_LIMIT 0 //10485760 @@ -95,6 +95,9 @@ typedef struct SMJoinTableCtx { int32_t blkId; SQueryStat inputStat; + uint64_t lastInGid; + SSDataBlock* remainInBlk; + SMJoinColMap* primCol; char* primData; @@ -237,6 +240,7 @@ typedef struct SMJoinWindowCtx { bool lowerRowsAcq; bool eqRowsAcq; bool greaterRowsAcq; + bool groupJoin; int64_t seqGrpId; int64_t winBeginTs; @@ -275,22 +279,28 @@ typedef struct SMJoinExecInfo { int64_t expectRows; } SMJoinExecInfo; +typedef struct SMJoinRetrieveCtx { + bool grpRetrieve; + uint64_t lastGid[2]; + SSDataBlock* remainBlk[2]; +} SMJoinRetrieveCtx; typedef struct SMJoinOperatorInfo { - SOperatorInfo* pOperator; - int32_t joinType; - int32_t subType; - int32_t inputTsOrder; - int32_t errCode; - SMJoinTableCtx tbs[2]; - SMJoinTableCtx* build; - SMJoinTableCtx* probe; - SFilterInfo* pFPreFilter; - SFilterInfo* pPreFilter; - SFilterInfo* pFinFilter; - joinImplFp joinFp; - SMJoinCtx ctx; - SMJoinExecInfo execInfo; + SOperatorInfo* pOperator; + int32_t joinType; + int32_t subType; + int32_t inputTsOrder; + int32_t errCode; + SMJoinTableCtx tbs[2]; + SMJoinTableCtx* build; + SMJoinTableCtx* probe; + SMJoinRetrieveCtx retrieveCtx; + SFilterInfo* pFPreFilter; + SFilterInfo* pPreFilter; + SFilterInfo* pFinFilter; + joinImplFp joinFp; + SMJoinCtx ctx; + SMJoinExecInfo execInfo; } SMJoinOperatorInfo; #define MJOIN_DS_REQ_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream) diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index e9234d96ba..338da457ad 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -35,11 +35,13 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t buildGrpNum = taosArrayGetSize(cache->grps); int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); - pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->seqGrpId : 0; + pCtx->finBlk->info.id.groupId = (pCtx->seqWinGrp || pCtx->groupJoin) ? pCtx->seqGrpId : 0; if (buildGrpNum <= 0 || buildTotalRows <= 0) { - MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp)); - pCtx->seqGrpId++; + MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp)); + if (pCtx->seqWinGrp) { + pCtx->seqGrpId++; + } return TSDB_CODE_SUCCESS; } @@ -87,9 +89,8 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { if (cache->grpIdx >= buildGrpNum) { cache->grpIdx = 0; ++probeGrp->readIdx; - pCtx->seqGrpId++; - if (pCtx->seqWinGrp) { + pCtx->seqGrpId++; break; } } @@ -2954,7 +2955,10 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->pJoin = pJoin; pCtx->lastTs = INT64_MIN; pCtx->seqWinGrp = pJoinNode->seqWinGroup; - pCtx->seqGrpId = 1; + pCtx->groupJoin = pJoinNode->grpJoin; + if (pCtx->seqWinGrp) { + pCtx->seqGrpId = 1; + } switch (pJoinNode->subType) { case JOIN_STYPE_ASOF: diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index afb82021d7..4ac708504d 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -847,6 +847,8 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin } static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { + pJoin->retrieveCtx.grpRetrieve = pJoinNode->grpJoin; + if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) || (JOIN_STYPE_WIN == pJoin->subType)) { return mJoinInitWindowCtx(pJoin, pJoinNode); diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index cde17a435a..62077033bc 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -2787,7 +2787,7 @@ void handleCaseEnd() { } // namespace -#if 0 +#if 1 #if 1 TEST(innerJoin, noCondTest) { SJoinTestParam param; @@ -2890,7 +2890,7 @@ TEST(innerJoin, fullCondTest) { #endif -#if 0 +#if 1 #if 1 TEST(leftOuterJoin, noCondTest) { SJoinTestParam param; @@ -2992,7 +2992,7 @@ TEST(leftOuterJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(fullOuterJoin, noCondTest) { SJoinTestParam param; @@ -3095,7 +3095,7 @@ TEST(fullOuterJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(leftSemiJoin, noCondTest) { SJoinTestParam param; @@ -3198,7 +3198,7 @@ TEST(leftSemiJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(leftAntiJoin, noCondTest) { SJoinTestParam param; @@ -3301,7 +3301,7 @@ TEST(leftAntiJoin, fullCondTest) { #endif #endif -#if 0 +#if 1 #if 1 TEST(leftAsofJoin, noCondGreaterThanTest) { SJoinTestParam param; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 6dba258f9e..42c36078ed 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -483,9 +483,12 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pTagEqCond); CLONE_NODE_FIELD(pTagOnCond); CLONE_NODE_FIELD(pFullOnCond); + CLONE_NODE_LIST_FIELD(pLeftEqNodes); + CLONE_NODE_LIST_FIELD(pRightEqNodes); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(hasSubQuery); COPY_SCALAR_FIELD(seqWinGroup); + COPY_SCALAR_FIELD(grpJoin); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index d81a32e644..8e6d530ce3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2167,7 +2167,8 @@ static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum"; static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum"; static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; -static const char* jkJoinPhysiPlanSeqWinGroup = "seqWinGroup"; +static const char* jkJoinPhysiPlanSeqWinGroup = "SeqWinGroup"; +static const char* jkJoinPhysiPlanGroupJoin = "GroupJoin"; static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -2230,6 +2231,9 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanSeqWinGroup, pNode->seqWinGroup); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanGroupJoin, pNode->grpJoin); + } return code; } @@ -2295,6 +2299,9 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanSeqWinGroup, &pNode->seqWinGroup); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanGroupJoin, &pNode->grpJoin); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 86deed6bac..6e69756a15 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2482,7 +2482,8 @@ enum { PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, - PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP + PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, + PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN }; static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2546,6 +2547,9 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, pNode->seqWinGroup); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN, pNode->grpJoin); + } return code; } @@ -2617,6 +2621,9 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP: code = tlvDecodeBool(pTlv, &pNode->seqWinGroup); break; + case PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN: + code = tlvDecodeBool(pTlv, &pNode->grpJoin); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ce000f6eef..470c6c4b9f 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1293,6 +1293,8 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pTagEqCond); nodesDestroyNode(pLogicNode->pTagOnCond); nodesDestroyNode(pLogicNode->pFullOnCond); + nodesDestroyList(pLogicNode->pLeftEqNodes); + nodesDestroyList(pLogicNode->pRightEqNodes); break; } case QUERY_NODE_LOGIC_PLAN_AGG: { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 64df135206..37930a5c5d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1379,9 +1379,11 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0))); } +/* if (TSDB_CODE_SUCCESS == code) { code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, NULL, fmIsAggFunc, &pPartition->pAggFuncs); } +*/ if (TSDB_CODE_SUCCESS == code) { pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 79815a4acf..73fbbc6b5a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -892,8 +892,16 @@ static bool pdcJoinIsEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allT bool isEqual = false; if (pdcJoinColInTableColList(pOper->pLeft, pLeftCols)) { isEqual = pdcJoinColInTableColList(pOper->pRight, pRightCols); + if (isEqual) { + nodesListMakeStrictAppend(&pJoin->pLeftEqNodes, nodesCloneNode(pOper->pLeft)); + nodesListMakeStrictAppend(&pJoin->pRightEqNodes, nodesCloneNode(pOper->pRight)); + } } else if (pdcJoinColInTableColList(pOper->pLeft, pRightCols)) { isEqual = pdcJoinColInTableColList(pOper->pRight, pLeftCols); + if (isEqual) { + nodesListMakeStrictAppend(&pJoin->pLeftEqNodes, nodesCloneNode(pOper->pRight)); + nodesListMakeStrictAppend(&pJoin->pRightEqNodes, nodesCloneNode(pOper->pLeft)); + } } return isEqual; @@ -1208,6 +1216,27 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin return TSDB_CODE_SUCCESS; } +static int32_t pdcJoinHandleGrpJoinCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + switch (pJoin->subType) { + case JOIN_STYPE_ASOF: + case JOIN_STYPE_WIN: + if (NULL != pJoin->pColOnCond || NULL != pJoin->pTagOnCond) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); + } + nodesDestroyNode(pJoin->pColEqCond); + pJoin->pColEqCond = NULL; + nodesDestroyNode(pJoin->pTagEqCond); + pJoin->pTagEqCond = NULL; + nodesDestroyNode(pJoin->pFullOnCond); + pJoin->pFullOnCond = NULL; + break; + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -1253,6 +1282,10 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { code = pdcJoinPartEqualOnCond(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pdcJoinHandleGrpJoinCond(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { code = pdcJoinAddParentOnColsToTarget(pCxt, pJoin); } @@ -4446,6 +4479,86 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan); } +static bool grpJoinOptShouldBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + return false; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + if (JOIN_STYPE_ASOF != pJoin->subType && JOIN_STYPE_WIN != pJoin->subType) { + return false; + } + + if (NULL == pJoin->pLeftEqNodes || pJoin->grpJoin) { + return false; + } + + return true; +} + +static int32_t grpJoinOptCreatePartitionNode(SLogicNode* pParent, SLogicNode* pChild, bool leftChild, SLogicNode** pNew) { + SPartitionLogicNode* pPartition = (SPartitionLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PARTITION); + if (NULL == pPartition) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pPartition->node.groupAction = GROUP_ACTION_SET; + pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + + pPartition->node.pTargets = nodesCloneList(pChild->pTargets); + if (NULL == pPartition->node.pTargets) { + nodesDestroyNode((SNode*)pPartition); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pParent; + pPartition->pPartitionKeys = nodesCloneList(leftChild ? pJoin->pLeftEqNodes : pJoin->pRightEqNodes); + pChild->pParent = (SLogicNode*)pPartition; + pPartition->node.pParent = pParent; + nodesListMakeStrictAppend(&pPartition->node.pChildren, (SNode *)pChild); + + *pNew = (SLogicNode*)pPartition; + + return TSDB_CODE_SUCCESS; +} + +static int32_t grpJoinOptInsertPartitionNode(SLogicNode* pJoin) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + SNode* pNew = NULL; + bool leftChild = true; + FOREACH(pNode, pJoin->pChildren) { + code = grpJoinOptCreatePartitionNode(pJoin, (SLogicNode*)pNode, leftChild, (SLogicNode**)&pNew); + if (code) { + break; + } + REPLACE_NODE(pNew); + leftChild = false; + } + + return code; +} + +static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) { + int32_t code = grpJoinOptInsertPartitionNode(pJoin); + if (TSDB_CODE_SUCCESS == code) { + ((SJoinLogicNode*)pJoin)->grpJoin = true; + pCxt->optimized = true; + } + return code; +} + + +static int32_t groupJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, grpJoinOptShouldBeOptimized); + if (NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + return grpJoinOptRewriteGroupJoin(pCxt, pNode, pLogicSubplan); +} + static bool partColOptShouldBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode; @@ -4585,6 +4698,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, + {.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize}, {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize}, {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, {.pName = "SortForjoin", .optimizeFunc = sortForJoinOptimize}, diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 28816a8bec..23cb15a741 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -900,6 +900,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup; + pJoin->grpJoin = pJoinLogicNode->grpJoin; SDataBlockDescNode* pLeftDesc = NULL; SDataBlockDescNode* pRightDesc = NULL; diff --git a/tests/script/tsim/join/left_asof_join.sim b/tests/script/tsim/join/left_asof_join.sim index c6f69e77cc..7c676a8a39 100644 --- a/tests/script/tsim/join/left_asof_join.sim +++ b/tests/script/tsim/join/left_asof_join.sim @@ -479,4 +479,6 @@ endi +sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts; +sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1;