From 3769da535edd9ee08e1894adaef46de12594ea69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 Apr 2024 10:00:47 +0800 Subject: [PATCH] fix(stream): re-send checkmsg to downstream for fixed dispatch type. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamTask.c | 39 +++++++++++++++++++---------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index e2370abb2c..b3ed86cff8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,7 @@ extern "C" { #endif -#define CHECK_RSP_INTERVAL 200 +#define CHECK_RSP_INTERVAL 300 #define LAUNCH_HTASK_INTERVAL 100 #define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40 diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 860a5f9170..70c2619f6f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1058,22 +1058,35 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { .stage = pTask->pMeta->stage, }; - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = p->reqId; + req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; + req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64, + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); + } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgs = taosArrayGetSize(vgInfo); - if (p->taskId == pVgInfo->taskId) { - req.reqId = p->reqId; - req.downstreamNodeId = pVgInfo->vgId; - req.downstreamTaskId = pVgInfo->taskId; + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); - streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - break; + if (p->taskId == pVgInfo->taskId) { + req.reqId = p->reqId; + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); + streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + break; + } } + } else { + ASSERT(0); } } @@ -1192,7 +1205,7 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream tasks not ready, send check msg again", pTask->id.idStr, numOfNotReady); + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady); } if (numOfTimeout > 0) {