diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 63411df11a..b5c265af0d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1477,20 +1477,25 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) { } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; - SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int64_t now = taosGetTimestampMs(); - bool allRsp = false; - int32_t notRsp = 0; - int32_t numOfFailed = 0; - bool triggerDispatchRsp = false; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + int64_t now = taosGetTimestampMs(); + bool allRsp = false; + int32_t notRsp = 0; + int32_t numOfFailed = 0; + bool triggerDispatchRsp = false; + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; // we only set the dispatch msg info for current checkpoint trans + int64_t tmpCheckpointId = -1; + int32_t tmpTranId = -1; + streamMutexLock(&pTask->lock); triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) && - (pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) && - (pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId); + (pInfo->activeId == pMsgInfo->checkpointId) && (pInfo->transId != pMsgInfo->transId); + tmpCheckpointId = pInfo->activeId; + tmpTranId = pInfo->transId; streamMutexUnlock(&pTask->lock); streamMutexLock(&pMsgInfo->lock); @@ -1556,8 +1561,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId); } else { stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 - " transId:%d discard, since expired", - pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); + " transId:%d discard, current active checkpointId:%" PRId64 " active transId:%d, since expired", + pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, tmpCheckpointId, tmpTranId); } } }