other: add some logs.
This commit is contained in:
parent
39079ab64a
commit
6d7b6fbb74
|
@ -128,9 +128,9 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
pTask->checkReqId = req.reqId;
|
pTask->checkReqId = req.reqId;
|
||||||
|
|
||||||
qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
||||||
"-%" PRId64 ", req:0x%" PRIx64,
|
"-%" PRId64 ", stage:%"PRId64" req:0x%" PRIx64,
|
||||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
||||||
pWindow->skey, pWindow->ekey, req.reqId);
|
pWindow->skey, pWindow->ekey, req.stage, req.reqId);
|
||||||
|
|
||||||
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -149,8 +149,8 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId,
|
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64,
|
||||||
req.downstreamTaskId, req.downstreamNodeId, i);
|
pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage);
|
||||||
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -178,8 +178,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
||||||
.stage = pTask->pMeta->stage,
|
.stage = pTask->pMeta->stage,
|
||||||
};
|
};
|
||||||
|
|
||||||
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId,
|
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||||
req.downstreamTaskId, req.downstreamNodeId);
|
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
|
@ -190,6 +190,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
if (pVgInfo->taskId == req.downstreamTaskId) {
|
if (pVgInfo->taskId == req.downstreamTaskId) {
|
||||||
|
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
||||||
|
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, req.stage);
|
||||||
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -494,7 +494,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask)
|
||||||
streamMetaCommit(pTask->pMeta);
|
streamMetaCommit(pTask->pMeta);
|
||||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||||
|
|
||||||
ASSERT(0);
|
|
||||||
// qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
|
// qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id,
|
||||||
// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
|
// pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue