fix:partical func parameters for middle interval[checkStreamSTable1.sim]

This commit is contained in:
wangmm0220 2023-12-22 15:28:02 +08:00
parent a387e7e2d3
commit 902504b39b
3 changed files with 28 additions and 8 deletions

View File

@ -556,12 +556,13 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList); SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
for(int i = begin; i < end && i < taosArrayGetSize(pUpTaskList); i++){ end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end;
for(int i = begin; i < end; i++){
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask); streamTaskSetUpstreamInfo(*pDownTask, pUpTask);
} }
mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end, (*(pDownTask))->id.idStr); mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
} }
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) {

View File

@ -488,6 +488,16 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code; return code;
} }
static SNode* createColumnByFunc(const SFunctionNode* pFunc) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
strcpy(pCol->colName, pFunc->node.aliasName);
pCol->node.resType = pFunc->node.resType;
return (SNode*)pCol;
}
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) { static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
SNodeList* pFunc = pMergeWindow->pFuncs; SNodeList* pFunc = pMergeWindow->pFuncs;
pMergeWindow->pFuncs = NULL; pMergeWindow->pFuncs = NULL;
@ -521,7 +531,7 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo
int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge); int32_t code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMerge);
pPartWin->pFuncs = pFuncPart; pPartWin->pFuncs = pFuncPart;
pMergeWindow->pFuncs = pFuncMerge; pMergeWindow->pFuncs = pFuncMerge;
pMidWin->pFuncs = nodesCloneList(pFuncMerge); pMidWin->pFuncs = nodesCloneList(pFuncPart);
int32_t index = 0; int32_t index = 0;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -543,7 +553,16 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo
for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) { for (int32_t i = 0; i < LIST_LENGTH(pMidWin->pFuncs); ++i) {
SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i); SFunctionNode* pFunc1 = (SFunctionNode*)nodesListGetNode(pPartWin->pFuncs, i);
SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i); SFunctionNode* pFunc2 = (SFunctionNode*)nodesListGetNode(pMidWin->pFuncs, i);
strcpy(pFunc2->node.aliasName, pFunc1->node.aliasName); NODES_DESTORY_LIST(pFunc2->pParameterList);
SNodeList* pParameterList = NULL;
SNode* pRes = createColumnByFunc(pFunc1);
code = nodesListMakeStrictAppend(&pParameterList, pRes);
if(code == TSDB_CODE_SUCCESS){
pFunc2->pParameterList = pParameterList;
}else{
nodesDestroyNode(pRes);
}
} }
} }

View File

@ -37,11 +37,11 @@ class TDTestCase:
def case1(self): def case1(self):
tdLog.debug("========case1 start========") tdLog.debug("========case1 start========")
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 12 > /dev/null 2>&1 &") os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
time.sleep(4) time.sleep(10)
tdSql.query("use test") tdSql.query("use test")
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream useing snode and insert data ok========") tdLog.debug("========create stream and insert data ok========")
time.sleep(15) time.sleep(15)
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
@ -64,7 +64,7 @@ class TDTestCase:
def case2(self): def case2(self):
tdLog.debug("========case2 start========") tdLog.debug("========case2 start========")
os.system("taosBenchmark -d db -t 20 -v 12 -n 1000 -y > /dev/null 2>&1") os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
# create stream # create stream
tdSql.execute("use db") tdSql.execute("use db")
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)