Merge pull request #28398 from taosdata/fix/ly_stream
feat(stream):add max delay check
This commit is contained in:
commit
a069cb9b8d
|
@ -10609,6 +10609,19 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
|
||||||
"Non window query only support scalar function, aggregate function is not allowed");
|
"Non window query only support scalar function, aggregate function is not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL != pStmt->pOptions->pDelay) {
|
||||||
|
SValueNode* pVal = (SValueNode*)pStmt->pOptions->pDelay;
|
||||||
|
int64_t minDelay = 0;
|
||||||
|
char* str = "5s";
|
||||||
|
if (DEAL_RES_ERROR != translateValue(pCxt, pVal) && TSDB_CODE_SUCCESS ==
|
||||||
|
parseNatualDuration(str, strlen(str), &minDelay, &pVal->unit, pVal->node.resType.precision, false)) {
|
||||||
|
if (pVal->datum.i < minDelay) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
|
||||||
|
"stream max delay must be bigger than 5 session");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -477,6 +477,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
|
||||||
if (!pStr) {
|
if (!pStr) {
|
||||||
if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
(*pWinCode) = TSDB_CODE_FAILED;
|
(*pWinCode) = TSDB_CODE_FAILED;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
|
||||||
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
|
||||||
|
|
|
@ -133,4 +133,17 @@ if $data13 != -111 then
|
||||||
goto loop1
|
goto loop1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 2====================
|
||||||
|
|
||||||
|
sql create database test vgroups 1 ;
|
||||||
|
sql use test;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql_error create stream streams1 trigger max_delay 4000a ignore update 0 ignore expired 0 into streamtST1 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql_error create stream streams2 trigger max_delay 4s ignore update 0 ignore expired 0 into streamtST2 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql create stream streams3 trigger max_delay 5000a ignore update 0 ignore expired 0 into streamtST3 as select _wstart, count(*) from st interval(5s);
|
||||||
|
sql create stream streams4 trigger max_delay 5s ignore update 0 ignore expired 0 into streamtST4 as select _wstart, count(*) from st interval(5s);
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -48,15 +48,15 @@ sql create table t1 using st tags(1);
|
||||||
sql create table t2 using st tags(2);
|
sql create table t2 using st tags(2);
|
||||||
|
|
||||||
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
|
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
|
||||||
sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s);
|
sql create stream stream3 trigger max_delay 5s into streamt3 as select _wstart, sum(a) from st interval(10s);
|
||||||
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
|
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
|
||||||
sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
|
sql create stream stream5 trigger max_delay 5s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
|
||||||
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
|
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
|
||||||
sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
|
sql create stream stream7 trigger max_delay 5s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
|
||||||
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
|
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||||
sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
|
sql create stream stream9 trigger max_delay 5s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||||
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
|
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
|
||||||
sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
|
sql create stream stream11 trigger max_delay 5s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -138,12 +138,12 @@ if $rows != 2 then
|
||||||
goto loop1
|
goto loop1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step 1 max delay 2s
|
print step 1 max delay 5s
|
||||||
sql create database test3 vgroups 4;
|
sql create database test3 vgroups 4;
|
||||||
sql use test3;
|
sql use test3;
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s);
|
sql create stream stream13 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 interval(10s);
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -172,8 +172,8 @@ $now02 = $data02
|
||||||
$now12 = $data12
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ if $data12 != $now12 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step 2 max delay 2s
|
print step 2 max delay 5s
|
||||||
|
|
||||||
sql create database test4 vgroups 4;
|
sql create database test4 vgroups 4;
|
||||||
sql use test4;
|
sql use test4;
|
||||||
|
@ -197,7 +197,7 @@ sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,t
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
sql create stream stream14 trigger max_delay 2s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s);
|
sql create stream stream14 trigger max_delay 5s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s);
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -234,8 +234,8 @@ $now12 = $data12
|
||||||
$now22 = $data22
|
$now22 = $data22
|
||||||
$now32 = $data32
|
$now32 = $data32
|
||||||
|
|
||||||
print step2 max delay 2s......... sleep 3s
|
print step2 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt14 order by 2;
|
sql select * from streamt14 order by 2;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
@ -264,8 +264,8 @@ if $data32 != $now32 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step2 max delay 2s......... sleep 3s
|
print step2 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt14 order by 2;
|
sql select * from streamt14 order by 2;
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
@ -294,12 +294,12 @@ if $data32 != $now32 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step 2 max delay 2s
|
print step 2 max delay 5s
|
||||||
sql create database test15 vgroups 4;
|
sql create database test15 vgroups 4;
|
||||||
sql use test15;
|
sql use test15;
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
|
sql create stream stream15 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -328,8 +328,8 @@ $now02 = $data02
|
||||||
$now12 = $data12
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -344,8 +344,8 @@ if $data12 != $now12 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -362,12 +362,12 @@ endi
|
||||||
|
|
||||||
print session max delay over
|
print session max delay over
|
||||||
|
|
||||||
print step 3 max delay 2s
|
print step 3 max delay 5s
|
||||||
sql create database test16 vgroups 4;
|
sql create database test16 vgroups 4;
|
||||||
sql use test16;
|
sql use test16;
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
|
sql create stream stream16 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -396,8 +396,8 @@ $now02 = $data02
|
||||||
$now12 = $data12
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -412,8 +412,8 @@ if $data12 != $now12 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -430,12 +430,12 @@ endi
|
||||||
|
|
||||||
print state max delay over
|
print state max delay over
|
||||||
|
|
||||||
print step 4 max delay 2s
|
print step 4 max delay 5s
|
||||||
sql create database test17 vgroups 4;
|
sql create database test17 vgroups 4;
|
||||||
sql use test17;
|
sql use test17;
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
|
||||||
sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
|
sql create stream stream17 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
|
||||||
|
|
||||||
run tsim/stream/checkTaskStatus.sim
|
run tsim/stream/checkTaskStatus.sim
|
||||||
|
|
||||||
|
@ -467,8 +467,8 @@ $now02 = $data02
|
||||||
$now12 = $data12
|
$now12 = $data12
|
||||||
|
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
@ -483,8 +483,8 @@ if $data12 != $now12 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print step1 max delay 2s......... sleep 3s
|
print step1 max delay 5s......... sleep 6s
|
||||||
sleep 3000
|
sleep 6000
|
||||||
|
|
||||||
sql select * from streamt13;
|
sql select * from streamt13;
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ class TDTestCase:
|
||||||
def run(self):
|
def run(self):
|
||||||
for fill_history_value in [None, 1]:
|
for fill_history_value in [None, 1]:
|
||||||
for watermark in [None, random.randint(20, 30)]:
|
for watermark in [None, random.randint(20, 30)]:
|
||||||
self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(1, 3)}s", fill_history_value=fill_history_value)
|
self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(5, 8)}s", fill_history_value=fill_history_value)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue