diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 8fb52ceb85..3a9dd3c713 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -101,6 +101,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_PROJECT, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_EXCHANGE, + QUERY_NODE_LOGIC_PLAN_WINDOW, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, @@ -115,6 +116,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_SORT, + QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 717baf24cd..2d870008df 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -80,6 +80,11 @@ typedef struct SExchangeLogicNode { int32_t srcGroupId; } SExchangeLogicNode; +typedef struct SWindowLogicNode { + SLogicNode node; + SNode* pWindow; +} SWindowLogicNode; + typedef enum ESubplanType { SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_PARTIAL, @@ -191,6 +196,14 @@ typedef struct SExchangePhysiNode { SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; +typedef struct SIntervalPhysiNode { + SPhysiNode node; + int64_t interval; + int64_t sliding; + int64_t offset; + SFillNode* pFill; +} SIntervalPhysiNode; + typedef struct SDataSinkNode { ENodeType type; SDataBlockDescNode* pInputDataBlockDesc; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a93985e8ba..543bc05554 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -257,6 +257,18 @@ static SNodeList* createColumnByRewriteExps(SLogicPlanContext* pCxt, SNodeList* return cxt.pList; } +static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { + if (NULL == pSelect->pWindow) { + return NULL; + } + + SWindowLogicNode* pAgg = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW); + CHECK_ALLOC(pAgg, NULL); + pAgg->node.id = pCxt->planNodeId++; + + +} + static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { SNodeList* pAggFuncs = NULL; CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL); @@ -345,6 +357,9 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p pRoot->pConditions = nodesCloneNode(pSelect->pWhere); CHECK_ALLOC(pRoot->pConditions, pRoot); } + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pRoot = pushLogicNode(pCxt, pRoot, createWindowLogicNode(pCxt, pSelect)); + } if (TSDB_CODE_SUCCESS == pCxt->errCode) { pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect)); }