diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 28e07fe98d..f10c42310d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3646,32 +3646,73 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { } static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { - int32_t code = TSDB_CODE_SUCCESS; - SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized); + SNode* node; + int32_t code = TSDB_CODE_SUCCESS; + SPartitionLogicNode* pNode = + (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized); if (NULL == pNode) return TSDB_CODE_SUCCESS; - SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode); - if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) { + + + if (pRootNode->pHint && getSortForGroupOptHint(pRootNode->pHint)) { + // replace with sort node + SSortLogicNode* pSort = partColOptCreateSort(pNode); + if (!pSort) { + // if sort create failed, we eat the error, skip the optimization + code = TSDB_CODE_SUCCESS; + } else { + TSWAP(pSort->node.pChildren, pNode->node.pChildren); + TSWAP(pSort->node.pTargets, pNode->node.pTargets); + optResetParent((SLogicNode*)pSort); + pSort->calcGroupId = true; + code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); + if (code == TSDB_CODE_SUCCESS) { + pCxt->optimized = true; + } else { + nodesDestroyNode((SNode*)pSort); + } + } + return code; + } else if (pNode->node.pParent && nodeType(pNode->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) { + // Check if we can delete partition node + SAggLogicNode* pAgg = (SAggLogicNode*)pNode->node.pParent; + FOREACH(node, pNode->pPartitionKeys) { + SGroupingSetNode* pgsNode = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (!pgsNode) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + pgsNode->groupingSetType = GP_TYPE_NORMAL; + pgsNode->pParameterList = nodesMakeList(); + if (!pgsNode->pParameterList) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code == TSDB_CODE_SUCCESS) { + code = nodesListAppend(pgsNode->pParameterList, nodesCloneNode(node)); + } + if (code == TSDB_CODE_SUCCESS) { + // Now we are using hash agg + code = nodesListMakeAppend(&pAgg->pGroupKeys, (SNode*)pgsNode); + } + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)pgsNode); + break; + } + } + + if (code == TSDB_CODE_SUCCESS) { + code = + replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)nodesListGetNode(pNode->node.pChildren, 0)); + NODES_CLEAR_LIST(pNode->node.pChildren); + } + if (code == TSDB_CODE_SUCCESS) { + // For hash agg, nonblocking mode is meaningless, slimit is useless, so we reset it + pAgg->node.forceCreateNonBlockingOptr = false; + nodesDestroyNode(pAgg->node.pSlimit); + pAgg->node.pSlimit = NULL; + nodesDestroyNode((SNode*)pNode); + pCxt->optimized = true; + } return code; } - // replace with sort node - SSortLogicNode* pSort = partColOptCreateSort(pNode); - if (!pSort) { - // if sort create failed, we eat the error, skip the optimization - code = TSDB_CODE_SUCCESS; - } else { - TSWAP(pSort->node.pChildren, pNode->node.pChildren); - TSWAP(pSort->node.pTargets, pNode->node.pTargets); - optResetParent((SLogicNode*)pSort); - pSort->calcGroupId = true; - code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); - if (code == TSDB_CODE_SUCCESS) { - pCxt->optimized = true; - } else { - nodesDestroyNode((SNode*)pSort); - } - } return code; } diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py index 1a394649d6..230b7582d9 100644 --- a/tests/system-test/2-query/partition_by_col.py +++ b/tests/system-test/2-query/partition_by_col.py @@ -141,12 +141,12 @@ class TDTestCase: def test_sort_for_partition_hint(self): sql = 'select count(*), c1 from meters partition by c1' sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1' - self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + #self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) sql = 'select count(*), c1, tbname from meters partition by tbname, c1' sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1' - self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + #self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)' @@ -156,7 +156,7 @@ class TDTestCase: sql = 'select count(*), c1, t1 from meters partition by t1, c1' sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1' - self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + #self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)' @@ -208,7 +208,7 @@ class TDTestCase: sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list) sql = self.add_order_by(sql, order_by, select_list) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) - self.check_explain_res_has_row("Partition", self.explain_sql(sql)) + #self.check_explain_res_has_row("Partition", self.explain_sql(sql)) self.query_and_compare_res(sql, sql_hint, compare_what=compare_what) def test_sort_for_partition_res(self): diff --git a/tests/system-test/2-query/partition_by_col_agg.py b/tests/system-test/2-query/partition_by_col_agg.py index c522eb1e2b..011415867b 100644 --- a/tests/system-test/2-query/partition_by_col_agg.py +++ b/tests/system-test/2-query/partition_by_col_agg.py @@ -137,6 +137,10 @@ class TDTestCase: if not plan_found: tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows))) + def check_explain_res_no_row(self, plan_str_not_expect: str, res): + for row in res: + if str(row).find(plan_str_not_expect) >= 0: + tdLog.exit('plan: [%s] found in: [%s]' % (plan_str_not_expect, str(row))) def test_sort_for_partition_hint(self): pass @@ -146,6 +150,9 @@ class TDTestCase: def add_hint(self, sql: str) -> str: return "select /*+ sort_for_group() */ %s" % sql[6:] + + def add_remove_partition_hint(self, sql: str) -> str: + return "select /*+ remove_partition() */ %s" % sql[6:] def query_with_time(self, sql): start = datetime.now() @@ -200,8 +207,8 @@ class TDTestCase: def check_explain(self, sql): sql_hint = self.add_hint(sql) explain_res = self.explain_sql(sql) - self.check_explain_res_has_row('SortMerge', explain_res) - self.check_explain_res_has_row("blocking=0", explain_res) + #self.check_explain_res_has_row('SortMerge', explain_res) + #self.check_explain_res_has_row("blocking=0", explain_res) explain_res = self.explain_sql(sql_hint) self.check_explain_res_has_row('SortMerge', explain_res) self.check_explain_res_has_row('blocking=0', explain_res) @@ -233,14 +240,27 @@ class TDTestCase: sql_hint = self.add_hint(sql) sql = self.add_order_by(sql, ele[3]) sql_no_slimit = sql_template % (ele[0], ele[1], '') + sql_no_slimit = self.add_order_by(sql_no_slimit, ele[3]) self.query_and_compare_first_rows(sql_hint, sql_no_slimit) + def test_remove_partition(self): + sql = 'select c1, count(*) from meters partition by c1 slimit 10' + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Partition", explain_res) + self.check_explain_res_has_row("blocking=1", explain_res) + + sql = 'select c1, count(*) from meters partition by c1,c2 slimit 10' + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Partition", explain_res) + self.check_explain_res_has_row("blocking=1", explain_res) + def run(self): self.prepareTestEnv() #time.sleep(99999999) self.test_pipelined_agg_plan_with_slimit() self.test_pipelined_agg_data_with_slimit() + self.test_remove_partition() def stop(self): tdSql.close()