From de3164f16f6220d913fadd331d01413d2bb86413 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 9 Nov 2021 14:58:10 +0800 Subject: [PATCH] [TD-10645][raft]add raft progress tracker --- source/libs/sync/inc/raft.h | 19 +- source/libs/sync/inc/sync_raft_impl.h | 42 +++ source/libs/sync/inc/sync_raft_progress.h | 2 + .../sync/inc/sync_raft_progress_tracker.h | 17 +- source/libs/sync/inc/sync_raft_quorum.h | 40 +++ source/libs/sync/inc/sync_raft_quorum_joint.h | 12 +- .../libs/sync/inc/sync_raft_quorum_majority.h | 30 ++ source/libs/sync/inc/sync_type.h | 2 +- source/libs/sync/src/raft.c | 271 +--------------- source/libs/sync/src/raft_election.c | 9 +- .../sync/src/raft_handle_election_message.c | 6 +- .../sync/src/raft_handle_vote_resp_message.c | 19 +- source/libs/sync/src/sync_raft_impl.c | 306 ++++++++++++++++++ source/libs/sync/src/sync_raft_progress.c | 2 + .../sync/src/sync_raft_progress_tracker.c | 41 ++- source/libs/sync/src/sync_raft_quorum_joint.c | 41 +++ .../libs/sync/src/sync_raft_quorum_majority.c | 54 ++++ 17 files changed, 608 insertions(+), 305 deletions(-) create mode 100644 source/libs/sync/inc/sync_raft_impl.h create mode 100644 source/libs/sync/inc/sync_raft_quorum.h create mode 100644 source/libs/sync/inc/sync_raft_quorum_majority.h create mode 100644 source/libs/sync/src/sync_raft_impl.c create mode 100644 source/libs/sync/src/sync_raft_quorum_joint.c create mode 100644 source/libs/sync/src/sync_raft_quorum_majority.c diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index c8bf63f81c..14f587d58e 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -19,7 +19,8 @@ #include "sync.h" #include "sync_type.h" #include "raft_message.h" - +#include "sync_raft_impl.h" +#include "sync_raft_quorum.h" typedef struct RaftLeaderState { @@ -140,20 +141,4 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftTick(SSyncRaft* pRaft); -void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId); -void syncRaftBecomePreCandidate(SSyncRaft* pRaft); -void syncRaftBecomeCandidate(SSyncRaft* pRaft); -void syncRaftBecomeLeader(SSyncRaft* pRaft); - -void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); - -void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); - -void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); -bool syncRaftIsPromotable(SSyncRaft* pRaft); -bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); -int syncRaftQuorum(SSyncRaft* pRaft); -int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, - bool preVote, bool accept, int* rejectNum); - #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h new file mode 100644 index 0000000000..26af06866b --- /dev/null +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_IMPL_H +#define _TD_LIBS_SYNC_RAFT_IMPL_H + +#include "sync.h" +#include "sync_type.h" +#include "raft_message.h" +#include "sync_raft_quorum.h" + +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId); +void syncRaftBecomePreCandidate(SSyncRaft* pRaft); +void syncRaftBecomeCandidate(SSyncRaft* pRaft); +void syncRaftBecomeLeader(SSyncRaft* pRaft); + +void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); + +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); + +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); +bool syncRaftIsPromotable(SSyncRaft* pRaft); +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); +int syncRaftQuorum(SSyncRaft* pRaft); + +SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, + bool preVote, bool accept, + int* rejectNum, int *granted); + +#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 1f693219be..fff0c13e31 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -63,6 +63,8 @@ typedef enum RaftProgressState { * progresses of all followers, and sends entries to the follower based on its progress. **/ struct SSyncRaftProgress { + SyncNodeId id; + SyncIndex nextIndex; SyncIndex matchIndex; diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 40d43895c8..887aeb2377 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -23,7 +23,8 @@ struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; - /** AutoLeave is true if the configuration is joint and a transition to the + /** + * autoLeave is true if the configuration is joint and a transition to the * incoming configuration should be carried out automatically by Raft when * this is possible. If false, the configuration will be joint until the * application initiates the transition manually. @@ -86,7 +87,7 @@ struct SSyncRaftProgressTracker { SSyncRaftProgress progressMap[TSDB_MAX_REPLICA]; - SyncRaftVoteRespType votes[TSDB_MAX_REPLICA]; + SyncRaftVoteResult votes[TSDB_MAX_REPLICA]; int maxInflight; }; @@ -97,4 +98,16 @@ void syncRaftResetVotes(SSyncRaftProgressTracker*); typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); +/** + * syncRaftRecordVote records that the node with the given id voted for this Raft + * instance if v == true (and declined it otherwise). + **/ +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); + +/** + * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the + * election outcome is known. + **/ +SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); + #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum.h b/source/libs/sync/inc/sync_raft_quorum.h new file mode 100644 index 0000000000..42f65c9806 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_QUORUM_H +#define TD_SYNC_RAFT_QUORUM_H + +/** + * SSyncRaftVoteResult indicates the outcome of a vote. + **/ +typedef enum { + /** + * SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future + * votes, i.e. neither "yes" or "no" has reached quorum yet. + **/ + SYNC_RAFT_VOTE_PENDING = 1, + + /** + * SYNC_RAFT_VOTE_LOST indicates that the quorum has voted "no". + **/ + SYNC_RAFT_VOTE_LOST = 2, + + /** + * SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes". + **/ + SYNC_RAFT_VOTE_WON = 3, +} SSyncRaftVoteResult; + +#endif /* TD_SYNC_RAFT_QUORUM_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 4f7424db7e..14c1f63754 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -18,13 +18,21 @@ #include "taosdef.h" #include "sync.h" +#include "sync_type.h" /** - * JointConfig is a configuration of two groups of (possibly overlapping) + * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) * majority configurations. Decisions require the support of both majorities. **/ typedef struct SSyncRaftQuorumJointConfig { - SyncNodeId majorityConfig[2][TSDB_MAX_REPLICA]; + SSyncCluster majorityConfig[2]; }SSyncRaftQuorumJointConfig; +/** + * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending, lost, or won. A joint quorum + * requires both majority quorums to vote in favor. + **/ +SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes); + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h new file mode 100644 index 0000000000..b1857ef056 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H +#define _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H + +#include "sync.h" +#include "sync_type.h" + +/** + * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending (i.e. neither a quorum of + * yes/no has been reached), won (a quorum of yes has been reached), or lost (a + * quorum of no has been reached). + **/ +SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes); + +#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 9faebe94b2..525623b4cf 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -61,6 +61,6 @@ typedef enum { //reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, -} SyncRaftVoteRespType; +} SyncRaftVoteResult; #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index b43a35c03e..20d24e3267 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -26,23 +26,6 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); -static int convertClear(SSyncRaft* pRaft); -static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); -static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); -static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); - -static int triggerAll(SSyncRaft* pRaft); - -static void tickElection(SSyncRaft* pRaft); -static void tickHeartbeat(SSyncRaft* pRaft); - -static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); -static bool maybeCommit(SSyncRaft* pRaft); - -static void abortLeaderTransfer(SSyncRaft* pRaft); - -static void resetRaft(SSyncRaft* pRaft, SyncTerm term); - int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; SSyncServerState serverState; @@ -136,124 +119,6 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } -void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) { - convertClear(pRaft); - - pRaft->stepFp = stepFollower; - resetRaft(pRaft, term); - pRaft->tickFp = tickElection; - pRaft->leaderId = leaderId; - pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; - syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); -} - -void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { - convertClear(pRaft); - - /** - * Becoming a pre-candidate changes our step functions and state, - * but doesn't change anything else. In particular it does not increase - * r.Term or change r.Vote. - **/ - pRaft->stepFp = stepCandidate; - pRaft->tickFp = tickElection; - pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; - pRaft->candidateState.inPreVote = true; - syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); -} - -void syncRaftBecomeCandidate(SSyncRaft* pRaft) { - convertClear(pRaft); - - pRaft->candidateState.inPreVote = false; - pRaft->stepFp = stepCandidate; - // become candidate make term+1 - resetRaft(pRaft, pRaft->term + 1); - pRaft->tickFp = tickElection; - pRaft->voteFor = pRaft->selfId; - pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; - syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); -} - -void syncRaftBecomeLeader(SSyncRaft* pRaft) { - assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER); - - pRaft->stepFp = stepLeader; - resetRaft(pRaft, pRaft->term); - pRaft->leaderId = pRaft->leaderId; - pRaft->state = TAOS_SYNC_ROLE_LEADER; - // TODO: check if there is pending config log - int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); - if (nPendingConf > 1) { - syncFatal("unexpected multiple uncommitted config entry"); - } - - syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - - // after become leader, send a no-op log - SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); - if (entry == NULL) { - return; - } - *entry = (SSyncRaftEntry) { - .buffer = (SSyncBuffer) { - .data = NULL, - .len = 0, - } - }; - appendEntries(pRaft, entry, 1); - //syncRaftTriggerHeartbeat(pRaft); -} - -void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { - triggerAll(pRaft); -} - -void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { - // electionTimeoutTick in [3,6] tick - pRaft->randomizedElectionTimeout = taosRand() % 4 + 3; -} - -bool syncRaftIsPromotable(SSyncRaft* pRaft) { - return pRaft->selfId != SYNC_NON_NODE_ID; -} - -bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { - return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout; -} - -int syncRaftQuorum(SSyncRaft* pRaft) { - return pRaft->cluster.replica / 2 + 1; -} - -int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum) { -/* - if (accept) { - syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", - pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); - } else { - syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "", - pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); - } - - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); - assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); - assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); - - pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; - int granted = 0, rejected = 0; - int i; - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++; - else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++; - } - - if (rejectNum) *rejectNum = rejected; - return granted; -*/ - return 0; -} - /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. @@ -335,138 +200,4 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) } return true; -} - -static int convertClear(SSyncRaft* pRaft) { - -} - -static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - - return 0; -} - -static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - /** - * Only handle vote responses corresponding to our candidacy (while in - * StateCandidate, we may get stale MsgPreVoteResp messages in this term from - * our pre-candidate state). - **/ - RaftMessageType msgType = pMsg->msgType; - - if (msgType == RAFT_MSG_INTERNAL_PROP) { - return 0; - } - - if (msgType == RAFT_MSG_VOTE_RESP) { - syncRaftHandleVoteRespMessage(pRaft, pMsg); - return 0; - } else if (msgType == RAFT_MSG_APPEND) { - syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); - syncRaftHandleAppendEntriesMessage(pRaft, pMsg); - } - return 0; -} - -static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - convertClear(pRaft); - return 0; -} - -/** - * tickElection is run by followers and candidates per tick. - **/ -static void tickElection(SSyncRaft* pRaft) { - pRaft->electionElapsed += 1; - - if (!syncRaftIsPromotable(pRaft)) { - return; - } - - if (!syncRaftIsPastElectionTimeout(pRaft)) { - return; - } - - // election timeout - pRaft->electionElapsed = 0; - SSyncMessage msg; - syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); -} - -static void tickHeartbeat(SSyncRaft* pRaft) { - -} - -static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { - SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); - SyncTerm term = pRaft->term; - int i; - - for (i = 0; i < n; ++i) { - entries[i].term = term; - entries[i].index = lastIndex + 1 + i; - } - - syncRaftLogAppend(pRaft->log, entries, n); - - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]); - syncRaftProgressMaybeUpdate(progress, lastIndex); - // Regardless of maybeCommit's return, our caller will call bcastAppend. - maybeCommit(pRaft); -} - -/** - * maybeCommit attempts to advance the commit index. Returns true if - * the commit index changed (in which case the caller should call - * r.bcastAppend). - **/ -static bool maybeCommit(SSyncRaft* pRaft) { - - return true; -} - -/** - * trigger I/O requests for newly appended log entries or heartbeats. - **/ -static int triggerAll(SSyncRaft* pRaft) { - assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); - int i; - - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (i == pRaft->cluster.selfIndex) { - continue; - } - - syncRaftReplicate(pRaft, i); - } -} - -static void abortLeaderTransfer(SSyncRaft* pRaft) { - pRaft->leadTransferee = SYNC_NON_NODE_ID; -} - -static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { - syncRaftInitProgress(i, (SSyncRaft*)arg, progress); -} - -static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { - if (pRaft->term != term) { - pRaft->term = term; - pRaft->voteFor = SYNC_NON_NODE_ID; - } - - pRaft->leaderId = SYNC_NON_NODE_ID; - - pRaft->electionElapsed = 0; - pRaft->heartbeatElapsed = 0; - - syncRaftRandomizedElectionTimeout(pRaft); - - abortLeaderTransfer(pRaft); - - syncRaftResetVotes(pRaft->tracker); - syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); - - pRaft->pendingConfigIndex = 0; - pRaft->uncommittedSize = 0; -} +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c index 4ffb8d0943..1ca3326810 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/raft_election.c @@ -23,6 +23,11 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { bool preVote; RaftMessageType voteMsgType; + if (syncRaftIsPromotable(pRaft)) { + syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { syncRaftBecomePreCandidate(pRaft); preVote = true; @@ -36,8 +41,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { } int quorum = syncRaftQuorum(pRaft); - int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, preVote, true, NULL); - if (quorum <= granted) { + SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); + if (result == SYNC_RAFT_VOTE_WON) { /** * We won the election after voting for ourselves (which must mean that * this is a single-node cluster). Advance to the next state. diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c index 19471846ba..6ffa24ff30 100644 --- a/source/libs/sync/src/raft_handle_election_message.c +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -20,10 +20,14 @@ int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (pRaft->state == TAOS_SYNC_ROLE_LEADER) { - syncDebug("%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfId); + syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); return 0; } + if (!syncRaftIsPromotable(pRaft)) { + syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); + return 0; + } // if there is pending uncommitted config,cannot start election if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 05464256af..b3a47aac7f 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -23,6 +23,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int quorum; int voterIndex; + assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE); + voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); if (voterIndex == -1) { syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); @@ -34,24 +36,23 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } - granted = syncRaftNumOfGranted(pRaft, pMsg->from, + SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from, pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION, - !pMsg->voteResp.rejected, &rejected); - quorum = syncRaftQuorum(pRaft); + !pMsg->voteResp.rejected, &rejected, &granted); syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections", pRaft->selfGroupId, pRaft->selfId, quorum, granted, rejected); - if (granted >= quorum) { - if (pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + if (result == SYNC_RAFT_VOTE_WON) { + if (pRaft->candidateState.inPreVote) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { - syncRaftBecomeLeader(pRaft); - } + syncRaftBecomeLeader(pRaft); - return 0; - } else if (rejected == quorum) { + } + } else if (result == SYNC_RAFT_VOTE_LOST) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); } + return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c new file mode 100644 index 0000000000..b7353fd787 --- /dev/null +++ b/source/libs/sync/src/sync_raft_impl.c @@ -0,0 +1,306 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_configuration.h" +#include "raft_log.h" +#include "raft_replication.h" +#include "sync_raft_progress_tracker.h" +#include "syncInt.h" + +static int convertClear(SSyncRaft* pRaft); +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +static int triggerAll(SSyncRaft* pRaft); + +static void tickElection(SSyncRaft* pRaft); +static void tickHeartbeat(SSyncRaft* pRaft); + +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); +static bool maybeCommit(SSyncRaft* pRaft); + +static void abortLeaderTransfer(SSyncRaft* pRaft); + +static void resetRaft(SSyncRaft* pRaft, SyncTerm term); + +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) { + convertClear(pRaft); + + pRaft->stepFp = stepFollower; + resetRaft(pRaft, term); + pRaft->tickFp = tickElection; + pRaft->leaderId = leaderId; + pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; + syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + + /** + * Becoming a pre-candidate changes our step functions and state, + * but doesn't change anything else. In particular it does not increase + * r.Term or change r.Vote. + **/ + pRaft->stepFp = stepCandidate; + pRaft->tickFp = tickElection; + pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; + pRaft->candidateState.inPreVote = true; + syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + + pRaft->candidateState.inPreVote = false; + pRaft->stepFp = stepCandidate; + // become candidate make term+1 + resetRaft(pRaft, pRaft->term + 1); + pRaft->tickFp = tickElection; + pRaft->voteFor = pRaft->selfId; + pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; + syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeLeader(SSyncRaft* pRaft) { + assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER); + + pRaft->stepFp = stepLeader; + resetRaft(pRaft, pRaft->term); + pRaft->leaderId = pRaft->leaderId; + pRaft->state = TAOS_SYNC_ROLE_LEADER; + // TODO: check if there is pending config log + int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); + if (nPendingConf > 1) { + syncFatal("unexpected multiple uncommitted config entry"); + } + + syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + // after become leader, send a no-op log + SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); + if (entry == NULL) { + return; + } + *entry = (SSyncRaftEntry) { + .buffer = (SSyncBuffer) { + .data = NULL, + .len = 0, + } + }; + appendEntries(pRaft, entry, 1); + //syncRaftTriggerHeartbeat(pRaft); +} + +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { + triggerAll(pRaft); +} + +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { + // electionTimeoutTick in [3,6] tick + pRaft->randomizedElectionTimeout = taosRand() % 4 + 3; +} + +bool syncRaftIsPromotable(SSyncRaft* pRaft) { + return pRaft->selfId != SYNC_NON_NODE_ID; +} + +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { + return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout; +} + +int syncRaftQuorum(SSyncRaft* pRaft) { + return pRaft->cluster.replica / 2 + 1; +} + +SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, + bool preVote, bool grant, + int* rejected, int *granted) { + int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + if (voterIndex == -1) { + return SYNC_RAFT_VOTE_PENDING; + } + + if (grant) { + syncInfo("[%d:%d] received grant (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } else { + syncInfo("[%d:%d] received rejection (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } + + syncRaftRecordVote(pRaft->tracker, voterIndex, grant); + return syncRaftTallyVotes(pRaft->tracker, rejected, granted); +} +/* + if (accept) { + syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } else { + syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); + } + + int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); + assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); + + pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + int granted = 0, rejected = 0; + int i; + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++; + else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++; + } + + if (rejectNum) *rejectNum = rejected; + return granted; +*/ + +static int convertClear(SSyncRaft* pRaft) { + +} + +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + + return 0; +} + +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + /** + * Only handle vote responses corresponding to our candidacy (while in + * StateCandidate, we may get stale MsgPreVoteResp messages in this term from + * our pre-candidate state). + **/ + RaftMessageType msgType = pMsg->msgType; + + if (msgType == RAFT_MSG_INTERNAL_PROP) { + return 0; + } + + if (msgType == RAFT_MSG_VOTE_RESP) { + syncRaftHandleVoteRespMessage(pRaft, pMsg); + return 0; + } else if (msgType == RAFT_MSG_APPEND) { + syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); + syncRaftHandleAppendEntriesMessage(pRaft, pMsg); + } + return 0; +} + +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + convertClear(pRaft); + return 0; +} + +/** + * tickElection is run by followers and candidates per tick. + **/ +static void tickElection(SSyncRaft* pRaft) { + pRaft->electionElapsed += 1; + + if (!syncRaftIsPromotable(pRaft)) { + return; + } + + if (!syncRaftIsPastElectionTimeout(pRaft)) { + return; + } + + // election timeout + pRaft->electionElapsed = 0; + SSyncMessage msg; + syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); +} + +static void tickHeartbeat(SSyncRaft* pRaft) { + +} + +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm term = pRaft->term; + int i; + + for (i = 0; i < n; ++i) { + entries[i].term = term; + entries[i].index = lastIndex + 1 + i; + } + + syncRaftLogAppend(pRaft->log, entries, n); + + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]); + syncRaftProgressMaybeUpdate(progress, lastIndex); + // Regardless of maybeCommit's return, our caller will call bcastAppend. + maybeCommit(pRaft); +} + +/** + * maybeCommit attempts to advance the commit index. Returns true if + * the commit index changed (in which case the caller should call + * r.bcastAppend). + **/ +static bool maybeCommit(SSyncRaft* pRaft) { + + return true; +} + +/** + * trigger I/O requests for newly appended log entries or heartbeats. + **/ +static int triggerAll(SSyncRaft* pRaft) { + assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); + int i; + + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (i == pRaft->cluster.selfIndex) { + continue; + } + + syncRaftReplicate(pRaft, i); + } +} + +static void abortLeaderTransfer(SSyncRaft* pRaft) { + pRaft->leadTransferee = SYNC_NON_NODE_ID; +} + +static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { + syncRaftInitProgress(i, (SSyncRaft*)arg, progress); +} + +static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { + if (pRaft->term != term) { + pRaft->term = term; + pRaft->voteFor = SYNC_NON_NODE_ID; + } + + pRaft->leaderId = SYNC_NON_NODE_ID; + + pRaft->electionElapsed = 0; + pRaft->heartbeatElapsed = 0; + + syncRaftRandomizedElectionTimeout(pRaft); + + abortLeaderTransfer(pRaft); + + syncRaftResetVotes(pRaft->tracker); + syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); + + pRaft->pendingConfigIndex = 0; + pRaft->uncommittedSize = 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index ec98be7dfa..437c083b4d 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -34,6 +34,8 @@ void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .inflights = inflights, + .isLearner = false, + .state = PROGRESS_STATE_PROBE, }; } diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 7104794cbb..43b68a4b08 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -25,7 +25,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { } void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { - memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); + memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA); } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { @@ -34,4 +34,43 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi SSyncRaftProgress* progress = &(tracker->progressMap[i]); visit(i, progress, arg); } +} + +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { + if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) { + return; + } + + tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; +} + +/** + * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the + * election outcome is known. + **/ +SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { + int i; + SSyncRaftProgress* progress; + int r, g; + + for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { + progress = &(tracker->progressMap[i]); + if (progress->id == SYNC_NON_NODE_ID) { + continue; + } + + if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + continue; + } + + if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + g++; + } else { + r++; + } + } + + if (rejected) *rejected = r; + if (granted) *granted = g; + return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c new file mode 100644 index 0000000000..a0e6a6782a --- /dev/null +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_quorum_majority.h" +#include "sync_raft_quorum_joint.h" +#include "sync_raft_quorum.h" + +/** + * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending, lost, or won. A joint quorum + * requires both majority quorums to vote in favor. + **/ +SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) { + SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes); + SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes); + + if (r1 == r2) { + // If they agree, return the agreed state. + return r1; + } + + if (r1 == SYNC_RAFT_VOTE_LOST || r2 == SYNC_RAFT_VOTE_LOST) { + // If either config has lost, loss is the only possible outcome. + return SYNC_RAFT_VOTE_LOST; + } + + // One side won, the other one is pending, so the whole outcome is. + return SYNC_RAFT_VOTE_PENDING; +} diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c new file mode 100644 index 0000000000..ea543d7335 --- /dev/null +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_quorum.h" +#include "sync_raft_quorum_majority.h" + +/** + * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending (i.e. neither a quorum of + * yes/no has been reached), won (a quorum of yes has been reached), or lost (a + * quorum of no has been reached). + **/ +SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) { + if (config->replica == 0) { + return SYNC_RAFT_VOTE_WON; + } + + int i, g, r, missing; + for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { + if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + continue; + } + + if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + missing += 1; + } else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + g +=1; + } else { + r += 1; + } + } + + int quorum = config->replica / 2 + 1; + if (g >= quorum) { + return SYNC_RAFT_VOTE_WON; + } + if (r + missing >= quorum) { + return SYNC_RAFT_VOTE_PENDING; + } + + return SYNC_RAFT_VOTE_LOST; +} \ No newline at end of file