fix(stream): fix the checkpoint msg failure.
This commit is contained in:
parent
0a2fb5fe9a
commit
75420bfefb
|
@ -1143,8 +1143,21 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
|
||||
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
// todo handle the agg task failure, add test case
|
||||
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER &&
|
||||
pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
stError("s-task:%s failed to dispatch checkpoint-trigger msg, checkpointId:%" PRId64
|
||||
", set the current checkpoint failed, and send rsp to mnode",
|
||||
id, pTask->checkpointingId);
|
||||
{ // send checkpoint failure msg to mnode directly
|
||||
pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id
|
||||
pTask->checkpointingId = pTask->checkpointingId;
|
||||
streamTaskSendCheckpointReadyMsg(pTask);
|
||||
}
|
||||
} else {
|
||||
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue