From 226ee06209053eda652d985491b1b7a0c5e95638 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 29 Jun 2023 19:28:32 +0800 Subject: [PATCH] enh: add physical plan processing --- include/libs/nodes/nodes.h | 2 + include/libs/nodes/plannodes.h | 10 + source/libs/nodes/src/nodesCloneFuncs.c | 19 ++ source/libs/nodes/src/nodesCodeFuncs.c | 149 +++++++++++++- source/libs/nodes/src/nodesMsgFuncs.c | 195 ++++++++++++++++++- source/libs/nodes/src/nodesUtilFuncs.c | 26 +++ source/libs/planner/src/planOptimizer.c | 5 +- source/libs/planner/src/planPhysiCreater.c | 215 +++++++++++++++++++-- 8 files changed, 600 insertions(+), 21 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index be9708e0a9..daffc0624a 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -276,6 +276,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QUERY_NODE_PHYSICAL_PLAN_DELETE, + QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, + QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, QUERY_NODE_PHYSICAL_SUBPLAN, QUERY_NODE_PHYSICAL_PLAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 550c03d641..eecfae0812 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -435,6 +435,16 @@ typedef struct SHashJoinPhysiNode { SQueryStat inputStat[2]; } SHashJoinPhysiNode; +typedef struct SGroupCachePhysiNode { + SPhysiNode node; + SNodeList* pGroupCols; +} SGroupCachePhysiNode; + +typedef struct SDynQueryCtrlPhysiNode { + SPhysiNode node; + EDynQueryType qType; +} SDynQueryCtrlPhysiNode; + typedef struct SAggPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of group_by_clause and parameter expression of aggregate function diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index afc3315f64..824af67f36 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -407,6 +407,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pPrimKeyEqCond); CLONE_NODE_FIELD(pColEqCond); CLONE_NODE_FIELD(pTagEqCond); + CLONE_NODE_FIELD(pTagOnCond); CLONE_NODE_FIELD(pOtherOnCond); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(hasSubQuery); @@ -534,6 +535,18 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc return TSDB_CODE_SUCCESS; } +static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pGroupCols); + return TSDB_CODE_SUCCESS; +} + +static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + COPY_SCALAR_FIELD(qType); + return TSDB_CODE_SUCCESS; +} + static int32_t logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { COPY_OBJECT_FIELD(id, sizeof(SSubplanId)); CLONE_NODE_FIELD_EX(pNode, SLogicNode*); @@ -807,6 +820,12 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: code = logicInterpFuncCopy((const SInterpFuncLogicNode*)pNode, (SInterpFuncLogicNode*)pDst); break; + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + code = logicGroupCacheCopy((const SGroupCacheLogicNode*)pNode, (SGroupCacheLogicNode*)pDst); + break; + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + code = logicDynQueryCtrlCopy((const SDynQueryCtrlLogicNode*)pNode, (SDynQueryCtrlLogicNode*)pDst); + break; case QUERY_NODE_LOGIC_SUBPLAN: code = logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); break; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 916652c701..97df998aa3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -328,7 +328,9 @@ const char* nodesNodeName(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return "PhysiJoin"; + return "PhysiMergeJoin"; + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + return "PhysiHashJoin"; case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return "PhysiAgg"; case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -380,6 +382,10 @@ const char* nodesNodeName(ENodeType type) { return "PhysiQueryInsert"; case QUERY_NODE_PHYSICAL_PLAN_DELETE: return "PhysiDelete"; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + return "PhysiGroupCache"; + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + return "PhysiDynamicQueryCtrl"; case QUERY_NODE_PHYSICAL_SUBPLAN: return "PhysiSubplan"; case QUERY_NODE_PHYSICAL_PLAN: @@ -1960,12 +1966,16 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { static const char* jkJoinPhysiPlanJoinType = "JoinType"; static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; +static const char* jkJoinPhysiPlanOnLeftCols = "OnLeftColumns"; +static const char* jkJoinPhysiPlanOnRightCols = "OnRightColumns"; static const char* jkJoinPhysiPlanPrimKeyCondition = "PrimKeyCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanColEqualOnConditions = "ColumnEqualOnConditions"; +static const char* jkJoinPhysiPlanInputRowNum = "InputRowNum"; +static const char* jkJoinPhysiPlanInputRowSize = "InputRowSize"; -static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { +static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); @@ -1987,7 +1997,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { return code; } -static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { +static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); @@ -2009,6 +2019,76 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { return code; } +static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) { + const SHashJoinPhysiNode* pNode = (const SHashJoinPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanOnLeftCols, pNode->pOnLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanOnRightCols, pNode->pOnRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFilterConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize); + } + return code; +} + + +static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) { + SHashJoinPhysiNode* pNode = (SHashJoinPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanOnLeftCols, &pNode->pOnLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanOnRightCols, &pNode->pOnRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFilterConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[0].inputRowNum, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[0].inputRowSize, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowNum, pNode->inputStat[1].inputRowNum, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputRowSize, pNode->inputStat[1].inputRowSize, code); + } + return code; +} + + static const char* jkAggPhysiPlanExprs = "Exprs"; static const char* jkAggPhysiPlanGroupKeys = "GroupKeys"; static const char* jkAggPhysiPlanAggFuncs = "AggFuncs"; @@ -2830,6 +2910,53 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns"; + + +static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { + const SGroupCachePhysiNode* pNode = (const SGroupCachePhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols); + } + return code; +} + +static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { + SGroupCachePhysiNode* pNode = (SGroupCachePhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols); + } + return code; +} + +static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType"; + +static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { + const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType); + } + return code; +} + +static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) { + SDynQueryCtrlPhysiNode* pNode = (SDynQueryCtrlPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code); + } + return code; +} + + + static const char* jkQueryNodeAddrId = "Id"; static const char* jkQueryNodeAddrInUse = "InUse"; static const char* jkQueryNodeAddrNumOfEps = "NumOfEps"; @@ -6675,7 +6802,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return physiProjectNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return physiJoinNodeToJson(pObj, pJson); + return physiMergeJoinNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + return physiHashJoinNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return physiAggNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -6721,6 +6850,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiQueryInsertNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return physiDeleteNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + return physiGroupCacheNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + return physiDynQueryCtrlNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_SUBPLAN: return subplanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN: @@ -6997,7 +7130,9 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return jsonToPhysiProjectNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return jsonToPhysiJoinNode(pJson, pObj); + return jsonToPhysiMergeJoinNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + return jsonToPhysiHashJoinNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return jsonToPhysiAggNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -7041,6 +7176,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiQueryInsertNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return jsonToPhysiDeleteNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + return jsonToPhysiGroupCacheNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + return jsonToPhysiDynQueryCtrlNode(pJson, pObj); case QUERY_NODE_PHYSICAL_SUBPLAN: return jsonToSubplan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN: diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index ff34b7bfbb..bd93046b9d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2337,7 +2337,7 @@ enum { PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS }; -static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { +static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; int32_t code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); @@ -2359,7 +2359,7 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } -static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { +static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; int32_t code = TSDB_CODE_SUCCESS; @@ -2392,6 +2392,100 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_HASH_JOIN_CODE_BASE_NODE = 1, + PHY_HASH_JOIN_CODE_JOIN_TYPE, + PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN, + PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, + PHY_HASH_JOIN_CODE_ON_CONDITIONS, + PHY_HASH_JOIN_CODE_TARGETS, + PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0, + PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0, + PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1, + PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1 +}; + +static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SHashJoinPhysiNode* pNode = (const SHashJoinPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_HASH_JOIN_CODE_JOIN_TYPE, pNode->joinType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN, nodeListToMsg, pNode->pOnLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, nodeListToMsg, pNode->pOnRight); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFilterConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0, pNode->inputStat[0].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1, pNode->inputStat[1].inputRowNum); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0, pNode->inputStat[0].inputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize); + } + return code; +} + + +static int32_t msgToPhysiHashJoinNode(STlvDecoder* pDecoder, void* pObj) { + SHashJoinPhysiNode* pNode = (SHashJoinPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_HASH_JOIN_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_HASH_JOIN_CODE_JOIN_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); + break; + case PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnLeft); + break; + case PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnRight); + break; + case PHY_HASH_JOIN_CODE_ON_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFilterConditions); + break; + case PHY_HASH_JOIN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + case PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0: + code = tlvDecodeI64(pTlv, &pNode->inputStat[0].inputRowNum); + break; + case PHY_HASH_JOIN_CODE_INPUT_ROW_NUM1: + code = tlvDecodeI64(pTlv, &pNode->inputStat[1].inputRowNum); + break; + case PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE0: + code = tlvDecodeI32(pTlv, &pNode->inputStat[0].inputRowSize); + break; + case PHY_HASH_JOIN_CODE_INPUT_ROW_SIZE1: + code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize); + break; + default: + break; + } + } + + return code; +} + + enum { PHY_AGG_CODE_BASE_NODE = 1, PHY_AGG_CODE_EXPR, @@ -3430,6 +3524,81 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { + PHY_GROUP_CACHE_CODE_BASE_NODE = 1, + PHY_GROUP_CACHE_CODE_GROUP_COLUMNS +}; + +static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SGroupCachePhysiNode* pNode = (const SGroupCachePhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_GROUP_CACHE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS, nodeListToMsg, pNode->pGroupCols); + } + return code; +} + +static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { + SGroupCachePhysiNode* pNode = (SGroupCachePhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_GROUP_CACHE_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_GROUP_CACHE_CODE_GROUP_COLUMNS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupCols); + break; + default: + break; + } + } + + return code; +} + + +enum { + PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1, + PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE +}; + +static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDynQueryCtrlPhysiNode* pNode = (const SDynQueryCtrlPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_DYN_QUERY_CTRL_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, pNode->qType); + } + return code; +} + +static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { + SDynQueryCtrlPhysiNode* pNode = (SDynQueryCtrlPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DYN_QUERY_CTRL_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType)); + break; + default: + break; + } + } + + return code; +} + + + enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID }; static int32_t subplanIdInlineToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -3738,7 +3907,10 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { code = physiProjectNodeToMsg(pObj, pEncoder); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - code = physiJoinNodeToMsg(pObj, pEncoder); + code = physiMergeJoinNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + code = physiHashJoinNodeToMsg(pObj, pEncoder); break; case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: code = physiAggNodeToMsg(pObj, pEncoder); @@ -3799,6 +3971,12 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_PHYSICAL_PLAN_DELETE: code = physiDeleteNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + code = physiGroupCacheNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + code = physiDynQueryCtrlNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_SUBPLAN: code = subplanToMsg(pObj, pEncoder); break; @@ -3881,7 +4059,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { code = msgToPhysiProjectNode(pDecoder, pObj); break; case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - code = msgToPhysiJoinNode(pDecoder, pObj); + code = msgToPhysiMergeJoinNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + code = msgToPhysiHashJoinNode(pDecoder, pObj); break; case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: code = msgToPhysiAggNode(pDecoder, pObj); @@ -3942,6 +4123,12 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_DELETE: code = msgToPhysiDeleteNode(pDecoder, pObj); break; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + code = msgToPhysiGroupCacheNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + code = msgToPhysiDynQueryCtrlNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_SUBPLAN: code = msgToSubplan(pDecoder, pObj); break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 42ce5370b0..72943b55a8 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -520,6 +520,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: return makeNode(type, sizeof(SSortMergeJoinPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + return makeNode(type, sizeof(SHashJoinPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return makeNode(type, sizeof(SAggPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -575,6 +577,10 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SQueryInserterNode)); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return makeNode(type, sizeof(SDataDeleterNode)); + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + return makeNode(type, sizeof(SGroupCachePhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + return makeNode(type, sizeof(SDynQueryCtrlPhysiNode)); case QUERY_NODE_PHYSICAL_SUBPLAN: return makeNode(type, sizeof(SSubplan)); case QUERY_NODE_PHYSICAL_PLAN: @@ -1243,6 +1249,15 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pColEqCond); break; } + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: { + SHashJoinPhysiNode* pPhyNode = (SHashJoinPhysiNode*)pNode; + destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyList(pPhyNode->pOnLeft); + nodesDestroyList(pPhyNode->pOnRight); + nodesDestroyNode(pPhyNode->pFilterConditions); + nodesDestroyList(pPhyNode->pTargets); + break; + } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { SAggPhysiNode* pPhyNode = (SAggPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); @@ -1361,6 +1376,17 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pSink->pEndTs); break; } + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: { + SGroupCachePhysiNode* pPhyNode = (SGroupCachePhysiNode*)pNode; + destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyList(pPhyNode->pGroupCols); + break; + } + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: { + SDynQueryCtrlPhysiNode* pPhyNode = (SDynQueryCtrlPhysiNode*)pNode; + destroyPhysiNode((SPhysiNode*)pPhyNode); + break; + } case QUERY_NODE_PHYSICAL_SUBPLAN: { SSubplan* pSubplan = (SSubplan*)pNode; nodesClearList(pSubplan->pChildren); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 99b47079cf..a9de5ce309 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1882,6 +1882,9 @@ static bool eliminateProjOptCanChildConditionUseChildTargets(SLogicNode* pChild, nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); if (!cxt.canUse) return false; } + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->joinAlgo != JOIN_ALGO_UNKNOWN) { + return false; + } if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild) && ((SJoinLogicNode*)pChild)->pOtherOnCond) { SJoinLogicNode* pJoinLogicNode = (SJoinLogicNode*)pChild; CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = false}; @@ -3160,7 +3163,7 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; pJoin->pTagEqCond = nodesCloneNode(pOrigJoin->pTagEqCond); - pJoin->pTagOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); + pJoin->pOtherOnCond = nodesCloneNode(pOrigJoin->pTagOnCond); int32_t code = TSDB_CODE_SUCCESS; pJoin->node.pChildren = pChildren; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b6d33a50b7..31bf048917 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -708,6 +708,19 @@ static int32_t mergeEqCond(SNode** ppDst, SNode** ppSrc) { return TSDB_CODE_SUCCESS; } +static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) { + if (2 == pChildren->length) { + *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc; + } else if (1 == pChildren->length && nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) { + SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0); + *ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc; + } else { + planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0))); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { @@ -717,14 +730,20 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi return TSDB_CODE_OUT_OF_MEMORY; } - SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; - SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc; - int32_t code = TSDB_CODE_SUCCESS; - pJoin->joinType = pJoinLogicNode->joinType; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; - setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, - &pJoin->pPrimKeyCond); + + SDataBlockDescNode* pLeftDesc = NULL; + SDataBlockDescNode* pRightDesc = NULL; + int32_t code = getJoinDataBlockDescNode(pChildren, 0, &pLeftDesc); + if (TSDB_CODE_SUCCESS == code) { + code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc); + } + + if (TSDB_CODE_SUCCESS == code) { + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, + &pJoin->pPrimKeyCond); + } if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, @@ -759,6 +778,131 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi return code; } +static int32_t extractHashJoinOpCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) { + if (QUERY_NODE_OPERATOR == nodeType(pEq)) { + SOperatorNode* pOp = (SOperatorNode*)pEq; + SColumnNode* pLeft = (SColumnNode*)pOp->pLeft; + SColumnNode* pRight = (SColumnNode*)pOp->pRight; + if (lBlkId == pLeft->dataBlockId && rBlkId == pRight->dataBlockId) { + nodesListStrictAppend(pJoin->pOnLeft, nodesCloneNode(pOp->pLeft)); + nodesListStrictAppend(pJoin->pOnRight, nodesCloneNode(pOp->pRight)); + } else if (rBlkId == pLeft->dataBlockId && lBlkId == pRight->dataBlockId) { + nodesListStrictAppend(pJoin->pOnLeft, nodesCloneNode(pOp->pRight)); + nodesListStrictAppend(pJoin->pOnRight, nodesCloneNode(pOp->pLeft)); + } else { + planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId, pRight->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; + } + + planError("Invalid join equal node type:%d", nodeType(pEq)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t extractHashJoinOnCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) { + if (NULL == pEq) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + if (QUERY_NODE_OPERATOR == nodeType(pEq)) { + code = extractHashJoinOpCols(lBlkId, rBlkId, pEq, pJoin); + } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEq)) { + SLogicConditionNode* pLogic = (SLogicConditionNode*)pEq; + SNode* pNode = NULL; + FOREACH(pNode, pLogic->pParameterList) { + code = extractHashJoinOpCols(lBlkId, rBlkId, pNode, pJoin); + if (code) { + break; + } + } + } else { + planError("Invalid join equal node type:%d", nodeType(pEq)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return code; +} + +static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3, SHashJoinPhysiNode* pJoin) { + int32_t code = TSDB_CODE_SUCCESS; + pJoin->pOnLeft = nodesMakeList(); + pJoin->pOnRight = nodesMakeList(); + if (NULL == pJoin->pOnLeft || NULL == pJoin->pOnRight) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin); + if (TSDB_CODE_SUCCESS == code) { + code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { + code = extractHashJoinOnCols(lBlkId, rBlkId, pEq3, pJoin); + } + if (TSDB_CODE_SUCCESS == code && pJoin->pOnLeft->length <= 0) { + planError("Invalid join equal column num: %d", pJoin->pOnLeft->length); + code = TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return code; +} + +static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhysiNode* pJoin) { + SNode* pNode = NULL; + char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; + SSHashObj* pHash = tSimpleHashInit(pJoin->pTargets->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SNodeList* pNew = nodesMakeList(); + + FOREACH(pNode, pJoin->pTargets) { + SColumnNode* pCol = (SColumnNode*)pNode; + int32_t len = getSlotKey(pNode, NULL, name); + tSimpleHashPut(pHash, name, len, &pCol, POINTER_BYTES); + } + + nodesClearList(pJoin->pTargets); + pJoin->pTargets = pNew; + + FOREACH(pNode, pJoin->pOnLeft) { + SColumnNode* pCol = (SColumnNode*)pNode; + int32_t len = getSlotKey(pNode, NULL, name); + SNode** p = tSimpleHashGet(pHash, name, len); + if (p) { + nodesListStrictAppend(pJoin->pTargets, *p); + tSimpleHashRemove(pHash, name, len); + } + } + FOREACH(pNode, pJoin->pOnRight) { + SColumnNode* pCol = (SColumnNode*)pNode; + int32_t len = getSlotKey(pNode, NULL, name); + SNode** p = tSimpleHashGet(pHash, name, len); + if (p) { + nodesListStrictAppend(pJoin->pTargets, *p); + tSimpleHashRemove(pHash, name, len); + } + } + + if (tSimpleHashGetSize(pHash) > 0) { + SNode** p = NULL; + int32_t iter = 0; + while (1) { + p = tSimpleHashIterate(pHash, p, &iter); + if (p == NULL) { + break; + } + + nodesListStrictAppend(pJoin->pTargets, *p); + } + } + + tSimpleHashCleanup(pHash); + + return TSDB_CODE_SUCCESS; +} static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { @@ -778,7 +922,6 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil SNode* pPrimKeyCond = NULL; SNode* pColEqCond = NULL; SNode* pTagEqCond = NULL; - SNode* pTagOnCond = NULL; code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pColEqCond); @@ -786,21 +929,29 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil if (TSDB_CODE_SUCCESS == code) { code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pTagEqCond); } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagOnCond) { - code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pTagOnCond, &pTagOnCond); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions); } if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); } - if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } - + if (TSDB_CODE_SUCCESS == code) { + code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pPrimKeyCond, pColEqCond, pTagEqCond, pJoin); + } + if (TSDB_CODE_SUCCESS == code) { + code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + } if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } + nodesDestroyNode(pPrimKeyCond); + nodesDestroyNode(pColEqCond); + nodesDestroyNode(pTagEqCond); + if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pJoin; } else { @@ -825,6 +976,44 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren return TSDB_CODE_FAILED; } +static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SGroupCacheLogicNode* pLogicNode, + SPhysiNode** pPhyNode) { + SGroupCachePhysiNode* pGrpCache = + (SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE); + if (NULL == pGrpCache) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; + int32_t code = TSDB_CODE_SUCCESS; + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols); + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pGrpCache; + } else { + nodesDestroyNode((SNode*)pGrpCache); + } + + return code; +} + +static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, + SPhysiNode** pPhyNode) { + SDynQueryCtrlPhysiNode* pDynCtrl = + (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL); + if (NULL == pDynCtrl) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pDynCtrl->qType = pLogicNode->qType; + + *pPhyNode = (SPhysiNode*)pDynCtrl; + + return TSDB_CODE_SUCCESS; +} + typedef struct SRewritePrecalcExprsCxt { int32_t errCode; int32_t planNodeId; @@ -1737,6 +1926,10 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_MERGE: return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: + return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: + return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode); default: break; }