diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4f6778a286..e95c90fcaa 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -270,80 +270,116 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { SStreamMeta* pMeta = p->pMeta; int32_t vgId = pMeta->vgId; + const char* id = p->id.idStr; int32_t code = 0; if (p->info.fillHistory == 1) { return code; } - streamMetaWLock(pMeta); - ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId); + taosThreadMutexLock(&p->lock); + ASSERT(p->chkInfo.checkpointId < p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId); p->chkInfo.checkpointId = p->chkInfo.checkpointingId; + streamTaskClearCheckInfo(p); char* str = NULL; streamTaskGetStatus(p, &str); code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + taosThreadMutexUnlock(&p->lock); + if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); - streamMetaWUnLock(pMeta); + stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId); return -1; - } else { // save the task - streamMetaSaveTask(pMeta, p); } - stDebug( - "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " - "checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", - vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str); + stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", + vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, str); - code = streamMetaCommit(pMeta); - if (code < 0) { - stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId, - checkpointId, terrstr()); - } else { - stInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%" PRId64 " DONE", pMeta->vgId, checkpointId); + // save the task if not sink task + if (p->info.taskLevel != TASK_LEVEL__SINK) { + streamMetaWLock(pMeta); + + code = streamMetaSaveTask(pMeta, p); + if (code != TSDB_CODE_SUCCESS) { + streamMetaWUnLock(pMeta); + stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, + vgId, checkpointId, terrstr()); + return code; + } + + code = streamMetaCommit(pMeta); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", + id, vgId, checkpointId, terrstr()); + } + + streamMetaWUnLock(pMeta); } - - streamMetaWUnLock(pMeta); return code; } +void streamTaskSetFailedId(SStreamTask* pTask) { + pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; + pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId; +} + int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; - - // check for all tasks, and do generate the vnode-wide checkpoint data. - int64_t checkpointStartTs = pTask->chkInfo.startTs; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; // sink task do not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint", pTask->id.idStr, pTask->info.taskLevel); - streamBackendDoCheckpoint(pTask->pBackend, pTask->chkInfo.checkpointingId); - streamSaveTaskCheckpointInfo(pTask, pTask->chkInfo.checkpointingId); + code = streamBackendDoCheckpoint(pTask->pBackend, ckId); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, tstrerror(terrno)); + } } - double el = (taosGetTimestampMs() - checkpointStartTs) / 1000.0; - stInfo("s-task:%s vgId:%d checkpointId:%" PRId64 " save all tasks status, level:%d elapsed time:%.2f Sec ", - pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.checkpointingId, pTask->info.taskLevel, el); - // send check point response to upstream task - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - code = streamTaskSendCheckpointSourceRsp(pTask); - } else { - code = streamTaskSendCheckpointReadyMsg(pTask); + if (code == TSDB_CODE_SUCCESS) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } + + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, + ckId, tstrerror(code)); + } } - if (code != TSDB_CODE_SUCCESS) { - // record the failure checkpoint id - pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; - - // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", pTask->id.idStr, - pTask->chkInfo.checkpointingId, tstrerror(code)); + // clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully + if (code == TSDB_CODE_SUCCESS) { + code = streamSaveTaskCheckpointInfo(pTask, ckId); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, + tstrerror(terrno)); + } } + if (code != TSDB_CODE_SUCCESS) { // clear the checkpoint info if failed + taosThreadMutexLock(&pTask->lock); + streamTaskClearCheckInfo(pTask); + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + taosThreadMutexUnlock(&pTask->lock); + + streamTaskSetFailedId(pTask); + stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%"PRId64, pTask->id.idStr, ckId); + } + + double el = (taosGetTimestampMs() - startTs) / 1000.0; + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", + pTask->id.idStr, pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, + (code == TSDB_CODE_SUCCESS) ? "succ" : "failed"); + return code; }