From cacbb4e3692dae65fa880eddd015f385aa4d9aab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jan 2024 17:40:35 +0800 Subject: [PATCH] fix(stream): save the checkpoint info for sink tasks. --- source/libs/stream/src/streamCheckpoint.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cb3f7a3504..60e4d74b43 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -330,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task - if (p->info.taskLevel < TASK_LEVEL__SINK) { + if (p->info.taskLevel <= TASK_LEVEL__SINK) { streamMetaWLock(pMeta); code = streamMetaSaveTask(pMeta, p); @@ -455,7 +455,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel); + stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));