From 58886a026406c2e7fcc414ef5a963cc2e50e1f88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Nov 2024 17:48:02 +0800 Subject: [PATCH 1/4] fix(stream): the timestamp when the stream is created is set to be the initial force_window_close start time. --- source/libs/stream/src/streamData.c | 3 ++- source/libs/stream/src/streamSched.c | 27 ++++++++++++++++----------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 306d6a0239..87bd6d2187 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -14,6 +14,7 @@ */ #include "streamInt.h" +#include "ttime.h" static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { *pSubmit = NULL; @@ -330,7 +331,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge .intervalUnit = pInterval->intervalUnit, .slidingUnit = pInterval->slidingUnit}; - ts = taosGetTimestampMs(); + ts = taosGetTimestamp(pInterval->precision); if (pLatestWindow->skey == INT64_MIN) { STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 8c79abfd02..72d1859197 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -24,7 +24,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delay = 0; int32_t code = 0; const char* id = pTask->id.idStr; - int64_t* pTaskRefId = NULL; + int64_t* pTaskRefId = NULL; if (pTask->info.fillHistory == 1) { return; @@ -41,7 +41,6 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { return; } - pTask->status.latestForceWindow = lastTimeWindow; pTask->info.delaySchedParam = interval.sliding; pTask->info.watermark = waterMark; pTask->info.interval = interval; @@ -51,9 +50,17 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now); delay = (curWin.ekey + 1) - now + waterMark; + if (lastTimeWindow.skey == INT64_MIN) { // start from now, not the exec task timestamp after delay + pTask->status.latestForceWindow.skey = curWin.skey - pTask->info.interval.interval; + pTask->status.latestForceWindow.ekey = now; + } else { + pTask->status.latestForceWindow = lastTimeWindow; + } + stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 - " unit:%c, initial start after:%" PRId64, - id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay); + " unit:%c, initial start after:%" PRId64" last_win:%"PRId64"-%"PRId64, + id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay, pTask->status.latestForceWindow.skey, + pTask->status.latestForceWindow.ekey); } else { delay = pTask->info.delaySchedParam; if (delay == 0) { @@ -199,16 +206,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) { return; } - if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000); - streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, - "sched-run-tmr"); - } - if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t num = 0; SStreamTrigger* pTrigger = NULL; while (1) { @@ -227,6 +229,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { goto _end; } + num += 1; + // check whether the time window gaps exist or not int64_t now = taosGetTimestamp(pTask->info.interval.precision); int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; @@ -235,13 +239,14 @@ void streamTaskSchedHelper(void* param, void* tmrId) { STimeWindow w = pTrigger->pBlock->info.window; w.ekey = w.skey + pTask->info.interval.interval; if (w.skey <= pTask->status.latestForceWindow.skey) { - stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64 + stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64 " should be greater than latestForceWindow skey:%" PRId64, pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey); } pTask->status.latestForceWindow = w; if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) { + stDebug("s-task:%s generate %d time window(s)", id, num); break; } else { stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); From ec71572d749bbbc1a9ab965f69619636d5f1ddde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Nov 2024 14:52:39 +0800 Subject: [PATCH 2/4] refactor: do some internal refactor. --- source/libs/stream/src/streamData.c | 30 ++--- source/libs/stream/src/streamSched.c | 166 +++++++++++++++++---------- 2 files changed, 114 insertions(+), 82 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 87bd6d2187..5c699ad842 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -308,13 +308,17 @@ void streamFreeQitem(SStreamQueueItem* data) { } } -int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) { +int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t interval, SInterval* pInterval, + STimeWindow* pLatestWindow, const char* id) { QRY_PARAM_CHECK(pTrigger); - int64_t ts = INT64_MIN; + SStreamTrigger* p = NULL; + int64_t ts = taosGetTimestamp(pInterval->precision); + int64_t skey = pLatestWindow->skey + interval; int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p); if (code) { + stError("s-task:%s failed to create force_window trigger, code:%s", id, tstrerror(code)); return code; } @@ -325,26 +329,10 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge return terrno; } - // let's calculate the previous time window - SInterval interval = {.interval = trigger, - .sliding = trigger, - .intervalUnit = pInterval->intervalUnit, - .slidingUnit = pInterval->slidingUnit}; - - ts = taosGetTimestamp(pInterval->precision); - - if (pLatestWindow->skey == INT64_MIN) { - STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger); - - p->pBlock->info.window.skey = window.skey; - p->pBlock->info.window.ekey = TMAX(ts, window.ekey); - } else { - int64_t skey = pLatestWindow->skey + trigger; - p->pBlock->info.window.skey = skey; - p->pBlock->info.window.ekey = TMAX(ts, skey + trigger); - } - + p->pBlock->info.window.skey = skey; + p->pBlock->info.window.ekey = TMAX(ts, skey + interval); p->pBlock->info.type = STREAM_GET_RESULT; + stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id, p->pBlock->info.window.skey, p->pBlock->info.window.ekey); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 72d1859197..468c5f2139 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -17,6 +17,9 @@ #include "streamInt.h" #include "ttimer.h" +#define TRIGGER_RECHECK_INTERVAL (5 * 1000) +#define INITIAL_TRIGGER_INTERVAL (120 * 1000) + static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); @@ -32,8 +35,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { // dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) { - int64_t waterMark = 0; - SInterval interval = {0}; + int64_t waterMark = 0; + SInterval interval = {0}; STimeWindow lastTimeWindow = {0}; code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval, &lastTimeWindow); if (code) { @@ -46,21 +49,32 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { pTask->info.interval = interval; // calculate the first start timestamp - int64_t now = taosGetTimestamp(interval.precision); + int64_t now = taosGetTimestamp(interval.precision); STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now); - delay = (curWin.ekey + 1) - now + waterMark; if (lastTimeWindow.skey == INT64_MIN) { // start from now, not the exec task timestamp after delay pTask->status.latestForceWindow.skey = curWin.skey - pTask->info.interval.interval; pTask->status.latestForceWindow.ekey = now; + + delay = (curWin.ekey + 1) - now + waterMark; + delay = convertTimePrecision(delay, interval.precision, TSDB_TIME_PRECISION_MILLI); + } else { pTask->status.latestForceWindow = lastTimeWindow; + // It's the current calculated time window + int64_t calEkey = lastTimeWindow.skey + pTask->info.interval.interval * 2; + if (calEkey + waterMark < now) { // unfinished time window existed + delay = INITIAL_TRIGGER_INTERVAL; // wait for 2min to start to calculate + } else { + delay = (curWin.ekey + 1) - now + waterMark; + delay = convertTimePrecision(delay, interval.precision, TSDB_TIME_PRECISION_MILLI); + } } stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 - " unit:%c, initial start after:%" PRId64" last_win:%"PRId64"-%"PRId64, - id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay, pTask->status.latestForceWindow.skey, - pTask->status.latestForceWindow.ekey); + " unit:%c, initial start after:%" PRId64 "ms last_win:%" PRId64 "-%" PRId64, + id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay, + pTask->status.latestForceWindow.skey, pTask->status.latestForceWindow.ekey); } else { delay = pTask->info.delaySchedParam; if (delay == 0) { @@ -73,8 +87,8 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { stDebug("s-task:%s refId:%" PRId64 " enable the scheduler trigger, delay:%" PRId64, pTask->id.idStr, pTask->id.refId, delay); - streamTmrStart(streamTaskSchedHelper, (int32_t)delay, pTaskRefId, streamTimer, - &pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr"); + streamTmrStart(streamTaskSchedHelper, (int32_t)delay, pTaskRefId, streamTimer, &pTask->schedInfo.pDelayTimer, + pTask->pMeta->vgId, "sched-tmr"); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } } @@ -172,7 +186,62 @@ void streamTaskResumeHelper(void* param, void* tmrId) { streamTaskFreeRefId(param); } +static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrigger) { + int32_t num = 0; + int32_t code = 0; + const char* id = pTask->id.idStr; + int8_t precision = pTask->info.interval.precision; + SStreamTrigger* pTrigger = NULL; + + while (1) { + code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, + &pTask->status.latestForceWindow, id); + if (code != 0) { + *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id, + tstrerror(code), *pNextTrigger); + return code; + } + + // in the force window close model, status trigger does not matter. So we do not set the trigger model + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + num += 1; + + // check whether the time window gaps exist or not + int64_t now = taosGetTimestamp(precision); + int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; + + // there are gaps, needs to be filled + STimeWindow w = pTrigger->pBlock->info.window; + w.ekey = w.skey + pTask->info.interval.interval; + if (w.skey <= pTask->status.latestForceWindow.skey) { + stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64 + " should be greater than latestForceWindow skey:%" PRId64, + pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey); + } + + pTask->status.latestForceWindow = w; + if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) { + int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + + *pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now; + *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev, + *pNextTrigger); + return code; + } else { + stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); + } + } +} + void streamTaskSchedHelper(void* param, void* tmrId) { + int32_t code = 0; int64_t taskRefId = *(int64_t*)param; SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); if (pTask == NULL) { @@ -181,15 +250,20 @@ void streamTaskSchedHelper(void* param, void* tmrId) { return; } - stDebug("s-task:%s acquire task, refId:%"PRId64, pTask->id.idStr, pTask->id.refId); + stDebug("s-task:%s acquire task, refId:%" PRId64, pTask->id.idStr, pTask->id.refId); - const char* id = pTask->id.idStr; - int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam; - int32_t vgId = pTask->pMeta->vgId; - int32_t code = 0; + const char* id = pTask->id.idStr; + int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam; + int32_t vgId = pTask->pMeta->vgId; int8_t status = atomic_load_8(&pTask->schedInfo.status); - stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); + + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t next = convertTimePrecision(nextTrigger, pTask->info.interval.precision, TSDB_TIME_PRECISION_MILLI); + stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, next); + } else { + stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); + } if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, jump out of schedTimer", id); @@ -199,60 +273,30 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s is paused, recheck in %.2fs", id, nextTrigger/1000.0); - streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, - "sched-run-tmr"); + stDebug("s-task:%s is paused, recheck in %.2fs", id, TRIGGER_RECHECK_INTERVAL / 1000.0); + streamTmrStart(streamTaskSchedHelper, TRIGGER_RECHECK_INTERVAL, param, streamTimer, &pTask->schedInfo.pDelayTimer, + vgId, "sched-run-tmr"); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (pTask->status.downstreamReady == 0) { + stDebug("s-task:%s downstream not ready, recheck in %.2fs", id, TRIGGER_RECHECK_INTERVAL / 1000.0); + streamTmrStart(streamTaskSchedHelper, TRIGGER_RECHECK_INTERVAL, param, streamTimer, &pTask->schedInfo.pDelayTimer, + vgId, "sched-run-tmr"); streamMetaReleaseTask(pTask->pMeta, pTask); return; } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { - stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); + nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec + stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL); } else { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t num = 0; - SStreamTrigger* pTrigger = NULL; - - while (1) { - code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, - &pTask->status.latestForceWindow, id); - if (code != 0) { - stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id, - tstrerror(code), nextTrigger); - goto _end; - } - - // in the force window close model, status trigger does not matter. So we do not set the trigger model - code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code)); - goto _end; - } - - num += 1; - - // check whether the time window gaps exist or not - int64_t now = taosGetTimestamp(pTask->info.interval.precision); - int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; - - // there are gaps, needs to be filled - STimeWindow w = pTrigger->pBlock->info.window; - w.ekey = w.skey + pTask->info.interval.interval; - if (w.skey <= pTask->status.latestForceWindow.skey) { - stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64 - " should be greater than latestForceWindow skey:%" PRId64, - pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey); - } - - pTask->status.latestForceWindow = w; - if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) { - stDebug("s-task:%s generate %d time window(s)", id, num); - break; - } else { - stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); - } + code = doCreateForceWindowTrigger(pTask, &nextTrigger); + if (code != TSDB_CODE_SUCCESS) { + goto _end; } - } else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) { SStreamTrigger* pTrigger = NULL; code = streamCreateSinkResTrigger(&pTrigger); From 05e9032fb90a1d20feef96cd97c0c6c492fee649 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Nov 2024 15:29:43 +0800 Subject: [PATCH 3/4] test(stream): add test cases. --- tests/script/tsim/stream/forcewindowclose.sim | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/stream/forcewindowclose.sim b/tests/script/tsim/stream/forcewindowclose.sim index 77def52b3c..f591f91ab9 100644 --- a/tests/script/tsim/stream/forcewindowclose.sim +++ b/tests/script/tsim/stream/forcewindowclose.sim @@ -18,7 +18,47 @@ sql create stable st(ts timestamp, a int) tags(t int); sql create table tu1 using st tags(1); sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s); +run tsim/stream/checkTaskStatus.sim +sql insert into tu1 values(now, 1); +sleep 5500 + +sql pause stream stream1 + +$loop_count = 0 + +loop1: +sleep 500 +$loop_count = $loop_count + 1 +if $loop_count == 20 then + goto end_loop1 +endi + +sql insert into tu1 values(now, 1); +goto loop1 + +end_loop1: +sql resume stream stream1 +sleep 5000 + +sql select * from str_dst + +if $rows != 4 then + print expect 4, actual: $rows + return -1 +endi + +sql drop database test + +print ============ test on micro precision db +print ============ create db +sql create database test vgroups 2 precision 'us'; + +sql use test +sql create stable st(ts timestamp, a int) tags(t int); +sql create table tu1 using st tags(1); + +sql create stream stream1 trigger force_window_close into str_dst as select _wstart, count(*) from st partition by tbname interval(5s); run tsim/stream/checkTaskStatus.sim sql insert into tu1 values(now, 1); @@ -41,10 +81,13 @@ goto loop0 end_loop: sql resume stream stream1 +sleep 5000 + sql select * from str_dst -if $rows != 3 then - print expect 3, actual: $rows +if $rows != 4 then + print expect 4, actual: $rows + return -1 endi system sh/exec.sh -n dnode1 -s stop -x SIGINT From 6580e7751f1d21e11d99c92a85ced288f8de5afa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 Nov 2024 16:57:11 +0800 Subject: [PATCH 4/4] test: add test cases. --- tests/script/tsim/stream/forcewindowclose.sim | 61 ++++++++++++++++--- 1 file changed, 53 insertions(+), 8 deletions(-) diff --git a/tests/script/tsim/stream/forcewindowclose.sim b/tests/script/tsim/stream/forcewindowclose.sim index f591f91ab9..ab54278e39 100644 --- a/tests/script/tsim/stream/forcewindowclose.sim +++ b/tests/script/tsim/stream/forcewindowclose.sim @@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -print =============== create database +print ========================================== create database sql create database test vgroups 2; sql select * from information_schema.ins_databases if $rows != 3 then @@ -41,16 +41,16 @@ end_loop1: sql resume stream stream1 sleep 5000 -sql select * from str_dst +sql select sum(`count(*)`) from (select * from str_dst) -if $rows != 4 then - print expect 4, actual: $rows +if $data00 != 20 then + print expect 20, actual: $data00 return -1 endi sql drop database test -print ============ test on micro precision db +print ===================================== micro precision db test print ============ create db sql create database test vgroups 2 precision 'us'; @@ -83,10 +83,55 @@ end_loop: sql resume stream stream1 sleep 5000 -sql select * from str_dst +sql select sum(`count(*)`) from (select * from str_dst) -if $rows != 4 then - print expect 4, actual: $rows +if $data00 != 20 then + print expect 20, actual: $data00 + return -1 +endi + +sql drop stream stream1 +sql drop table str_dst + +print ============================= too long watermark test +sql drop table tu1; +sql create table tu1 using st tags(1); +sql create stream stream2 trigger force_window_close watermark 30s into str_dst as select _wstart, count(*), now() from st partition by tbname interval(5s); +run tsim/stream/checkTaskStatus.sim + +$loop_count = 0 + +loop2: +sleep 500 +$loop_count = $loop_count + 1 +if $loop_count == 20 then + goto end_loop3 +endi + +sql insert into tu1 values(now, 1); +goto loop2 + +end_loop3: + +sql select count(*) from str_dst +print =================rows: $data00 + +if $data00 != 0 then + print expect 0, actual $data00 + return -1 +endi + +sleep 35000 + +sql select sum(`count(*)`) from (select * from str_dst) +if $data00 != 19 then + print expect 19, actual: $data00 + return -1 +endi + +sql select round(timediff(`now()`, `_wstart`)/1000000) from str_dst; +if $data00 != 35.000000000 then + print expect 35.000000000 , actual $data00 return -1 endi