diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h index d0e55ef10e..180a2db61f 100644 --- a/source/libs/sync/inc/raft_replication.h +++ b/source/libs/sync/inc/raft_replication.h @@ -20,11 +20,11 @@ #include "syncInt.h" #include "sync_type.h" -// syncRaftReplicate sends an append RPC with new entries to the given peer, +// syncRaftMaybeSendAppend sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). -bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); +bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); #endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index a8615f17eb..1a6c13f65f 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -28,6 +28,8 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft); void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType); +void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType); + void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 3b4c9e5f36..72b0d268a8 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -169,7 +169,7 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch } static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) { - syncRaftReplicate(arg, progress, false); + syncRaftMaybeSendAppend(arg, progress, false); } // switchToConfig reconfigures this node to use the provided configuration. It diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 9997c5226d..0219e39df9 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -48,12 +48,14 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { } static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { - return false; - } - if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { - return false; - } + bool canVote = + // We can vote if this is a repeat of a vote we've already cast... + pRaft->voteFor == pMsg->from || + // ...we haven't voted and we don't think there's a leader yet in this term... + (pRaft->voteFor == SYNC_NON_NODE_ID && pRaft->leaderId == SYNC_NON_NODE_ID) || + // ...or this is a PreVote for a future term... + (pMsg->vote.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION && pMsg->term > pRaft->term); - return true; + // ...and we believe the candidate is up to date. + return canVote && syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm); } \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 744d654cc5..87a5cfcd15 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -45,12 +45,14 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (result == SYNC_RAFT_VOTE_WON) { if (pRaft->candidateState.inPreVote) { - syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + syncRaftCampaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { syncRaftBecomeLeader(pRaft); - + syncRaftBroadcastAppend(pRaft); } } else if (result == SYNC_RAFT_VOTE_LOST) { + // pb.MsgPreVoteResp contains future term of pre-candidate + // m.Term > r.Term; reuse r.Term syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 228d8195f6..c8c2d2c379 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -24,12 +24,12 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, SSyncRaftEntry *entries, int nEntry); -// syncRaftReplicate sends an append RPC with new entries to the given peer, +// maybeSendAppend sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). -bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { +bool syncRaftMaybeSendAppend(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { assert(pRaft->state == TAOS_SYNC_STATE_LEADER); SyncNodeId nodeId = progress->id; diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index d961978be2..fe2e0fd9d3 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -19,8 +19,6 @@ #include "raft_message.h" #include "sync_raft_progress_tracker.h" -static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType); - void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { if (pRaft->state == TAOS_SYNC_STATE_LEADER) { syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); @@ -28,7 +26,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { } if (!syncRaftIsPromotable(pRaft)) { - syncWarn("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); + syncWarn("[%d:%d] is unpromotable and can not syncRaftCampaign", pRaft->selfGroupId, pRaft->selfId); return; } @@ -41,17 +39,17 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - campaign(pRaft, cType); + syncRaftCampaign(pRaft, cType); } -// campaign transitions the raft instance to candidate state. This must only be +// syncRaftCampaign transitions the raft instance to candidate state. This must only be // called after verifying that this is a legitimate transition. -static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { +void syncRaftCampaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { bool preVote; SyncTerm term; if (syncRaftIsPromotable(pRaft)) { - syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); + syncDebug("[%d:%d] is unpromotable; syncRaftCampaign() should have been called", pRaft->selfGroupId, pRaft->selfId); return; } diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 2093bcb046..4d8222e826 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -25,6 +25,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); + static int triggerAll(SSyncRaft* pRaft); static void tickElection(SSyncRaft* pRaft); @@ -82,13 +84,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { resetRaft(pRaft, pRaft->term); pRaft->leaderId = pRaft->leaderId; pRaft->state = TAOS_SYNC_STATE_LEADER; - // TODO: check if there is pending config log - int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); - if (nPendingConf > 1) { - syncFatal("unexpected multiple uncommitted config entry"); - } - syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId); + assert(progress != NULL); + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + syncRaftProgressBecomeReplicate(progress); + + // Conservatively set the pendingConfIndex to the last index in the + // log. There may or may not be a pending config change, but it's + // safe to delay any future proposals until we commit all our + // pending log entries, and scanning the entire tail of the log + // could be expensive. + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + pRaft->pendingConfigIndex = lastIndex; // after become leader, send a no-op log SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); @@ -103,6 +114,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { }; appendEntries(pRaft, entry, 1); //syncRaftTriggerHeartbeat(pRaft); + syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); } void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { @@ -192,9 +204,11 @@ static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) { return; } - syncRaftReplicate(arg, progress, true); + syncRaftMaybeSendAppend(arg, progress, true); } +// bcastAppend sends RPC, with entries to all peers that are not up-to-date +// according to the progress recorded in r.prs. void syncRaftBroadcastAppend(SSyncRaft* pRaft) { syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } @@ -267,6 +281,11 @@ static void tickHeartbeat(SSyncRaft* pRaft) { } +// TODO +static bool increaseUncommittedSize(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { + return false; +} + static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm term = pRaft->term; @@ -277,9 +296,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { entries[i].index = lastIndex + 1 + i; } + // Track the size of this uncommitted proposal. + if (!increaseUncommittedSize(pRaft, entries, n)) { + // Drop the proposal. + return; + } + syncRaftLogAppend(pRaft->log, entries, n); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId); + assert(progress != NULL); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. syncRaftMaybeCommit(pRaft); @@ -306,7 +332,7 @@ static int triggerAll(SSyncRaft* pRaft) { continue; } - syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true); + syncRaftMaybeSendAppend(pRaft, pRaft->tracker->progressMap.progress[i], true); } #endif return 0;