diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index f5392f02b1..85e990c4e1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -119,6 +119,7 @@ typedef struct SRowBuffPos { bool beFlushed; bool beUsed; bool needFree; + bool beUpdated; } SRowBuffPos; // tq diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8bfa8e1a5d..50093db91d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -279,7 +279,12 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) { SWinKey* pKey = tSimpleHashGetKey(pIte, NULL); uint64_t groupId = pKey->groupId; TSKEY ts = pKey->ts; - int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins); + SRowBuffPos* pPos = *(SRowBuffPos**)pIte; + if (!pPos->beUpdated) { + continue; + } + pPos->beUpdated = false; + int32_t code = saveWinResultInfo(ts, groupId, pPos, resWins); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -863,6 +868,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + pResPos->beUpdated = true; tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 584e81fafc..2d7c5cd970 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -407,6 +407,7 @@ SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; + newPos->beUpdated = true; return newPos; } diff --git a/tests/script/tsim/stream/windowClose.sim b/tests/script/tsim/stream/windowClose.sim index 5bd17e076e..12449526c2 100644 --- a/tests/script/tsim/stream/windowClose.sim +++ b/tests/script/tsim/stream/windowClose.sim @@ -134,6 +134,58 @@ if $rows != 2 then goto loop1 endi +print max delay 2s +sql create database test3 vgroups 4; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +sleep 1000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamt13; + +if $rows != 2 then + print ======rows=$rows + goto loop2 +endi + +$now02 = $data02 + +$now12 = $data12 + +$loop_count = 0 + +print max delay 2s......... sleep 5s +sleep 5000 + +sql select * from streamt13; + + +if $data02 != $now02 then + print ======data02=$data02 + return -1 +endi + +if $data12 != $now12 then + print ======data12=$data12 + return -1 +endi + print ======over system sh/exec.sh -n dnode1 -s stop -x SIGINT