fix:middle agg plan

This commit is contained in:
wangmm0220 2023-12-13 14:51:34 +08:00
parent 7ba6135f2c
commit 80ee5c1f13
1 changed files with 103 additions and 21 deletions

View File

@ -488,53 +488,139 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code; return code;
} }
static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) { static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
SNodeList* pFunc = pMidWindow->pFuncs; SNodeList* pFunc = pMergeWindow->pFuncs;
pMidWindow->pFuncs = NULL; pMergeWindow->pFuncs = NULL;
nodesDestroyList(pMidWindow->node.pTargets); SNodeList* pTargets = pMergeWindow->node.pTargets;
pMidWindow->node.pTargets = NULL; pMergeWindow->node.pTargets = NULL;
SNodeList* pChildren = pMidWindow->node.pChildren; SNodeList* pChildren = pMergeWindow->node.pChildren;
pMidWindow->node.pChildren = NULL; pMergeWindow->node.pChildren = NULL;
SNode* pConditions = pMergeWindow->node.pConditions;
pMergeWindow->node.pConditions = NULL;
SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow); SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
if (NULL == pPartWin) { if (NULL == pPartWin) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SWindowLogicNode* pMidWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
if (NULL == pMidWin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPartWin->node.groupAction = GROUP_ACTION_KEEP;
pMidWin->node.groupAction = GROUP_ACTION_KEEP;
pMergeWindow->node.pTargets = pTargets;
pMergeWindow->node.pConditions = pConditions;
pPartWin->node.pChildren = pChildren; pPartWin->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartWin); splSetParent((SLogicNode*)pPartWin);
SNodeList* pFuncPart = NULL;
SNodeList* pFuncMerge = NULL;
int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge);
pPartWin->pFuncs = pFuncPart;
pMergeWindow->pFuncs = pFuncMerge;
pMidWin->pFuncs = nodesCloneList(pFuncMerge);
int32_t index = 0; int32_t index = 0;
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision); code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets); code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets); nodesDestroyNode(pMidWin->pTspk);
pMidWin->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
if (NULL == pMidWin->pTspk) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if(TSDB_CODE_SUCCESS == code){
for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) {
SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i);
SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i);
strcpy(pFunc2->node.aliasName, pFunc1->node.aliasName);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode(pMidWindow->pTspk); code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); }
if (NULL == pMidWindow->pTspk) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pMidWin->pFuncs, &pMidWin->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode(pMergeWindow->pTspk);
pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pMidWin->node.pTargets, index));
if (NULL == pMergeWindow->pTspk) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
} }
nodesDestroyList(pFunc); nodesDestroyList(pFunc);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pSemiWindow = (SLogicNode*)pPartWin; *pPartWindow = (SLogicNode*)pPartWin;
*pMidWindow = (SLogicNode*)pMidWin;
} else { } else {
nodesDestroyNode((SNode*)pPartWin); nodesDestroyNode((SNode*)pPartWin);
nodesDestroyNode((SNode*)pMidWin);
} }
return code; return code;
} }
//static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) {
// SNodeList* pFunc = pMidWindow->pFuncs;
// pMidWindow->pFuncs = NULL;
// nodesDestroyList(pMidWindow->node.pTargets);
// pMidWindow->node.pTargets = NULL;
// SNodeList* pChildren = pMidWindow->node.pChildren;
// pMidWindow->node.pChildren = NULL;
//
// SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow);
// if (NULL == pPartWin) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// pPartWin->node.pChildren = pChildren;
// splSetParent((SLogicNode*)pPartWin);
//
// int32_t index = 0;
// int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs);
// if (TSDB_CODE_SUCCESS == code) {
// code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision);
// }
// if (TSDB_CODE_SUCCESS == code) {
// code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
// }
//
// if (TSDB_CODE_SUCCESS == code) {
// code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets);
// }
//
// if (TSDB_CODE_SUCCESS == code) {
// nodesDestroyNode(pMidWindow->pTspk);
// pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
// if (NULL == pMidWindow->pTspk) {
// code = TSDB_CODE_OUT_OF_MEMORY;
// }
// }
//
// nodesDestroyList(pFunc);
// if (TSDB_CODE_SUCCESS == code) {
// *pSemiWindow = (SLogicNode*)pPartWin;
// } else {
// nodesDestroyNode((SNode*)pPartWin);
// }
//
// return code;
//}
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) { static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups; return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
@ -702,17 +788,13 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
SLogicNode* pMidWindow = NULL; SLogicNode* pMidWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pMidWindow); int32_t code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID; ((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateSemiWindowNode((SWindowLogicNode*)pMidWindow, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow); code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow);
} }
} }