From 74df121cf5e36a1b364030eb38dc5e2e60f3c678 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Jan 2024 22:29:46 +0800 Subject: [PATCH] fix(stream): fix the deadlock. --- source/libs/stream/src/streamExec.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e73c5ffddb..d68288222c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -734,8 +734,8 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) { static int32_t schedTaskInFuture(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", - pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref); + stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, + pTask->status.schedIdleTime, ref); // add one ref count for task SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId); @@ -761,6 +761,9 @@ int32_t streamResumeTask(SStreamTask* pTask) { if (pTask->status.schedIdleTime > 0) { stDebug("s-task:%s idled, and will be invoked in %dms", id, pTask->status.schedIdleTime); schedTaskInFuture(pTask); + + taosThreadMutexUnlock(&pTask->lock); + return 0; } else { int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {