fix(stream): fix dead lock.

This commit is contained in:
Haojun Liao 2024-11-15 17:16:47 +08:00
parent 9cc2aec9d6
commit 628808c9a4
2 changed files with 3 additions and 0 deletions

View File

@ -1170,6 +1170,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR; return TSDB_CODE_STREAM_INTERNAL_ERROR;
} }
@ -1412,6 +1413,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
if (size > 0) { if (size > 0) {
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady == NULL) { if (pReady == NULL) {
streamMutexUnlock(&pActiveInfo->lock);
return terrno; return terrno;
} }

View File

@ -433,6 +433,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
// send hb msg to mnode before closing all tasks. // send hb msg to mnode before closing all tasks.
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
streamMetaRUnLock(pMeta);
return code; return code;
} }