fix(stream): update logs.

This commit is contained in:
Haojun Liao 2023-09-21 19:14:34 +08:00
parent bcecc2418a
commit 65cc4040f3
2 changed files with 7 additions and 5 deletions

View File

@ -285,8 +285,10 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
// save the task
streamMetaSaveTask(pMeta, p);
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
stDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
stDebug(
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
streamGetTaskStatusStr(prev));
}

View File

@ -837,7 +837,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
info.msg.info.noResp = 1; // refactor later.
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.downstreamNodeId, index);
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));