From b74e98b25c11d40a290d03988dd848d6a75319c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Dec 2023 14:17:46 +0800 Subject: [PATCH] fix(stream): set the correct split timestamp for session/state window. --- source/dnode/mnode/impl/src/mndScheduler.c | 6 +++--- source/libs/parser/src/parTranslater.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3ef4c9a4d2..39121cecd9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -251,7 +251,7 @@ int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, in static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory, - bool hasExtraSink, int64_t firstWindowSkey, bool hasFillHistory) { + bool hasExtraSink, int64_t nextWindowSkey, bool hasFillHistory) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory); if (pTask == NULL) { @@ -262,7 +262,7 @@ static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, STimeWindow* pWindow = &pTask->dataRange.window; pWindow->skey = INT64_MIN; - pWindow->ekey = firstWindowSkey - 1; + pWindow->ekey = nextWindowSkey - 1; mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); // sink or dispatch @@ -382,7 +382,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t ui epsetAssign(&(pTask)->info.mnodeEpset, pEpset); - // todo set the correct ts, which should be last key of queried table. + // set the correct ts, which is the last key of queried table. STimeWindow* pWindow = &pTask->dataRange.window; pWindow->skey = INT64_MIN; pWindow->ekey = nextWindowSkey - 1; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6f3915c4d7..cf6db9f549 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7991,7 +7991,7 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void if (interval.interval > 0) { pStmt->pReq->lastTs = taosTimeAdd(taosTimeTruncate(lastTs, &interval), interval.interval, interval.intervalUnit, interval.precision); } else { - pStmt->pReq->lastTs = lastTs; + pStmt->pReq->lastTs = lastTs + 1; // start key of the next time window } code = buildCmdMsg(&cxt, TDMT_MND_CREATE_STREAM, (FSerializeFunc)tSerializeSCMCreateStreamReq, pStmt->pReq); }