enh(stream): log the checkpoint time.

This commit is contained in:
Haojun Liao 2023-09-19 09:53:33 +08:00
parent b95ad74c7f
commit 5c0b8ea804
2 changed files with 10 additions and 4 deletions

View File

@ -247,6 +247,7 @@ typedef struct SStreamTaskId {
} SStreamTaskId; } SStreamTaskId;
typedef struct SCheckpointInfo { typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId; int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version int64_t checkpointVer; // latest checkpointId version
int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t nextProcessVer; // current offset in WAL, not serialize it

View File

@ -141,6 +141,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->status.taskStatus = TASK_STATUS__CK; pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointingId = pReq->checkpointId; pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into
// inputQ, to make sure all blocks with less version have been handled by this task already. // inputQ, to make sure all blocks with less version have been handled by this task already.
@ -312,15 +313,19 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0); ASSERT(remain >= 0);
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
if (remain == 0) { // all tasks are ready if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId); streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, elapsed time:%.2f Sec checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId); el, pTask->checkpointingId);
} else { } else {
qDebug("vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, not ready:%d/%d", pMeta->vgId, qDebug(
pTask->id.idStr, remain, pMeta->numOfStreamTasks); "vgId:%d vnode wide tasks not reach checkpoint ready status, ready s-task:%s, elapsed time:%.2f Sec not "
"ready:%d/%d",
pMeta->vgId, pTask->id.idStr, el, remain, pMeta->numOfStreamTasks);
} }
// send check point response to upstream task // send check point response to upstream task