From 7917716b4f3b090366b5880b7b169afd9d132cfd Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 19:06:55 +0800 Subject: [PATCH] enh(sync): syncNodeOnAppendEntriesReplySnapshotCb --- source/libs/sync/src/syncAppendEntriesReply.c | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5cda56cb9a..8ba169d093 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -95,4 +95,62 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } -int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; } \ No newline at end of file +int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + + char logBuf[128] = {0}; + snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplySnapshotCb== term:%lu", ths->pRaftStore->currentTerm); + syncAppendEntriesReplyLog2(logBuf, pMsg); + + if (pMsg->term < ths->pRaftStore->currentTerm) { + sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, + ths->pRaftStore->currentTerm); + return ret; + } + + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== before pMatchIndex", ths->pMatchIndex); + + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + if (pMsg->term > ths->pRaftStore->currentTerm) { + char logBuf[128] = {0}; + snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplySnapshotCb error term, receive:%lu current:%lu", + pMsg->term, ths->pRaftStore->currentTerm); + syncNodeLog2(logBuf, ths); + sError("%s", logBuf); + return ret; + } + + assert(pMsg->term == ths->pRaftStore->currentTerm); + + if (pMsg->success) { + // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1); + + // matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + + // maybe commit + syncMaybeAdvanceCommitIndex(ths); + + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + + // notice! int64, uint64 + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } else { + nextIndex = SYNC_INDEX_BEGIN; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplySnapshotCb== after pMatchIndex", ths->pMatchIndex); + + return ret; +} \ No newline at end of file