From d364397e0d981c575135fd47ebd73d3f23740aed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 19:15:17 +0800 Subject: [PATCH] fix(stream): fix dead lock caused by refactor. --- source/libs/stream/src/streamCheckpoint.c | 12 +++++++++--- source/libs/stream/src/streamDispatch.c | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4dc7b14cc3..0e89cb003e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -390,6 +390,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -433,6 +434,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -447,6 +449,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -830,6 +833,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + streamMutexUnlock(&pActiveInfo->lock); return; } @@ -938,13 +942,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) streamMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return false; } for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -964,11 +969,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return true; } - ASSERT(0); + streamMutexUnlock(&pInfo->lock); return false; } @@ -1028,6 +1033,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return num; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 4e128ace54..255afb44f9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -907,8 +907,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, - &msg); + int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, + pInfo->checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) {