From 2a66a89adcdb796efb83cdcc1b59e502774a8448 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 16 Aug 2024 16:54:45 +0800 Subject: [PATCH] add ci --- .../executor/src/streamtimesliceoperator.c | 10 +- .../script/tsim/stream/streamInterpLarge.sim | 188 ++++++++++++++ .../script/tsim/stream/streamInterpNext0.sim | 16 +- .../script/tsim/stream/streamInterpNext1.sim | 16 +- .../script/tsim/stream/streamInterpOther.sim | 245 ++++++++++++++++++ .../script/tsim/stream/streamInterpOther1.sim | 11 + .../script/tsim/stream/streamInterpPrev0.sim | 16 +- .../script/tsim/stream/streamInterpPrev1.sim | 16 +- 8 files changed, 500 insertions(+), 18 deletions(-) create mode 100644 tests/script/tsim/stream/streamInterpLarge.sim diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 17840f8790..6cb53a36ad 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -583,8 +583,8 @@ static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, i return rowId; } - for (int32_t i = rowId; rowId < pBlock->info.rows; i++) { - if (!checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) { + for (int32_t i = rowId; i < pBlock->info.rows; i++) { + if (!checkNullRow(pExprSup, pBlock, i, ignoreNull)) { return i; } } @@ -1201,6 +1201,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) TSDB_ORDER_ASC); startPos += numOfWin; int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); + ASSERT(leftRowId >= 0); left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); @@ -1314,6 +1315,10 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); + if (pBlock->info.rows >= pBlock->info.capacity) { + pGroupResInfo->index++; + break; + } } _end: @@ -1540,6 +1545,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR } doStreamTimeSliceImpl(pOperator, pBlock); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } if (pInfo->destHasPrimaryKey) { diff --git a/tests/script/tsim/stream/streamInterpLarge.sim b/tests/script/tsim/stream/streamInterpLarge.sim new file mode 100644 index 0000000000..85203d2d9e --- /dev/null +++ b/tests/script/tsim/stream/streamInterpLarge.sim @@ -0,0 +1,188 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 20001 then + print ======rows=$rows + goto loop0 +endi + +print step2 +print =============== create database +sql create database test2 vgroups 1; +sql use test2; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0); + +$loop_count = 0 + +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 20001 then + print ======rows=$rows + goto loop2 +endi + +print step3 +print =============== create database +sql create database test3 vgroups 1; +sql use test3; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0); + +$loop_count = 0 + +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 20001 then + print ======rows=$rows + goto loop3 +endi + +print step4 +print =============== create database +sql create database test4 vgroups 1; +sql use test4; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(value, 1,2,3,4); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0); + +$loop_count = 0 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 20001 then + print ======rows=$rows + goto loop4 +endi + +print step5 +print =============== create database +sql create database test5 vgroups 1; +sql use test5; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0); + +$loop_count = 0 + +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 20001 then + print ======rows=$rows + goto loop5 +endi + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpNext0.sim b/tests/script/tsim/stream/streamInterpNext0.sim index 605144378e..abdbeda634 100644 --- a/tests/script/tsim/stream/streamInterpNext0.sim +++ b/tests/script/tsim/stream/streamInterpNext0.sim @@ -31,8 +31,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then @@ -64,8 +68,12 @@ endi print 1 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then diff --git a/tests/script/tsim/stream/streamInterpNext1.sim b/tests/script/tsim/stream/streamInterpNext1.sim index 9d86ba5135..d924c82a50 100644 --- a/tests/script/tsim/stream/streamInterpNext1.sim +++ b/tests/script/tsim/stream/streamInterpNext1.sim @@ -31,8 +31,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then @@ -141,8 +145,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then diff --git a/tests/script/tsim/stream/streamInterpOther.sim b/tests/script/tsim/stream/streamInterpOther.sim index 8af1274bb6..8553e67ec8 100644 --- a/tests/script/tsim/stream/streamInterpOther.sim +++ b/tests/script/tsim/stream/streamInterpOther.sim @@ -360,4 +360,249 @@ if $rows != 0 then goto loop3 endi + +print step4 +print =============== create database +sql drop database if exists test4; +sql create database test4 vgroups 4; +sql use test4; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams4_1 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_1 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(prev); +sql create stream streams4_2 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_2 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(next); +sql create stream streams4_3 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_3 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(linear); +sql create stream streams4_4 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_4 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(NULL); +sql create stream streams4_5 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_5 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(value,11); + +run tsim/stream/checkTaskStatus.sim + +sql insert into t1 values(1648791275000,NULL,0,0,0.0); + +sleep 500 + +sql insert into t1 values(1648791276000,NULL,1,0,0.0) (1648791277000,NULL,2,0,0.0) (1648791275000,NULL,3,0,0.0); + +$loop_count = 0 +loop4: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt4_1; +sql select * from streamt4_1; + +if $rows != 0 then + print ======rows=$rows + goto loop4 +endi + +print sql select * from streamt4_2; +sql select * from streamt4_2; + +if $rows != 0 then + print ======rows=$rows + goto loop4 +endi + +print sql select * from streamt4_3; +sql select * from streamt4_3; + +if $rows != 0 then + print ======rows=$rows + goto loop4 +endi + +print sql select * from streamt4_4; +sql select * from streamt4_4; + +if $rows != 0 then + print ======rows=$rows + goto loop4 +endi + +print sql select * from streamt4_5; +sql select * from streamt4_5; + +if $rows != 0 then + print ======rows=$rows + goto loop4 +endi + +print step4_2 + +print sql insert into t1 values(1648791215000,1,0,0,0.0); +sql insert into t1 values(1648791215000,1,0,0,0.0); +sleep 500 + +sql insert into t1 values(1648791216000,2,1,0,0.0) (1648791217000,3,2,0,0.0) (1648791215000,4,3,0,0.0); + +$loop_count = 0 +loop5: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt4_1; +sql select * from streamt4_1; + +if $rows != 0 then + print ======rows=$rows + goto loop5 +endi + +print sql select * from streamt4_2; +sql select * from streamt4_2; + +if $rows != 0 then + print ======rows=$rows + goto loop5 +endi + +print sql select * from streamt4_3; +sql select * from streamt4_3; + +if $rows != 0 then + print ======rows=$rows + goto loop5 +endi + +print sql select * from streamt4_4; +sql select * from streamt4_4; + +if $rows != 0 then + print ======rows=$rows + goto loop5 +endi + +print sql select * from streamt4_5; +sql select * from streamt4_5; + +if $rows != 0 then + print ======rows=$rows + goto loop5 +endi + +print step4_3 + +print sql insert into t1 values(1648791278000,NULL,2,0,0.0) (1648791278001,NULL,2,0,0.0) (1648791279000,1,2,0,0.0) (1648791279001,NULL,2,0,0.0) (1648791280000,NULL,2,0,0.0)(1648791280001,NULL,2,0,0.0)(1648791281000,20,2,0,0.0) (1648791281001,NULL,2,0,0.0)(1648791281002,NULL,2,0,0.0) (1648791282000,NULL,2,0,0.0); +sql insert into t1 values(1648791278000,NULL,2,0,0.0) (1648791278001,NULL,2,0,0.0) (1648791279000,1,2,0,0.0) (1648791279001,NULL,2,0,0.0) (1648791280000,NULL,2,0,0.0)(1648791280001,NULL,2,0,0.0)(1648791281000,20,2,0,0.0) (1648791281001,NULL,2,0,0.0)(1648791281002,NULL,2,0,0.0) (1648791282000,NULL,2,0,0.0); + +$loop_count = 0 +loop6: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 + +print sql select * from streamt4_1; +sql select * from streamt4_1; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 3 then + print ======rows=$rows + goto loop6 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop6 +endi + +print sql select * from streamt4_2; +sql select * from streamt4_2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 3 then + print ======rows=$rows + goto loop6 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop6 +endi + +print sql select * from streamt4_3; +sql select * from streamt4_3; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 3 then + print ======rows=$rows + goto loop6 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop6 +endi + +print sql select * from streamt4_4; +sql select * from streamt4_4; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 3 then + print ======rows=$rows + goto loop6 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop6 +endi + +print sql select * from streamt4_5; +sql select * from streamt4_5; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +if $rows != 3 then + print ======rows=$rows + goto loop6 +endi + +if $data12 != 1 then + print ======data12=$data12 + goto loop6 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpOther1.sim b/tests/script/tsim/stream/streamInterpOther1.sim index 045da1ce84..5f78bcbcc6 100644 --- a/tests/script/tsim/stream/streamInterpOther1.sim +++ b/tests/script/tsim/stream/streamInterpOther1.sim @@ -77,6 +77,8 @@ if $data51 != 32 then goto loop4_1 endi +print step4_2 + sql create database test4_2 vgroups 4; sql use test4_2; @@ -141,6 +143,8 @@ if $data51 != 32 then goto loop4_2 endi +print step4_3 + sql create database test4_3 vgroups 4; sql use test4_3; @@ -205,6 +209,8 @@ if $data51 != 32 then goto loop4_3 endi +print step4_4 + sql create database test4_4 vgroups 4; sql use test4_4; @@ -269,6 +275,8 @@ if $data51 != 32 then goto loop4_4 endi +print step4_5 + sql create database test4_5 vgroups 4; sql use test4_5; @@ -332,4 +340,7 @@ if $data51 != 32 then print ======data51=$data51 goto loop4_5 endi + + + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpPrev0.sim b/tests/script/tsim/stream/streamInterpPrev0.sim index a326fd87f1..86f7f95a5f 100644 --- a/tests/script/tsim/stream/streamInterpPrev0.sim +++ b/tests/script/tsim/stream/streamInterpPrev0.sim @@ -31,8 +31,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then @@ -64,8 +68,12 @@ endi print 1 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then diff --git a/tests/script/tsim/stream/streamInterpPrev1.sim b/tests/script/tsim/stream/streamInterpPrev1.sim index 5d40d0a11f..be8b129a4b 100644 --- a/tests/script/tsim/stream/streamInterpPrev1.sim +++ b/tests/script/tsim/stream/streamInterpPrev1.sim @@ -31,8 +31,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then @@ -141,8 +145,12 @@ endi print 0 sql select * from streamt; sql select * from streamt; -print $data00 $data01 $data02 $data03 -print $data10 $data11 $data12 $data13 +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 # row 0 if $rows != 1 then