diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 12453f8b0e..d777883015 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -402,6 +402,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -445,6 +446,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -459,6 +461,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); if (pReadyInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -843,7 +846,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); if (pNotSendList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:%s quit tmr function due to out of memory", id); + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); streamMutexUnlock(&pActiveInfo->lock); stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); @@ -956,13 +959,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) streamMutexLock(&pInfo->lock); if (!pInfo->dispatchTrigger) { - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return false; } for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); if (pSendInfo == NULL) { + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -982,11 +986,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId); } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return true; } - ASSERT(0); + streamMutexUnlock(&pInfo->lock); return false; } @@ -1043,6 +1047,7 @@ int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { + streamMutexUnlock(&pInfo->lock); return num; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 96a9f2d297..f6e827b745 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -919,8 +919,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, - &msg); + int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, + pInfo->checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) {