fix(stream): save the checkpoint info for sink tasks.

This commit is contained in:
Haojun Liao 2024-01-15 17:40:35 +08:00
parent bc657dc2d8
commit cacbb4e369
1 changed files with 2 additions and 2 deletions

View File

@ -330,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
// save the task if not sink task
if (p->info.taskLevel < TASK_LEVEL__SINK) {
if (p->info.taskLevel <= TASK_LEVEL__SINK) {
streamMetaWLock(pMeta);
code = streamMetaSaveTask(pMeta, p);
@ -455,7 +455,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel);
stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
code = streamBackendDoCheckpoint(pTask->pBackend, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));