From 70191f6a5d85862a7e0620dd66349749963eb326 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 21 Jul 2024 02:06:09 +0800 Subject: [PATCH] fix(stream): fix race condition when starting check downstream status and dropping task are executed concurrently. --- source/libs/stream/src/streamCheckStatus.c | 7 +++++++ source/libs/stream/src/streamMeta.c | 3 +++ 2 files changed, 10 insertions(+) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 5e67f1766f..55209fc427 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamMutexLock(&pInfo->checkInfoLock); + // drop procedure already started, not start check downstream now + ETaskStatus s = streamTaskGetStatus(pTask).state; + if (s == TASK_STATUS__DROPPING) { + streamMutexUnlock(&pInfo->checkInfoLock); + return; + } + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 42d2f86dac..6f3b7d8b32 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // to make sure check status will not start the check downstream status when we start to check timerActive count. + streamMutexLock(&pTask->taskCheckInfo.checkInfoLock); timerActive = (*ppTask)->status.timerActive; + streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock); } streamMetaRUnLock(pMeta);