From 5c0b8ea804fbf84f934a1bc36dd5f2a081ea26c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 19 Sep 2023 09:53:33 +0800 Subject: [PATCH] enh(stream): log the checkpoint time. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 13 +++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a55d188978..5329da2f17 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -247,6 +247,7 @@ typedef struct SStreamTaskId { } SStreamTaskId; typedef struct SCheckpointInfo { + int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version int64_t nextProcessVer; // current offset in WAL, not serialize it diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a48f74ce86..3f8b69785d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + pTask->chkInfo.startTs = taosGetTimestampMs(); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. @@ -312,15 +313,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); + double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0; + if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); - qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, - pTask->checkpointingId); + qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId, + el, pTask->checkpointingId); } else { - qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, - pTask->id.idStr, remain, pMeta->numOfStreamTasks); + qDebug( + "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not " + "ready:%d/%d", + pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks); } // send check point response to upstream task