fix(stream): update checkpoint info only it is in ck status.

This commit is contained in:
Haojun Liao 2024-08-09 02:02:41 +08:00
parent dfa74f82d7
commit 506a72d50f
1 changed files with 6 additions and 7 deletions

View File

@ -567,14 +567,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer);
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, true);
// update only it is in checkpoint status.
if (pStatus.state == TASK_STATUS__CK) {
// todo handle error
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, true);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);