From 13d979a2eeb3771348896fad933ba40d7430cc0e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 25 Oct 2023 00:03:46 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/dnode/vnode/src/tq/tq.c | 6 ++++++ source/libs/stream/src/streamStart.c | 5 +++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dbd1e02732..711c9a52bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -959,6 +959,12 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + if (!vnodeIsRoleLeader(pTq->pVnode)) { + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return code; + } + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index c386a49e2d..56910c0c53 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -254,6 +254,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ if (pInfo->stage != stage) { return TASK_UPSTREAM_NEW_STAGE; } else if (pTask->status.downstreamReady != 1) { + stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); return TASK_DOWNSTREAM_NOT_READY; } else { return TASK_DOWNSTREAM_READY; @@ -398,8 +399,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, " - "roll-back needed", + "s-task:%s downstream taskId:0x%x (vgId:%d) detects current task vnode-transfer/leader-change/restart, not " + "send check again, roll-back needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); return 0; }