fix(stream): adjust schedule interval.

This commit is contained in:
Haojun Liao 2024-12-08 00:20:28 +08:00
parent cea647daf6
commit b715df1ac4
1 changed files with 7 additions and 4 deletions

View File

@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
const char* id = pTask->id.idStr;
int8_t precision = pTask->info.interval.precision;
SStreamTrigger* pTrigger = NULL;
bool isFull = false;
while (1) {
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
@ -225,13 +226,15 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
}
pTask->status.latestForceWindow = w;
if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) ||
streamQueueIsFull(pTask->inputq.queue)) {
isFull = streamQueueIsFull(pTask->inputq.queue);
if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) {
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
if (!isFull) {
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
}
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
*pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval;
stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id,
num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);