refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-05-01 14:58:59 +08:00
parent 563fca5c54
commit 6c86847b12
1 changed files with 5 additions and 3 deletions

View File

@ -68,11 +68,12 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId;
qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId,
qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
pTask->recoverTryingDownstream = vgSz;
pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t));
@ -83,14 +84,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId,
qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId);
qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId);
streamTaskLaunchRecover(pTask, version);
}
return 0;
}