Merge pull request #23185 from taosdata/feat/TD-26639

feat: remove partition node for agg
This commit is contained in:
dapan1121 2023-10-11 08:36:48 +08:00 committed by GitHub
commit c5ec45ddb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 27 deletions

View File

@ -3646,32 +3646,73 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
}
static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
int32_t code = TSDB_CODE_SUCCESS;
SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
SNode* node;
int32_t code = TSDB_CODE_SUCCESS;
SPartitionLogicNode* pNode =
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
if (NULL == pNode) return TSDB_CODE_SUCCESS;
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode);
if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) {
if (pRootNode->pHint && getSortForGroupOptHint(pRootNode->pHint)) {
// replace with sort node
SSortLogicNode* pSort = partColOptCreateSort(pNode);
if (!pSort) {
// if sort create failed, we eat the error, skip the optimization
code = TSDB_CODE_SUCCESS;
} else {
TSWAP(pSort->node.pChildren, pNode->node.pChildren);
TSWAP(pSort->node.pTargets, pNode->node.pTargets);
optResetParent((SLogicNode*)pSort);
pSort->calcGroupId = true;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
if (code == TSDB_CODE_SUCCESS) {
pCxt->optimized = true;
} else {
nodesDestroyNode((SNode*)pSort);
}
}
return code;
} else if (pNode->node.pParent && nodeType(pNode->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
// Check if we can delete partition node
SAggLogicNode* pAgg = (SAggLogicNode*)pNode->node.pParent;
FOREACH(node, pNode->pPartitionKeys) {
SGroupingSetNode* pgsNode = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
if (!pgsNode) code = TSDB_CODE_OUT_OF_MEMORY;
if (code == TSDB_CODE_SUCCESS) {
pgsNode->groupingSetType = GP_TYPE_NORMAL;
pgsNode->pParameterList = nodesMakeList();
if (!pgsNode->pParameterList) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListAppend(pgsNode->pParameterList, nodesCloneNode(node));
}
if (code == TSDB_CODE_SUCCESS) {
// Now we are using hash agg
code = nodesListMakeAppend(&pAgg->pGroupKeys, (SNode*)pgsNode);
}
if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pgsNode);
break;
}
}
if (code == TSDB_CODE_SUCCESS) {
code =
replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)nodesListGetNode(pNode->node.pChildren, 0));
NODES_CLEAR_LIST(pNode->node.pChildren);
}
if (code == TSDB_CODE_SUCCESS) {
// For hash agg, nonblocking mode is meaningless, slimit is useless, so we reset it
pAgg->node.forceCreateNonBlockingOptr = false;
nodesDestroyNode(pAgg->node.pSlimit);
pAgg->node.pSlimit = NULL;
nodesDestroyNode((SNode*)pNode);
pCxt->optimized = true;
}
return code;
}
// replace with sort node
SSortLogicNode* pSort = partColOptCreateSort(pNode);
if (!pSort) {
// if sort create failed, we eat the error, skip the optimization
code = TSDB_CODE_SUCCESS;
} else {
TSWAP(pSort->node.pChildren, pNode->node.pChildren);
TSWAP(pSort->node.pTargets, pNode->node.pTargets);
optResetParent((SLogicNode*)pSort);
pSort->calcGroupId = true;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
if (code == TSDB_CODE_SUCCESS) {
pCxt->optimized = true;
} else {
nodesDestroyNode((SNode*)pSort);
}
}
return code;
}

View File

@ -141,12 +141,12 @@ class TDTestCase:
def test_sort_for_partition_hint(self):
sql = 'select count(*), c1 from meters partition by c1'
sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, tbname from meters partition by tbname, c1'
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
@ -156,7 +156,7 @@ class TDTestCase:
sql = 'select count(*), c1, t1 from meters partition by t1, c1'
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1'
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
@ -208,7 +208,7 @@ class TDTestCase:
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
sql = self.add_order_by(sql, order_by, select_list)
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
self.check_explain_res_has_row("Partition", self.explain_sql(sql))
#self.check_explain_res_has_row("Partition", self.explain_sql(sql))
self.query_and_compare_res(sql, sql_hint, compare_what=compare_what)
def test_sort_for_partition_res(self):

View File

@ -137,6 +137,10 @@ class TDTestCase:
if not plan_found:
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows)))
def check_explain_res_no_row(self, plan_str_not_expect: str, res):
for row in res:
if str(row).find(plan_str_not_expect) >= 0:
tdLog.exit('plan: [%s] found in: [%s]' % (plan_str_not_expect, str(row)))
def test_sort_for_partition_hint(self):
pass
@ -146,6 +150,9 @@ class TDTestCase:
def add_hint(self, sql: str) -> str:
return "select /*+ sort_for_group() */ %s" % sql[6:]
def add_remove_partition_hint(self, sql: str) -> str:
return "select /*+ remove_partition() */ %s" % sql[6:]
def query_with_time(self, sql):
start = datetime.now()
@ -200,8 +207,8 @@ class TDTestCase:
def check_explain(self, sql):
sql_hint = self.add_hint(sql)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row('SortMerge', explain_res)
self.check_explain_res_has_row("blocking=0", explain_res)
#self.check_explain_res_has_row('SortMerge', explain_res)
#self.check_explain_res_has_row("blocking=0", explain_res)
explain_res = self.explain_sql(sql_hint)
self.check_explain_res_has_row('SortMerge', explain_res)
self.check_explain_res_has_row('blocking=0', explain_res)
@ -233,14 +240,27 @@ class TDTestCase:
sql_hint = self.add_hint(sql)
sql = self.add_order_by(sql, ele[3])
sql_no_slimit = sql_template % (ele[0], ele[1], '')
sql_no_slimit = self.add_order_by(sql_no_slimit, ele[3])
self.query_and_compare_first_rows(sql_hint, sql_no_slimit)
def test_remove_partition(self):
sql = 'select c1, count(*) from meters partition by c1 slimit 10'
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Partition", explain_res)
self.check_explain_res_has_row("blocking=1", explain_res)
sql = 'select c1, count(*) from meters partition by c1,c2 slimit 10'
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Partition", explain_res)
self.check_explain_res_has_row("blocking=1", explain_res)
def run(self):
self.prepareTestEnv()
#time.sleep(99999999)
self.test_pipelined_agg_plan_with_slimit()
self.test_pipelined_agg_data_with_slimit()
self.test_remove_partition()
def stop(self):
tdSql.close()