diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index d94aa80369..d62990f181 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -48,10 +48,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; - SStreamTaskId id = pTask->id; - int64_t initTs = pTask->execInfo.init; - int64_t startTs = pTask->execInfo.start; char* p = NULL; int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); ETaskStatus status = streamTaskGetStatus(pTask, &p); @@ -70,13 +66,6 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { int64_t el = (pTask->execInfo.start - pTask->execInfo.init); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - - taosThreadMutexUnlock(&pTask->lock); - - // todo: fix it, to avoid deadlock in: tqStreamTaskProcessUpdateReq - streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true); - - taosThreadMutexLock(&pTask->lock); return TSDB_CODE_SUCCESS; } @@ -392,6 +381,10 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { } streamTaskOnHandleEventSuccess(pTask->status.pSM, event); + + int64_t initTs = pTask->execInfo.init; + int64_t startTs = pTask->execInfo.start; + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); } static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {