From 8f39b9d2e458afe4fcfebcbc99a289e121834963 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 27 Jun 2023 19:36:51 +0800 Subject: [PATCH] enh: optimize query plan --- include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 28 +- include/libs/nodes/querynodes.h | 7 + source/libs/command/src/explain.c | 6 +- source/libs/executor/src/mergejoinoperator.c | 18 +- source/libs/nodes/src/nodesCodeFuncs.c | 14 +- source/libs/nodes/src/nodesMsgFuncs.c | 16 +- source/libs/nodes/src/nodesUtilFuncs.c | 8 +- source/libs/parser/src/parTranslater.c | 1 + source/libs/planner/src/planLogicCreater.c | 9 +- source/libs/planner/src/planOptimizer.c | 261 ++++++++++++++++++- source/libs/planner/src/planPhysiCreater.c | 73 +++++- source/libs/planner/src/planSpliter.c | 4 +- source/libs/planner/src/planner.c | 2 +- 14 files changed, 384 insertions(+), 64 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index f3ec0d85e0..86286a4de3 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -232,6 +232,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_PARTITION, QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_LOGIC_PLAN_INTERP_FUNC, + QUERY_NODE_LOGIC_PLAN_GROUP_CACHE, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 88d1780230..dcadb5d5e8 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -108,13 +108,15 @@ typedef struct SScanLogicNode { } SScanLogicNode; typedef struct SJoinLogicNode { - SLogicNode node; - EJoinType joinType; - SNode* pPrimKeyEqCond; - SNode* pColEqCond; - SNode* pTagEqCond; - SNode* pOtherOnCond; - bool isSingleTableJoin; + SLogicNode node; + EJoinType joinType; + EJoinAlgorithm joinAlgo; + SNode* pPrimKeyEqCond; + SNode* pColEqCond; + SNode* pTagEqCond; + SNode* pOtherOnCond; + bool isSingleTableJoin; + bool hasSubQuery; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -153,6 +155,12 @@ typedef struct SInterpFuncLogicNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; +typedef struct SGroupCacheLogicNode { + SLogicNode node; + SNode* pGroupCol; +} SGroupCacheLogicNode; + + typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; typedef struct SVnodeModifyLogicNode { @@ -404,10 +412,10 @@ typedef struct SInterpFuncPhysiNode { typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; - SNode* pMergeCondition; - SNode* pOnConditions; + SNode* pPrimKeyCond; + SNode* pColEqCond; + SNode* pOtherOnCond; SNodeList* pTargets; - SNode* pColEqualOnConditions; } SSortMergeJoinPhysiNode; typedef struct SHashJoinPhysiNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 7ff0231519..abcb6a095b 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -174,9 +174,16 @@ typedef enum EJoinType { JOIN_TYPE_RIGHT, } EJoinType; +typedef enum EJoinAlgorithm { + JOIN_ALGO_UNKNOWN = 0, + JOIN_ALGO_MERGE, + JOIN_ALGO_HASH, +} EJoinAlgorithm; + typedef struct SJoinTableNode { STableNode table; // QUERY_NODE_JOIN_TABLE EJoinType joinType; + bool hasSubQuery; SNode* pLeft; SNode* pRight; SNode* pOnCond; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 884c0f7b20..5e13e6890d 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -575,11 +575,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); QRY_ERR_RET( - nodesNodeToSQL(pJoinNode->pMergeCondition, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - if (pJoinNode->pOnConditions) { + nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + if (pJoinNode->pOtherOnCond) { EXPLAIN_ROW_APPEND(" AND "); QRY_ERR_RET( - nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + nodesNodeToSQL(pJoinNode->pOtherOnCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 5e0df27715..7b526a95d8 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -74,13 +74,13 @@ static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInf static void extractTimeCondition(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { - SNode* pMergeCondition = pJoinNode->pMergeCondition; - if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) { + SNode* pPrimKeyCond = pJoinNode->pPrimKeyCond; + if (nodeType(pPrimKeyCond) != QUERY_NODE_OPERATOR) { qError("not support this in join operator, %s", idStr); return; // do not handle this } - SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; + SOperatorNode* pNode = (SOperatorNode*)pPrimKeyCond; SColumnNode* col1 = (SColumnNode*)pNode->pLeft; SColumnNode* col2 = (SColumnNode*)pNode->pRight; SColumnNode* leftTsCol = NULL; @@ -222,7 +222,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo)); - if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { + if (pJoinNode->pOtherOnCond != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); if (pInfo->pCondAfterMerge == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -236,11 +236,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOtherOnCond)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pOnConditions != NULL) { - pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOnConditions); + } else if (pJoinNode->pOtherOnCond != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pOtherOnCond); + } else if (pJoinNode->pColEqCond != NULL) { + pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->pColEqCond); } else if (pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesCloneNode(pJoinNode->node.pConditions); } else { @@ -259,7 +261,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } - pInfo->pColEqualOnConditions = pJoinNode->pColEqualOnConditions; + pInfo->pColEqualOnConditions = pJoinNode->pColEqCond; if (pInfo->pColEqualOnConditions != NULL) { pInfo->leftEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); pInfo->rightEqOnCondCols = taosArrayInit(4, sizeof(SColumn)); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8b7f708b57..cc66ffbd9c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1892,7 +1892,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { static const char* jkJoinPhysiPlanJoinType = "JoinType"; static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; -static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; +static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanColEqualOnConditions = "ColumnEqualOnConditions"; @@ -1905,16 +1905,16 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + code = tjsonAddObject(pJson, jkJoinPhysiPlanPrimKeyCondition, nodeToJson, pNode->pPrimKeyCond); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions); + code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOtherOnCond); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqualOnConditions); + code = tjsonAddObject(pJson, jkJoinPhysiPlanColEqualOnConditions, nodeToJson, pNode->pColEqCond); } return code; } @@ -1927,16 +1927,16 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOtherOnCond); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanMergeCondition, &pNode->pMergeCondition); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanPrimKeyCondition, &pNode->pPrimKeyCond); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqualOnConditions); + code = jsonToNodeObject(pJson, jkJoinPhysiPlanColEqualOnConditions, &pNode->pColEqCond); } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 1ca37defa4..635712658d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2330,7 +2330,7 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, - PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, + PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_TARGETS, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, @@ -2345,16 +2345,16 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, nodeToMsg, pNode->pMergeCondition); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION, nodeToMsg, pNode->pPrimKeyCond); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOnConditions); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOtherOnCond); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqualOnConditions); + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pColEqCond); } return code; } @@ -2372,17 +2372,17 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE: code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); break; - case PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pMergeCondition); + case PHY_SORT_MERGE_JOIN_CODE_PRIM_KEY_CONDITION: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pPrimKeyCond); break; case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOnConditions); + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOtherOnCond); break; case PHY_SORT_MERGE_JOIN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS: - code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqualOnConditions); + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pColEqCond); break; default: break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index f6baa734dd..0c76efb6d7 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -490,6 +490,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SIndefRowsFuncLogicNode)); case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return makeNode(type, sizeof(SInterpFuncLogicNode)); + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + return makeNode(type, sizeof(SGroupCacheLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SLogicSubplan)); case QUERY_NODE_LOGIC_PLAN: @@ -1222,10 +1224,10 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); - nodesDestroyNode(pPhyNode->pMergeCondition); - nodesDestroyNode(pPhyNode->pOnConditions); + nodesDestroyNode(pPhyNode->pPrimKeyCond); + nodesDestroyNode(pPhyNode->pOtherOnCond); nodesDestroyList(pPhyNode->pTargets); - nodesDestroyNode(pPhyNode->pColEqualOnConditions); + nodesDestroyNode(pPhyNode->pColEqCond); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b3a043fe12..bd292213fc 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2763,6 +2763,7 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { pJoinTable->table.precision = calcJoinTablePrecision(pJoinTable); pJoinTable->table.singleTable = joinTableIsSingleTable(pJoinTable); code = translateExpr(pCxt, &pJoinTable->pOnCond); + pJoinTable->hasSubQuery = (nodeType(pJoinTable->pLeft) != QUERY_NODE_REAL_TABLE) || (nodeType(pJoinTable->pRight) != QUERY_NODE_REAL_TABLE); } break; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index e56c859561..4da38f1869 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -436,6 +436,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->joinType = pJoinTable->joinType; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; + pJoin->hasSubQuery = pJoinTable->hasSubQuery; pJoin->node.inputTsOrder = ORDER_ASC; pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; @@ -475,12 +476,12 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set the output if (TSDB_CODE_SUCCESS == code) { - pJoin->node.pTargets = nodesCloneList(pLeft->pTargets); - if (NULL == pJoin->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; + SNodeList* pColList = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, NULL, COLLECT_COL_TYPE_ALL, &pColList); } if (TSDB_CODE_SUCCESS == code) { - code = nodesListStrictAppendList(pJoin->node.pTargets, nodesCloneList(pRight->pTargets)); + code = createColumnByRewriteExprs(pColList, &pJoin->node.pTargets); } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 25b5d21f09..6bad4d67f1 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -689,11 +689,14 @@ static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNo int32_t code = TSDB_CODE_SUCCESS; SNodeList* pOnConds = NULL; SNode* pCond = NULL; - FOREACH(pCond, pLogicCond->pParameterList) { + WHERE_EACH(pCond, pLogicCond->pParameterList) { if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { + nodesDestroyNode(*ppPrimKeyEqCond); *ppPrimKeyEqCond = nodesCloneNode(pCond); + ERASE_NODE(pLogicCond->pParameterList); } else { code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); + WHERE_NEXT; } } @@ -721,9 +724,8 @@ static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppPr } if (pushDownCondOptIsPriKeyEqualCond(pJoin, pJoin->pOtherOnCond)) { - *ppPrimKeyEqCond = nodesCloneNode(pJoin->pOtherOnCond); + *ppPrimKeyEqCond = pJoin->pOtherOnCond; *ppOnCond = NULL; - nodesDestroyNode(pJoin->pOtherOnCond); pJoin->pOtherOnCond = NULL; return TSDB_CODE_SUCCESS; } else { @@ -1814,15 +1816,9 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } - if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) { + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->pOtherOnCond) { SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild; CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; - nodesWalkExpr(pJoinLogicNode->pPrimKeyEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); - if (!cxt.canUse) return false; - nodesWalkExpr(pJoinLogicNode->pColEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); - if (!cxt.canUse) return false; - nodesWalkExpr(pJoinLogicNode->pTagEqCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); - if (!cxt.canUse) return false; nodesWalkExpr(pJoinLogicNode->pOtherOnCond, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } @@ -2969,6 +2965,246 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog return TSDB_CODE_SUCCESS; } +static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) { + return false; + } + + SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode; + if (pJoin->isSingleTableJoin || NULL == pJoin->pTagEqCond || pNode->pChildren->length != 2 || pJoin->hasSubQuery || pJoin->joinAlgo != UNKNOWN_JOIN_ALGO) { + return false; + } + + return true; +} + +int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin; + + pScan->scanType = SCAN_TYPE_TAG; + NODES_DESTORY_LIST(pScan->pScanCols); + NODES_DESTORY_NODE(pScan->node.pConditions); + pScan->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + + SNodeList* pTags = nodesMakeList(); + int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); + if (TSDB_CODE_SUCCESS == code) { + SNode* pTarget = NULL; + SNode* pTag = NULL; + bool found = false; + WHERE_EACH(pTarget, pScan->node.pTargets) { + found = false; + FOREACH(pTag, pTags) { + if (nodesEqualNode(pTarget, pTag)) { + found = true; + break; + } + } + if (!found) { + ERASE_NODE(pScan->node.pTargets); + } else { + WHERE_NEXT; + } + } + } + if (TSDB_CODE_SUCCESS == code) { + SFunctionNode* pUidFunc = createFunction("_tbuid", NULL); + snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", + pUidFunc->functionName, pUidFunc); + nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc); + code = createColumnByRewriteExpr(pUidFunc, &pScan->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + SFunctionNode* pVgidFunc = createFunction("_vgid", NULL); + snprintf(pVgidFunc->node.aliasName, sizeof(pVgidFunc->node.aliasName), "%s.%p", + pVgidFunc->functionName, pVgidFunc); + nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pVgidFunc); + code = createColumnByRewriteExpr(pVgidFunc, &pScan->node.pTargets); + } + + if (code) { + nodesDestroyList(pTags); + } + + return code; +} + +static int32_t stbJoinOptCreateTagScanNode(SLogicNode* pJoin, SNodeList** ppList) { + SNodeList* pList = nodesCloneList(pJoin->pChildren); + if (NULL == pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + code = stbJoinOptRewriteToTagScan(pJoin, pNode); + if (code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *ppList = pList; + } else { + nodesDestroyList(pList); + } + + return code; +} + +static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) { + SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig; + SJoinLogicNode* pJoin = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_JOIN); + if (NULL == pJoin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pJoin->joinType = pOrigJoin->joinType; + pJoin->joinAlgo = JOIN_ALGO_HASH; + pJoin->isSingleTableJoin = pOrigJoin->isSingleTableJoin; + pJoin->hasSubQuery = pOrigJoin->hasSubQuery; + pJoin->node.inputTsOrder = pOrigJoin->node.inputTsOrder; + pJoin->node.groupAction = pOrigJoin->node.groupAction; + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond); + + int32_t code = TSDB_CODE_SUCCESS; + pJoin->node.pChildren = pChildren; + + SNode* pNode = NULL; + FOREACH(pNode, pChildren) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + SNode* pCol = NULL; + FOREACH(pCol, pScan->pScanPseudoCols) { + if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { + code = createColumnByRewriteExpr(pCol, &pJoin->node.pTargets); + if (code) { + break; + } + } + } + if (code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *ppLogic = (SLogicNode*)pJoin; + } else { + nodesDestroyNode((SNode*)pJoin); + } + + return code; +} + +static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppList) { + SNodeList* pList = nodesCloneList(pJoin->pChildren); + if (NULL == pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + code = stbJoinOptAddUidToScan(pJoin, pNode); + if (code) { + break; + } + } + + *ppList = pList; + + return TSDB_CODE_SUCCESS; +} + +static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pOrig, SNodeList* pChildren, SLogicNode** ppLogic) { + SJoinLogicNode* pOrigJoin = (SJoinLogicNode*)pOrig; + SGroupCacheLogicNode* pGrpCache = (SJoinLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE); + if (NULL == pGrpCache) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + pGrpCache->node.pTargets = nodesMakeList(); + if (NULL == pGrpCache->node.pTargets) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0); + code = nodesListStrictAppendList(pGrpCache->node.pTargets, nodesCloneList(pScan->node.pTargets)); + } + + if (TSDB_CODE_SUCCESS == code) { + *ppLogic = (SLogicNode*)pGrpCache; + } else { + nodesDestroyNode((SNode*)pGrpCache); + } + + return code; +} + + +static int32_t stbJoinOptCreateDynTaskCtrlNode(SLogicNode* pJoin, SLogicNode* pHJoinNode, SLogicNode* pMJoinNode, SLogicNode** ppDynNode) { + int32_t code = TSDB_CODE_SUCCESS; + + if (TSDB_CODE_SUCCESS == code) { + pDynNode->pChildren = nodesMakeList(); + if (NULL == pDynNode->pChildren) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppend(pDynNode->pChildren, (SNode*)pHJoinNode); + } +} + +static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) { + SNodeList* pTagScanNodes = NULL; + SNodeList* pTbScanNodes = NULL; + SLogicNode* pGrpCacheNode = NULL; + SLogicNode* pHJoinNode = NULL; + SLogicNode* pMJoinNode = NULL; + SLogicNode* pDynNode = NULL; + int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes); + if (TSDB_CODE_SUCCESS == code) { + code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbJoinOptCreateTableScanNodes(pJoin, pTbScanNodes); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbJoinOptCreateGroupCacheNode(pJoin, pTbScanNodes, &pGrpCacheNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbJoinOptCreateMergeJoinNode(pJoin, pGrpCacheNode, &pMJoinNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbJoinOptCreateDynTaskCtrlNode(pJoin, pHJoinNode, pMJoinNode, &pDynNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pLogicSubplan, pJoin, (SLogicNode*)pDynNode); + } + if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode((SNode*)pJoin); + pCxt->optimized = true; + } + return code; +} + +static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, stbJoinOptShouldBeOptimized); + if (NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan); +} + + // clang-format off static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, @@ -2977,6 +3213,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "SortPrimaryKey", .optimizeFunc = sortPrimaryKeyOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaIndexOptimize}, {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, + {.pName = "StableJoin", .optimizeFunc = stableJoinOptimize}, {.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, @@ -2998,9 +3235,9 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { char* pStr = NULL; nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); if (NULL == pRuleName) { - qDebugL("before optimize: %s", pStr); + qDebugL("before optimize, JsonPlan: %s", pStr); } else { - qDebugL("apply optimize %s rule: %s", pRuleName, pStr); + qDebugL("apply optimize %s rule, JsonPlan: %s", pRuleName, pStr); } taosMemoryFree(pStr); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 653072ace1..0269fc5f66 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -665,6 +665,49 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, return TSDB_CODE_FAILED; } +static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { + if (NULL == *ppSrc) { + return TSDB_CODE_SUCCESS; + } + if (NULL == *ppDst) { + *ppDst = *ppSrc; + *ppSrc = NULL; + return TSDB_CODE_SUCCESS; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) { + TSWAP(*ppDst, *ppSrc); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppDst)) { + SLogicConditionNode* pLogic = (SLogicConditionNode*)*ppDst; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*ppSrc)) { + nodesListStrictAppendList(pLogic->pParameterList, ((SLogicConditionNode*)(*ppSrc))->pParameterList); + ((SLogicConditionNode*)(*ppSrc))->pParameterList = NULL; + } else { + nodesListStrictAppend(pLogic->pParameterList, *ppSrc); + *ppSrc = NULL; + } + nodesDestroyNode(*ppSrc); + *ppSrc = NULL; + return TSDB_CODE_SUCCESS; + } + + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; + pLogicCond->condType = LOGIC_COND_TYPE_AND; + pLogicCond->pParameterList = nodesMakeList(); + nodesListStrictAppend(pLogicCond->pParameterList, *ppSrc); + nodesListStrictAppend(pLogicCond->pParameterList, *ppDst); + + *ppDst = (SNode*)pLogicCond; + *ppSrc = NULL; + + return TSDB_CODE_SUCCESS; +} + static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { SSortMergeJoinPhysiNode* pJoin = @@ -680,40 +723,58 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren pJoin->joinType = pJoinLogicNode->joinType; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, - &pJoin->pMergeCondition); + &pJoin->pPrimKeyCond); + if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); } - if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); - } if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { SNodeList* pCondCols = nodesMakeList(); + SNodeList* pTargets = NULL; + SNodeList* pFinTargets = NULL; if (NULL == pCondCols) { code = TSDB_CODE_OUT_OF_MEMORY; } else { code = nodesCollectColumnsFromNode(pJoinLogicNode->pOtherOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols); } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pCondCols, &pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pTargets, &pFinTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListStrictAppendList(pJoin->pTargets, pFinTargets); + } if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pCondCols, pJoin->node.pOutputDataBlockDesc); } + + nodesDestroyList(pTargets); nodesDestroyList(pCondCols); } if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, - pJoinLogicNode->pOtherOnCond, &pJoin->pOnConditions); + pJoinLogicNode->pOtherOnCond, &pJoin->pOtherOnCond); } + if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) { + code = mergeEqCond(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond); + } if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqualOnConditions); + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); } if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); + } + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pJoin; } else { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 246ee13fb0..cdf641f584 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1621,9 +1621,9 @@ static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { char* pStr = NULL; nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); if (NULL == pRuleName) { - qDebugL("before split: %s", pStr); + qDebugL("before split, JsonPlan: %s", pStr); } else { - qDebugL("apply split %s rule: %s", pRuleName, pStr); + qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr); } taosMemoryFree(pStr); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 2fcc8510d4..6dd9c544cc 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -33,7 +33,7 @@ static void dumpQueryPlan(SQueryPlan* pPlan) { } char* pStr = NULL; nodesNodeToString((SNode*)pPlan, false, &pStr, NULL); - planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr); + planDebugL("QID:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr); taosMemoryFree(pStr); }