diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 48b6486e05..ed59f36af8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -297,7 +297,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId && p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 545d6310f7..505fc6d099 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1125,7 +1125,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i { // send checkpoint failure msg to mnode directly pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id pTask->checkpointingId = pTask->checkpointingId; - streamTaskSendCheckpointReadyMsg(pTask); + streamTaskSendCheckpointSourceRsp(pTask); } } else { stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,