fix(stream): set the correct start ts.

This commit is contained in:
Haojun Liao 2024-10-28 19:33:11 +08:00
parent c0932d419d
commit cacc7db61c
3 changed files with 6 additions and 6 deletions

View File

@ -240,7 +240,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger);
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval,
STimeWindow* pLatestWindow);
STimeWindow* pLatestWindow, const char* id);
#ifdef __cplusplus
}

View File

@ -307,7 +307,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
}
}
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow) {
int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigger, SInterval* pInterval, STimeWindow* pLatestWindow, const char* id) {
QRY_PARAM_CHECK(pTrigger);
int64_t ts = INT64_MIN;
SStreamTrigger* p = NULL;
@ -333,7 +333,7 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge
ts = taosGetTimestampMs();
if (pLatestWindow->skey == INT64_MIN) {
STimeWindow window = getAlignQueryTimeWindow(&interval, ts);
STimeWindow window = getAlignQueryTimeWindow(&interval, ts - trigger);
p->pBlock->info.window.skey = window.skey;
p->pBlock->info.window.ekey = TMAX(ts, window.ekey);
@ -344,8 +344,8 @@ int32_t streamCreateForcewindowTrigger(SStreamTrigger** pTrigger, int32_t trigge
}
p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, p->pBlock->info.window.skey,
p->pBlock->info.window.ekey);
stDebug("s-task:%s force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, id,
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
*pTrigger = p;
return code;

View File

@ -181,7 +181,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
while (1) {
code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
&pTask->status.latestForceWindow);
&pTask->status.latestForceWindow, id);
if (code != 0) {
stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
tstrerror(code), nextTrigger);