fix(stream): fix send msg error.

This commit is contained in:
Haojun Liao 2023-11-15 16:31:07 +08:00
parent d7fb9c5b6e
commit f7ee65d059
2 changed files with 2 additions and 2 deletions

View File

@ -297,7 +297,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue; continue;
} }
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && ASSERT(p->chkInfo.checkpointId <= p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->checkpointingId; p->chkInfo.checkpointId = p->checkpointingId;

View File

@ -1125,7 +1125,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
{ // send checkpoint failure msg to mnode directly { // send checkpoint failure msg to mnode directly
pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id pTask->chkInfo.failedId = pTask->checkpointingId; // record the latest failed checkpoint id
pTask->checkpointingId = pTask->checkpointingId; pTask->checkpointingId = pTask->checkpointingId;
streamTaskSendCheckpointReadyMsg(pTask); streamTaskSendCheckpointSourceRsp(pTask);
} }
} else { } else {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,