diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0d1440fbde..0d214661c4 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -68,11 +68,12 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); pTask->recoverTryingDownstream = vgSz; pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); @@ -83,14 +84,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId); + qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); streamTaskLaunchRecover(pTask, version); } + return 0; }