From 902504b39b14343b30b4f7fc83c6c9007e6d0d24 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 22 Dec 2023 15:28:02 +0800 Subject: [PATCH] fix:partical func parameters for middle interval[checkStreamSTable1.sim] --- source/dnode/mnode/impl/src/mndScheduler.c | 5 ++-- source/libs/planner/src/planSpliter.c | 23 +++++++++++++++++-- .../system-test/8-stream/stream_multi_agg.py | 8 +++---- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2cf5330f09..a4721b8a11 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -556,12 +556,13 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList); - for(int i = begin; i < end && i < taosArrayGetSize(pUpTaskList); i++){ + end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end; + for(int i = begin; i < end; i++){ SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetUpstreamInfo(*pDownTask, pUpTask); } - mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end, (*(pDownTask))->id.idStr); + mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr); } static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index dc0e7da6a1..d1f9901b30 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -488,6 +488,16 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } +static SNode* createColumnByFunc(const SFunctionNode* pFunc) { + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + strcpy(pCol->colName, pFunc->node.aliasName); + pCol->node.resType = pFunc->node.resType; + return (SNode*)pCol; +} + static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) { SNodeList* pFunc = pMergeWindow->pFuncs; pMergeWindow->pFuncs = NULL; @@ -521,7 +531,7 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge); pPartWin->pFuncs = pFuncPart; pMergeWindow->pFuncs = pFuncMerge; - pMidWin->pFuncs = nodesCloneList(pFuncMerge); + pMidWin->pFuncs = nodesCloneList(pFuncPart); int32_t index = 0; if (TSDB_CODE_SUCCESS == code) { @@ -543,7 +553,16 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) { SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i); SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i); - strcpy(pFunc2->node.aliasName, pFunc1->node.aliasName); + NODES_DESTORY_LIST(pFunc2->pParameterList); + + SNodeList* pParameterList = NULL; + SNode* pRes = createColumnByFunc(pFunc1); + code = nodesListMakeStrictAppend(&pParameterList, pRes); + if(code == TSDB_CODE_SUCCESS){ + pFunc2->pParameterList = pParameterList; + }else{ + nodesDestroyNode(pRes); + } } } diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 5c478d7e14..92ee4540e4 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -37,11 +37,11 @@ class TDTestCase: def case1(self): tdLog.debug("========case1 start========") - os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 12 > /dev/null 2>&1 &") - time.sleep(4) + os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &") + time.sleep(10) tdSql.query("use test") tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") - tdLog.debug("========create stream useing snode and insert data ok========") + tdLog.debug("========create stream and insert data ok========") time.sleep(15) tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") @@ -64,7 +64,7 @@ class TDTestCase: def case2(self): tdLog.debug("========case2 start========") - os.system("taosBenchmark -d db -t 20 -v 12 -n 1000 -y > /dev/null 2>&1") + os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1") # create stream tdSql.execute("use db") tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)