refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-30 09:51:40 +08:00
parent cd6261c8d4
commit 973ca3be80
1 changed files with 4 additions and 3 deletions

View File

@ -384,6 +384,8 @@ int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskI
} }
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
const char* id = pTask->id.idStr;
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId, .upstreamTaskId = pTask->id.taskId,
@ -398,7 +400,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -413,8 +415,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
p->reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break; break;
} }