diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b67b81b611..d6e1286093 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1215,6 +1215,7 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { + stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId); inTimer = true; } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 5ce7668048..94f635a480 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -48,9 +48,13 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - char* p = NULL; - int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); - ETaskStatus status = streamTaskGetStatus(pTask, &p); + SStreamMeta* pMeta = pTask->pMeta; + SStreamTaskId id = pTask->id; + int64_t initTs = pTask->execInfo.init; + int64_t startTs = pTask->execInfo.start; + char* p = NULL; + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + ETaskStatus status = streamTaskGetStatus(pTask, &p); if ((status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__STREAM_SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { @@ -67,8 +71,12 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, - pTask->execInfo.start, true); + taosThreadMutexUnlock(&pTask->lock); + + // to avoid deadlock + streamMetaUpdateTaskDownstreamStatus(pMeta, id.streamId, id.taskId, initTs, startTs, true); + + taosThreadMutexLock(&pTask->lock); return TSDB_CODE_SUCCESS; } @@ -461,8 +469,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 ", current stage:%" PRId64 - ", " - "not check wait for downstream task nodeUpdate, and all tasks restart", + ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else {