enh: support group join

This commit is contained in:
dapan1121 2024-02-23 17:59:45 +08:00
parent a745319b81
commit 97aca25633
13 changed files with 187 additions and 29 deletions

View File

@ -138,10 +138,13 @@ typedef struct SJoinLogicNode {
SNode* pTagEqCond; SNode* pTagEqCond;
SNode* pTagOnCond; SNode* pTagOnCond;
SNode* pFullOnCond; // except prim eq cond SNode* pFullOnCond; // except prim eq cond
SNodeList* pLeftEqNodes;
SNodeList* pRightEqNodes;
bool isSingleTableJoin; bool isSingleTableJoin;
bool hasSubQuery; bool hasSubQuery;
bool isLowLevelJoin; bool isLowLevelJoin;
bool seqWinGroup; bool seqWinGroup;
bool grpJoin;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
@ -503,6 +506,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNodeList* pTargets; SNodeList* pTargets;
SQueryStat inputStat[2]; SQueryStat inputStat[2];
bool seqWinGroup; bool seqWinGroup;
bool grpJoin;
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
typedef struct SHashJoinPhysiNode { typedef struct SHashJoinPhysiNode {

View File

@ -19,7 +19,7 @@
extern "C" { extern "C" {
#endif #endif
#if 0 #if 1
#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096 #define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096
#define MJOIN_HJOIN_CART_THRESHOLD 10 #define MJOIN_HJOIN_CART_THRESHOLD 10
#define MJOIN_BLK_SIZE_LIMIT 0 //10485760 #define MJOIN_BLK_SIZE_LIMIT 0 //10485760
@ -95,6 +95,9 @@ typedef struct SMJoinTableCtx {
int32_t blkId; int32_t blkId;
SQueryStat inputStat; SQueryStat inputStat;
uint64_t lastInGid;
SSDataBlock* remainInBlk;
SMJoinColMap* primCol; SMJoinColMap* primCol;
char* primData; char* primData;
@ -237,6 +240,7 @@ typedef struct SMJoinWindowCtx {
bool lowerRowsAcq; bool lowerRowsAcq;
bool eqRowsAcq; bool eqRowsAcq;
bool greaterRowsAcq; bool greaterRowsAcq;
bool groupJoin;
int64_t seqGrpId; int64_t seqGrpId;
int64_t winBeginTs; int64_t winBeginTs;
@ -275,22 +279,28 @@ typedef struct SMJoinExecInfo {
int64_t expectRows; int64_t expectRows;
} SMJoinExecInfo; } SMJoinExecInfo;
typedef struct SMJoinRetrieveCtx {
bool grpRetrieve;
uint64_t lastGid[2];
SSDataBlock* remainBlk[2];
} SMJoinRetrieveCtx;
typedef struct SMJoinOperatorInfo { typedef struct SMJoinOperatorInfo {
SOperatorInfo* pOperator; SOperatorInfo* pOperator;
int32_t joinType; int32_t joinType;
int32_t subType; int32_t subType;
int32_t inputTsOrder; int32_t inputTsOrder;
int32_t errCode; int32_t errCode;
SMJoinTableCtx tbs[2]; SMJoinTableCtx tbs[2];
SMJoinTableCtx* build; SMJoinTableCtx* build;
SMJoinTableCtx* probe; SMJoinTableCtx* probe;
SFilterInfo* pFPreFilter; SMJoinRetrieveCtx retrieveCtx;
SFilterInfo* pPreFilter; SFilterInfo* pFPreFilter;
SFilterInfo* pFinFilter; SFilterInfo* pPreFilter;
joinImplFp joinFp; SFilterInfo* pFinFilter;
SMJoinCtx ctx; joinImplFp joinFp;
SMJoinExecInfo execInfo; SMJoinCtx ctx;
SMJoinExecInfo execInfo;
} SMJoinOperatorInfo; } SMJoinOperatorInfo;
#define MJOIN_DS_REQ_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream) #define MJOIN_DS_REQ_INIT(_pOp) ((_pOp)->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)(_pOp)->pOperatorGetParam->value)->initDownstream)

View File

@ -35,11 +35,13 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
int32_t buildGrpNum = taosArrayGetSize(cache->grps); int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->seqGrpId : 0; pCtx->finBlk->info.id.groupId = (pCtx->seqWinGrp || pCtx->groupJoin) ? pCtx->seqGrpId : 0;
if (buildGrpNum <= 0 || buildTotalRows <= 0) { if (buildGrpNum <= 0 || buildTotalRows <= 0) {
MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp)); MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp));
pCtx->seqGrpId++; if (pCtx->seqWinGrp) {
pCtx->seqGrpId++;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -87,9 +89,8 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (cache->grpIdx >= buildGrpNum) { if (cache->grpIdx >= buildGrpNum) {
cache->grpIdx = 0; cache->grpIdx = 0;
++probeGrp->readIdx; ++probeGrp->readIdx;
pCtx->seqGrpId++;
if (pCtx->seqWinGrp) { if (pCtx->seqWinGrp) {
pCtx->seqGrpId++;
break; break;
} }
} }
@ -2954,7 +2955,10 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
pCtx->pJoin = pJoin; pCtx->pJoin = pJoin;
pCtx->lastTs = INT64_MIN; pCtx->lastTs = INT64_MIN;
pCtx->seqWinGrp = pJoinNode->seqWinGroup; pCtx->seqWinGrp = pJoinNode->seqWinGroup;
pCtx->seqGrpId = 1; pCtx->groupJoin = pJoinNode->grpJoin;
if (pCtx->seqWinGrp) {
pCtx->seqGrpId = 1;
}
switch (pJoinNode->subType) { switch (pJoinNode->subType) {
case JOIN_STYPE_ASOF: case JOIN_STYPE_ASOF:

View File

@ -847,6 +847,8 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
} }
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
pJoin->retrieveCtx.grpRetrieve = pJoinNode->grpJoin;
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|| (JOIN_STYPE_WIN == pJoin->subType)) { || (JOIN_STYPE_WIN == pJoin->subType)) {
return mJoinInitWindowCtx(pJoin, pJoinNode); return mJoinInitWindowCtx(pJoin, pJoinNode);

View File

@ -2787,7 +2787,7 @@ void handleCaseEnd() {
} // namespace } // namespace
#if 0 #if 1
#if 1 #if 1
TEST(innerJoin, noCondTest) { TEST(innerJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -2890,7 +2890,7 @@ TEST(innerJoin, fullCondTest) {
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(leftOuterJoin, noCondTest) { TEST(leftOuterJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -2992,7 +2992,7 @@ TEST(leftOuterJoin, fullCondTest) {
#endif #endif
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(fullOuterJoin, noCondTest) { TEST(fullOuterJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -3095,7 +3095,7 @@ TEST(fullOuterJoin, fullCondTest) {
#endif #endif
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(leftSemiJoin, noCondTest) { TEST(leftSemiJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -3198,7 +3198,7 @@ TEST(leftSemiJoin, fullCondTest) {
#endif #endif
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(leftAntiJoin, noCondTest) { TEST(leftAntiJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -3301,7 +3301,7 @@ TEST(leftAntiJoin, fullCondTest) {
#endif #endif
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(leftAsofJoin, noCondGreaterThanTest) { TEST(leftAsofJoin, noCondGreaterThanTest) {
SJoinTestParam param; SJoinTestParam param;

View File

@ -483,9 +483,12 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD(pTagEqCond); CLONE_NODE_FIELD(pTagEqCond);
CLONE_NODE_FIELD(pTagOnCond); CLONE_NODE_FIELD(pTagOnCond);
CLONE_NODE_FIELD(pFullOnCond); CLONE_NODE_FIELD(pFullOnCond);
CLONE_NODE_LIST_FIELD(pLeftEqNodes);
CLONE_NODE_LIST_FIELD(pRightEqNodes);
COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(hasSubQuery); COPY_SCALAR_FIELD(hasSubQuery);
COPY_SCALAR_FIELD(seqWinGroup); COPY_SCALAR_FIELD(seqWinGroup);
COPY_SCALAR_FIELD(grpJoin);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -2167,7 +2167,8 @@ static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum";
static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum"; static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum";
static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize";
static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize";
static const char* jkJoinPhysiPlanSeqWinGroup = "seqWinGroup"; static const char* jkJoinPhysiPlanSeqWinGroup = "SeqWinGroup";
static const char* jkJoinPhysiPlanGroupJoin = "GroupJoin";
static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
@ -2230,6 +2231,9 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanSeqWinGroup, pNode->seqWinGroup); code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanSeqWinGroup, pNode->seqWinGroup);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanGroupJoin, pNode->grpJoin);
}
return code; return code;
} }
@ -2295,6 +2299,9 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanSeqWinGroup, &pNode->seqWinGroup); code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanSeqWinGroup, &pNode->seqWinGroup);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanGroupJoin, &pNode->grpJoin);
}
return code; return code;
} }

View File

@ -2482,7 +2482,8 @@ enum {
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1,
PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP,
PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN
}; };
static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2546,6 +2547,9 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, pNode->seqWinGroup); code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, pNode->seqWinGroup);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN, pNode->grpJoin);
}
return code; return code;
} }
@ -2617,6 +2621,9 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP: case PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP:
code = tlvDecodeBool(pTlv, &pNode->seqWinGroup); code = tlvDecodeBool(pTlv, &pNode->seqWinGroup);
break; break;
case PHY_SORT_MERGE_JOIN_CODE_GROUP_JOIN:
code = tlvDecodeBool(pTlv, &pNode->grpJoin);
break;
default: default:
break; break;
} }

View File

@ -1293,6 +1293,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTagEqCond); nodesDestroyNode(pLogicNode->pTagEqCond);
nodesDestroyNode(pLogicNode->pTagOnCond); nodesDestroyNode(pLogicNode->pTagOnCond);
nodesDestroyNode(pLogicNode->pFullOnCond); nodesDestroyNode(pLogicNode->pFullOnCond);
nodesDestroyList(pLogicNode->pLeftEqNodes);
nodesDestroyList(pLogicNode->pRightEqNodes);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_AGG: { case QUERY_NODE_LOGIC_PLAN_AGG: {

View File

@ -1379,9 +1379,11 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0))); nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
} }
/*
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, NULL, fmIsAggFunc, &pPartition->pAggFuncs); code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, NULL, fmIsAggFunc, &pPartition->pAggFuncs);
} }
*/
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList); pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList);

