Merge pull request #17557 from taosdata/feature/stream_ly
feat(stream):change stream mode & add ci
This commit is contained in:
commit
93557c3670
|
@ -4411,6 +4411,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
}
|
||||
}
|
||||
|
||||
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql create database test vgroups 1;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
|
@ -29,4 +29,100 @@ if $rows != 0 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql create database test1 vgroups 4;
|
||||
sql use test1;
|
||||
sql create stable st(ts timestamp, a int, b int) tags(t int);
|
||||
sql create table t1 using st tags(1);
|
||||
sql create table t2 using st tags(2);
|
||||
|
||||
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
|
||||
sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s);
|
||||
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
|
||||
sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
|
||||
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
|
||||
sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
|
||||
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||
sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
|
||||
sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1);
|
||||
sql insert into t1 values(1648791213001,2,1);
|
||||
sql insert into t1 values(1648791213002,3,1);
|
||||
|
||||
sql insert into t1 values(1648791233000,4,2);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt2;
|
||||
|
||||
if $rows != 1 then
|
||||
print ======streamt2=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
if $rows != 2 then
|
||||
print ======streamt3=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt4;
|
||||
if $rows != 1 then
|
||||
print ======streamt4=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt5;
|
||||
if $rows != 2 then
|
||||
print ======streamt5=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt6;
|
||||
if $rows != 1 then
|
||||
print ======streamt6=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt7;
|
||||
if $rows != 2 then
|
||||
print ======streamt7=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt8;
|
||||
if $rows != 1 then
|
||||
print ======streamt8=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt9;
|
||||
if $rows != 2 then
|
||||
print ======streamt9=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt10;
|
||||
if $rows != 1 then
|
||||
print ======streamt10=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt11;
|
||||
if $rows != 2 then
|
||||
print ======streamt11=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue