diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1c9361fa61..fb9d111124 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -294,11 +294,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { int32_t code = 0; SCheckpointInfo* pCKInfo = &p->chkInfo; - if (p->info.fillHistory == 1) { - return code; - } - - if (p->info.taskLevel > TASK_LEVEL__SINK) { + // fill-history task, rsma task, and sink task will not generate the checkpoint + if ((p->info.fillHistory == 1) || (p->info.taskLevel >= TASK_LEVEL__SINK)) { return code; } @@ -333,7 +330,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task - if (p->info.taskLevel != TASK_LEVEL__SINK) { + if (p->info.taskLevel < TASK_LEVEL__SINK) { streamMetaWLock(pMeta); code = streamMetaSaveTask(pMeta, p); @@ -451,16 +448,17 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel); + stDebug("s-task:%s level:%d start gen checkpoint", id, pTask->info.taskLevel); code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno)); + stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); } } @@ -474,39 +472,38 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, - ckId, tstrerror(code)); + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId, + tstrerror(code)); } } // clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully if (code == TSDB_CODE_SUCCESS) { code = streamSaveTaskCheckpointInfo(pTask, ckId); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, - tstrerror(code)); - } else { - code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr); - if (code != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskUploadChkp(pTask, ckId, (char*)id); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); } + } else { + stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); } } - if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed + // clear the checkpoint info if failed + if (code != TSDB_CODE_SUCCESS) { taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); streamTaskSetCheckpointFailedId(pTask); - stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, pTask->id.idStr, - ckId); + stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } double el = (taosGetTimestampMs() - startTs) / 1000.0; - stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", - pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, + pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); return code;