From 853f1df3abcfd071a6d584f36aea036edbd976a3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 24 Jun 2022 21:07:12 +0800 Subject: [PATCH] feat: support partition by expression and aggregate function output together --- include/libs/nodes/plannodes.h | 1 + source/libs/function/src/builtins.c | 18 ++++++-- source/libs/parser/src/parTranslater.c | 44 ++++++++++++++----- source/libs/parser/test/parSelectTest.cpp | 23 ++++++++++ source/libs/planner/src/planLogicCreater.c | 8 ++-- source/libs/planner/src/planOptimizer.c | 15 ++++++- source/libs/planner/src/planSpliter.c | 20 ++++----- source/libs/planner/test/planOptimizeTest.cpp | 10 ++++- source/libs/planner/test/planPartByTest.cpp | 6 +++ 9 files changed, 115 insertions(+), 30 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index f8cf58d8cb..a7b8c97740 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -75,6 +75,7 @@ typedef struct SScanLogicNode { double filesFactor; SArray* pSmaIndexes; SNodeList* pPartTags; + bool partSort; } SScanLogicNode; typedef struct SJoinLogicNode { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 6516a10795..af5bebc601 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1274,13 +1274,12 @@ static bool validateTimestampDigits(const SValueNode* pVal) { } int64_t tsVal = pVal->datum.i; - char fraction[20] = {0}; + char fraction[20] = {0}; NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction); int32_t tsDigits = (int32_t)strlen(fraction); if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) { - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || - tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS || + if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS || tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { return true; } else { @@ -1510,6 +1509,11 @@ static int32_t translateBlockDistInfoFunc(SFunctionNode* pFunc, char* pErrBuf, i return TSDB_CODE_SUCCESS; } +static int32_t translateGroupKeyFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + return TSDB_CODE_SUCCESS; +} + static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(STableBlockDistInfo); return true; @@ -2499,6 +2503,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BLOCK_DIST_INFO, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC, .translateFunc = translateBlockDistInfoFunc, + }, + { + .name = "_group_key", + .type = FUNCTION_TYPE_BLOCK_DIST_INFO, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateGroupKeyFunc, + .pPartialFunc = "_group_key", + .pMergeFunc = "_group_key" } }; // clang-format on diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5d464fdf27..d94ec3460b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1355,6 +1355,24 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; } +static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode) { + SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + + strcpy(pFunc->functionName, "_group_key"); + strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); + pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + *pNode = (SNode*)pFunc; + pCxt->errCode = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len); + } + + return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); +} + static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { SCheckExprForGroupByCxt* pCxt = (SCheckExprForGroupByCxt*)pContext; if (!nodesIsExprNode(*pNode) || isAliasColumn(*pNode)) { @@ -1371,10 +1389,10 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { if (isAggFunc(*pNode) && !isDistinctOrderBy(pCxt->pTranslateCxt)) { return DEAL_RES_IGNORE_CHILD; } - SNode* pGroupNode; + SNode* pGroupNode = NULL; FOREACH(pGroupNode, getGroupByList(pCxt->pTranslateCxt)) { if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) { - return DEAL_RES_IGNORE_CHILD; + return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode); } } if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { @@ -1441,22 +1459,28 @@ typedef struct CheckAggColCoexistCxt { bool existOtherAggFunc; } CheckAggColCoexistCxt; -static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) { +static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) { CheckAggColCoexistCxt* pCxt = (CheckAggColCoexistCxt*)pContext; - if (isSelectFunc(pNode)) { + if (isSelectFunc(*pNode)) { ++(pCxt->selectFuncNum); - } else if (isAggFunc(pNode)) { + } else if (isAggFunc(*pNode)) { pCxt->existOtherAggFunc = true; } - if (isAggFunc(pNode)) { + if (isAggFunc(*pNode)) { pCxt->existAggFunc = true; return DEAL_RES_IGNORE_CHILD; } - if (isIndefiniteRowsFunc(pNode)) { + if (isIndefiniteRowsFunc(*pNode)) { pCxt->existIndefiniteRowsFunc = true; return DEAL_RES_IGNORE_CHILD; } - if (isScanPseudoColumnFunc(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) { + SNode* pPartKey = NULL; + FOREACH(pPartKey, pCxt->pTranslateCxt->pCurrSelectStmt->pPartitionByList) { + if (nodesEqualNode(pPartKey, *pNode)) { + return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode); + } + } + if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { pCxt->existCol = true; } return DEAL_RES_CONTINUE; @@ -1472,9 +1496,9 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) .existIndefiniteRowsFunc = false, .selectFuncNum = 0, .existOtherAggFunc = false}; - nodesWalkExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt); + nodesRewriteExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt); if (!pSelect->isDistinct) { - nodesWalkExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt); + nodesRewriteExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt); } if (1 == cxt.selectFuncNum && !cxt.existOtherAggFunc) { return rewriteColsToSelectValFunc(pCxt, pSelect); diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 5c8439f846..f8b8dc58fc 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -199,6 +199,20 @@ TEST_F(ParserSelectTest, tailFuncSemanticCheck) { run("SELECT TAIL(c1, 10) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC); } +TEST_F(ParserSelectTest, partitionBy) { + useDb("root", "test"); + + run("SELECT c1, c2 FROM t1 PARTITION BY c2"); + + run("SELECT SUM(c1), c2 FROM t1 PARTITION BY c2"); +} + +TEST_F(ParserSelectTest, partitionBySemanticCheck) { + useDb("root", "test"); + + run("SELECT SUM(c1), c2, c3 FROM t1 PARTITION BY c2", TSDB_CODE_PAR_NOT_SINGLE_GROUP); +} + TEST_F(ParserSelectTest, groupBy) { useDb("root", "test"); @@ -213,6 +227,15 @@ TEST_F(ParserSelectTest, groupBy) { run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2"); } +TEST_F(ParserSelectTest, groupBySemanticCheck) { + useDb("root", "test"); + + run("SELECT COUNT(*) cnt, c1 FROM t1 WHERE c1 > 0", TSDB_CODE_PAR_NOT_SINGLE_GROUP); + run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); + run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 PARTITION BY c2 GROUP BY c1", + TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); +} + TEST_F(ParserSelectTest, orderBy) { useDb("root", "test"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index d4f18a663c..6d4aa5ae23 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -457,15 +457,15 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } } + if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) { + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs); + } + // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); } - if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) { - code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs); - } - // rewrite the expression in subsequent clauses if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d4964d3cd4..7abfdb1f50 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1075,6 +1075,16 @@ static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) { return code; } +static int32_t partTagOptRewriteGroupKey(SAggLogicNode* pAgg, SScanLogicNode* pScan) { + // SNode* pNode = NULL; + // FOREACH(pNode, pScan->pPartTags) { + // if (QUERY_NODE_COLUMN != nodeType(pNode)) { + // createColumnByRewriteExpr(pNode, ); + // } + // } + return TSDB_CODE_SUCCESS; +} + static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized); if (NULL == pNode) { @@ -1100,6 +1110,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub } } NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); + if (TSDB_CODE_SUCCESS == code) { + code = partTagOptRewriteGroupKey((SAggLogicNode*)pNode, pScan); + } } if (TSDB_CODE_SUCCESS == code) { code = partTagsOptRebuildTbanme(pScan->pPartTags); @@ -1189,7 +1202,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, {.pName = "SmaIndex", .optimizeFunc = smaOptimize}, - // {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, + {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize} }; // clang-format on diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 8e8559ba3f..b4a966f206 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -390,9 +390,6 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); } } - if (TSDB_CODE_SUCCESS == code && NULL != pSubplan) { - nodesDestroyNode((SNode*)pSplitNode); - } if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode((SNode*)pMerge); } @@ -564,6 +561,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { return ((SScanLogicNode*)pNode)->pPartTags; + } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + return ((SPartitionLogicNode*)pNode)->pPartitionKeys; } else { return NULL; } @@ -574,14 +573,15 @@ static bool stbSplIsPartTbanme(SNodeList* pPartKeys) { return false; } SNode* pPartKey = nodesListGetNode(pPartKeys, 0); - return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType; + return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) || + (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType); } -static bool stbSplIsMultiTableWinodw(SWindowLogicNode* pWindow) { +static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) { return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); } -static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { +static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) { case WINDOW_TYPE_INTERVAL: return stbSplSplitInterval(pCxt, pInfo); @@ -595,7 +595,7 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI return TSDB_CODE_PLAN_INTERNAL_ERROR; } -static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { +static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { if (pCxt->pPlanCxt->streamQuery) { SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); return TSDB_CODE_SUCCESS; @@ -616,10 +616,10 @@ static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitI } static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - if (stbSplIsMultiTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { - return stbSplSplitWindowForMultiTable(pCxt, pInfo); + if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { + return stbSplSplitWindowForPartTable(pCxt, pInfo); } else { - return stbSplSplitWindowForMergeTable(pCxt, pInfo); + return stbSplSplitWindowForCrossTable(pCxt, pInfo); } } diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 8b3d263d66..9bd95a79f9 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -57,9 +57,15 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) { TEST_F(PlanOptimizeTest, PartitionTags) { useDb("root", "test"); - run("SELECT c1 FROM st1 PARTITION BY tag1"); + run("SELECT c1, tag1 FROM st1 PARTITION BY tag1"); - run("SELECT SUM(c1) FROM st1 GROUP BY tag1"); + run("SELECT SUM(c1), tag1 FROM st1 PARTITION BY tag1"); + + run("SELECT SUM(c1), tag1 + 10 FROM st1 PARTITION BY tag1 + 10"); + + run("SELECT SUM(c1), tag1 FROM st1 GROUP BY tag1"); + + run("SELECT SUM(c1), tag1 + 10 FROM st1 GROUP BY tag1 + 10"); } TEST_F(PlanOptimizeTest, eliminateProjection) { diff --git a/source/libs/planner/test/planPartByTest.cpp b/source/libs/planner/test/planPartByTest.cpp index 9437f6ad3f..2abcb44dfd 100644 --- a/source/libs/planner/test/planPartByTest.cpp +++ b/source/libs/planner/test/planPartByTest.cpp @@ -34,6 +34,8 @@ TEST_F(PlanPartitionByTest, withAggFunc) { useDb("root", "test"); run("select count(*) from t1 partition by c1"); + + run("select count(*), c1 from t1 partition by c1"); } TEST_F(PlanPartitionByTest, withInterval) { @@ -41,8 +43,12 @@ TEST_F(PlanPartitionByTest, withInterval) { // normal/child table run("select count(*) from t1 partition by c1 interval(10s)"); + + run("select count(*), c1 from t1 partition by c1 interval(10s)"); // super table run("select count(*) from st1 partition by tag1, tag2 interval(10s)"); + + run("select count(*), tag1 from st1 partition by tag1, tag2 interval(10s)"); } TEST_F(PlanPartitionByTest, withGroupBy) {