From b9e05ab1ec4c0509fdf101a49b1fada877472aac Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 31 Oct 2023 15:40:42 +0800 Subject: [PATCH] update data --- .../executor/src/streameventwindowoperator.c | 9 ++- .../tsim/stream/checkpointInterval1.sim | 2 + tests/script/tsim/stream/event1.sim | 77 +++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a820271705..bfa3cb8873 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -115,7 +115,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) { bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0); setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin); - if(inWin || !pCurWin->pWinFlag->endFlag) { + if(inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag) ) { pCurWin->winInfo.isOutput = true; goto _end; } @@ -124,7 +124,8 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); SSessionKey rightWinKey = {0}; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len); - if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && start && !end) { + bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0); + if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) { int32_t endi = getEndCondIndex(pEnd, index, rows); if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) { setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin); @@ -158,6 +159,10 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { *pRebuild = false; + if (!pWinInfo->pWinFlag->startFlag) { + return 1; + } + TSKEY maxTs = INT64_MAX; STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win; if (pWinInfo->pWinFlag->endFlag) { diff --git a/tests/script/tsim/stream/checkpointInterval1.sim b/tests/script/tsim/stream/checkpointInterval1.sim index 21825e7f48..36f361ad64 100644 --- a/tests/script/tsim/stream/checkpointInterval1.sim +++ b/tests/script/tsim/stream/checkpointInterval1.sim @@ -15,6 +15,8 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from st interval(10s); +sleep 1000 + sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t2 values(1648791213001,2,2,3,1.1); diff --git a/tests/script/tsim/stream/event1.sim b/tests/script/tsim/stream/event1.sim index d84cc9c5d3..e2ea19f63f 100644 --- a/tests/script/tsim/stream/event1.sim +++ b/tests/script/tsim/stream/event1.sim @@ -26,8 +26,10 @@ sleep 300 print 1 sql select * from streamt1; sql select * from streamt1; +print =============================== print $data00 $data01 $data02 $data03 print $data10 $data11 $data12 $data13 +print =============================== $loop_count = $loop_count + 1 if $loop_count == 10 then @@ -45,5 +47,80 @@ if $data01 != 2 then goto loop0 endi +print step2 +print =============== create database test2 +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart as s, count(*) c1, sum(b), max(c) from t1 event_window start with a = 0 end with b = 9; +sleep 1000 + +sql insert into t1 values(1648791223000,0,3,3,3.0); + +sql insert into t1 values(1648791233000,0,1,1,1.0); +sql insert into t1 values(1648791243000,1,9,2,2.0); + + +$loop_count = 0 +loop1: + +sleep 300 +print 1 sql select * from streamt2; +sql select * from streamt2; + +print =============================== +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print =============================== + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791223000,1,1,4,4.0); + +$loop_count = 0 +loop2: + +sleep 300 +print 1 sql select * from streamt2; +sql select * from streamt2; + +print =============================== +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print =============================== + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 2 then + print ======data01=$data01 + goto loop2 +endi + + print event1 end system sh/exec.sh -n dnode1 -s stop -x SIGINT