fix:error in split plan

This commit is contained in:
wangmm0220 2023-12-12 16:37:22 +08:00
parent 623ab98392
commit 7ba6135f2c
4 changed files with 53 additions and 3 deletions

View File

@ -363,7 +363,8 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
SEpSet* pEpset, int64_t nextWindowSkey) {
// create exec stream task, since only one level, the exec task is also the source task
addNewTaskList(pStream);
void* pIter = NULL;
SSdb* pSdb = pMnode->pSdb;
while (1) {

View File

@ -422,6 +422,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pCreate->sql = NULL;
pCreate->ast = NULL;
qDebugL("ast:%s", pObj->ast);
// deserialize ast
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
goto FAIL;

File diff suppressed because one or more lines are too long

View File

@ -488,6 +488,53 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code;
}
static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) {
SNodeList* pFunc = pMidWindow->pFuncs;
pMidWindow->pFuncs = NULL;
nodesDestroyList(pMidWindow->node.pTargets);
pMidWindow->node.pTargets = NULL;
SNodeList* pChildren = pMidWindow->node.pChildren;
pMidWindow->node.pChildren = NULL;
SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow);
if (NULL == pPartWin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPartWin->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartWin);
int32_t index = 0;
int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode(pMidWindow->pTspk);
pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
if (NULL == pMidWindow->pTspk) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
nodesDestroyList(pFunc);
if (TSDB_CODE_SUCCESS == code) {
*pSemiWindow = (SLogicNode*)pPartWin;
} else {
nodesDestroyNode((SNode*)pPartWin);
}
return code;
}
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
@ -663,7 +710,7 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
}
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreatePartWindowNode((SWindowLogicNode*)pMidWindow, &pPartWindow);
code = stbSplCreateSemiWindowNode((SWindowLogicNode*)pMidWindow, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow);