diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 306d6a0239..87bd6d2187 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -14,6 +14,7 @@ */ #include "streamInt.h" +#include "ttime.h" static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { *pSubmit = NULL; @@ -330,7 +331,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge .intervalUnit = pInterval->intervalUnit, .slidingUnit = pInterval->slidingUnit}; - ts = taosGetTimestampMs(); + ts = taosGetTimestamp(pInterval->precision); if (pLatestWindow->skey == INT64_MIN) { STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 8c79abfd02..72d1859197 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -24,7 +24,7 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delay = 0; int32_t code = 0; const char* id = pTask->id.idStr; - int64_t* pTaskRefId = NULL; + int64_t* pTaskRefId = NULL; if (pTask->info.fillHistory == 1) { return; @@ -41,7 +41,6 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { return; } - pTask->status.latestForceWindow = lastTimeWindow; pTask->info.delaySchedParam = interval.sliding; pTask->info.watermark = waterMark; pTask->info.interval = interval; @@ -51,9 +50,17 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now); delay = (curWin.ekey + 1) - now + waterMark; + if (lastTimeWindow.skey == INT64_MIN) { // start from now, not the exec task timestamp after delay + pTask->status.latestForceWindow.skey = curWin.skey - pTask->info.interval.interval; + pTask->status.latestForceWindow.ekey = now; + } else { + pTask->status.latestForceWindow = lastTimeWindow; + } + stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 - " unit:%c, initial start after:%" PRId64, - id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay); + " unit:%c, initial start after:%" PRId64" last_win:%"PRId64"-%"PRId64, + id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay, pTask->status.latestForceWindow.skey, + pTask->status.latestForceWindow.ekey); } else { delay = pTask->info.delaySchedParam; if (delay == 0) { @@ -199,16 +206,11 @@ void streamTaskSchedHelper(void* param, void* tmrId) { return; } - if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000); - streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, - "sched-run-tmr"); - } - if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t num = 0; SStreamTrigger* pTrigger = NULL; while (1) { @@ -227,6 +229,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { goto _end; } + num += 1; + // check whether the time window gaps exist or not int64_t now = taosGetTimestamp(pTask->info.interval.precision); int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; @@ -235,13 +239,14 @@ void streamTaskSchedHelper(void* param, void* tmrId) { STimeWindow w = pTrigger->pBlock->info.window; w.ekey = w.skey + pTask->info.interval.interval; if (w.skey <= pTask->status.latestForceWindow.skey) { - stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64 + stFatal("s-task:%s invalid new time window in force_window_close trigger model, skey:%" PRId64 " should be greater than latestForceWindow skey:%" PRId64, pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey); } pTask->status.latestForceWindow = w; if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) { + stDebug("s-task:%s generate %d time window(s)", id, num); break; } else { stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);