fix(stream): add some logs.

This commit is contained in:
Haojun Liao 2023-10-25 00:03:46 +08:00
parent 8cde39eebd
commit 13d979a2ee
2 changed files with 9 additions and 2 deletions

View File

@ -959,6 +959,12 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code;
}
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",

View File

@ -254,6 +254,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
if (pInfo->stage != stage) {
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));
return TASK_DOWNSTREAM_NOT_READY;
} else {
return TASK_DOWNSTREAM_READY;
@ -398,8 +399,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
} else {
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, "
"roll-back needed",
"s-task:%s downstream taskId:0x%x (vgId:%d) detects current task vnode-transfer/leader-change/restart, not "
"send check again, roll-back needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
return 0;
}