refactor: update logs.
This commit is contained in:
parent
c38eb033a9
commit
8825065364
|
@ -1477,20 +1477,25 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
bool allRsp = false;
|
||||
int32_t notRsp = 0;
|
||||
int32_t numOfFailed = 0;
|
||||
bool triggerDispatchRsp = false;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
bool allRsp = false;
|
||||
int32_t notRsp = 0;
|
||||
int32_t numOfFailed = 0;
|
||||
bool triggerDispatchRsp = false;
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
// we only set the dispatch msg info for current checkpoint trans
|
||||
int64_t tmpCheckpointId = -1;
|
||||
int32_t tmpTranId = -1;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
|
||||
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) &&
|
||||
(pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId);
|
||||
(pInfo->activeId == pMsgInfo->checkpointId) && (pInfo->transId != pMsgInfo->transId);
|
||||
tmpCheckpointId = pInfo->activeId;
|
||||
tmpTranId = pInfo->transId;
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamMutexLock(&pMsgInfo->lock);
|
||||
|
@ -1556,8 +1561,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
|
||||
} else {
|
||||
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
|
||||
" transId:%d discard, since expired",
|
||||
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
|
||||
" transId:%d discard, current active checkpointId:%" PRId64 " active transId:%d, since expired",
|
||||
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, tmpCheckpointId, tmpTranId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue