From 3baddbe67aa1c9fceb88527d5d725021ff19dc8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 23 Sep 2023 18:05:14 +0800 Subject: [PATCH] fix(stream): update the checkpoint record start time. --- source/libs/stream/src/streamCheckpoint.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 103302e75d..735136ba5b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -306,7 +306,6 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { return TSDB_CODE_SUCCESS; } -// todo: handle the case: during the checkpoint procedure, leader/follower changes happened. int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = 0; @@ -316,9 +315,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { ASSERT(remain >= 0); double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; + pTask->chkInfo.startTs = 0; // clear the recorded start time if (remain == 0) { // all tasks are ready - pTask->chkInfo.startTs = 0; // clear the recorded start time stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);