From 0f9328330b044c893d89bd16f8015ab43f525641 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 Oct 2023 11:56:36 +0800 Subject: [PATCH] fix(stream): disable continue check for downstream tasks. --- source/libs/stream/src/streamStart.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 965f116572..b060121680 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -390,20 +390,21 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs doProcessDownstreamReadyRsp(pTask); } } else { // not ready, wait for 100ms and retry - if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, " - "roll-back needed", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " + "not check wait for downstream task nodeUpdate, and all tasks restart", + id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); } else { - if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { + if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { stError( - "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, continue check " - "till downstream nodeUpdate", - id, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + "s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, " + "roll-back needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);