fix(stream):add force window close check

This commit is contained in:
54liuyao 2024-10-18 17:38:14 +08:00
parent bef7fb6045
commit f04151d772
3 changed files with 45 additions and 0 deletions

View File

@ -10664,6 +10664,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp function only support force window close");
}
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
@ -10722,6 +10723,18 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream unsupported Fill history");
}
if (pStmt->pOptions->ignoreExpired != 1) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore expired 0");
}
if (pStmt->pOptions->ignoreUpdate != 1) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore update 0");
}
}
if (NULL != pSelect->pGroupByList) {

View File

@ -1370,6 +1370,7 @@
#,,y,script,./test.sh -f tsim/stream/streamInterpError.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim
,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim
#,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim
#,,y,script,./test.sh -f tsim/stream/streamInterpHistory1.sim
#,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim

View File

@ -0,0 +1,31 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step2
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close into streamt1 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql_error create stream streams2 trigger force_window_close IGNORE EXPIRED 0 into streamt2 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql_error create stream streams3 trigger force_window_close IGNORE UPDATE 0 into streamt3 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 into streamt4 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql create stream streams5 trigger force_window_close IGNORE UPDATE 1 into streamt5 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT