From 370fb56a0d064e056400ae3dd37e4733eed4faac Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 27 Sep 2024 18:24:28 +0800 Subject: [PATCH] add ci --- source/libs/executor/src/groupoperator.c | 1 + .../executor/src/streamtimesliceoperator.c | 4 +- .../stream/streamInterpForceWindowClose1.sim | 315 ++++++++++++++++++ 3 files changed, 319 insertions(+), 1 deletion(-) create mode 100644 tests/script/tsim/stream/streamInterpForceWindowClose1.sim diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1c9279b84f..0d85118271 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1503,6 +1503,7 @@ static int32_t doStreamHashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** case STREAM_CREATE_CHILD_TABLE: case STREAM_RETRIEVE: case STREAM_CHECKPOINT: + case STREAM_GET_RESULT: case STREAM_GET_ALL: { (*ppRes) = pBlock; return code; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 8dcaa007f6..d03f4fba38 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -967,7 +967,9 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF } else { pFillInfo->pos = FILL_POS_INVALID; setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); - copyNonFillValueInfo(pFillSup, pFillInfo); + if (pFillSup->cur.pRowVal != NULL) { + copyNonFillValueInfo(pFillSup, pFillInfo); + } } } break; case TSDB_FILL_PREV: { diff --git a/tests/script/tsim/stream/streamInterpForceWindowClose1.sim b/tests/script/tsim/stream/streamInterpForceWindowClose1.sim new file mode 100644 index 0000000000..f0147c3f02 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpForceWindowClose1.sim @@ -0,0 +1,315 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step prev +print =============== create database +sql create database test vgroups 3; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); + +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); + +sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a) as a, _isfilled, tbname, b, c from st partition by tbname, b,c every(5s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now,1,1,1,1.0) (now + 10s,2,1,1,2.0)(now + 20s,3,1,1,3.0) +sql insert into t2 values(now,21,1,1,1.0) (now + 10s,22,1,1,2.0)(now + 20s,23,1,1,3.0) +sql insert into t3 values(now,31,1,1,1.0) (now + 10s,32,1,1,2.0)(now + 20s,33,1,1,3.0) + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t3; +sql select * from t3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop0: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 1; +sql select * from streamt where a == 1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop0 +endi + +$loop_count = 0 +loop1: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 2; +sql select * from streamt where a == 2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop1 +endi + +$loop_count = 0 +loop2: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 3; +sql select * from streamt where a == 3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 2 then + print ======rows=$rows + goto loop2 +endi + +sleep 4000 + +$loop_count = 0 +loop3: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 3; +sql select * from streamt where a == 3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop3 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); + +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); + +sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a) as a, _isfilled, tbname, b, c from st partition by tbname, b,c every(2s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(now,1,1,1,1.0) (now + 10s,2,1,1,2.0)(now + 20s,3,1,1,3.0) +sql insert into t2 values(now,21,1,1,1.0) (now + 10s,22,1,1,2.0)(now + 20s,23,1,1,3.0) +sql insert into t3 values(now,31,1,1,1.0) (now + 10s,32,1,1,2.0)(now + 20s,33,1,1,3.0) + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t3; +sql select * from t3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop4: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a is null; +sql select * from streamt where a is null; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop4 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); + +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(2,2,2); + +sql create stream streams3 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a) as a, _isfilled, tbname, b, c from st partition by tbname, b,c every(2s) fill(value,100,200); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(now,1,1,1,1.0) (now + 10s,2,1,1,2.0)(now + 20s,3,1,1,3.0) +sql insert into t2 values(now,21,1,1,1.0) (now + 10s,22,1,1,2.0)(now + 20s,23,1,1,3.0) +sql insert into t3 values(now,31,1,1,1.0) (now + 10s,32,1,1,2.0)(now + 20s,33,1,1,3.0) + +print sql select * from t1; +sql select * from t1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t2; +sql select * from t2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +print sql select * from t3; +sql select * from t3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop4: + +sleep 2000 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +print 2 sql select * from streamt where a == 100; +sql select * from streamt where a == 100; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +# row 0 +if $rows < 5 then + print ======rows=$rows + goto loop4 +endi + +print end + +system sh/exec.sh -n dnode1 -s stop -x SIGINT