From 1f756f58f683f49578bd639d61233854e44fd500 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 5 Nov 2023 01:24:23 +0800 Subject: [PATCH] fix(stream): add need update node into update node list. --- source/libs/stream/src/streamStart.c | 60 +++++++++++++++------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 5ebc60dc13..4f757f09fb 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -318,6 +318,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { streamTaskOnHandleEventSuccess(pTask->status.pSM, event); } +static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { + int32_t vgId = pTask->pMeta->vgId; + + taosThreadMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + bool existed = false; + for (int i = 0; i < num; ++i) { + SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); + if (p->nodeId == nodeId) { + existed = true; + break; + } + } + + if (!existed) { + SDownstreamTaskEpset t = {.nodeId = nodeId}; + taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); + + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, + t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); + } + + taosThreadMutexUnlock(&pTask->lock); +} + int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; @@ -372,35 +397,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " "not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); - } else { - if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { - stError( - "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " - "downstream again, nodeUpdate needed", - id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - taosThreadMutexLock(&pTask->lock); - int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); - bool existed = false; - for (int i = 0; i < num; ++i) { - SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i); - if (p->nodeId == pRsp->downstreamNodeId) { - existed = true; - break; - } - } - - if (!existed) { - SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId}; - taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t); - stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId, - t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList)); - } - - taosThreadMutexUnlock(&pTask->lock); - return 0; - } + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } else if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + stError( + "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " + "downstream again, nodeUpdate needed", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);