diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index bfa3cb8873..73d347e153 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -108,7 +108,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts; SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin); - SSessionKey leftWinKey = {0}; + SSessionKey leftWinKey = {.groupId = groupId}; void* pVal = NULL; int32_t len = 0; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len); @@ -122,7 +122,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI } pAggSup->stateStore.streamStateFreeCur(pCur); pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); - SSessionKey rightWinKey = {0}; + SSessionKey rightWinKey = {.groupId = groupId}; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len); bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0); if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) { diff --git a/tests/script/tsim/stream/event1.sim b/tests/script/tsim/stream/event1.sim index 7be3d375cf..588d4a5a73 100644 --- a/tests/script/tsim/stream/event1.sim +++ b/tests/script/tsim/stream/event1.sim @@ -230,5 +230,61 @@ if $data01 != 2 then goto loop6 endi +print step3 +print =============== create database test3 +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname event_window start with a = 0 end with b = 9; +sleep 1000 + +sql insert into t1 values(1648791223000,0,3,3,3.0); + +sql insert into t1 values(1648791233000,0,1,1,1.0); +sql insert into t1 values(1648791243000,1,9,2,2.0); + +sql insert into t2 values(1648791223000,0,3,3,3.0); + +sql insert into t2 values(1648791233000,0,1,1,1.0); +sql insert into t2 values(1648791243000,1,9,2,2.0); + + +$loop_count = 0 +loop7: + +sleep 300 +print 1 sql select * from streamt3; +sql select * from streamt3; + +print +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 2 then + print ======rows=$rows + goto loop7 +endi + +if $data01 != 3 then + print ======data01=$data01 + goto loop7 +endi + +if $data11 != 3 then + print ======data11=$data11 + goto loop6 +endi + print event1 end system sh/exec.sh -n dnode1 -s stop -x SIGINT