From d0e4a5d0f773bc2f8d39ef875d410776a916c54f Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 25 Oct 2023 20:29:25 +0800 Subject: [PATCH 1/3] opt max delay --- include/libs/executor/storageapi.h | 1 + .../executor/src/streamtimewindowoperator.c | 8 ++- source/libs/stream/src/tstreamFileState.c | 1 + tests/script/tsim/stream/windowClose.sim | 52 +++++++++++++++++++ 4 files changed, 61 insertions(+), 1 deletion(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index f5392f02b1..85e990c4e1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -119,6 +119,7 @@ typedef struct SRowBuffPos { bool beFlushed; bool beUsed; bool needFree; + bool beUpdated; } SRowBuffPos; // tq diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8bfa8e1a5d..50093db91d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -279,7 +279,12 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) { SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); uint64_t groupId = pKey->groupId; TSKEY ts = pKey->ts; - int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins); + SRowBuffPos* pPos = *(SRowBuffPos**)pIte; + if (!pPos->beUpdated) { + continue; + } + pPos->beUpdated = false; + int32_t code = saveWinResultInfo(ts, groupId, pPos, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -863,6 +868,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + pResPos->beUpdated = true; tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 584e81fafc..2d7c5cd970 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -407,6 +407,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; + newPos->beUpdated = true; return newPos; } diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 5bd17e076e..12449526c2 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -134,6 +134,58 @@ if $rows != 2 then goto loop1 endi +print max delay 2s +sql create database test3 vgroups 4; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 + +$now12 = $data12 + +$loop_count = 0 + +print max delay 2s......... sleep 5s +sleep 5000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT From 4c45f5a01c9d22ac0b6326ee8ef13bc69f9461ba Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 26 Oct 2023 14:38:05 +0800 Subject: [PATCH 2/3] add ci --- tests/script/tsim/stream/windowClose.sim | 82 ++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 12449526c2..7539164f0c 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -134,7 +134,7 @@ if $rows != 2 then goto loop1 endi -print max delay 2s +print step 1 max delay 2s sql create database test3 vgroups 4; sql use test3; sql create table t1(ts timestamp, a int, b int , c int, d double); @@ -165,12 +165,10 @@ if $rows != 2 then endi $now02 = $data02 - $now12 = $data12 -$loop_count = 0 -print max delay 2s......... sleep 5s +print step1 max delay 2s......... sleep 5s sleep 5000 sql select * from streamt13; @@ -186,6 +184,82 @@ if $data12 != $now12 then return -1 endi +print step 2 max delay 2s + +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream stream14 trigger max_delay 2s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223000,2,2,3,1.1); + +sql insert into t2 values(1648791213000,3,2,3,1.0); +sql insert into t2 values(1648791223000,4,2,3,1.1); + +$loop_count = 0 + +loop3: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 4 then + print ======rows=$rows + goto loop3 +endi + +$now02 = $data02 +$now12 = $data12 +$now22 = $data22 +$now32 = $data32 + +print step2 max delay 2s......... sleep 5s +sleep 5000 + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +if $data22 != $now22 then + print ======data22=$data22 + return -1 +endi + +if $data32 != $now32 then + print ======data32=$data32 + return -1 +endi + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT From e151b3df89889c5e6b2addfc4f2592115ebd0832 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 26 Oct 2023 15:45:40 +0800 Subject: [PATCH 3/3] opt ci --- .../script/tsim/stream/fillIntervalRange.sim | 2 +- tests/script/tsim/stream/windowClose.sim | 38 +++++++++++++++++-- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/tests/script/tsim/stream/fillIntervalRange.sim b/tests/script/tsim/stream/fillIntervalRange.sim index 99c1fe8ad4..e5316e6a1e 100644 --- a/tests/script/tsim/stream/fillIntervalRange.sim +++ b/tests/script/tsim/stream/fillIntervalRange.sim @@ -64,7 +64,7 @@ endi sql select count(*) from streamt; if $data00 != 9098 then - print =====rows=$rows + print =====data00=$data00 goto loop1 endi diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 7539164f0c..67678963ea 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -168,8 +168,8 @@ $now02 = $data02 $now12 = $data12 -print step1 max delay 2s......... sleep 5s -sleep 5000 +print step1 max delay 2s......... sleep 3s +sleep 3000 sql select * from streamt13; @@ -230,8 +230,38 @@ $now12 = $data12 $now22 = $data22 $now32 = $data32 -print step2 max delay 2s......... sleep 5s -sleep 5000 +print step2 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +if $data22 != $now22 then + print ======data22=$data22 + return -1 +endi + +if $data32 != $now32 then + print ======data32=$data32 + return -1 +endi + +print step2 max delay 2s......... sleep 3s +sleep 3000 sql select * from streamt14 order by 2; print $data00 $data01 $data02