diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 49ded65af2..fb2eb29c0b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -128,9 +128,9 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { pTask->checkReqId = req.reqId; qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 - "-%" PRId64 ", req:0x%" PRIx64, + "-%" PRId64 ", stage:%"PRId64" req:0x%" PRIx64, pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, - pWindow->skey, pWindow->ekey, req.reqId); + pWindow->skey, pWindow->ekey, req.stage, req.reqId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -149,8 +149,8 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId, - req.downstreamTaskId, req.downstreamNodeId, i); + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64, + pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { @@ -178,8 +178,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p .stage = pTask->pMeta->stage, }; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, - req.downstreamTaskId, req.downstreamNodeId); + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, + pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); @@ -190,6 +190,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, + pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage); streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0046ee3307..2f957e7dea 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -494,7 +494,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) streamMetaCommit(pTask->pMeta); taosWUnLockLatch(&pTask->pMeta->lock); - ASSERT(0); // qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, // pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));