From 5585a141d53ffff9940027d248932c9eccc88bda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 23:16:03 +0800 Subject: [PATCH] fix(stream): set the failed id before clear the checkpoint info. --- source/libs/stream/src/streamCheckpoint.c | 4 ++-- source/libs/stream/src/streamMeta.c | 1 + source/libs/stream/src/streamTask.c | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 52265963bb..b5e27fde87 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -707,11 +707,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, false); + streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info + streamTaskClearCheckInfo(pTask, true); taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08e7c97150..bc07d1811a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1063,6 +1063,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); } + (*pTask)->chkInfo.pActiveInfo->failedId = 0; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8abac4ce85..41dd4d7f26 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1039,7 +1039,6 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; // clear the checkpoint id - pInfo->failedId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false;