diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e0fa199199..5807240f5e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1170,6 +1170,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); + streamMutexUnlock(&pActiveInfo->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; } @@ -1412,6 +1413,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa if (size > 0) { STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); if (pReady == NULL) { + streamMutexUnlock(&pActiveInfo->lock); return terrno; } diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index ed12687e41..9c16ff036e 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -433,6 +433,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { // send hb msg to mnode before closing all tasks. int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); if (code != TSDB_CODE_SUCCESS) { + streamMetaRUnLock(pMeta); return code; }