agg tsma
This commit is contained in:
parent
54100b1c0d
commit
403f743301
|
@ -6073,7 +6073,7 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
|
|||
}
|
||||
|
||||
// the main tsma
|
||||
if (endOfSkeyFirstWin < startOfEkeyFirstWin) {
|
||||
if (endOfSkeyFirstWin <= startOfEkeyFirstWin) {
|
||||
scanRange.ekey = TMIN(pScanRange->ekey, startOfEkeyFirstWin - 1);
|
||||
if (!isSkeyAlignedWithTsma) {
|
||||
scanRange.skey = endOfSkeyFirstWin;
|
||||
|
|
|
@ -1119,20 +1119,41 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p
|
|||
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartAgg = NULL;
|
||||
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
|
||||
|
||||
bool hasExchange = false;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
|
||||
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
|
||||
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
|
||||
else
|
||||
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO test slimit
|
||||
else {
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
|
||||
hasExchange = true;
|
||||
}
|
||||
} else {
|
||||
nodesDestroyNode((SNode*)pPartAgg);
|
||||
}
|
||||
|
||||
SLogicSubplan* pScanSubplan = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
(SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
|
||||
pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
|
||||
if (!pScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pInfo->pSubplan->pChildren) {
|
||||
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
|
||||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->id.queryId = pCxt->queryId;
|
||||
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
|
||||
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pSubplan->pNode);
|
||||
}
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
|
||||
}
|
||||
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
++(pCxt->groupId);
|
||||
return code;
|
||||
|
@ -1732,6 +1753,7 @@ static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan,
|
|||
SUnionDistinctSplitInfo* pInfo) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
|
||||
pInfo->pAgg = (SAggLogicNode*)pNode;
|
||||
if (!pInfo->pAgg->pGroupKeys) return false;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -279,7 +279,7 @@ class TDTestCase:
|
|||
self.init_data()
|
||||
self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m')
|
||||
self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m')
|
||||
#time.sleep(9999999)
|
||||
time.sleep(9999999)
|
||||
self.test_query_with_tsma_interval()
|
||||
self.test_query_with_tsma_agg()
|
||||
|
||||
|
|
Loading…
Reference in New Issue