diff --git a/docs/en/06-advanced/03-stream.md b/docs/en/06-advanced/03-stream.md index 9c8dd44655..9920168220 100644 --- a/docs/en/06-advanced/03-stream.md +++ b/docs/en/06-advanced/03-stream.md @@ -142,8 +142,11 @@ When creating a stream, you can specify the trigger mode of stream computing thr 1. AT_ONCE: Triggered immediately upon writing. 2. WINDOW_CLOSE: Triggered when the window closes (the closing of the window is determined by the event time, can be used in conjunction with watermark). 3. MAX_DELAY time: If the window closes, computation is triggered. If the window has not closed, and the duration since it has not closed exceeds the time specified by max delay, computation is triggered. -4. FORCE_WINDOW_CLOSE: Based on the current time of the operating system, only the results of the currently closed window are calculated and pushed out. The window is only calculated once at the moment of closure, and will not be recalculated subsequently. This mode currently only supports INTERVAL windows (does not support sliding); FILL_HISTORY must be 0, IGNORE EXPIRED must be 1, IGNORE UPDATE must be 1; FILL only supports PREV, NULL, NONE, VALUE. - +4. FORCE_WINDOW_CLOSE: Based on the current time of the operating system, only the results of the currently closed window are calculated and pushed out. The window is only calculated once at the moment of closure, and will not be recalculated subsequently. This mode currently only supports INTERVAL windows (does support sliding); In this mode, FILL_HISTORY is automatically set to 0, IGNORE EXPIRED is automatically set to 1 and IGNORE UPDATE is automatically set to 1; FILL only supports PREV, NULL, NONE, VALUE. + - This mode can be used to implement continuous queries, such as creating a stream that queries the number of data entries in the past 10 seconds window every 1 second。SQL as follows: + ```sql + create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s) + ``` The closing of the window is determined by the event time, such as when the event stream is interrupted or continuously delayed, at which point the event time cannot be updated, possibly leading to outdated computation results. Therefore, stream computing provides the MAX_DELAY trigger mode that combines event time with processing time: MAX_DELAY mode triggers computation immediately when the window closes, and its unit can be specified, specific units: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks). Additionally, when data is written, if the time that triggers computation exceeds the time specified by MAX_DELAY, computation is triggered immediately. diff --git a/docs/zh/06-advanced/03-stream.md b/docs/zh/06-advanced/03-stream.md index 4bd5d517fd..1bac7f826c 100644 --- a/docs/zh/06-advanced/03-stream.md +++ b/docs/zh/06-advanced/03-stream.md @@ -131,7 +131,11 @@ create stream if not exists count_history_s fill_history 1 into count_history as 1. AT_ONCE:写入立即触发。 2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 watermark 使用)。 3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 max delay 指定的时间,则触发计算。 -4. FORCE_WINDOW_CLOSE:以操作系统当前时间为准,只计算当前关闭窗口的结果,并推送出去。窗口只会在被关闭的时刻计算一次,后续不会再重复计算。该模式当前只支持 INTERVAL 窗口(不支持滑动);FILL_HISTORY必须为 0,IGNORE EXPIRED 必须为 1,IGNORE UPDATE 必须为 1;FILL 只支持 PREV 、NULL、 NONE、VALUE。 +4. FORCE_WINDOW_CLOSE:以操作系统当前时间为准,只计算当前关闭窗口的结果,并推送出去。窗口只会在被关闭的时刻计算一次,后续不会再重复计算。该模式当前只支持 INTERVAL 窗口(支持滑动);该模式时,FILL_HISTORY 自动设置为 0,IGNORE EXPIRED 自动设置为 1,IGNORE UPDATE 自动设置为 1;FILL 只支持 PREV 、NULL、 NONE、VALUE。 + - 该模式可用于实现连续查询,比如,创建一个流,每隔 1s 查询一次过去 10s 窗口内的数据条数。SQL 如下: + ```sql + create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s) + ``` 窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,此时事件时间无法更新,可能导致无法得到最新的计算结果。 diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7b54ef878c..07dc3a31c5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3981,7 +3981,7 @@ FETCH_NEXT_BLOCK: pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; pInfo->lastScanRange = pBlock->info.window; - TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval); + TSKEY endKey = getNextTimeWindowStart(&pInfo->interval, pBlock->info.window.skey, TSDB_ORDER_ASC) - 1; if (pInfo->useGetResultRange == true) { endKey = pBlock->info.window.ekey; } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index 277cbda36b..60c7ea867d 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -441,7 +441,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** return code; } - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX); + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep); setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; return code; @@ -533,7 +533,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** (*ppRes) = pInfo->pCheckpointRes; return code; } - pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX); + pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep); setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8f2758fe7a..319149716f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12608,10 +12608,8 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - if (pStmt->pOptions->fillHistory) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "When trigger was force window close, Stream interp unsupported Fill history"); - } else if (pSelect->pFill != NULL) { + pStmt->pOptions->fillHistory = 0; + if (pSelect->pFill != NULL) { EFillMode mode = ((SFillNode*)(pSelect->pFill))->mode; if (mode == FILL_MODE_NEXT) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, @@ -12660,32 +12658,9 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm } if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - if (pStmt->pOptions->fillHistory) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "When trigger was force window close, Stream unsupported Fill history"); - } - - if (pStmt->pOptions->ignoreExpired != 1) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "When trigger was force window close, Stream must not set ignore expired 0"); - } - - if (pStmt->pOptions->ignoreUpdate != 1) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "When trigger was force window close, Stream must not set ignore update 0"); - } - - if (pSelect->pWindow != NULL && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)) { - SIntervalWindowNode* pWindow = (SIntervalWindowNode*)pSelect->pWindow; - if (NULL != pWindow->pSliding) { - int64_t interval = ((SValueNode*)pWindow->pInterval)->datum.i; - int64_t sliding = ((SValueNode*)pWindow->pSliding)->datum.i; - if (interval != sliding) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "When trigger was force window close, Stream unsupported sliding"); - } - } - } + pStmt->pOptions->fillHistory = 0; + pStmt->pOptions->ignoreExpired = 1; + pStmt->pOptions->ignoreUpdate = 1; if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index aa1f56af33..4faef0594b 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -318,7 +318,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interv SStreamTrigger* p = NULL; int64_t ts = taosGetTimestamp(pInterval->precision); - int64_t skey = pLatestWindow->skey + interval; + int64_t skey = pLatestWindow->skey + pInterval->sliding; int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p); if (code) { @@ -334,7 +334,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interv } p->pBlock->info.window.skey = skey; - p->pBlock->info.window.ekey = TMAX(ts, skey + interval); + p->pBlock->info.window.ekey = TMAX(ts, skey + pInterval->interval); p->pBlock->info.type = STREAM_GET_RESULT; stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id, diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 6a9409959b..7319953d35 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -60,7 +60,6 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code)); return; } - pTask->info.delaySchedParam = interval.sliding; pTask->info.watermark = waterMark; pTask->info.interval = interval; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 098075db39..f0c547c3cf 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1638,6 +1638,7 @@ ,,y,script,./test.sh -f tsim/stream/streamInterpError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim +,,y,script,./test.sh -f tsim/stream/forcewindowclose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim #,,y,script,./test.sh -f tsim/stream/streamInterpHistory1.sim diff --git a/tests/parallel_test/cases_tdengine.task b/tests/parallel_test/cases_tdengine.task index 2983df6a05..e5732957a1 100644 --- a/tests/parallel_test/cases_tdengine.task +++ b/tests/parallel_test/cases_tdengine.task @@ -1351,6 +1351,7 @@ ,,y,script,./test.sh -f tsim/stream/streamInterpError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim +,,y,script,./test.sh -f tsim/stream/forcewindowclose.sim ,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim ,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim ,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim diff --git a/tests/script/tsim/stream/forcewindowclose.sim b/tests/script/tsim/stream/forcewindowclose.sim index ab54278e39..46fa9192ed 100644 --- a/tests/script/tsim/stream/forcewindowclose.sim +++ b/tests/script/tsim/stream/forcewindowclose.sim @@ -4,10 +4,51 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +print ===================================== force window close with sliding test +print ============ create db +sql create database test1 vgroups 2 precision 'us'; + +sql use test1 +sql create stable st1(ts timestamp, a int) tags(t int); +sql create table tu11 using st1 tags(1); + +sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(6s); +sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(9a); +sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(1.1s); +sql create stream stream11 trigger force_window_close into str_dst1 as select _wstart, _wend, count(*) from st1 partition by tbname interval(5s) sliding(1s); +run tsim/stream/checkTaskStatus.sim + +sql insert into tu11 values(now, 1); +sleep 5500 + +$loop_count = 0 + +loop01: +sleep 500 +$loop_count = $loop_count + 1 +if $loop_count == 20 then + goto end_loop0 +endi + +print insert data +sql insert into tu11 values(now, 1); +goto loop01 + +end_loop0: + +sleep 10000 + +sql select sum(`count(*)`) from (select * from str_dst1) + +if $data00 != 100 then + print expect 100, actual: $data00 + return -1 +endi + print ========================================== create database sql create database test vgroups 2; sql select * from information_schema.ins_databases -if $rows != 3 then +if $rows != 4 then return -1 endi @@ -135,4 +176,5 @@ if $data00 != 35.000000000 then return -1 endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpError.sim b/tests/script/tsim/stream/streamInterpError.sim index f0f4e80ade..258cbee703 100644 --- a/tests/script/tsim/stream/streamInterpError.sim +++ b/tests/script/tsim/stream/streamInterpError.sim @@ -73,11 +73,11 @@ sql create stream streams2_6_3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 I sql create stream streams2_6_4 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); sql create stream streams2_6_5 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); -sql_error create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); +sql create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev); sql_error create stream streams2_6_7 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_7 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next); sql_error create stream streams2_6_8 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_8 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear); -sql_error create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); -sql_error create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); +sql create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL); +sql create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44); run tsim/stream/checkTaskStatus.sim diff --git a/tests/script/tsim/stream/streamInterpFwcError.sim b/tests/script/tsim/stream/streamInterpFwcError.sim index a53a6fe189..67316e9661 100644 --- a/tests/script/tsim/stream/streamInterpFwcError.sim +++ b/tests/script/tsim/stream/streamInterpFwcError.sim @@ -17,8 +17,8 @@ sql create stream streams1 trigger force_window_close into streamt1 as select _ run tsim/stream/checkTaskStatus.sim -sql_error create stream streams2 trigger force_window_close IGNORE EXPIRED 0 into streamt2 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); -sql_error create stream streams3 trigger force_window_close IGNORE UPDATE 0 into streamt3 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); +sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 into streamt2 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); +sql create stream streams3 trigger force_window_close IGNORE UPDATE 0 into streamt3 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 into streamt4 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev); diff --git a/tests/script/tsim/stream/streamTwaError.sim b/tests/script/tsim/stream/streamTwaError.sim index cda5fa9c4b..67757eaa7e 100644 --- a/tests/script/tsim/stream/streamTwaError.sim +++ b/tests/script/tsim/stream/streamTwaError.sim @@ -26,7 +26,7 @@ sql_error create stream streams8 trigger force_window_close IGNORE EXPIRED 1 IGN sql_error create stream streams9 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 as select _wstart, elapsed(ts) from st partition by tbname,ta interval(2s) fill(prev); -sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st partition by tbname,ta interval(2s) SLIDING(1s); +sql create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st partition by tbname,ta interval(2s) SLIDING(1s); sql create stream streams11 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 as select _wstart, avg(a) from st partition by tbname,ta interval(2s) SLIDING(2s); sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st interval(2s); diff --git a/tests/system-test/8-stream/force_window_close_interp.py b/tests/system-test/8-stream/force_window_close_interp.py index f78330411b..e2a412e629 100644 --- a/tests/system-test/8-stream/force_window_close_interp.py +++ b/tests/system-test/8-stream/force_window_close_interp.py @@ -81,29 +81,29 @@ class TDTestCase: # create error stream tdLog.info("create error stream") sleep(10) - tdSql.error( - f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" + tdSql.execute( + f"create stream itp_force_error_0 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_0_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" + ) + tdSql.execute( + f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" ) tdSql.error( - f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" + f"create stream itp_force_error_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_2_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" ) tdSql.error( - f"create stream itp_force_error_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" + f"create stream itp_force_error_3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_3_s as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" ) tdSql.error( - f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;" - ) - tdSql.error( - f"create stream itp_1d_next_error_1 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + f"create stream itp_1d_next_error_0 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t0 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" ) tdSql.error( f"create stream itp_1d_next_error_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" ) tdSql.error( - f"create stream itp_1d_next_error_1 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + f"create stream itp_1d_next_error_2 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t2 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" ) tdSql.error( - f"create stream itp_1d_next_error_1 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" + f"create stream itp_1d_next_error_3 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t3 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;" ) # function name : interp