From dcb8673063f17d85e437b0b1dfe1d3bf75e8d833 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Jul 2024 09:01:39 +0800 Subject: [PATCH 1/3] set key for session window state --- source/libs/executor/src/streamtimewindowoperator.c | 2 +- source/libs/stream/src/streamSessionState.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 5b9c018bba..1948fe998f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -3121,6 +3121,7 @@ _error: static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { tSimpleHashClear(pInfo->streamAggSup.pResultRows); pInfo->streamAggSup.stateStore.streamStateSessionClear(pInfo->streamAggSup.pState); + pInfo->clearState = false; } void deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, @@ -3170,7 +3171,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { // semi session operator clear disk buffer clearStreamSessionOperator(pInfo); setStreamOperatorCompleted(pOperator); - pInfo->clearState = false; return NULL; } } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 84db657392..0887f9e965 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -242,7 +242,6 @@ _end: int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; void* pBuff = NULL; @@ -250,6 +249,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v if (code != TSDB_CODE_SUCCESS) { return code; } + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); memcpy(pNewPos->pRowBuff, pBuff, *pVLen); taosMemoryFreeClear(pBuff); (*pVal) = pNewPos; From 9c7178fedd137b1c27c93be87b4be997871bae6e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Jul 2024 09:34:50 +0800 Subject: [PATCH 2/3] add ci --- .../stream/distributeMultiLevelInterval0.sim | 29 ++++++ .../script/tsim/stream/distributeSession0.sim | 98 +++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/tests/script/tsim/stream/distributeMultiLevelInterval0.sim b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim index 784ab7f4a5..e8bd246008 100644 --- a/tests/script/tsim/stream/distributeMultiLevelInterval0.sim +++ b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim @@ -21,8 +21,37 @@ sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4 from st interval(10s); +print ====check task status start + +$loop_count = 0 + +loopCheck: + sleep 1000 +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 1 select * from information_schema.ins_stream_tasks; +sql select * from information_schema.ins_stream_tasks; + +if $rows == 0 then + print rows=$rows + goto loopCheck +endi + +print 1 select * from information_schema.ins_stream_tasks where status != "ready"; +sql select * from information_schema.ins_stream_tasks where status != "ready"; + +if $rows != 0 then + print rows=$rows + goto loopCheck +endi + +print ====check task status end + sql insert into ts1 values(1648791213000,1,1,3,4.1); sql insert into ts1 values(1648791223000,2,2,3,1.1); sql insert into ts1 values(1648791233000,3,3,3,2.1); diff --git a/tests/script/tsim/stream/distributeSession0.sim b/tests/script/tsim/stream/distributeSession0.sim index 7eb8c725c8..9a112c2cdf 100644 --- a/tests/script/tsim/stream/distributeSession0.sim +++ b/tests/script/tsim/stream/distributeSession0.sim @@ -41,8 +41,37 @@ sql create table ts1 using st tags(1,1,1); sql create table ts2 using st tags(2,2,2); sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ; +print ====check task status start + +$loop_count = 0 + +loopCheck: + sleep 1000 +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 1 select * from information_schema.ins_stream_tasks; +sql select * from information_schema.ins_stream_tasks; + +if $rows == 0 then + print rows=$rows + goto loopCheck +endi + +print 1 select * from information_schema.ins_stream_tasks where status != "ready"; +sql select * from information_schema.ins_stream_tasks where status != "ready"; + +if $rows != 0 then + print rows=$rows + goto loopCheck +endi + +print ====check task status end + sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1); sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3); sql insert into ts1 values(1648791211005,1,1,1); @@ -79,4 +108,73 @@ if $data03 != 7 then return -1 endi +print ===== step3 + +sql create database test1 vgroups 4; +sql use test1; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c2 , max(b) c3 from st partition by a session(ts, 10s) ; + +print ====check task status start + +$loop_count = 0 + +loopCheck1: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 1 select * from information_schema.ins_stream_tasks; +sql select * from information_schema.ins_stream_tasks; + +if $rows == 0 then + print rows=$rows + goto loopCheck1 +endi + +print 1 select * from information_schema.ins_stream_tasks where status != "ready"; +sql select * from information_schema.ins_stream_tasks where status != "ready"; + +if $rows != 0 then + print rows=$rows + goto loopCheck1 +endi + +print ====check task status end + +sql insert into ts1 values(1648791201000,1,1,1) (1648791210000,1,1,1); +sql insert into ts1 values(1648791211000,2,1,1) (1648791212000,2,1,1); +sql insert into ts2 values(1648791211000,3,1,1) (1648791212000,3,1,1); + +sql delete from st where ts = 1648791211000; + +$loop_count = 0 +loop2: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 1000 +print 2 select * from streamtST2; +sql select * from streamtST2; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 + +if $rows != 3 then + print =====rows=$rows + goto loop2 +endi + system sh/stop_dnodes.sh From 331d692a6f70f0b2466dee9b00ea28bc9f8a5f61 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 10 Jul 2024 09:38:42 +0800 Subject: [PATCH 3/3] add ci --- tests/script/tsim/stream/distributeMultiLevelInterval0.sim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/script/tsim/stream/distributeMultiLevelInterval0.sim b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim index e8bd246008..3ac1fbdd2b 100644 --- a/tests/script/tsim/stream/distributeMultiLevelInterval0.sim +++ b/tests/script/tsim/stream/distributeMultiLevelInterval0.sim @@ -152,6 +152,8 @@ if $data31 != 2 then goto loop1 endi +sleep 5000 + sql delete from ts2 where ts = 1648791243000 ; $loop_count = 0