From 2ac374b2e01b300ef1b5a08106bf365ff31e14f3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 2 Feb 2024 14:02:13 +0800 Subject: [PATCH 1/3] max delay --- .../executor/src/streameventwindowoperator.c | 1 + .../executor/src/streamtimewindowoperator.c | 2 + tests/script/tsim/stream/windowClose.sim | 50 +++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 8aca76597b..d1138afb65 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -349,6 +349,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 02f8b90864..14fabaecea 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2083,6 +2083,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); @@ -3425,6 +3426,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + curWin.winInfo.pStatePos->beUpdated = true; SSessionKey key = {0}; getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 67678963ea..e4b8fc7d84 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -290,6 +290,56 @@ if $data32 != $now32 then return -1 endi +print step 1 max delay 2s +sql create database test3 vgroups 4; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT From 54f216b6c635453261d8d26c666c3bb34d6cb113 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 2 Feb 2024 15:19:58 +0800 Subject: [PATCH 2/3] add ci --- .../executor/src/streamtimewindowoperator.c | 4 + tests/script/tsim/stream/windowClose.sim | 161 +++++++++++++++++- 2 files changed, 160 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 14fabaecea..f26ff7156b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2287,6 +2287,10 @@ int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated) { int32_t iter = 0; while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { SResultWindowInfo* pWinInfo = pIte; + if (!pWinInfo->pStatePos->beUpdated) { + continue; + } + pWinInfo->pStatePos->beUpdated = false; saveResult(*pWinInfo, pStUpdated); } return TSDB_CODE_SUCCESS; diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index e4b8fc7d84..775ff81f51 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -290,17 +290,17 @@ if $data32 != $now32 then return -1 endi -print step 1 max delay 2s -sql create database test3 vgroups 4; -sql use test3; +print step 2 max delay 2s +sql create database test15 vgroups 4; +sql use test15; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); +sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); sleep 1000 sql insert into t1 values(1648791213000,1,2,3,1.0); -sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233001,2,2,3,1.1); $loop_count = 0 @@ -330,6 +330,157 @@ sleep 3000 sql select * from streamt13; +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step 3 max delay 2s +sql create database test16 vgroups 4; +sql use test16; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791233001,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step 4 max delay 2s +sql create database test17 vgroups 4; +sql use test17; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 0 end with a = 9; + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,9,2,3,1.0); + +sql insert into t1 values(1648791233001,1,2,3,1.1); +sql insert into t1 values(1648791233009,9,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + if $data02 != $now02 then print ======data02=$data02 return -1 From c56cebdd49f9c89cd0363e2fbc5d344d6471f9e8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 5 Feb 2024 09:34:18 +0800 Subject: [PATCH 3/3] opt ci --- tests/script/tsim/stream/windowClose.sim | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 775ff81f51..ce5c57572e 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -304,7 +304,7 @@ sql insert into t1 values(1648791233001,2,2,3,1.1); $loop_count = 0 -loop2: +loop4: sleep 1000 @@ -317,7 +317,7 @@ sql select * from streamt13; if $rows != 2 then print ======rows=$rows - goto loop2 + goto loop4 endi $now02 = $data02 @@ -356,12 +356,14 @@ if $data12 != $now12 then return -1 endi +print session max delay over + print step 3 max delay 2s sql create database test16 vgroups 4; sql use test16; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a, 10s); +sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a); sleep 1000 @@ -370,7 +372,7 @@ sql insert into t1 values(1648791233001,2,2,3,1.1); $loop_count = 0 -loop2: +loop5: sleep 1000 @@ -383,7 +385,7 @@ sql select * from streamt13; if $rows != 2 then print ======rows=$rows - goto loop2 + goto loop5 endi $now02 = $data02 @@ -422,12 +424,14 @@ if $data12 != $now12 then return -1 endi +print state max delay over + print step 4 max delay 2s sql create database test17 vgroups 4; sql use test17; sql create table t1(ts timestamp, a int, b int , c int, d double); -sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 0 end with a = 9; +sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9; sleep 1000 @@ -439,7 +443,7 @@ sql insert into t1 values(1648791233009,9,2,3,1.1); $loop_count = 0 -loop2: +loop6: sleep 1000 @@ -452,7 +456,7 @@ sql select * from streamt13; if $rows != 2 then print ======rows=$rows - goto loop2 + goto loop6 endi $now02 = $data02 @@ -491,6 +495,8 @@ if $data12 != $now12 then return -1 endi +print event max delay over + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT