From cf35174d539d5b0b59ff6776af9121ce5c5d95c7 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 30 Jun 2022 10:23:35 +0800 Subject: [PATCH 1/2] feat: the last_row function supports all scenes --- include/libs/function/functionMgt.h | 1 + include/libs/nodes/plannodes.h | 1 + source/libs/function/src/builtins.c | 13 +++++++ source/libs/planner/src/planLogicCreater.c | 8 ++-- source/libs/planner/src/planOptimizer.c | 43 +++++++++++++++++++++- source/libs/planner/src/planSpliter.c | 2 + source/libs/planner/test/planBasicTest.cpp | 2 + 7 files changed, 66 insertions(+), 4 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 1ed78750d1..7697dd5047 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -125,6 +125,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function FUNCTION_TYPE_TO_COLUMN, FUNCTION_TYPE_GROUP_KEY, + FUNCTION_TYPE_CACHE_LAST_ROW, // distributed splitting functions FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 4671c8b81e..f8d79a863f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -91,6 +91,7 @@ typedef struct SAggLogicNode { SLogicNode node; SNodeList* pGroupKeys; SNodeList* pAggFuncs; + bool hasLastRow; } SAggLogicNode; typedef struct SProjectLogicNode { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index fbebb12cc3..85a2ab22b6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1882,6 +1882,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLast, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = firstLastFinalize, + .pPartialFunc = "_last_partial", + .pMergeFunc = "_last_merge", + .combineFunc = lastCombine, + }, + { + .name = "_cache_last_row", + .type = FUNCTION_TYPE_CACHE_LAST_ROW, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateLastRow, .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index ef8b109b62..05256e2696 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -160,9 +160,9 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNod return SCAN_TYPE_STREAM; } - if (pSelect->hasLastRowFunc) { - return SCAN_TYPE_LAST_ROW; - } + // if (pSelect->hasLastRowFunc) { + // return SCAN_TYPE_LAST_ROW; + // } if (NULL == pScanCols) { // select count(*) from t @@ -474,6 +474,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, return TSDB_CODE_OUT_OF_MEMORY; } + pAgg->hasLastRow = pSelect->hasLastRowFunc; + int32_t code = TSDB_CODE_SUCCESS; // set grouyp keys, agg funcs and having conditions diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 42f7f744c5..b733d75fb2 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1616,6 +1616,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); } +static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) || + NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) || + NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) { + return false; + } + + SNode* pFunc = NULL; + FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { + if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) { + return false; + } + } + + return true; +} + +static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized); + + if (NULL == pAgg) { + return TSDB_CODE_SUCCESS; + } + + SNode* pNode = NULL; + FOREACH(pNode, pAgg->pAggFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row"); + pFunc->functionName[len] = '\0'; + fmGetFuncInfo(pFunc, NULL, 0); + } + pAgg->hasLastRow = false; + + ((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW; + + pCxt->optimized = true; + return TSDB_CODE_SUCCESS; +} + // merge projects static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { @@ -1704,7 +1744,8 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, - {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize} + {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, + {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize} }; // clang-format on diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 9d23df5bda..2bc226804f 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -197,6 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); case QUERY_NODE_LOGIC_PLAN_JOIN: return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); + // case QUERY_NODE_LOGIC_PLAN_PARTITION: + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index ff725c444e..b886fca2af 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -99,6 +99,8 @@ TEST_F(PlanBasicTest, lastRowFunc) { run("SELECT LAST_ROW(c1, c2) FROM t1"); run("SELECT LAST_ROW(c1) FROM st1"); + + run("SELECT LAST_ROW(c1), SUM(c3) FROM t1"); } TEST_F(PlanBasicTest, sampleFunc) { From 2ad65998107b88498917bcb902500e9912c4efef Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 30 Jun 2022 11:04:24 +0800 Subject: [PATCH 2/2] feat: partition by distributed split --- source/libs/planner/src/planLogicCreater.c | 10 +++------- source/libs/planner/src/planPhysiCreater.c | 2 +- source/libs/planner/src/planSpliter.c | 19 ++++++++++++++++--- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 05256e2696..d170482c48 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot); } -static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols, - SNodeList* pScanCols, int8_t tableType) { +static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols, + int8_t tableType) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { return SCAN_TYPE_STREAM; } - // if (pSelect->hasLastRowFunc) { - // return SCAN_TYPE_LAST_ROW; - // } - if (NULL == pScanCols) { // select count(*) from t return NULL == pScanPseudoCols @@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); } - pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); + pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); if (TSDB_CODE_SUCCESS == code) { code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 46747af3a9..0f19db26a5 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) { code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, &pMerge->pMergeKeys); } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 2bc226804f..60c04c2c30 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -197,8 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); case QUERY_NODE_LOGIC_PLAN_JOIN: return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); - // case QUERY_NODE_LOGIC_PLAN_PARTITION: - // return stbSplHasMultiTbScan(streamQuery, pNode); + case QUERY_NODE_LOGIC_PLAN_PARTITION: + return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: @@ -433,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo SNodeList* pMergeKeys = NULL; code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(pMergeKeys); @@ -889,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); + } + ++(pCxt->groupId); + return code; +} + static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { if (pCxt->pPlanCxt->rSmaQuery) { return TSDB_CODE_SUCCESS; @@ -907,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { case QUERY_NODE_LOGIC_PLAN_JOIN: code = stbSplSplitJoinNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = stbSplSplitPartitionNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_AGG: code = stbSplSplitAggNode(pCxt, &info); break;