From 32ce4b6a4ce9452f6fd4b14bfb1fc59295850932 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Dec 2023 23:12:59 +0800 Subject: [PATCH] fix(stream): fix race condition. --- source/libs/stream/src/streamStart.c | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index d94aa80369..d62990f181 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -48,10 +48,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - SStreamTaskId id = pTask->id; - int64_t initTs = pTask->execInfo.init; - int64_t startTs = pTask->execInfo.start; char* p = NULL; int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); ETaskStatus status = streamTaskGetStatus(pTask, &p); @@ -70,13 +66,6 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { int64_t el = (pTask->execInfo.start - pTask->execInfo.init); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - - taosThreadMutexUnlock(&pTask->lock); - - // todo: fix it, to avoid deadlock in: tqStreamTaskProcessUpdateReq - streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true); - - taosThreadMutexLock(&pTask->lock); return TSDB_CODE_SUCCESS; } @@ -392,6 +381,10 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { } streamTaskOnHandleEventSuccess(pTask->status.pSM, event); + + int64_t initTs = pTask->execInfo.init; + int64_t startTs = pTask->execInfo.start; + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); } static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {