From 1a45d406070b534869e37ca699aa1792ef0f5284 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 24 Jan 2024 14:37:46 +0800 Subject: [PATCH] fix(stream): fix deadlock. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamExec.c | 3 +-- source/libs/stream/src/streamMeta.c | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 63da78a174..46f4b0959f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -879,6 +879,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); +SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 53232ccb84..5cff4b318f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -763,8 +763,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task - // todo this may be failed, and add ref may be failed. - SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); + /*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask); if (pTask->schedInfo.pIdleTimer == NULL) { pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 112777da9e..87c558a99e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -647,6 +647,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t return p; } +SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) { + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); + return pTask; +} + void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) {