View File

@ -892,8 +892,16 @@ static bool pdcJoinIsEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, bool* allT
bool isEqual = false; bool isEqual = false;
if (pdcJoinColInTableColList(pOper->pLeft, pLeftCols)) { if (pdcJoinColInTableColList(pOper->pLeft, pLeftCols)) {
isEqual = pdcJoinColInTableColList(pOper->pRight, pRightCols); isEqual = pdcJoinColInTableColList(pOper->pRight, pRightCols);
if (isEqual) {
nodesListMakeStrictAppend(&pJoin->pLeftEqNodes, nodesCloneNode(pOper->pLeft));
nodesListMakeStrictAppend(&pJoin->pRightEqNodes, nodesCloneNode(pOper->pRight));
}
} else if (pdcJoinColInTableColList(pOper->pLeft, pRightCols)) { } else if (pdcJoinColInTableColList(pOper->pLeft, pRightCols)) {
isEqual = pdcJoinColInTableColList(pOper->pRight, pLeftCols); isEqual = pdcJoinColInTableColList(pOper->pRight, pLeftCols);
if (isEqual) {
nodesListMakeStrictAppend(&pJoin->pLeftEqNodes, nodesCloneNode(pOper->pRight));
nodesListMakeStrictAppend(&pJoin->pRightEqNodes, nodesCloneNode(pOper->pLeft));
}
} }
return isEqual; return isEqual;
@ -1208,6 +1216,27 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t pdcJoinHandleGrpJoinCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
switch (pJoin->subType) {
case JOIN_STYPE_ASOF:
case JOIN_STYPE_WIN:
if (NULL != pJoin->pColOnCond || NULL != pJoin->pTagOnCond) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
}
nodesDestroyNode(pJoin->pColEqCond);
pJoin->pColEqCond = NULL;
nodesDestroyNode(pJoin->pTagEqCond);
pJoin->pTagEqCond = NULL;
nodesDestroyNode(pJoin->pFullOnCond);
pJoin->pFullOnCond = NULL;
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1253,6 +1282,10 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
code = pdcJoinPartEqualOnCond(pCxt, pJoin); code = pdcJoinPartEqualOnCond(pCxt, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) {
code = pdcJoinHandleGrpJoinCond(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = pdcJoinAddParentOnColsToTarget(pCxt, pJoin); code = pdcJoinAddParentOnColsToTarget(pCxt, pJoin);
} }
@ -4446,6 +4479,86 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan); return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan);
} }
static bool grpJoinOptShouldBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
return false;
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (JOIN_STYPE_ASOF != pJoin->subType && JOIN_STYPE_WIN != pJoin->subType) {
return false;
}
if (NULL == pJoin->pLeftEqNodes || pJoin->grpJoin) {
return false;
}
return true;
}
static int32_t grpJoinOptCreatePartitionNode(SLogicNode* pParent, SLogicNode* pChild, bool leftChild, SLogicNode** pNew) {
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PARTITION);
if (NULL == pPartition) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPartition->node.groupAction = GROUP_ACTION_SET;
pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pPartition->node.pTargets = nodesCloneList(pChild->pTargets);
if (NULL == pPartition->node.pTargets) {
nodesDestroyNode((SNode*)pPartition);
return TSDB_CODE_OUT_OF_MEMORY;
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pParent;
pPartition->pPartitionKeys = nodesCloneList(leftChild ? pJoin->pLeftEqNodes : pJoin->pRightEqNodes);
pChild->pParent = (SLogicNode*)pPartition;
pPartition->node.pParent = pParent;
nodesListMakeStrictAppend(&pPartition->node.pChildren, (SNode *)pChild);
*pNew = (SLogicNode*)pPartition;
return TSDB_CODE_SUCCESS;
}
static int32_t grpJoinOptInsertPartitionNode(SLogicNode* pJoin) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
SNode* pNew = NULL;
bool leftChild = true;
FOREACH(pNode, pJoin->pChildren) {
code = grpJoinOptCreatePartitionNode(pJoin, (SLogicNode*)pNode, leftChild, (SLogicNode**)&pNew);
if (code) {
break;
}
REPLACE_NODE(pNew);
leftChild = false;
}
return code;
}
static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) {
int32_t code = grpJoinOptInsertPartitionNode(pJoin);
if (TSDB_CODE_SUCCESS == code) {
((SJoinLogicNode*)pJoin)->grpJoin = true;
pCxt->optimized = true;
}
return code;
}
static int32_t groupJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, grpJoinOptShouldBeOptimized);
if (NULL == pNode) {
return TSDB_CODE_SUCCESS;
}
return grpJoinOptRewriteGroupJoin(pCxt, pNode, pLogicSubplan);
}
static bool partColOptShouldBeOptimized(SLogicNode* pNode) { static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode; SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode;
@ -4585,6 +4698,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, {.pName = "ScanPath", .optimizeFunc = scanPathOptimize},
{.pName = "PushDownCondition", .optimizeFunc = pdcOptimize}, {.pName = "PushDownCondition", .optimizeFunc = pdcOptimize},
{.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize},
{.pName = "GroupJoin", .optimizeFunc = groupJoinOptimize},
{.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize}, {.pName = "sortNonPriKeyOptimize", .optimizeFunc = sortNonPriKeyOptimize},
{.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize},
{.pName = "SortForjoin", .optimizeFunc = sortForJoinOptimize}, {.pName = "SortForjoin", .optimizeFunc = sortForJoinOptimize},

View File

@ -900,6 +900,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit);
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup; pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup;
pJoin->grpJoin = pJoinLogicNode->grpJoin;
SDataBlockDescNode* pLeftDesc = NULL; SDataBlockDescNode* pLeftDesc = NULL;
SDataBlockDescNode* pRightDesc = NULL; SDataBlockDescNode* pRightDesc = NULL;

View File

@ -479,4 +479,6 @@ endi
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts;
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1;