From 75420bfefbe7a3c03089b89eebdfd0b60b57a88e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Nov 2023 23:35:09 +0800 Subject: [PATCH] fix(stream): fix the checkpoint msg failure. --- source/libs/stream/src/streamDispatch.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 42280b0d0f..82affe71d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1143,8 +1143,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { - stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + // todo handle the agg task failure, add test case + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER && + pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64 + ", set the current checkpoint failed, and send rsp to mnode", + id, pTask->checkpointingId); + { // send checkpoint failure msg to mnode directly + pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id + pTask->checkpointingId = pTask->checkpointingId; + streamTaskSendCheckpointReadyMsg(pTask); + } + } else { + stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } } }