This commit is contained in:
54liuyao 2024-08-16 16:54:45 +08:00
parent ab5848f29c
commit 2a66a89adc
8 changed files with 500 additions and 18 deletions

View File

@ -583,8 +583,8 @@ static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, i
return rowId; return rowId;
} }
for (int32_t i = rowId; rowId < pBlock->info.rows; i++) { for (int32_t i = rowId; i < pBlock->info.rows; i++) {
if (!checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) { if (!checkNullRow(pExprSup, pBlock, i, ignoreNull)) {
return i; return i;
} }
} }
@ -1201,6 +1201,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
TSDB_ORDER_ASC); TSDB_ORDER_ASC);
startPos += numOfWin; startPos += numOfWin;
int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
ASSERT(leftRowId >= 0);
left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type);
if (left) { if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
@ -1314,6 +1315,10 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
if (pBlock->info.rows >= pBlock->info.capacity) {
pGroupResInfo->index++;
break;
}
} }
_end: _end:
@ -1540,6 +1545,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
} }
doStreamTimeSliceImpl(pOperator, pBlock); doStreamTimeSliceImpl(pOperator, pBlock);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
if (pInfo->destHasPrimaryKey) { if (pInfo->destHasPrimaryKey) {

View File

@ -0,0 +1,188 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 20001 then
print ======rows=$rows
goto loop0
endi
print step2
print =============== create database
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 20001 then
print ======rows=$rows
goto loop2
endi
print step3
print =============== create database
sql create database test3 vgroups 1;
sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0);
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 20001 then
print ======rows=$rows
goto loop3
endi
print step4
print =============== create database
sql create database test4 vgroups 1;
sql use test4;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(value, 1,2,3,4);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 20001 then
print ======rows=$rows
goto loop4
endi
print step5
print =============== create database
sql create database test5 vgroups 1;
sql use test5;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(linear);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648700000000,1,1,1,1.0) (1648710000000,100,100,100,100.0) (1648720000000,10,10,10,10.0);
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
print 0 sql select * from streamt;
sql select * from streamt;
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
# row 0
if $rows != 20001 then
print ======rows=$rows
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -31,8 +31,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then
@ -64,8 +68,12 @@ endi
print 1 sql select * from streamt; print 1 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then

View File

@ -31,8 +31,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then
@ -141,8 +145,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then

View File

@ -360,4 +360,249 @@ if $rows != 0 then
goto loop3 goto loop3
endi endi
print step4
print =============== create database
sql drop database if exists test4;
sql create database test4 vgroups 4;
sql use test4;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams4_1 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_1 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(prev);
sql create stream streams4_2 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_2 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(next);
sql create stream streams4_3 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_3 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(linear);
sql create stream streams4_4 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_4 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(NULL);
sql create stream streams4_5 trigger at_once watermark 10s IGNORE EXPIRED 1 IGNORE UPDATE 0 into streamt4_5 as select interp(a, 1), _isfilled as a1 from t1 every(1s) fill(value,11);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791275000,NULL,0,0,0.0);
sleep 500
sql insert into t1 values(1648791276000,NULL,1,0,0.0) (1648791277000,NULL,2,0,0.0) (1648791275000,NULL,3,0,0.0);
$loop_count = 0
loop4:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt4_1;
sql select * from streamt4_1;
if $rows != 0 then
print ======rows=$rows
goto loop4
endi
print sql select * from streamt4_2;
sql select * from streamt4_2;
if $rows != 0 then
print ======rows=$rows
goto loop4
endi
print sql select * from streamt4_3;
sql select * from streamt4_3;
if $rows != 0 then
print ======rows=$rows
goto loop4
endi
print sql select * from streamt4_4;
sql select * from streamt4_4;
if $rows != 0 then
print ======rows=$rows
goto loop4
endi
print sql select * from streamt4_5;
sql select * from streamt4_5;
if $rows != 0 then
print ======rows=$rows
goto loop4
endi
print step4_2
print sql insert into t1 values(1648791215000,1,0,0,0.0);
sql insert into t1 values(1648791215000,1,0,0,0.0);
sleep 500
sql insert into t1 values(1648791216000,2,1,0,0.0) (1648791217000,3,2,0,0.0) (1648791215000,4,3,0,0.0);
$loop_count = 0
loop5:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt4_1;
sql select * from streamt4_1;
if $rows != 0 then
print ======rows=$rows
goto loop5
endi
print sql select * from streamt4_2;
sql select * from streamt4_2;
if $rows != 0 then
print ======rows=$rows
goto loop5
endi
print sql select * from streamt4_3;
sql select * from streamt4_3;
if $rows != 0 then
print ======rows=$rows
goto loop5
endi
print sql select * from streamt4_4;
sql select * from streamt4_4;
if $rows != 0 then
print ======rows=$rows
goto loop5
endi
print sql select * from streamt4_5;
sql select * from streamt4_5;
if $rows != 0 then
print ======rows=$rows
goto loop5
endi
print step4_3
print sql insert into t1 values(1648791278000,NULL,2,0,0.0) (1648791278001,NULL,2,0,0.0) (1648791279000,1,2,0,0.0) (1648791279001,NULL,2,0,0.0) (1648791280000,NULL,2,0,0.0)(1648791280001,NULL,2,0,0.0)(1648791281000,20,2,0,0.0) (1648791281001,NULL,2,0,0.0)(1648791281002,NULL,2,0,0.0) (1648791282000,NULL,2,0,0.0);
sql insert into t1 values(1648791278000,NULL,2,0,0.0) (1648791278001,NULL,2,0,0.0) (1648791279000,1,2,0,0.0) (1648791279001,NULL,2,0,0.0) (1648791280000,NULL,2,0,0.0)(1648791280001,NULL,2,0,0.0)(1648791281000,20,2,0,0.0) (1648791281001,NULL,2,0,0.0)(1648791281002,NULL,2,0,0.0) (1648791282000,NULL,2,0,0.0);
$loop_count = 0
loop6:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt4_1;
sql select * from streamt4_1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data12 != 1 then
print ======data12=$data12
goto loop6
endi
print sql select * from streamt4_2;
sql select * from streamt4_2;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data12 != 1 then
print ======data12=$data12
goto loop6
endi
print sql select * from streamt4_3;
sql select * from streamt4_3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data12 != 1 then
print ======data12=$data12
goto loop6
endi
print sql select * from streamt4_4;
sql select * from streamt4_4;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data12 != 1 then
print ======data12=$data12
goto loop6
endi
print sql select * from streamt4_5;
sql select * from streamt4_5;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 3 then
print ======rows=$rows
goto loop6
endi
if $data12 != 1 then
print ======data12=$data12
goto loop6
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -77,6 +77,8 @@ if $data51 != 32 then
goto loop4_1 goto loop4_1
endi endi
print step4_2
sql create database test4_2 vgroups 4; sql create database test4_2 vgroups 4;
sql use test4_2; sql use test4_2;
@ -141,6 +143,8 @@ if $data51 != 32 then
goto loop4_2 goto loop4_2
endi endi
print step4_3
sql create database test4_3 vgroups 4; sql create database test4_3 vgroups 4;
sql use test4_3; sql use test4_3;
@ -205,6 +209,8 @@ if $data51 != 32 then
goto loop4_3 goto loop4_3
endi endi
print step4_4
sql create database test4_4 vgroups 4; sql create database test4_4 vgroups 4;
sql use test4_4; sql use test4_4;
@ -269,6 +275,8 @@ if $data51 != 32 then
goto loop4_4 goto loop4_4
endi endi
print step4_5
sql create database test4_5 vgroups 4; sql create database test4_5 vgroups 4;
sql use test4_5; sql use test4_5;
@ -332,4 +340,7 @@ if $data51 != 32 then
print ======data51=$data51 print ======data51=$data51
goto loop4_5 goto loop4_5
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -31,8 +31,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then
@ -64,8 +68,12 @@ endi
print 1 sql select * from streamt; print 1 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then

View File

@ -31,8 +31,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then
@ -141,8 +145,12 @@ endi
print 0 sql select * from streamt; print 0 sql select * from streamt;
sql select * from streamt; sql select * from streamt;
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0 # row 0
if $rows != 1 then if $rows != 1 then