From 0b56a4049a894f55a949b2c431ae29639740e680 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 21 Apr 2022 17:51:39 +0800 Subject: [PATCH] feat: sql command 'union' --- source/libs/planner/src/planLogicCreater.c | 50 +++++++++- source/libs/planner/src/planSpliter.c | 106 +++++++++++++++++---- source/libs/planner/test/planSetOpTest.cpp | 6 ++ source/libs/planner/test/planTestUtil.cpp | 5 +- 4 files changed, 142 insertions(+), 25 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 83dd71a834..b13c0a8888 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { +static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { nodesWalkExprs(pExprs, doNameExpr, NULL); SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); return cxt.errCode; } +static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) { + SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; + nodesRewriteExprs(pTarget, doRewriteExpr, &cxt); + return cxt.errCode; +} + static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) { if (NULL == pNewRoot->pChildren) { pNewRoot->pChildren = nodesMakeList(); @@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { - code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); + code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); } if (TSDB_CODE_SUCCESS == code) { - code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); + code = rewriteExprForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { @@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm } if (TSDB_CODE_SUCCESS == code) { - code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); + code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } if (TSDB_CODE_SUCCESS == code) { @@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { - code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); + code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); } // set the output @@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator return code; } +static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) { + SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); + if (NULL == pAgg) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; + pAgg->pGroupKeys = nodesCloneList(pSetOperator->pProjectionList); + if (NULL == pAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + // rewrite the expression in subsequent clauses + if (TSDB_CODE_SUCCESS == code) { + code = rewriteExprs(pAgg->pGroupKeys, pSetOperator->pOrderByList); + } + + // set the output + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pAgg; + } else { + nodesDestroyNode(pAgg); + } + + return code; +} + static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) { SLogicNode* pSetOp = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO case SET_OP_TYPE_UNION_ALL: code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp); break; + case SET_OP_TYPE_UNION: + code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp); + break; default: code = -1; break; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index f091682bc8..b419414ca6 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -50,6 +50,11 @@ typedef struct SUaInfo { SLogicSubplan* pSubplan; } SUaInfo; +typedef struct SUnInfo { + SAggLogicNode* pAgg; + SLogicSubplan* pSubplan; +} SUnInfo; + typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo); static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) { @@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) { pSubplan->id.groupId = pCxt->groupId; pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = pNode; - // TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*); return pSubplan; } @@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan pSubplan->subplanType = SUBPLAN_TYPE_MERGE; - return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange); + if (NULL == pProject->node.pParent) { + pSubplan->pNode = (SLogicNode*)pExchange; + nodesDestroyNode(pProject); + return TSDB_CODE_SUCCESS; + } - // if (NULL == pProject->node.pParent) { - // pSubplan->pNode = (SLogicNode*)pExchange; - // nodesDestroyNode(pProject); - // return TSDB_CODE_SUCCESS; - // } - - // SNode* pNode; - // FOREACH(pNode, pProject->node.pParent->pChildren) { - // if (nodesEqualNode(pNode, pProject)) { - // REPLACE_NODE(pExchange); - // nodesDestroyNode(pNode); - // return TSDB_CODE_SUCCESS; - // } - // } - // nodesDestroyNode(pExchange); - // return TSDB_CODE_FAILED; + SNode* pNode; + FOREACH(pNode, pProject->node.pParent->pChildren) { + if (nodesEqualNode(pNode, pProject)) { + REPLACE_NODE(pExchange); + nodesDestroyNode(pNode); + return TSDB_CODE_SUCCESS; + } + } + nodesDestroyNode(pExchange); + return TSDB_CODE_FAILED; } static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { @@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return code; } +static SLogicNode* unMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { + SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); + if (NULL == pExchange) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pExchange->srcGroupId = pCxt->groupId; + // pExchange->precision = pScan->pMeta->tableInfo.precision; + pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets); + if (NULL == pExchange->node.pTargets) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + + return nodesListMakeAppend(&pAgg->node.pChildren, pExchange); +} + +static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) { + SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pAgg = (SAggLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + +static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + SUnInfo info = {0}; + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + + SNode* pChild = NULL; + FOREACH(pChild, info.pAgg->node.pChildren) { + code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild)); + if (TSDB_CODE_SUCCESS == code) { + REPLACE_NODE(NULL); + } else { + break; + } + } + if (TSDB_CODE_SUCCESS == code) { + nodesClearList(info.pAgg->node.pChildren); + info.pAgg->node.pChildren = NULL; + code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg); + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + static const SSplitRule splitRuleSet[] = { { .pName = "SuperTableScan", .splitFunc = stsSplit }, { .pName = "ChildTableJoin", .splitFunc = ctjSplit }, { .pName = "UnionAll", .splitFunc = uaSplit }, + { .pName = "Union", .splitFunc = unSplit } }; static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); diff --git a/source/libs/planner/test/planSetOpTest.cpp b/source/libs/planner/test/planSetOpTest.cpp index d25323f2f3..5ace503959 100644 --- a/source/libs/planner/test/planSetOpTest.cpp +++ b/source/libs/planner/test/planSetOpTest.cpp @@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) { run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20"); } + +TEST_F(PlanSetOpTest, union) { + useDb("root", "test"); + + run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20"); +} diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 8198ae3fa0..222ad9780b 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -62,7 +62,7 @@ public: doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); SQueryPlan* pPlan = nullptr; - doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL); + doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); if (g_isDump) { dump(); @@ -162,7 +162,8 @@ private: res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan)); } - void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) { + void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) { + SArray* pExecNodeList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr)); DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList); res_.physiPlan_ = toString((SNode*)(*pPlan)); SNode* pNode;