From e74b8473f1035b0af5597327909efc2c1318a968 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 23:40:05 +0800 Subject: [PATCH] fix(stream): clear the active checkpoint info after report to mnode. --- source/libs/stream/src/streamCheckpoint.c | 1 - source/libs/stream/src/streamMeta.c | 19 +++++++++++-------- source/libs/stream/src/streamTask.c | 1 + 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b5e27fde87..4666cec4b6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -708,7 +708,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info - streamTaskClearCheckInfo(pTask, true); taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bc07d1811a..a7f73d1b52 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1053,17 +1053,20 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = - ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; + SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + if (p->activeId != 0) { + entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = p->activeId; + entry.checkpointInfo.activeTransId = p->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, - (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo", + (*pTask)->id.idStr, p->transId); + + taosThreadMutexLock(&(*pTask)->lock); + streamTaskClearCheckInfo((*pTask), true); + taosThreadMutexUnlock(&(*pTask)->lock); } - (*pTask)->chkInfo.pActiveInfo->failedId = 0; } if ((*pTask)->exec.pWalReader != NULL) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 41dd4d7f26..ff020e88c9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1042,6 +1042,7 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; + pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList);