diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cfe960e99c..c2e03d0ca9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -443,9 +443,9 @@ static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpoi } tEncoderClear(&encoder); - initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); - qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); + initRpcMsg(&msg, TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); + qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", + pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0; @@ -518,16 +518,16 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pRpcMsgList); ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); - qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, - num); + qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, + pTask->info.taskLevel, num); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i); tmsgSendRsp(pMsg); } taosArrayClear(pTask->pRpcMsgList); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr, pTask->info.taskLevel); return TSDB_CODE_SUCCESS; } @@ -540,7 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { tmsgSendRsp(pMsg); taosArrayClear(pTask->pRpcMsgList); - qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr); + qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); return TSDB_CODE_SUCCESS; } @@ -631,7 +631,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist tEncoderClear(&encoder); msg.info.noResp = 1; - initRpcMsg(&msg,TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); + initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); tmsgSendReq(pEpSet, &msg); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); @@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; taosArrayPush(pTask->pRpcMsgList, &rspMsg); - qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t) taosArrayGetSize(pTask->pRpcMsgList)); + qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pRpcMsgList)); return TSDB_CODE_SUCCESS; }