From 403f743301b393ad47f78a93365b31e8fb4cf098 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 18 Jan 2024 12:34:49 +0800 Subject: [PATCH] agg tsma --- source/libs/planner/src/planOptimizer.c | 2 +- source/libs/planner/src/planSpliter.c | 32 +++++++++++++++++++++---- tests/system-test/2-query/tsma.py | 2 +- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bfe1039cc5..bcb51aef59 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6073,7 +6073,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc } // the main tsma - if (endOfSkeyFirstWin < startOfEkeyFirstWin) { + if (endOfSkeyFirstWin <= startOfEkeyFirstWin) { scanRange.ekey = TMIN(pScanRange->ekey, startOfEkeyFirstWin - 1); if (!isSkeyAlignedWithTsma) { scanRange.skey = endOfSkeyFirstWin; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index c6f375bcee..841d66657b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1119,20 +1119,41 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartAgg = NULL; int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); - + bool hasExchange = false; if (TSDB_CODE_SUCCESS == code) { // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg if (pInfo->pSplitNode->forceCreateNonBlockingOptr) - code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); - else + code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO test slimit + else { code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + hasExchange = true; + } } else { nodesDestroyNode((SNode*)pPartAgg); } + + SLogicSubplan* pScanSubplan = NULL; if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT); + if (!pScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY; } + + if (code == TSDB_CODE_SUCCESS) { + SNode* pNode; + FOREACH(pNode, pInfo->pSubplan->pChildren) { + SLogicSubplan* pSubplan = (SLogicSubplan*)pNode; + pSubplan->id.groupId = pCxt->groupId; + pSubplan->id.queryId = pCxt->queryId; + pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT; + splSetSubplanVgroups(pSubplan, pSubplan->pNode); + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pSubplan->pNode); + } + } + + if (code == TSDB_CODE_SUCCESS) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; ++(pCxt->groupId); return code; @@ -1732,6 +1753,7 @@ static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) { if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { pInfo->pAgg = (SAggLogicNode*)pNode; + if (!pInfo->pAgg->pGroupKeys) return false; pInfo->pSubplan = pSubplan; return true; } diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 8c1cf61c9d..53befb0697 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -279,7 +279,7 @@ class TDTestCase: self.init_data() self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m') self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m') - #time.sleep(9999999) + time.sleep(9999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg()