From c0932d419d53768c7f0a3565364b8b0e837aa996 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Oct 2024 18:07:11 +0800 Subject: [PATCH 1/4] fix(stream): fix error in pause/resume for force_window_close --- include/libs/stream/tstream.h | 5 +- source/libs/stream/inc/streamInt.h | 4 +- source/libs/stream/src/streamCheckStatus.c | 2 +- source/libs/stream/src/streamData.c | 52 +++++++++++++---- source/libs/stream/src/streamQueue.c | 2 +- source/libs/stream/src/streamSched.c | 65 ++++++++++++++++++---- 6 files changed, 103 insertions(+), 27 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d018682a10..3be581a7a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -113,7 +113,7 @@ enum { enum { TASK_TRIGGER_STATUS__INACTIVE = 1, - TASK_TRIGGER_STATUS__ACTIVE, + TASK_TRIGGER_STATUS__MAY_ACTIVE, }; typedef enum { @@ -294,9 +294,10 @@ typedef struct SStreamStatus { int32_t timerActive; // timer is active int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; - bool appendTranstateBlock; // has append the transfer state data block already + bool appendTranstateBlock; // has appended the transfer state data block already bool removeBackendFiles; // remove backend files on disk when free stream tasks SConsenChkptInfo consenChkptInfo; + STimeWindow latestForceWindow; // latest generated time window, only valid in } SStreamStatus; typedef struct SDataRange { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 83c6625526..6d49e6b10c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -238,7 +238,9 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 int64_t checkpointId, SRpcMsg* pMsg); int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); -int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger); +int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); +int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, + STimeWindow* pLatestWindow); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index c1c54b3c0b..f830a8e3cf 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -749,7 +749,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, true, id); - // not record the failed of the current task if try to close current vnode + // not record the failure of the current task if try to close current vnode // otherwise, the put of message operation may incur invalid read of message queue. if (!pMeta->closeFlag) { int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index a315c9c726..4bbff3ad21 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -307,8 +307,9 @@ void streamFreeQitem(SStreamQueueItem* data) { } } -int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) { +int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow) { QRY_PARAM_CHECK(pTrigger); + int64_t ts = INT64_MIN; SStreamTrigger* p = NULL; int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p); @@ -324,22 +325,49 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp } // let's calculate the previous time window - // todo get the time precision for ts - if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { - SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'}; - int64_t now = taosGetTimestampMs(); + SInterval interval = {.interval = trigger, + .sliding = trigger, + .intervalUnit = pInterval->intervalUnit, + .slidingUnit = pInterval->slidingUnit}; + + ts = taosGetTimestampMs(); + + if (pLatestWindow->skey == INT64_MIN) { + STimeWindow window = getAlignQueryTimeWindow(&interval, ts); - STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger); p->pBlock->info.window.skey = window.skey; - p->pBlock->info.window.ekey = TMAX(now, window.ekey); - p->pBlock->info.type = STREAM_GET_RESULT; - stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, - p->pBlock->info.window.skey, p->pBlock->info.window.ekey); + p->pBlock->info.window.ekey = TMAX(ts, window.ekey); } else { - p->pBlock->info.type = STREAM_GET_ALL; + int64_t skey = pLatestWindow->skey + trigger; + p->pBlock->info.window.skey = skey; + p->pBlock->info.window.ekey = TMAX(ts, skey + trigger); } - *pTrigger = p; + p->pBlock->info.type = STREAM_GET_RESULT; + stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, p->pBlock->info.window.skey, + p->pBlock->info.window.ekey); + *pTrigger = p; + return code; +} + +int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger) { + QRY_PARAM_CHECK(pTrigger); + SStreamTrigger* p = NULL; + + int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p); + if (code) { + return code; + } + + p->type = STREAM_INPUT__GET_RES; + p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (p->pBlock == NULL) { + taosFreeQitem(p); + return terrno; + } + + p->pBlock->info.type = STREAM_GET_ALL; + *pTrigger = p; return code; } \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 6af6ebd044..c1d70f4259 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -352,7 +352,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.delaySchedParam != 0)) { (void)atomic_val_compare_exchange_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE, - TASK_TRIGGER_STATUS__ACTIVE); + TASK_TRIGGER_STATUS__MAY_ACTIVE); stDebug("s-task:%s new data arrived, active the sched-trigger, triggerStatus:%d", pTask->id.idStr, pTask->schedInfo.status); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index cfb033fb71..042d082ed9 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -40,6 +40,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { return; } + pTask->status.latestForceWindow = lastTimeWindow; pTask->info.delaySchedParam = interval.sliding; pTask->info.watermark = waterMark; pTask->info.interval = interval; @@ -156,27 +157,71 @@ void streamTaskSchedHelper(void* param, void* tmrId) { const char* id = pTask->id.idStr; int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam; int32_t vgId = pTask->pMeta->vgId; + int32_t code = 0; int8_t status = atomic_load_8(&pTask->schedInfo.status); stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); - if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, jump out of schedTimer", id); return; } + if (streamTaskShouldPause(pTask)) { + stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000); + streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); + } + if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { - if ((status == TASK_TRIGGER_STATUS__ACTIVE) || - (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE)) { - SStreamTrigger* pTrigger; + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + SStreamTrigger* pTrigger = NULL; - int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam); + while (1) { + code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, + &pTask->status.latestForceWindow); + if (code != 0) { + stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id, + tstrerror(code), nextTrigger); + goto _end; + } + + // in the force window close model, status trigger does not matter. So we do not set the trigger model + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code)); + goto _end; + } + + // check whether the time window gaps exist or not + int64_t now = taosGetTimestamp(pTask->info.interval.precision); + int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; + + // there are gaps, needs to be filled + STimeWindow w = pTrigger->pBlock->info.window; + w.ekey = w.skey + pTask->info.interval.interval; + if (w.skey <= pTask->status.latestForceWindow.skey) { + stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64 + " should be greater than latestForceWindow skey:%" PRId64, + pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey); + } + + pTask->status.latestForceWindow = w; + if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) { + break; + } else { + stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); + } + } + + } else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) { + SStreamTrigger* pTrigger = NULL; + code = streamCreateSinkResTrigger(&pTrigger); if (code) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code), nextTrigger); - terrno = code; goto _end; } @@ -187,11 +232,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) { stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code)); goto _end; } + } - code = streamTrySchedExec(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr); - } + code = streamTrySchedExec(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr); } } From 570b69c30d4626fa6df8f15411615598ee61999d Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 28 Oct 2024 18:55:14 +0800 Subject: [PATCH 2/4] test:verify the output of the stream with tag filtering is correct when a new child table with matching tag conditions is created --- .../8-stream/force_window_close_interval.py | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tests/system-test/8-stream/force_window_close_interval.py b/tests/system-test/8-stream/force_window_close_interval.py index 0cad878895..8eeca329da 100644 --- a/tests/system-test/8-stream/force_window_close_interval.py +++ b/tests/system-test/8-stream/force_window_close_interval.py @@ -111,7 +111,7 @@ class TDTestCase: if partition: tdLog.info("create stream with partition by tag and tbname ") partition_elm_new = f'partition by {partition}, t1' - self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, t1 as t_t1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) partition_elm_new = f'partition by {partition}, c1' self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) partition_elm_new = f'partition by {partition}, c2' @@ -125,6 +125,7 @@ class TDTestCase: tdLog.info("create stream general table") self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + # insert data self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0] start_time = self.tdCom.date_time time.sleep(1) @@ -224,9 +225,13 @@ class TDTestCase: # get query time range using interval count windows tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ') + # getData don't support negative index end_new_ts = tdSql.getData(tdSql.queryRows-1, 1) + end_last_but_one_ts = tdSql.getData(tdSql.queryRows-2, 1) + #source data include that fill valuse is null and "_isfilled" column of the stream output is false tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ") tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ") + tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_last_but_one_ts}\",NULL) ") for i in range(self.tdCom.range_count): ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' @@ -240,6 +245,7 @@ class TDTestCase: self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value) # wait for the stream to process the data + # print(self.tdCom.dataDict["interval"]*(final_range_count+2)) time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2)) # check the data @@ -247,11 +253,13 @@ class TDTestCase: print(tbname) tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ') start_new_ts = tdSql.getData(0, 1) - if tbname != self.tb_name: + if tbname == self.ctb_name: if "value" in fill_value.lower(): fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' if partition == "tbname": - # print(self.tdCom.dataDict["interval"]*(final_range_count+2)) + self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) + elif tbname == self.stb_name: + if partition == "tbname": self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) # check tag and tbname filter self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) @@ -262,6 +270,18 @@ class TDTestCase: if partition == "tbname": self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) + # Recreate a sub-table that meets the filtering "where_tag" and check if the streaming results are automatically included within it." + where_tag_ctbname = f"{self.ctb_name}_where_tag" + tdSql.execute(f"create table {where_tag_ctbname} using {self.stb_name} (t1) tags({tag_t1_value}) ") + where_tag_timestamp = self.tdCom.genTs(precision=self.tdCom.precision)[0] + where_tag_ts_start_value = str(where_tag_timestamp)+ '+2s' + self.tdCom.sinsert_rows(tbname=where_tag_ctbname, ts_value=where_tag_ts_start_value) + time.sleep(self.tdCom.dataDict["interval"]+5) + tdSql.query(f'select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name="{where_tag_ctbname}"') + tdSql.checkEqual(tdSql.queryResult[0][0], where_tag_ctbname) + + + if self.delete: self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value) self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value) From cacc7db61cc0b8be237a523ba49b94fb62e0c6e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Oct 2024 19:33:11 +0800 Subject: [PATCH 3/4] fix(stream): set the correct start ts. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamData.c | 8 ++++---- source/libs/stream/src/streamSched.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 6d49e6b10c..190f009e88 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -240,7 +240,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, - STimeWindow* pLatestWindow); + STimeWindow* pLatestWindow, const char* id); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 4bbff3ad21..306d6a0239 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -307,7 +307,7 @@ void streamFreeQitem(SStreamQueueItem* data) { } } -int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow) { +int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) { QRY_PARAM_CHECK(pTrigger); int64_t ts = INT64_MIN; SStreamTrigger* p = NULL; @@ -333,7 +333,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge ts = taosGetTimestampMs(); if (pLatestWindow->skey == INT64_MIN) { - STimeWindow window = getAlignQueryTimeWindow(&interval, ts); + STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger); p->pBlock->info.window.skey = window.skey; p->pBlock->info.window.ekey = TMAX(ts, window.ekey); @@ -344,8 +344,8 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge } p->pBlock->info.type = STREAM_GET_RESULT; - stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, p->pBlock->info.window.skey, - p->pBlock->info.window.ekey); + stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id, + p->pBlock->info.window.skey, p->pBlock->info.window.ekey); *pTrigger = p; return code; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 042d082ed9..74cbc01617 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -181,7 +181,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { while (1) { code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, - &pTask->status.latestForceWindow); + &pTask->status.latestForceWindow, id); if (code != 0) { stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id, tstrerror(code), nextTrigger); From 4bd1e0c3cf2b62d2d20cf3c01013519f475cd0cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 00:20:04 +0800 Subject: [PATCH 4/4] test: add test cases. --- tests/script/tsim/stream/forcewindowclose.sim | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 tests/script/tsim/stream/forcewindowclose.sim diff --git a/tests/script/tsim/stream/forcewindowclose.sim b/tests/script/tsim/stream/forcewindowclose.sim new file mode 100644 index 0000000000..77def52b3c --- /dev/null +++ b/tests/script/tsim/stream/forcewindowclose.sim @@ -0,0 +1,50 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 2; +sql select * from information_schema.ins_databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test +sql create stable st(ts timestamp, a int) tags(t int); +sql create table tu1 using st tags(1); + +sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s); + +run tsim/stream/checkTaskStatus.sim + +sql insert into tu1 values(now, 1); +sleep 5500 + +sql pause stream stream1 + +$loop_count = 0 + +loop0: +sleep 500 +$loop_count = $loop_count + 1 +if $loop_count == 20 then + goto end_loop +endi + +sql insert into tu1 values(now, 1); +goto loop0 + +end_loop: + +sql resume stream stream1 +sql select * from str_dst + +if $rows != 3 then + print expect 3, actual: $rows +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT