fix(sync): fake match when prevLogIndex less than commitIndex

This commit is contained in:
Minghao Li 2022-06-16 20:08:27 +08:00
parent d3f6f07902
commit b42c963a5f
1 changed files with 23 additions and 3 deletions

View File

@ -880,13 +880,14 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
} while (0); } while (0);
// fake match // fake match2
// //
// condition1: // condition1:
// preIndex <= my commit index // preIndex <= my commit index
// //
// operation: // operation:
// match my commit index // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1
// no operation on log // no operation on log
do { do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
@ -895,6 +896,25 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex, sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
ths->commitIndex); ths->commitIndex);
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// append entry
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(pAppendEntry != NULL);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
matchIndex = pMsg->prevLogIndex + 1;
syncEntryDestory(pAppendEntry);
}
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
@ -902,7 +922,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = ths->commitIndex; pReply->matchIndex = matchIndex;
// send response // send response
SRpcMsg rpcMsg; SRpcMsg rpcMsg;