From 81a2bf10cfa22eed7a5c9cdcdfd035942a9aca92 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 28 Jun 2023 19:39:04 +0800 Subject: [PATCH] enh: optimize stable join --- include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 8 +- include/libs/nodes/querynodes.h | 4 + source/libs/nodes/src/nodesCloneFuncs.c | 2 + source/libs/nodes/src/nodesCodeFuncs.c | 86 +++++++- source/libs/nodes/src/nodesUtilFuncs.c | 13 ++ source/libs/planner/src/planOptimizer.c | 241 ++++++++++++++++++--- source/libs/planner/src/planPhysiCreater.c | 95 +++++--- source/libs/planner/src/planSpliter.c | 2 +- 9 files changed, 385 insertions(+), 67 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 86286a4de3..be9708e0a9 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -233,6 +233,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_LOGIC_PLAN_INTERP_FUNC, QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, + QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index dcadb5d5e8..bac7e5b47d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -42,6 +42,7 @@ typedef enum EGroupAction { typedef struct SLogicNode { ENodeType type; + bool dynamicOp; SNodeList* pTargets; // SColumnNode SNode* pConditions; SNodeList* pChildren; @@ -114,6 +115,7 @@ typedef struct SJoinLogicNode { SNode* pPrimKeyEqCond; SNode* pColEqCond; SNode* pTagEqCond; + SNode* pTagOnCond; SNode* pOtherOnCond; bool isSingleTableJoin; bool hasSubQuery; @@ -157,9 +159,13 @@ typedef struct SInterpFuncLogicNode { typedef struct SGroupCacheLogicNode { SLogicNode node; - SNode* pGroupCol; + SNodeList* pGroupCols; } SGroupCacheLogicNode; +typedef struct SDynQueryCtrlLogicNode { + SLogicNode node; + EDynQueryType qType; +} SDynQueryCtrlLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index abcb6a095b..174960be77 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -180,6 +180,10 @@ typedef enum EJoinAlgorithm { JOIN_ALGO_HASH, } EJoinAlgorithm; +typedef enum EDynQueryType { + DYN_QTYPE_STB_HASH = 1, +} EDynQueryType; + typedef struct SJoinTableNode { STableNode table; // QUERY_NODE_JOIN_TABLE EJoinType joinType; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index a853d712b3..449f4efc2b 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -403,11 +403,13 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); + COPY_SCALAR_FIELD(joinAlgo); CLONE_NODE_FIELD(pPrimKeyEqCond); CLONE_NODE_FIELD(pColEqCond); CLONE_NODE_FIELD(pTagEqCond); CLONE_NODE_FIELD(pOtherOnCond); COPY_SCALAR_FIELD(isSingleTableJoin); + COPY_SCALAR_FIELD(hasSubQuery); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index cc66ffbd9c..f94fdf9081 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -295,6 +295,10 @@ const char* nodesNodeName(ENodeType type) { return "LogicIndefRowsFunc"; case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return "LogicInterpFunc"; + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + return "LogicGroupCache"; + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + return "LogicDynamicQueryCtrl"; case QUERY_NODE_LOGIC_SUBPLAN: return "LogicSubplan"; case QUERY_NODE_LOGIC_PLAN: @@ -1172,6 +1176,55 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols"; + +static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { + const SGroupCacheLogicNode* pNode = (const SGroupCacheLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols); + } + + return code; +} + +static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { + SGroupCacheLogicNode* pNode = (SGroupCacheLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols); + } + + return code; +} + +static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType"; + +static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { + const SDynQueryCtrlLogicNode* pNode = (const SDynQueryCtrlLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType); + } + + return code; +} + +static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) { + SDynQueryCtrlLogicNode* pNode = (SDynQueryCtrlLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code); + } + + return code; +} + + static const char* jkSubplanIdQueryId = "QueryId"; static const char* jkSubplanIdGroupId = "GroupId"; static const char* jkSubplanIdSubplanId = "SubplanId"; @@ -1426,9 +1479,11 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { } static const char* jkJoinLogicPlanJoinType = "JoinType"; -static const char* jkJoinLogicPlanOnConditions = "OnConditions"; +static const char* jkJoinLogicPlanJoinAlgo = "JoinAlgo"; +static const char* jkJoinLogicPlanOnConditions = "OtherOnCond"; static const char* jkJoinLogicPlanPrimKeyEqCondition = "PrimKeyEqCond"; static const char* jkJoinLogicPlanColEqCondition = "ColumnEqCond"; +static const char* jkJoinLogicPlanTagEqCondition = "TagEqCond"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1437,14 +1492,20 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinAlgo, pNode->joinAlgo); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, nodeToJson, pNode->pPrimKeyEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond); + code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinLogicPlanColEqCondition, nodeToJson, pNode->pColEqCond); + code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqCondition, nodeToJson, pNode->pTagEqCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOtherOnCond); } return code; } @@ -1457,14 +1518,21 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinType, pNode->joinType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond); + tjsonGetNumberValue(pJson, jkJoinLogicPlanJoinAlgo, pNode->joinAlgo, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond); + code = jsonToNodeObject(pJson, jkJoinLogicPlanPrimKeyEqCondition, &pNode->pPrimKeyEqCond); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinLogicPlanColEqCondition, &pNode->pColEqCond); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqCondition, &pNode->pTagEqCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOtherOnCond); + } + return code; } @@ -6577,6 +6645,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicIndefRowsFuncNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return logicInterpFuncNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + return logicGroupCacheNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + return logicDynQueryCtrlNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN: @@ -6895,6 +6967,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicIndefRowsFuncNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return jsonToLogicInterpFuncNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + return jsonToLogicGroupCacheNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + return jsonToLogicDynQueryCtrlNode(pJson, pObj); case QUERY_NODE_LOGIC_SUBPLAN: return jsonToLogicSubplan(pJson, pObj); case QUERY_NODE_LOGIC_PLAN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0c76efb6d7..42ce5370b0 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -492,6 +492,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SInterpFuncLogicNode)); case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: return makeNode(type, sizeof(SGroupCacheLogicNode)); + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + return makeNode(type, sizeof(SDynQueryCtrlLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SLogicSubplan)); case QUERY_NODE_LOGIC_PLAN: @@ -1180,6 +1182,17 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pTimeSeries); break; } + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: { + SGroupCacheLogicNode* pLogicNode = (SGroupCacheLogicNode*)pNode; + destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyList(pLogicNode->pGroupCols); + break; + } + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: { + SDynQueryCtrlLogicNode* pLogicNode = (SDynQueryCtrlLogicNode*)pNode; + destroyLogicNode((SLogicNode*)pLogicNode); + break; + } case QUERY_NODE_LOGIC_SUBPLAN: { SLogicSubplan* pSubplan = (SLogicSubplan*)pNode; nodesDestroyList(pSubplan->pChildren); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 6bad4d67f1..11e5868d13 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -763,14 +763,18 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, return false; } SOperatorNode* pOper = (SOperatorNode*)pCond; - if (OP_TYPE_EQUAL != pOper->opType) { - return false; - } if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) { return false; } SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); SColumnNode* pRight = (SColumnNode*)(pOper->pRight); + + *allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType); + + if (OP_TYPE_EQUAL != pOper->opType) { + return false; + } + //TODO: add cast to operator and remove this restriction of optimization if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) { return false; @@ -784,7 +788,6 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond, isEqual = pushDownCondOptIsTableColumn(pOper->pRight, pLeftCols); } if (isEqual) { - *allTags = (COLUMN_TYPE_TAG == pLeft->colType) && (COLUMN_TYPE_TAG == pRight->colType); } return isEqual; } @@ -795,30 +798,42 @@ static int32_t pushDownCondOptJoinExtractEqualOnLogicCond(SJoinLogicNode* pJoin) int32_t code = TSDB_CODE_SUCCESS; SNodeList* pColEqOnConds = NULL; SNodeList* pTagEqOnConds = NULL; + SNodeList* pTagOnConds = NULL; SNode* pCond = NULL; bool allTags = false; FOREACH(pCond, pLogicCond->pParameterList) { + allTags = false; if (pushDownCondOptIsColEqualOnCond(pJoin, pCond, &allTags)) { if (allTags) { code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond)); } else { code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond)); } + } else if (allTags) { + code = nodesListMakeAppend(&pTagOnConds, nodesCloneNode(pCond)); + } + if (code) { + break; } } SNode* pTempTagEqCond = NULL; SNode* pTempColEqCond = NULL; + SNode* pTempTagOnCond = NULL; if (TSDB_CODE_SUCCESS == code) { code = nodesMergeConds(&pTempColEqCond, &pColEqOnConds); } if (TSDB_CODE_SUCCESS == code) { code = nodesMergeConds(&pTempTagEqCond, &pTagEqOnConds); } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagOnCond, &pTagOnConds); + } if (TSDB_CODE_SUCCESS == code) { pJoin->pColEqCond = pTempColEqCond; pJoin->pTagEqCond = pTempTagEqCond; + pJoin->pTagOnCond = pTempTagOnCond; return TSDB_CODE_SUCCESS; } else { nodesDestroyList(pColEqOnConds); @@ -846,15 +861,62 @@ static int32_t pushDownCondOptJoinExtractEqualOnCond(SOptimizeContext* pCxt, SJo } else { pJoin->pColEqCond = nodesCloneNode(pJoin->pOtherOnCond); } + } else if (allTags) { + pJoin->pTagOnCond = nodesCloneNode(pJoin->pOtherOnCond); } return TSDB_CODE_SUCCESS; } +static int32_t pushDownCondOptAppendFilterCol(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOtherOnCond) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pCondCols = nodesMakeList(); + SNodeList* pTargets = NULL; + if (NULL == pCondCols) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + code = nodesCollectColumnsFromNode(pJoin->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pCondCols, &pTargets); + } + + nodesDestroyList(pCondCols); + + if (TSDB_CODE_SUCCESS == code) { + SNode* pNode = NULL; + FOREACH(pNode, pTargets) { + SNode* pTmp = NULL; + bool found = false; + FOREACH(pTmp, pJoin->node.pTargets) { + if (nodesEqualNode(pTmp, pNode)) { + found = true; + break; + } + } + if (!found) { + nodesListStrictAppend(pJoin->node.pTargets, pNode); + } + } + } + + nodesDestroyList(pTargets); + + return code; +} + + static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; } + if (pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) { + return TSDB_CODE_SUCCESS; + } if (NULL == pJoin->node.pConditions) { int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin); @@ -889,6 +951,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = pushDownCondOptAppendFilterCol(pCxt, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; @@ -2971,13 +3037,23 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { } SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; - if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != UNKNOWN_JOIN_ALGO) { + if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != JOIN_ALGO_UNKNOWN) { return false; } return true; } + +int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) { + SFunctionNode* pUidFunc = createFunction(funcName, NULL); + snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", + pUidFunc->functionName, pUidFunc); + nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc); + return createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets); +} + + int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin; @@ -2990,6 +3066,9 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { SNodeList* pTags = nodesMakeList(); int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); + if (TSDB_CODE_SUCCESS == code) { + code = nodesCollectColumnsFromNode(pJoinNode->pTagOnCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); + } if (TSDB_CODE_SUCCESS == code) { SNode* pTarget = NULL; SNode* pTag = NULL; @@ -3010,18 +3089,10 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { } } if (TSDB_CODE_SUCCESS == code) { - SFunctionNode* pUidFunc = createFunction("_tbuid", NULL); - snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", - pUidFunc->functionName, pUidFunc); - nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc); - code = createColumnByRewriteExpr(pUidFunc, &pScan->node.pTargets); + code = stbJoinOptAddFuncToScanNode("_tbuid", pScan); } if (TSDB_CODE_SUCCESS == code) { - SFunctionNode* pVgidFunc = createFunction("_vgid", NULL); - snprintf(pVgidFunc->node.aliasName, sizeof(pVgidFunc->node.aliasName), "%s.%p", - pVgidFunc->functionName, pVgidFunc); - nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pVgidFunc); - code = createColumnByRewriteExpr(pVgidFunc, &pScan->node.pTargets); + code = stbJoinOptAddFuncToScanNode("_vgid", pScan); } if (code) { @@ -3071,6 +3142,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond); + pJoin->pTagOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); int32_t code = TSDB_CODE_SUCCESS; pJoin->node.pChildren = pChildren; @@ -3090,6 +3162,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh if (code) { break; } + pScan->node.pParent = (SLogicNode*)pJoin; } if (TSDB_CODE_SUCCESS == code) { @@ -3107,27 +3180,36 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL return TSDB_CODE_OUT_OF_MEMORY; } + int32_t code = TSDB_CODE_SUCCESS; SNode* pNode = NULL; FOREACH(pNode, pList) { - code = stbJoinOptAddUidToScan(pJoin, pNode); + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + code = stbJoinOptAddFuncToScanNode("_tbuid", pScan); if (code) { break; } + pScan->node.dynamicOp = true; } - *ppList = pList; + if (TSDB_CODE_SUCCESS == code) { + *ppList = pList; + } else { + nodesDestroyList(pList); + *ppList = NULL; + } - return TSDB_CODE_SUCCESS; + return code; } -static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) { - SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig; - SGroupCacheLogicNode* pGrpCache = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE); +static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** ppLogic) { + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheLogicNode* pGrpCache = (SGroupCacheLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE); if (NULL == pGrpCache) { return TSDB_CODE_OUT_OF_MEMORY; } - - int32_t code = TSDB_CODE_SUCCESS; + + pGrpCache->node.dynamicOp = true; + pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -3136,7 +3218,24 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChi SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0); code = nodesListStrictAppendList(pGrpCache->node.pTargets, nodesCloneList(pScan->node.pTargets)); } - + + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0); + SNode* pCol = NULL; + FOREACH(pCol, pScan->pScanPseudoCols) { + if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { + code = createColumnByRewriteExpr(pCol, &pGrpCache->pGroupCols); + if (code) { + break; + } + } + } + + SNode* pNode = NULL; + FOREACH(pNode, pChildren) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + pScan->node.pParent = (SLogicNode*)pGrpCache; + } + if (TSDB_CODE_SUCCESS == code) { *ppLogic = (SLogicNode*)pGrpCache; } else { @@ -3146,20 +3245,95 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChi return code; } +static void stbJoinOptRemoveTagEqCond(SJoinLogicNode* pJoin) { + if (QUERY_NODE_OPERATOR == nodeType(pJoin->pOtherOnCond) && nodesEqualNode(pJoin->pOtherOnCond, pJoin->pTagEqCond)) { + NODES_DESTORY_NODE(pJoin->pOtherOnCond); + return; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOtherOnCond)) { + SLogicConditionNode* pLogic = (SLogicConditionNode*)pJoin->pOtherOnCond; + SNode* pNode = NULL; + FOREACH(pNode, pLogic->pParameterList) { + if (nodesEqualNode(pNode, pJoin->pTagEqCond)) { + ERASE_NODE(pLogic->pParameterList); + break; + } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pTagEqCond)) { + SLogicConditionNode* pTags = (SLogicConditionNode*)pJoin->pTagEqCond; + SNode* pTag = NULL; + FOREACH(pTag, pTags->pParameterList) { + if (nodesEqualNode(pTag, pNode)) { + ERASE_NODE(pLogic->pParameterList); + break; + } + } + } + } -static int32_t stbJoinOptCreateDynTaskCtrlNode(SLogicNode* pJoin, SLogicNode* pHJoinNode, SLogicNode* pMJoinNode, SLogicNode** ppDynNode) { + if (pLogic->pParameterList->length <= 0) { + NODES_DESTORY_NODE(pJoin->pOtherOnCond); + } + } +} + +static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChild, SLogicNode** ppLogic) { + SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig; + SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesCloneNode((SNode*)pOrig); + if (NULL == pJoin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pJoin->joinAlgo = JOIN_ALGO_MERGE; + pJoin->node.dynamicOp = true; + + stbJoinOptRemoveTagEqCond(pJoin); + NODES_DESTORY_NODE(pJoin->pTagEqCond); + + SNode* pNode = NULL; + FOREACH(pNode, pJoin->node.pChildren) { + ERASE_NODE(pJoin->node.pChildren); + } + nodesListStrictAppend(pJoin->node.pChildren, (SNode *)pChild); + pChild->pParent = (SLogicNode*)pJoin; + + *ppLogic = (SLogicNode*)pJoin; + + return TSDB_CODE_SUCCESS; +} + + +static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* pPost, SLogicNode** ppDynNode) { int32_t code = TSDB_CODE_SUCCESS; + SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL); + if (NULL == pDynCtrl) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pDynCtrl->qType = DYN_QTYPE_STB_HASH; if (TSDB_CODE_SUCCESS == code) { - pDynNode->pChildren = nodesMakeList(); - if (NULL == pDynNode->pChildren) { + pDynCtrl->node.pChildren = nodesMakeList(); + if (NULL == pDynCtrl->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; } } if (TSDB_CODE_SUCCESS == code) { - code = nodesListStrictAppend(pDynNode->pChildren, (SNode*)pHJoinNode); + nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev); + nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPost); + pDynCtrl->node.pTargets = nodesCloneList(pPost->pTargets); } + + if (TSDB_CODE_SUCCESS == code) { + pPrev->pParent = (SLogicNode*)pDynCtrl; + pPost->pParent = (SLogicNode*)pDynCtrl; + + *ppDynNode = (SLogicNode*)pDynCtrl; + } else { + nodesDestroyNode((SNode*)pDynCtrl); + *ppDynNode = NULL; + } + + return code; } static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) { @@ -3174,16 +3348,16 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode); } if (TSDB_CODE_SUCCESS == code) { - code = stbJoinOptCreateTableScanNodes(pJoin, pTbScanNodes); + code = stbJoinOptCreateTableScanNodes(pJoin, &pTbScanNodes); } if (TSDB_CODE_SUCCESS == code) { - code = stbJoinOptCreateGroupCacheNode(pJoin, pTbScanNodes, &pGrpCacheNode); + code = stbJoinOptCreateGroupCacheNode(pTbScanNodes, &pGrpCacheNode); } if (TSDB_CODE_SUCCESS == code) { code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode); } if (TSDB_CODE_SUCCESS == code) { - code = stbJoinOptCreateDynTaskCtrlNode(pJoin, pHJoinNode, pMJoinNode, &pDynNode); + code = stbJoinOptCreateDynQueryCtrlNode(pHJoinNode, pMJoinNode, &pDynNode); } if (TSDB_CODE_SUCCESS == code) { code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode); @@ -3215,14 +3389,14 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize}, - {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, - {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}, {.pName = "TagScan", .optimizeFunc = tagScanOptimize}, {.pName = "PushDownLimit", .optimizeFunc = pushDownLimitOptimize}, {.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize}, + {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, + {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, }; // clang-format on @@ -3257,6 +3431,7 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubpla if (cxt.optimized) { optimized = true; dumpLogicSubplan(optimizeRuleSet[i].pName, pLogicSubplan); + break; } } } while (optimized); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0269fc5f66..231d36b3b7 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -708,7 +708,8 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { return TSDB_CODE_SUCCESS; } -static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, + +static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); @@ -730,32 +731,6 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren &pJoin->pTargets); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { - SNodeList* pCondCols = nodesMakeList(); - SNodeList* pTargets = NULL; - SNodeList* pFinTargets = NULL; - if (NULL == pCondCols) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); - } - if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExprs(pCondCols, &pTargets); - } - if (TSDB_CODE_SUCCESS == code) { - code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pTargets, &pFinTargets); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListStrictAppendList(pJoin->pTargets, pFinTargets); - } - if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc); - } - - nodesDestroyList(pTargets); - nodesDestroyList(pCondCols); - } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pOtherOnCond); @@ -784,6 +759,72 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren return code; } + +static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, + SPhysiNode** pPhyNode) { + SHashJoinPhysiNode* pJoin = + (SHashJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN); + if (NULL == pJoin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; + SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc; + int32_t code = TSDB_CODE_SUCCESS; + + pJoin->joinType = pJoinLogicNode->joinType; + pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; + + SNode* pPrimKeyCond = NULL; + SNode* pColEqCond = NULL; + SNode* pTagEqCond = NULL; + SNode* pTagOnCond = NULL; + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pPrimKeyCond); + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pColEqCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pTagEqCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagOnCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pTagOnCond, &pTagOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); + } + + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pJoin; + } else { + nodesDestroyNode((SNode*)pJoin); + } + + return code; +} + +static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, + SPhysiNode** pPhyNode) { + switch (pJoinLogicNode->joinAlgo) { + case JOIN_ALGO_MERGE: + return createMergeJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode); + case JOIN_ALGO_HASH: + return createHashJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode); + default: + planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo); + break; + } + + return TSDB_CODE_FAILED; +} + typedef struct SRewritePrecalcExprsCxt { int32_t errCode; int32_t planNodeId; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index cdf641f584..7759f04227 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -280,7 +280,7 @@ static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) { } static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) { - if (pJoin->isSingleTableJoin) { + if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) { return false; } SNode* pChild = NULL;