refactor: update logs.
This commit is contained in:
parent
8825065364
commit
1e388cace7
|
@ -1486,16 +1486,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
int32_t numOfFailed = 0;
|
||||
bool triggerDispatchRsp = false;
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
int64_t tmpCheckpointId = -1;
|
||||
int32_t tmpTranId = -1;
|
||||
const char* pStatus = NULL;
|
||||
|
||||
// we only set the dispatch msg info for current checkpoint trans
|
||||
int64_t tmpCheckpointId = -1;
|
||||
int32_t tmpTranId = -1;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
|
||||
(pInfo->activeId == pMsgInfo->checkpointId) && (pInfo->transId != pMsgInfo->transId);
|
||||
SStreamTaskState s = streamTaskGetStatus(pTask);
|
||||
triggerDispatchRsp = (s.state == TASK_STATUS__CK) && (pInfo->activeId == pMsgInfo->checkpointId) &&
|
||||
(pInfo->transId != pMsgInfo->transId);
|
||||
tmpCheckpointId = pInfo->activeId;
|
||||
tmpTranId = pInfo->transId;
|
||||
pStatus = s.name;
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamMutexLock(&pMsgInfo->lock);
|
||||
|
@ -1561,8 +1563,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
|
||||
} else {
|
||||
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
|
||||
" transId:%d discard, current active checkpointId:%" PRId64 " active transId:%d, since expired",
|
||||
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, tmpCheckpointId, tmpTranId);
|
||||
" transId:%d discard, current status:%s, active checkpointId:%" PRId64
|
||||
" active transId:%d, since expired",
|
||||
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, pStatus, tmpCheckpointId, tmpTranId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue