diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 0436ae7ee4..1d0c882003 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig const char* id = pTask->id.idStr; int8_t precision = pTask->info.interval.precision; SStreamTrigger* pTrigger = NULL; + bool isFull = false; while (1) { code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, @@ -225,13 +226,15 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || - streamQueueIsFull(pTask->inputq.queue)) { + isFull = streamQueueIsFull(pTask->inputq.queue); + + if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) { int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + if (!isFull) { + *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; + } - *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); - pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval; stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id, num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);