From 80ee5c1f13d37c26dee2003834ba45590bcf62a7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Dec 2023 14:51:34 +0800 Subject: [PATCH] fix:middle agg plan --- source/libs/planner/src/planSpliter.c | 124 +++++++++++++++++++++----- 1 file changed, 103 insertions(+), 21 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 4cd7749b6a..accf5bf100 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -488,53 +488,139 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) { - SNodeList* pFunc = pMidWindow->pFuncs; - pMidWindow->pFuncs = NULL; - nodesDestroyList(pMidWindow->node.pTargets); - pMidWindow->node.pTargets = NULL; - SNodeList* pChildren = pMidWindow->node.pChildren; - pMidWindow->node.pChildren = NULL; +static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) { + SNodeList* pFunc = pMergeWindow->pFuncs; + pMergeWindow->pFuncs = NULL; + SNodeList* pTargets = pMergeWindow->node.pTargets; + pMergeWindow->node.pTargets = NULL; + SNodeList* pChildren = pMergeWindow->node.pChildren; + pMergeWindow->node.pChildren = NULL; + SNode* pConditions = pMergeWindow->node.pConditions; + pMergeWindow->node.pConditions = NULL; - SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow); + SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow); if (NULL == pPartWin) { return TSDB_CODE_OUT_OF_MEMORY; } + SWindowLogicNode* pMidWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow); + if (NULL == pMidWin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pPartWin->node.groupAction = GROUP_ACTION_KEEP; + pMidWin->node.groupAction = GROUP_ACTION_KEEP; + pMergeWindow->node.pTargets = pTargets; + pMergeWindow->node.pConditions = pConditions; + pPartWin->node.pChildren = pChildren; splSetParent((SLogicNode*)pPartWin); + SNodeList* pFuncPart = NULL; + SNodeList* pFuncMerge = NULL; + int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge); + pPartWin->pFuncs = pFuncPart; + pMergeWindow->pFuncs = pFuncMerge; + pMidWin->pFuncs = nodesCloneList(pFuncMerge); + int32_t index = 0; - int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { - code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision); + code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision); } if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets); + nodesDestroyNode(pMidWin->pTspk); + pMidWin->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); + if (NULL == pMidWin->pTspk) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if(TSDB_CODE_SUCCESS == code){ + for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) { + SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i); + SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i); + strcpy(pFunc2->node.aliasName, pFunc1->node.aliasName); + } } if (TSDB_CODE_SUCCESS == code) { - nodesDestroyNode(pMidWindow->pTspk); - pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); - if (NULL == pMidWindow->pTspk) { + code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pMidWin->pFuncs, &pMidWin->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode(pMergeWindow->pTspk); + pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pMidWin->node.pTargets, index)); + if (NULL == pMergeWindow->pTspk) { code = TSDB_CODE_OUT_OF_MEMORY; } } nodesDestroyList(pFunc); if (TSDB_CODE_SUCCESS == code) { - *pSemiWindow = (SLogicNode*)pPartWin; + *pPartWindow = (SLogicNode*)pPartWin; + *pMidWindow = (SLogicNode*)pMidWin; } else { nodesDestroyNode((SNode*)pPartWin); + nodesDestroyNode((SNode*)pMidWin); } return code; } +//static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) { +// SNodeList* pFunc = pMidWindow->pFuncs; +// pMidWindow->pFuncs = NULL; +// nodesDestroyList(pMidWindow->node.pTargets); +// pMidWindow->node.pTargets = NULL; +// SNodeList* pChildren = pMidWindow->node.pChildren; +// pMidWindow->node.pChildren = NULL; +// +// SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow); +// if (NULL == pPartWin) { +// return TSDB_CODE_OUT_OF_MEMORY; +// } +// +// pPartWin->node.pChildren = pChildren; +// splSetParent((SLogicNode*)pPartWin); +// +// int32_t index = 0; +// int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs); +// if (TSDB_CODE_SUCCESS == code) { +// code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision); +// } +// if (TSDB_CODE_SUCCESS == code) { +// code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets); +// } +// +// if (TSDB_CODE_SUCCESS == code) { +// code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets); +// } +// +// if (TSDB_CODE_SUCCESS == code) { +// nodesDestroyNode(pMidWindow->pTspk); +// pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); +// if (NULL == pMidWindow->pTspk) { +// code = TSDB_CODE_OUT_OF_MEMORY; +// } +// } +// +// nodesDestroyList(pFunc); +// if (TSDB_CODE_SUCCESS == code) { +// *pSemiWindow = (SLogicNode*)pPartWin; +// } else { +// nodesDestroyNode((SNode*)pPartWin); +// } +// +// return code; +//} + static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups; @@ -702,17 +788,13 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; SLogicNode* pMidWindow = NULL; - int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pMidWindow); + int32_t code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow); if (TSDB_CODE_SUCCESS == code) { ((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL; + ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow); - } - - if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateSemiWindowNode((SWindowLogicNode*)pMidWindow, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow); } }