diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d22511a41e..05ccc2f5c3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -84,7 +84,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); if (code) { - return code; } } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 7150b322da..64d79d6b42 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -332,6 +332,7 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger); p->pBlock->info.window = window; p->pBlock->info.type = STREAM_GET_RESULT; + stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, window.skey, window.ekey); } else { p->pBlock->info.type = STREAM_GET_ALL; } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 9b097696c5..2bdd85cec6 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "ttime.h" #include "streamInt.h" #include "ttimer.h" @@ -20,9 +21,10 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - int64_t delay = 0; - int32_t code = 0; + int64_t delay = 0; + int32_t code = 0; const char* id = pTask->id.idStr; + if (pTask->info.fillHistory == 1) { return; } @@ -32,21 +34,28 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t waterMark = 0; SInterval interval = {0}; code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval); - if (code == 0) { - pTask->info.delaySchedParam = interval.sliding; - pTask->info.watermark = waterMark; - pTask->info.interval = interval; + if (code) { + stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code)); + return; } - // todo: calculate the correct start delay time for force_window_close - delay = pTask->info.delaySchedParam; - stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 - " unit:%c ", - id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit); - } + pTask->info.delaySchedParam = interval.sliding; + pTask->info.watermark = waterMark; + pTask->info.interval = interval; - if (delay == 0) { - return; + // calculate the first start timestamp + int64_t now = taosGetTimestamp(interval.precision); + STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now); + delay = (curWin.ekey + 1) - now + waterMark; + + stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 + " unit:%c, initial start after:%" PRId64, + id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay); + } else { + delay = pTask->info.delaySchedParam; + if (delay == 0) { + return; + } } int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); @@ -153,7 +162,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger); } else { - if (status == TASK_TRIGGER_STATUS__ACTIVE) { + if ((status == TASK_TRIGGER_STATUS__ACTIVE) || + (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE)) { SStreamTrigger* pTrigger; int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam); @@ -168,7 +178,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code)); + stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code)); goto _end; }