fix(stream): re-send checkmsg to downstream for fixed dispatch type.

This commit is contained in:
Haojun Liao 2024-04-19 10:00:47 +08:00
parent 815d12fd77
commit 3769da535e
2 changed files with 27 additions and 14 deletions

View File

@ -26,7 +26,7 @@
extern "C" {
#endif
#define CHECK_RSP_INTERVAL 200
#define CHECK_RSP_INTERVAL 300
#define LAUNCH_HTASK_INTERVAL 100
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40

View File

@ -1058,22 +1058,35 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
.stage = pTask->pMeta->stage,
};
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
if (p->taskId == pVgInfo->taskId) {
req.reqId = p->reqId;
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
}
} else {
ASSERT(0);
}
}
@ -1192,7 +1205,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
stDebug("s-task:%s %d downstream tasks not ready, send check msg again", pTask->id.idStr, numOfNotReady);
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady);
}
if (numOfTimeout > 0) {