From cacc7db61cc0b8be237a523ba49b94fb62e0c6e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Oct 2024 19:33:11 +0800 Subject: [PATCH] fix(stream): set the correct start ts. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamData.c | 8 ++++---- source/libs/stream/src/streamSched.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 6d49e6b10c..190f009e88 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -240,7 +240,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger); int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, - STimeWindow* pLatestWindow); + STimeWindow* pLatestWindow, const char* id); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 4bbff3ad21..306d6a0239 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -307,7 +307,7 @@ void streamFreeQitem(SStreamQueueItem* data) { } } -int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow) { +int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) { QRY_PARAM_CHECK(pTrigger); int64_t ts = INT64_MIN; SStreamTrigger* p = NULL; @@ -333,7 +333,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge ts = taosGetTimestampMs(); if (pLatestWindow->skey == INT64_MIN) { - STimeWindow window = getAlignQueryTimeWindow(&interval, ts); + STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger); p->pBlock->info.window.skey = window.skey; p->pBlock->info.window.ekey = TMAX(ts, window.ekey); @@ -344,8 +344,8 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge } p->pBlock->info.type = STREAM_GET_RESULT; - stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, p->pBlock->info.window.skey, - p->pBlock->info.window.ekey); + stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id, + p->pBlock->info.window.skey, p->pBlock->info.window.ekey); *pTrigger = p; return code; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 042d082ed9..74cbc01617 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -181,7 +181,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { while (1) { code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, - &pTask->status.latestForceWindow); + &pTask->status.latestForceWindow, id); if (code != 0) { stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id, tstrerror(code), nextTrigger);