diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ef2a99d1d1..cf5ea95088 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1961,8 +1961,10 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { } static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { - if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { + if (pInfo->pUpdateInfo) { pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + } + if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 6adab74344..bd247eba07 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -622,6 +622,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { } setEventWindowFlag(pAggSup, &curInfo); if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) { + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); continue; } diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 45d656c456..c71edccb99 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -449,13 +449,13 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); - if (!ppBuff) { - return TSDB_CODE_FAILED; + SArray* pWinStates = NULL; + if (ppBuff) { + pWinStates = (SArray*)(*ppBuff); } - SArray* pWinStates = (SArray*)(*ppBuff); - int32_t size = taosArrayGetSize(pWinStates); if (pCur->buffIndex >= 0) { + int32_t size = taosArrayGetSize(pWinStates); if (pCur->buffIndex >= size) { return TSDB_CODE_FAILED; } diff --git a/tests/script/tsim/stream/event2.sim b/tests/script/tsim/stream/event2.sim index eb9fca46e6..9fc7615fb8 100644 --- a/tests/script/tsim/stream/event2.sim +++ b/tests/script/tsim/stream/event2.sim @@ -31,6 +31,8 @@ sql insert into t4 values(1648791223000,1,1,10,3.0); sql insert into t4 values(1648791233000,0,2,11,1.0); sql insert into t4 values(1648791243000,1,9,12,2.0); +sleep 1000 + sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9; sleep 1000 @@ -79,6 +81,37 @@ if $data21 != 2 then goto loop0 endi +sql insert into t3 values(1648791222000,0,1,7,3.0); + +$loop_count = 0 +loop1: + +sleep 300 +print 2 sql select * from streamt0 order by 1, 2, 3, 4; +sql select * from streamt0 order by 1, 2, 3, 4; + +print +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 4 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != 5 then + print ======data01=$data01 + goto loop0 +endi + print event1 end system sh/exec.sh -n dnode1 -s stop -x SIGINT