diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index a7bc4df281..f31f3dd1ae 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -717,24 +717,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // maybe update commit index, leader notice me if (pMsg->commitIndex > ths->commitIndex) { + SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + + SyncIndex beginIndex = 0; + SyncIndex endIndex = -1; + // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { - // advance commit index to sanpshot first - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { - SyncIndex commitBegin = ths->commitIndex; - SyncIndex commitEnd = snapshot.lastApplyIndex; - ths->commitIndex = snapshot.lastApplyIndex; - - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, - commitBegin, commitEnd); - syncNodeEventLog(ths, eventLog); - } - - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; + if (pMsg->commitIndex <= lastIndex) { + beginIndex = ths->commitIndex + 1; + endIndex = pMsg->commitIndex; // update commit index ths->commitIndex = pMsg->commitIndex; @@ -743,10 +734,22 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + } else if (pMsg->commitIndex > lastIndex && ths->commitIndex < lastIndex) { + beginIndex = ths->commitIndex + 1; + endIndex = lastIndex; + + // update commit index, speed up + ths->commitIndex = lastIndex; + + // call back Wal + code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); } + + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); } + return 0; } } while (0); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 15617523fa..48ebf824e8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2803,6 +2803,23 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { } int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { + if (beginIndex > endIndex) { + return 0; + } + + // advance commit index to sanpshot first + SSnapshot snapshot = {0}; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, + snapshot.lastApplyIndex); + syncNodeEventLog(ths, eventLog); + + // update begin index + beginIndex = snapshot.lastApplyIndex + 1; + } + int32_t code = 0; ESyncState state = flag; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4f2dbcae67..bc703e519c 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -140,9 +140,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 ", match-index:%d, raftid:%" PRId64, pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); - - // syncNodeRestartNowHeartbeatTimer(pSyncNode); - syncNodeStartNowHeartbeatTimer(pSyncNode); return -1; }