From e261023ee60eb5d03a71196f56c25517047e0615 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 18:42:44 +0800 Subject: [PATCH] fix(stream): add lock, and fix race condition. --- source/dnode/vnode/src/tq/tqUtil.c | 6 ++++++ source/libs/stream/src/streamTask.c | 11 ++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6029575e2c..d8440e996f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -501,6 +501,10 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b } // extract the required source task for a given stream, identified by streamId + streamMetaRLock(pMeta); + + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (pId->streamId != streamId) { @@ -552,5 +556,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b walCloseReader(pReader); } + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 44f70f8b19..88c8c85dec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -733,15 +733,12 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask) { bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { bool ret = false; - // double check + taosThreadMutexLock(&pTask->lock); if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - taosThreadMutexLock(&pTask->lock); - if (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE) { - pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; - ret = true; - } - taosThreadMutexUnlock(&pTask->lock); + pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; + ret = true; } + taosThreadMutexUnlock(&pTask->lock); return ret; }