diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42280b0d0f..82affe71d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1143,8 +1143,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + // todo handle the agg task failure, add test case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && + pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 + ", set the current checkpoint failed, and send rsp to mnode", + id, pTask->checkpointingId); + { // send checkpoint failure msg to mnode directly + pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id + pTask->checkpointingId = pTask->checkpointingId; + streamTaskSendCheckpointReadyMsg(pTask); + } + } else { + stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } } }