From 70191f6a5d85862a7e0620dd66349749963eb326 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 21 Jul 2024 02:06:09 +0800 Subject: [PATCH 1/2] fix(stream): fix race condition when starting check downstream status and dropping task are executed concurrently. --- source/libs/stream/src/streamCheckStatus.c | 7 +++++++ source/libs/stream/src/streamMeta.c | 3 +++ 2 files changed, 10 insertions(+) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 5e67f1766f..55209fc427 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -280,6 +280,13 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { streamMutexLock(&pInfo->checkInfoLock); + // drop procedure already started, not start check downstream now + ETaskStatus s = streamTaskGetStatus(pTask).state; + if (s == TASK_STATUS__DROPPING) { + streamMutexUnlock(&pInfo->checkInfoLock); + return; + } + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 42d2f86dac..6f3b7d8b32 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -742,7 +742,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaRLock(pMeta); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { + // to make sure check status will not start the check downstream status when we start to check timerActive count. + streamMutexLock(&pTask->taskCheckInfo.checkInfoLock); timerActive = (*ppTask)->status.timerActive; + streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock); } streamMetaRUnLock(pMeta); From 6cc009653437f8e6d296f76ac660b4f664e799cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 21 Jul 2024 03:10:21 +0800 Subject: [PATCH 2/2] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tqStreamTask.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b4829d1dd8..0e5b1b6fb7 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -365,6 +365,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); + taosArrayDestroy(pTaskList); return -1; } }