diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b5c265af0d..eb6c9c1a66 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1486,16 +1486,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t numOfFailed = 0; bool triggerDispatchRsp = false; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + int64_t tmpCheckpointId = -1; + int32_t tmpTranId = -1; + const char* pStatus = NULL; // we only set the dispatch msg info for current checkpoint trans - int64_t tmpCheckpointId = -1; - int32_t tmpTranId = -1; - streamMutexLock(&pTask->lock); - triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) && - (pInfo->activeId == pMsgInfo->checkpointId) && (pInfo->transId != pMsgInfo->transId); + SStreamTaskState s = streamTaskGetStatus(pTask); + triggerDispatchRsp = (s.state == TASK_STATUS__CK) && (pInfo->activeId == pMsgInfo->checkpointId) && + (pInfo->transId != pMsgInfo->transId); tmpCheckpointId = pInfo->activeId; tmpTranId = pInfo->transId; + pStatus = s.name; streamMutexUnlock(&pTask->lock); streamMutexLock(&pMsgInfo->lock); @@ -1561,8 +1563,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId); } else { stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 - " transId:%d discard, current active checkpointId:%" PRId64 " active transId:%d, since expired", - pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, tmpCheckpointId, tmpTranId); + " transId:%d discard, current status:%s, active checkpointId:%" PRId64 + " active transId:%d, since expired", + pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, pStatus, tmpCheckpointId, tmpTranId); } } }