From b6707fbbd207ff64c5721f7c2681db0d49e1a6ad Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 26 Feb 2025 16:55:06 +0800 Subject: [PATCH] refactor(stream): move the generate checkpoint procedure out of lock, to avoid blocking heartbeat, and resulting in leader/follower switch. --- source/libs/stream/src/streamCheckpoint.c | 5 ++++- source/libs/stream/src/streamExec.c | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c53e1a19a3..41e3d40542 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -910,9 +910,12 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int64_t startTs = pTask->chkInfo.startTs; int64_t ckId = pTask->chkInfo.pActiveInfo->activeId; const char* id = pTask->id.idStr; - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); SStreamMeta* pMeta = pTask->pMeta; + streamMutexLock(&pTask->lock); + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + streamMutexUnlock(&pTask->lock); + // sink task does not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 267dc88807..999a6b5a4c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -698,6 +698,8 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) { streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue @@ -715,7 +717,7 @@ static int32_t doHandleChkptBlock(SStreamTask* pTask) { } } - streamMutexUnlock(&pTask->lock); +// streamMutexUnlock(&pTask->lock); return code; }