feat[TS-6137]: support sliding in force_window_close (#30203)

* feat[TS-6137]: support sliding in force_window_close

* feat[TS-6137]: support sliding in force_window_close

* feat(stream): force window close support interval sliding

* feat[TS-6137]: support sliding in force_window_close

* feat[TS-6137]: support sliding in force_window_close

* feat[TS-6137]: support sliding in force_window_close

---------

Co-authored-by: 54liuyao <54liuyao@163.com>
This commit is contained in:
WANG MINGMING 2025-03-19 10:22:06 +08:00 committed by GitHub
parent 7c3d6a35b4
commit 3fafecb242
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 81 additions and 56 deletions

View File

@ -142,8 +142,11 @@ When creating a stream, you can specify the trigger mode of stream computing thr
1. AT_ONCE: Triggered immediately upon writing.
2. WINDOW_CLOSE: Triggered when the window closes (the closing of the window is determined by the event time, can be used in conjunction with watermark).
3. MAX_DELAY time: If the window closes, computation is triggered. If the window has not closed, and the duration since it has not closed exceeds the time specified by max delay, computation is triggered.
4. FORCE_WINDOW_CLOSE: Based on the current time of the operating system, only the results of the currently closed window are calculated and pushed out. The window is only calculated once at the moment of closure, and will not be recalculated subsequently. This mode currently only supports INTERVAL windows (does not support sliding); FILL_HISTORY must be 0, IGNORE EXPIRED must be 1, IGNORE UPDATE must be 1; FILL only supports PREV, NULL, NONE, VALUE.
4. FORCE_WINDOW_CLOSE: Based on the current time of the operating system, only the results of the currently closed window are calculated and pushed out. The window is only calculated once at the moment of closure, and will not be recalculated subsequently. This mode currently only supports INTERVAL windows (does support sliding); In this mode, FILL_HISTORY is automatically set to 0, IGNORE EXPIRED is automatically set to 1 and IGNORE UPDATE is automatically set to 1; FILL only supports PREV, NULL, NONE, VALUE.
- This mode can be used to implement continuous queries, such as creating a stream that queries the number of data entries in the past 10 seconds window every 1 second。SQL as follows:
```sql
create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s)
```
The closing of the window is determined by the event time, such as when the event stream is interrupted or continuously delayed, at which point the event time cannot be updated, possibly leading to outdated computation results.
Therefore, stream computing provides the MAX_DELAY trigger mode that combines event time with processing time: MAX_DELAY mode triggers computation immediately when the window closes, and its unit can be specified, specific units: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks). Additionally, when data is written, if the time that triggers computation exceeds the time specified by MAX_DELAY, computation is triggered immediately.

View File

@ -131,7 +131,11 @@ create stream if not exists count_history_s fill_history 1 into count_history as
1. AT_ONCE写入立即触发。
2. WINDOW_CLOSE窗口关闭时触发窗口关闭由事件时间决定可配合 watermark 使用)。
3. MAX_DELAY time若窗口关闭则触发计算。若窗口未关闭且未关闭时长超过 max delay 指定的时间,则触发计算。
4. FORCE_WINDOW_CLOSE以操作系统当前时间为准只计算当前关闭窗口的结果并推送出去。窗口只会在被关闭的时刻计算一次后续不会再重复计算。该模式当前只支持 INTERVAL 窗口不支持滑动FILL_HISTORY必须为 0IGNORE EXPIRED 必须为 1IGNORE UPDATE 必须为 1FILL 只支持 PREV 、NULL、 NONE、VALUE。
4. FORCE_WINDOW_CLOSE以操作系统当前时间为准只计算当前关闭窗口的结果并推送出去。窗口只会在被关闭的时刻计算一次后续不会再重复计算。该模式当前只支持 INTERVAL 窗口支持滑动该模式时FILL_HISTORY 自动设置为 0IGNORE EXPIRED 自动设置为 1IGNORE UPDATE 自动设置为 1FILL 只支持 PREV 、NULL、 NONE、VALUE。
- 该模式可用于实现连续查询,比如,创建一个流,每隔 1s 查询一次过去 10s 窗口内的数据条数。SQL 如下:
```sql
create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s)
```
窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,此时事件时间无法更新,可能导致无法得到最新的计算结果。

View File

@ -3981,7 +3981,7 @@ FETCH_NEXT_BLOCK:
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
pInfo->lastScanRange = pBlock->info.window;
TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
TSKEY endKey = getNextTimeWindowStart(&pInfo->interval, pBlock->info.window.skey, TSDB_ORDER_ASC) - 1;
if (pInfo->useGetResultRange == true) {
endKey = pBlock->info.window.ekey;
}

View File

@ -441,7 +441,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX);
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
return code;
@ -533,7 +533,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX);
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
setStreamOperatorCompleted(pOperator);
}

View File

@ -12608,10 +12608,8 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
}
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream interp unsupported Fill history");
} else if (pSelect->pFill != NULL) {
pStmt->pOptions->fillHistory = 0;
if (pSelect->pFill != NULL) {
EFillMode mode = ((SFillNode*)(pSelect->pFill))->mode;
if (mode == FILL_MODE_NEXT) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
@ -12660,32 +12658,9 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
}
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream unsupported Fill history");
}
if (pStmt->pOptions->ignoreExpired != 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore expired 0");
}
if (pStmt->pOptions->ignoreUpdate != 1) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream must not set ignore update 0");
}
if (pSelect->pWindow != NULL && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)) {
SIntervalWindowNode* pWindow = (SIntervalWindowNode*)pSelect->pWindow;
if (NULL != pWindow->pSliding) {
int64_t interval = ((SValueNode*)pWindow->pInterval)->datum.i;
int64_t sliding = ((SValueNode*)pWindow->pSliding)->datum.i;
if (interval != sliding) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream unsupported sliding");
}
}
}
pStmt->pOptions->fillHistory = 0;
pStmt->pOptions->ignoreExpired = 1;
pStmt->pOptions->ignoreUpdate = 1;
if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta &&
TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&

View File

@ -318,7 +318,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interv
SStreamTrigger* p = NULL;
int64_t ts = taosGetTimestamp(pInterval->precision);
int64_t skey = pLatestWindow->skey + interval;
int64_t skey = pLatestWindow->skey + pInterval->sliding;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
if (code) {
@ -334,7 +334,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interv
}
p->pBlock->info.window.skey = skey;
p->pBlock->info.window.ekey = TMAX(ts, skey + interval);
p->pBlock->info.window.ekey = TMAX(ts, skey + pInterval->interval);
p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id,

View File

@ -60,7 +60,6 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code));
return;
}
pTask->info.delaySchedParam = interval.sliding;
pTask->info.watermark = waterMark;
pTask->info.interval = interval;

View File

@ -1638,6 +1638,7 @@
,,y,script,./test.sh -f tsim/stream/streamInterpError.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim
,,y,script,./test.sh -f tsim/stream/forcewindowclose.sim
,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim
,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim
#,,y,script,./test.sh -f tsim/stream/streamInterpHistory1.sim

View File

@ -1351,6 +1351,7 @@
,,y,script,./test.sh -f tsim/stream/streamInterpError.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose.sim
,,y,script,./test.sh -f tsim/stream/streamInterpForceWindowClose1.sim
,,y,script,./test.sh -f tsim/stream/forcewindowclose.sim
,,y,script,./test.sh -f tsim/stream/streamInterpFwcError.sim
,,y,script,./test.sh -f tsim/stream/streamInterpHistory.sim
,,y,script,./test.sh -f tsim/stream/streamInterpLarge.sim

View File

@ -4,10 +4,51 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===================================== force window close with sliding test
print ============ create db
sql create database test1 vgroups 2 precision 'us';
sql use test1
sql create stable st1(ts timestamp, a int) tags(t int);
sql create table tu11 using st1 tags(1);
sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(6s);
sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(9a);
sql_error create stream stream11 trigger force_window_close into str_dst1 as select _wstart, count(*) from st1 partition by tbname interval(5s) sliding(1.1s);
sql create stream stream11 trigger force_window_close into str_dst1 as select _wstart, _wend, count(*) from st1 partition by tbname interval(5s) sliding(1s);
run tsim/stream/checkTaskStatus.sim
sql insert into tu11 values(now, 1);
sleep 5500
$loop_count = 0
loop01:
sleep 500
$loop_count = $loop_count + 1
if $loop_count == 20 then
goto end_loop0
endi
print insert data
sql insert into tu11 values(now, 1);
goto loop01
end_loop0:
sleep 10000
sql select sum(`count(*)`) from (select * from str_dst1)
if $data00 != 100 then
print expect 100, actual: $data00
return -1
endi
print ========================================== create database
sql create database test vgroups 2;
sql select * from information_schema.ins_databases
if $rows != 3 then
if $rows != 4 then
return -1
endi
@ -135,4 +176,5 @@ if $data00 != 35.000000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -73,11 +73,11 @@ sql create stream streams2_6_3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 I
sql create stream streams2_6_4 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql create stream streams2_6_5 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
sql_error create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql_error create stream streams2_6_7 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_7 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
sql_error create stream streams2_6_8 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_8 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
sql_error create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql_error create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
sql create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
run tsim/stream/checkTaskStatus.sim

View File

@ -17,8 +17,8 @@ sql create stream streams1 trigger force_window_close into streamt1 as select _
run tsim/stream/checkTaskStatus.sim
sql_error create stream streams2 trigger force_window_close IGNORE EXPIRED 0 into streamt2 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql_error create stream streams3 trigger force_window_close IGNORE UPDATE 0 into streamt3 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 into streamt2 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql create stream streams3 trigger force_window_close IGNORE UPDATE 0 into streamt3 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);
sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 into streamt4 as select _irowts, _isfilled as a1, interp(a) as a2 from st partition by tbname every(1s) fill(prev);

View File

@ -26,7 +26,7 @@ sql_error create stream streams8 trigger force_window_close IGNORE EXPIRED 1 IGN
sql_error create stream streams9 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 as select _wstart, elapsed(ts) from st partition by tbname,ta interval(2s) fill(prev);
sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st partition by tbname,ta interval(2s) SLIDING(1s);
sql create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st partition by tbname,ta interval(2s) SLIDING(1s);
sql create stream streams11 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 as select _wstart, avg(a) from st partition by tbname,ta interval(2s) SLIDING(2s);
sql_error create stream streams10 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 as select _wstart, sum(a) from st interval(2s);

View File

@ -81,29 +81,29 @@ class TDTestCase:
# create error stream
tdLog.info("create error stream")
sleep(10)
tdSql.error(
f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
tdSql.execute(
f"create stream itp_force_error_0 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_0_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.execute(
f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.error(
f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
f"create stream itp_force_error_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_2_s as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.error(
f"create stream itp_force_error_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
f"create stream itp_force_error_3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_3_s as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.error(
f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
f"create stream itp_1d_next_error_0 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t0 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
f"create stream itp_1d_next_error_2 trigger window_close FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t2 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
tdSql.error(
f"create stream itp_1d_next_error_1 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t1 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
f"create stream itp_1d_next_error_3 trigger max_delay 5s FILL_HISTORY 1 IGNORE EXPIRED 1 IGNORE UPDATE 1 into itp_1d_next_error_t3 as select _irowts,tbname,_isfilled,interp(current) from {self.stb_name} where groupid=100 partition by every(5s) fill(next) ;"
)
# function name : interp