diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index f5392f02b1..85e990c4e1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -119,6 +119,7 @@ typedef struct SRowBuffPos { bool beFlushed; bool beUsed; bool needFree; + bool beUpdated; } SRowBuffPos; // tq diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0d82fcc9c3..e58e6a8055 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -279,7 +279,12 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) { SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); uint64_t groupId = pKey->groupId; TSKEY ts = pKey->ts; - int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins); + SRowBuffPos* pPos = *(SRowBuffPos**)pIte; + if (!pPos->beUpdated) { + continue; + } + pPos->beUpdated = false; + int32_t code = saveWinResultInfo(ts, groupId, pPos, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -866,6 +871,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + pResPos->beUpdated = true; tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index fc47498a3c..8642a990a6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -428,6 +428,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; + newPos->beUpdated = true; return newPos; } diff --git a/tests/script/tsim/stream/fillIntervalRange.sim b/tests/script/tsim/stream/fillIntervalRange.sim index 99c1fe8ad4..e5316e6a1e 100644 --- a/tests/script/tsim/stream/fillIntervalRange.sim +++ b/tests/script/tsim/stream/fillIntervalRange.sim @@ -64,7 +64,7 @@ endi sql select count(*) from streamt; if $data00 != 9098 then - print =====rows=$rows + print =====data00=$data00 goto loop1 endi diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 5bd17e076e..67678963ea 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -134,6 +134,162 @@ if $rows != 2 then goto loop1 endi +print step 1 max delay 2s +sql create database test3 vgroups 4; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 +$now12 = $data12 + + +print step1 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +print step 2 max delay 2s + +sql create database test4 vgroups 4; +sql use test4; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream stream14 trigger max_delay 2s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223000,2,2,3,1.1); + +sql insert into t2 values(1648791213000,3,2,3,1.0); +sql insert into t2 values(1648791223000,4,2,3,1.1); + +$loop_count = 0 + +loop3: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 4 then + print ======rows=$rows + goto loop3 +endi + +$now02 = $data02 +$now12 = $data12 +$now22 = $data22 +$now32 = $data32 + +print step2 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +if $data22 != $now22 then + print ======data22=$data22 + return -1 +endi + +if $data32 != $now32 then + print ======data32=$data32 + return -1 +endi + +print step2 max delay 2s......... sleep 3s +sleep 3000 + +sql select * from streamt14 order by 2; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + +if $data22 != $now22 then + print ======data22=$data22 + return -1 +endi + +if $data32 != $now32 then + print ======data32=$data32 + return -1 +endi + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT