[TD-10645][raft]<feature>refactor node and progress map

This commit is contained in:
lichuang 2021-11-19 10:18:56 +08:00
parent 7e2590f108
commit ce654f835a
15 changed files with 333 additions and 290 deletions

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_CONST_H
#define _TD_LIBS_SYNC_CONST_H
#include "sync.h"
static int kSyncRaftMaxInflghtMsgs = 20;
static SyncIndex kMaxCommitIndex = UINT64_MAX;
#endif /* _TD_LIBS_SYNC_CONST_H */

View File

@ -18,54 +18,47 @@
#include "sync.h" #include "sync.h"
/** // Inflights limits the number of MsgApp (represented by the largest index
* SSyncRaftInflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers
* contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever
* use syncRaftInflightFull() to check whether more messages can be sent, // they are sending a new append, and release "quota" via FreeLE() whenever an
* call syncRaftInflightAdd() whenever they are sending a new append, // ack is received.
* and release "quota" via FreeLE() whenever an ack is received.
**/
typedef struct SSyncRaftInflights { typedef struct SSyncRaftInflights {
/* the starting index in the buffer */ // the starting index in the buffer
int start; int start;
/* number of inflights in the buffer */ // number of inflights in the buffer
int count; int count;
/* the size of the buffer */ // the size of the buffer
int size; int size;
/** // buffer contains the index of the last entry
* buffer contains the index of the last entry // inside one message.
* inside one message.
**/
SyncIndex* buffer; SyncIndex* buffer;
} SSyncRaftInflights; } SSyncRaftInflights;
SSyncRaftInflights* syncRaftOpenInflights(int size); SSyncRaftInflights* syncRaftOpenInflights(int size);
void syncRaftCloseInflights(SSyncRaftInflights*); void syncRaftCloseInflights(SSyncRaftInflights*);
// reset frees all inflights.
static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) {
inflights->count = 0; inflights->count = 0;
inflights->start = 0; inflights->start = 0;
} }
// Full returns true if no more messages can be sent at the moment.
static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) {
return inflights->count == inflights->size; return inflights->count == inflights->size;
} }
/** // Add notifies the Inflights that a new message with the given index is being
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being // dispatched. Full() must be called prior to Add() to verify that there is room
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() // for one more message, and consecutive calls to add Add() must provide a
* to verify that there is room for one more message, // monotonic sequence of indexes.
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
/** // FreeLE frees the inflights smaller or equal to the given `to` flight.
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex);
/** /**

View File

@ -31,9 +31,9 @@ void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap);
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* from, SSyncRaftNodeMap* to); void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to);
void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to);
void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);

View File

@ -65,10 +65,13 @@ static const char* kProgressStateString[] = {
"Snapshot", "Snapshot",
}; };
/** // Progress represents a followers progress in the view of the leader. Leader
* Progress represents a followers progress in the view of the leader. Leader maintains // maintains progresses of all followers, and sends entries to the follower
* progresses of all followers, and sends entries to the follower based on its progress. // based on its progress.
**/ //
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
struct SSyncRaftProgress { struct SSyncRaftProgress {
SyncGroupId groupId; SyncGroupId groupId;
@ -80,63 +83,53 @@ struct SSyncRaftProgress {
SyncIndex matchIndex; SyncIndex matchIndex;
/** // State defines how the leader should interact with the follower.
* State defines how the leader should interact with the follower. //
* // When in StateProbe, leader sends at most one replication message
* When in StateProbe, leader sends at most one replication message // per heartbeat interval. It also probes actual progress of the follower.
* per heartbeat interval. It also probes actual progress of the follower. //
* // When in StateReplicate, leader optimistically increases next
* When in StateReplicate, leader optimistically increases next // to the latest entry sent after sending replication message. This is
* to the latest entry sent after sending replication message. This is // an optimized state for fast replicating log entries to the follower.
* an optimized state for fast replicating log entries to the follower. //
* // When in StateSnapshot, leader should have sent out snapshot
* When in StateSnapshot, leader should have sent out snapshot // before and stops sending any replication message.
* before and stops sending any replication message.
**/
ESyncRaftProgressState state; ESyncRaftProgressState state;
/** // PendingSnapshot is used in StateSnapshot.
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. // If there is a pending snapshot, the pendingSnapshot will be set to the
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the // index of the snapshot. If pendingSnapshot is set, the replication process of
* index of the snapshot. If pendingSnapshotIndex is set, the replication process of // this Progress will be paused. raft will not resend snapshot until the pending one
* this Progress will be paused. raft will not resend snapshot until the pending one // is reported to be failed.
* is reported to be failed.
**/
SyncIndex pendingSnapshotIndex; SyncIndex pendingSnapshotIndex;
/** // RecentActive is true if the progress is recently active. Receiving any messages
* recentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active.
* from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout.
* RecentActive can be reset to false after an election timeout. //
**/ // TODO(tbg): the leader should always have this set to true.
bool recentActive; bool recentActive;
/** // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
* probeSent is used while this follower is in StateProbe. When probeSent is // true, raft should pause sending replication message to this peer until
* true, raft should pause sending replication message to this peer until // ProbeSent is reset. See ProbeAcked() and IsPaused().
* probeSent is reset. See ProbeAcked() and IsPaused().
**/
bool probeSent; bool probeSent;
/** // Inflights is a sliding window for the inflight messages.
* inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries.
* Each inflight message contains one or more log entries. // The max number of entries per message is defined in raft config as MaxSizePerMsg.
* The max number of entries per message is defined in raft config as MaxSizePerMsg. // Thus inflight effectively limits both the number of inflight messages
* Thus inflight effectively limits both the number of inflight messages // and the bandwidth each Progress can use.
* and the bandwidth each Progress can use. // When inflights is Full, no more message should be sent.
* When inflights is Full, no more message should be sent. // When a leader sends out a message, the index of the last
* When a leader sends out a message, the index of the last // entry should be added to inflights. The index MUST be added
* entry should be added to inflights. The index MUST be added // into inflights in order.
* into inflights in order. // When a leader receives a reply, the previous inflights should
* When a leader receives a reply, the previous inflights should // be freed by calling inflights.FreeLE with the index of the last
* be freed by calling inflights.FreeLE with the index of the last // received entry.
* received entry.
**/
SSyncRaftInflights* inflights; SSyncRaftInflights* inflights;
/** // IsLearner is true if this progress is tracked for a learner.
* IsLearner is true if this progress is tracked for a learner.
**/
bool isLearner; bool isLearner;
}; };
@ -151,56 +144,44 @@ static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgr
void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress);
/** // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot.
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress);
/** // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress);
/** // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from
* index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true.
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex);
/** // OptimisticUpdate signals that appends all the way up to and including index n
* syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n // are in-flight. As a result, Next is increased to n+1.
* are in-flight. As a result, Next is increased to n+1.
**/
static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) {
progress->nextIndex = nextIndex + 1; progress->nextIndex = nextIndex + 1;
} }
/** // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
* syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and
* arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to.
* the hint that we want to decrease to. //
* // Rejections can happen spuriously as messages are sent out of order or
* Rejections can happen spuriously as messages are sent out of order or // duplicated. In such cases, the rejection pertains to an index that the
* duplicated. In such cases, the rejection pertains to an index that the // Progress already knows were previously acknowledged, and false is returned
* Progress already knows were previously acknowledged, and false is returned // without changing the Progress.
* without changing the Progress. //
* // If the rejection is genuine, Next is lowered sensibly, and the Progress is
* If the rejection is genuine, Next is lowered sensibly, and the Progress is // cleared for sending log entries.
* cleared for sending log entries.
**/
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint); SyncIndex rejected, SyncIndex matchHint);
/** // IsPaused returns whether sending log entries to this node has been throttled.
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. // This is done when a node has rejected recent MsgApps, is currently waiting
* This is done when a node has rejected recent MsgApps, is currently waiting // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal // operation, this is false. A throttled node will be contacted less frequently
* operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of
* until it has reached a state in which it's able to accept a steady stream of // log entries again.
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); bool syncRaftProgressIsPaused(SSyncRaftProgress* progress);
static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) {
@ -242,6 +223,8 @@ bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
**/ **/
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex);
void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to);

View File

@ -23,6 +23,7 @@
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
#include "thash.h" #include "thash.h"
// Config reflects the configuration tracked in a ProgressTracker.
struct SSyncRaftProgressTrackerConfig { struct SSyncRaftProgressTrackerConfig {
SSyncRaftQuorumJointConfig voters; SSyncRaftQuorumJointConfig voters;
@ -99,27 +100,32 @@ void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config); void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config);
// ResetVotes prepares for a new round of vote counting via recordVote.
void syncRaftResetVotes(SSyncRaftProgressTracker*); void syncRaftResetVotes(SSyncRaftProgressTracker*);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
/** // RecordVote records that the node with the given id voted for this Raft
* syncRaftRecordVote records that the node with the given id voted for this Raft // instance if v == true (and declined it otherwise).
* instance if v == true (and declined it otherwise).
**/
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant); void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant);
void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to); void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to);
int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
/** // TallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the // election outcome is known.
* election outcome is known.
**/
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs);
// Committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker);
// QuorumActive returns true if the quorum is active from the view of the local
// raft state machine. Otherwise, it returns false.
bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker);
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);

View File

@ -22,20 +22,25 @@
#include "sync_raft_node_map.h" #include "sync_raft_node_map.h"
#include "thash.h" #include "thash.h"
/** // JointConfig is a configuration of two groups of (possibly overlapping)
* SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) // majority configurations. Decisions require the support of both majorities.
* majority configurations. Decisions require the support of both majorities.
**/
typedef struct SSyncRaftQuorumJointConfig { typedef struct SSyncRaftQuorumJointConfig {
SSyncRaftNodeMap outgoing; SSyncRaftNodeMap outgoing;
SSyncRaftNodeMap incoming; SSyncRaftNodeMap incoming;
} SSyncRaftQuorumJointConfig; } SSyncRaftQuorumJointConfig;
/** // IDs returns a newly initialized map representing the set of voters present
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns // in the joint configuration.
* a result indicating whether the vote is pending, lost, or won. A joint quorum void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap);
* requires both majority quorums to vote in favor.
**/ // CommittedIndex returns the largest committed index for the given joint
// quorum. An index is jointly committed if it is committed in both constituent
// majorities.
SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg);
// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
// a result indicating whether the vote is pending, lost, or won. A joint quorum
// requires both majority quorums to vote in favor.
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap); ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap);
void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config); void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config);
@ -76,6 +81,4 @@ static FORCE_INLINE bool syncRaftJointConfigIsInOutgoing(const SSyncRaftQuorumJo
return syncRaftIsInNodeMap(&config->outgoing, id); return syncRaftIsInNodeMap(&config->outgoing, id);
} }
void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */

View File

@ -29,4 +29,8 @@
**/ **/
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap); ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap);
// CommittedIndex computes the committed index from those supplied via the
// provided AckedIndexer (for the active config).
SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */

View File

@ -86,4 +86,6 @@ typedef enum {
typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg);
typedef void (*matchAckIndexerFp)(SyncNodeId id, void* arg, SyncIndex* index);
#endif /* _TD_LIBS_SYNC_TYPE_H */ #endif /* _TD_LIBS_SYNC_TYPE_H */

View File

@ -84,7 +84,7 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
SSyncRaftNodeMap nodeMap; SSyncRaftNodeMap nodeMap;
syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); syncRaftJointConfigIDs(&pRaft->tracker->config.voters, &nodeMap);
SyncNodeId *pNodeId = NULL; SyncNodeId *pNodeId = NULL;
while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) { while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) {
SyncNodeId nodeId = *pNodeId; SyncNodeId nodeId = *pNodeId;

View File

@ -40,19 +40,16 @@ void syncRaftCloseInflights(SSyncRaftInflights* inflights) {
free(inflights); free(inflights);
} }
/** // Add notifies the Inflights that a new message with the given index is being
* syncRaftInflightAdd notifies the Inflights that a new message with the given index is being // dispatched. Full() must be called prior to Add() to verify that there is room
* dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() // for one more message, and consecutive calls to add Add() must provide a
* to verify that there is room for one more message, // monotonic sequence of indexes.
* and consecutive calls to add syncRaftInflightAdd() must provide a
* monotonic sequence of indexes.
**/
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) {
assert(!syncRaftInflightFull(inflights)); assert(!syncRaftInflightFull(inflights));
int next = inflights->start + inflights->count; int next = inflights->start + inflights->count;
int size = inflights->size; int size = inflights->size;
/* is next wrapped around buffer? */
if (next >= size) { if (next >= size) {
next -= size; next -= size;
} }
@ -61,12 +58,10 @@ void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex)
inflights->count++; inflights->count++;
} }
/** // FreeLE frees the inflights smaller or equal to the given `to` flight.
* syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight.
**/
void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) {
/* out of the left side of the window */ // out of the left side of the window
return; return;
} }
@ -95,10 +90,8 @@ void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) {
} }
} }
/** // FreeFirstOne releases the first inflight. This is a no-op if nothing is
* syncRaftInflightFreeFirstOne releases the first inflight. // inflight.
* This is a no-op if nothing is inflight.
**/
void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]);
} }

View File

@ -37,11 +37,10 @@ bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
return true; return true;
} }
void syncRaftCopyNodeMap(const SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) { void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) {
SyncNodeId** ppId = (SyncNodeId**)taosHashIterate(from->nodeIdMap, NULL); SyncNodeId *pId = NULL;
while (ppId) { while (!syncRaftIterateNodeMap(from, pId)) {
taosHashPut(to->nodeIdMap, ppId, sizeof(SyncNodeId*), ppId, sizeof(SyncNodeId*)); taosHashPut(to->nodeIdMap, &pId, sizeof(SyncNodeId*), &pId, sizeof(SyncNodeId*));
ppId = taosHashIterate(from->nodeIdMap, ppId);
} }
} }
@ -66,7 +65,7 @@ bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgress
return true; return true;
} }
void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
syncRaftCopyNodeMap(nodeMap, to); syncRaftCopyNodeMap(nodeMap, to);
} }

View File

@ -47,11 +47,9 @@ void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
}; };
} }
/** // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
* syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from
* index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true.
* an outdated message. Otherwise it updates the progress and returns true.
**/
bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) {
bool updated = false; bool updated = false;
@ -66,27 +64,36 @@ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastInde
return updated; return updated;
} }
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index of the append message rejected by the follower, and
// the hint that we want to decrease to.
//
// Rejections can happen spuriously as messages are sent out of order or
// duplicated. In such cases, the rejection pertains to an index that the
// Progress already knows were previously acknowledged, and false is returned
// without changing the Progress.
//
// If the rejection is genuine, Next is lowered sensibly, and the Progress is
// cleared for sending log entries.
bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
SyncIndex rejected, SyncIndex matchHint) { SyncIndex rejected, SyncIndex matchHint) {
if (progress->state == PROGRESS_STATE_REPLICATE) { if (progress->state == PROGRESS_STATE_REPLICATE) {
/** // The rejection must be stale if the progress has matched and "rejected"
* the rejection must be stale if the progress has matched and "rejected" // is smaller than "match".
* is smaller than "match".
**/
if (rejected <= progress->matchIndex) { if (rejected <= progress->matchIndex) {
syncDebug("match index is up to date,ignore"); syncDebug("match index is up to date,ignore");
return false; return false;
} }
/* directly decrease next to match + 1 */ // Directly decrease next to match + 1.
//
// TODO(tbg): why not use matchHint if it's larger?
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
return true; return true;
} }
/** // The rejection must be stale if "rejected" does not match next - 1. This
* The rejection must be stale if "rejected" does not match next - 1. This // is because non-replicating followers are probed one entry at a time.
* is because non-replicating followers are probed one entry at a time.
**/
if (rejected != progress->nextIndex - 1) { if (rejected != progress->nextIndex - 1) {
syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore"
, rejected, progress->nextIndex); , rejected, progress->nextIndex);
@ -99,14 +106,12 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress,
return true; return true;
} }
/** // IsPaused returns whether sending log entries to this node has been throttled.
* syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. // This is done when a node has rejected recent MsgApps, is currently waiting
* This is done when a node has rejected recent MsgApps, is currently waiting // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
* for a snapshot, or has reached the MaxInflightMsgs limit. In normal // operation, this is false. A throttled node will be contacted less frequently
* operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of
* until it has reached a state in which it's able to accept a steady stream of // log entries again.
* log entries again.
**/
bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
switch (progress->state) { switch (progress->state) {
case PROGRESS_STATE_PROBE: case PROGRESS_STATE_PROBE:
@ -152,16 +157,12 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
} }
/** // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, // optionally and if larger, the index of the pending snapshot.
* optionally and if larger, the index of the pending snapshot.
**/
void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
/** // If the original state is StateSnapshot, progress knows that
* If the original state is ProgressStateSnapshot, progress knows that // the pending snapshot has been sent to this peer successfully, then
* the pending snapshot has been sent to this peer successfully, then // probes from pendingSnapshot + 1.
* probes from pendingSnapshot + 1.
**/
if (progress->state == PROGRESS_STATE_SNAPSHOT) { if (progress->state == PROGRESS_STATE_SNAPSHOT) {
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_STATE_PROBE); resetProgressState(progress, PROGRESS_STATE_PROBE);
@ -172,14 +173,14 @@ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) {
} }
} }
/** // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
* syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
**/
void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) {
resetProgressState(progress, PROGRESS_STATE_REPLICATE); resetProgressState(progress, PROGRESS_STATE_REPLICATE);
progress->nextIndex = progress->matchIndex + 1; progress->nextIndex = progress->matchIndex + 1;
} }
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
// snapshot index.
void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) {
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = snapshotIndex; progress->pendingSnapshotIndex = snapshotIndex;
@ -242,10 +243,8 @@ static void unrefProgress(SSyncRaftProgress* progress, void* arg) {
} }
} }
/** // ResetState moves the Progress into the specified State, resetting ProbeSent,
* ResetState moves the Progress into the specified State, resetting ProbeSent, // PendingSnapshot, and Inflights.
* PendingSnapshot, and Inflights.
**/
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) { static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) {
progress->probeSent = false; progress->probeSent = false;
progress->pendingSnapshotIndex = 0; progress->pendingSnapshotIndex = 0;
@ -253,83 +252,9 @@ static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressSta
syncRaftInflightReset(progress->inflights); syncRaftInflightReset(progress->inflights);
} }
/** // ProbeAcked is called when this peer has accepted an append. It resets
* probeAcked is called when this peer has accepted an append. It resets // ProbeSent to signal that additional append messages should be sent without
* ProbeSent to signal that additional append messages should be sent without // further delay.
* further delay.
**/
static void probeAcked(SSyncRaftProgress* progress) { static void probeAcked(SSyncRaftProgress* progress) {
progress->probeSent = false; progress->probeSent = false;
} }
#if 0
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].nextIndex;
}
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].matchIndex;
}
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft);
}
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft);
}
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
bool prev = progress->recentRecv;
progress->recentRecv = false;
return prev;
}
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].recentRecv = true;
}
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].recentRecv;
}
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
resetProgressState(progress, PROGRESS_STATE_SNAPSHOT);
progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log);
}
void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (progress->state == PROGRESS_STATE_SNAPSHOT) {
assert(progress->pendingSnapshotIndex > 0);
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex);
} else {
resetProgressState(progress, PROGRESS_STATE_PROBE);
progress->nextIndex = progress->matchIndex + 1;
}
}
void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) {
resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE);
pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1;
}
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
progress->pendingSnapshotIndex = 0;
progress->state = PROGRESS_STATE_PROBE;
}
ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].state;
}
#endif

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "raft.h"
#include "sync_const.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
@ -22,9 +24,11 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) {
return NULL; return NULL;
} }
tracker->votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
syncRaftInitTrackConfig(&tracker->config); syncRaftInitTrackConfig(&tracker->config);
syncRaftInitNodeMap(&tracker->config.learnersNext);
tracker->pRaft = pRaft; tracker->pRaft = pRaft;
tracker->maxInflightMsgs = kSyncRaftMaxInflghtMsgs;
return tracker; return tracker;
} }
@ -39,9 +43,11 @@ void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config) {
void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) { void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) {
syncRaftFreeNodeMap(&config->learners); syncRaftFreeNodeMap(&config->learners);
syncRaftFreeNodeMap(&config->learnersNext); syncRaftFreeNodeMap(&config->learnersNext);
syncRaftFreeQuorumJointConfig(&config->voters); syncRaftFreeNodeMap(&config->voters.incoming);
syncRaftFreeNodeMap(&config->voters.outgoing);
} }
// ResetVotes prepares for a new round of vote counting via recordVote.
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
taosHashClear(tracker->votesMap); taosHashClear(tracker->votesMap);
} }
@ -50,14 +56,15 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi
syncRaftVisitProgressMap(&tracker->progressMap, visit, arg); syncRaftVisitProgressMap(&tracker->progressMap, visit, arg);
} }
// RecordVote records that the node with the given id voted for this Raft
// instance if v == true (and declined it otherwise).
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) {
ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*)); ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*));
if (pType != NULL) { if (pType != NULL) {
return; return;
} }
ESyncRaftVoteType type = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &grant, sizeof(bool*));
taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &type, sizeof(ESyncRaftVoteType*));
} }
void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
@ -78,26 +85,27 @@ int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config,
return 0; return 0;
} }
/** // TallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the // election outcome is known.
* election outcome is known.
**/
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
int i; SSyncRaftProgress* progress = NULL;
SSyncRaftProgress* progress;
int r, g; int r, g;
// Make sure to populate granted/rejected correctly even if the Votes slice
// contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) { while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) {
if (progress->id == SYNC_NON_NODE_ID) { if (progress->id == SYNC_NON_NODE_ID) {
continue; continue;
} }
ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*)); bool* v = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*));
if (pType == NULL) { if (v == NULL) {
continue; continue;
} }
if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { if (*v) {
g++; g++;
} else { } else {
r++; r++;
@ -109,9 +117,40 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap); return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap);
} }
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters); syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters);
syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing); syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing);
syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners); syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners);
syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext); syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext);
cs->autoLeave = tracker->config.autoLeave;
}
static void matchAckIndexer(SyncNodeId id, void* arg, SyncIndex* index) {
SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)arg;
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&tracker->progressMap, id);
if (progress == NULL) {
*index = 0;
return;
}
*index = progress->matchIndex;
}
// Committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker) {
return syncRaftJointConfigCommittedIndex(&tracker->config.voters, matchAckIndexer, tracker);
}
static void visitProgressActive(SSyncRaftProgress* progress, void* arg) {
SHashObj* votesMap = (SHashObj*)arg;
taosHashPut(votesMap, &progress->id, sizeof(SyncNodeId), &progress->recentActive, sizeof(bool));
}
// QuorumActive returns true if the quorum is active from the view of the local
// raft state machine. Otherwise, it returns false.
bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker) {
SHashObj* votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
syncRaftVisitProgressMap(&tracker->progressMap, visitProgressActive, votesMap);
return syncRaftVoteResult(&tracker->config.voters, votesMap) == SYNC_RAFT_VOTE_WON;
} }

View File

@ -59,8 +59,17 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S
syncRaftRemoveFromNodeMap(&config->incoming, id); syncRaftRemoveFromNodeMap(&config->incoming, id);
} }
void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) {
syncRaftCopyNodeMap(&config->incoming, nodeMap); syncRaftCopyNodeMap(&config->incoming, nodeMap);
syncRaftUnionNodeMap(&config->outgoing, nodeMap); syncRaftUnionNodeMap(&config->outgoing, nodeMap);
}
SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg) {
SyncIndex index0, index1;
index0 = syncRaftMajorityConfigCommittedIndex(&config->incoming, indexer, arg);
index1 = syncRaftMajorityConfigCommittedIndex(&config->outgoing, indexer, arg);
return index0 < index1 ? index0 : index1;
} }

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync_const.h"
#include "sync_raft_quorum.h" #include "sync_raft_quorum.h"
#include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_majority.h"
#include "sync_raft_node_map.h" #include "sync_raft_node_map.h"
@ -34,13 +35,13 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb
i = g = r = missing = 0; i = g = r = missing = 0;
SyncNodeId* pId = NULL; SyncNodeId* pId = NULL;
while (!syncRaftIterateNodeMap(config, pId)) { while (!syncRaftIterateNodeMap(config, pId)) {
const ESyncRaftVoteType* pType = taosHashGet(votesMap, pId, sizeof(SyncNodeId*)); const bool* v = (const bool*)taosHashGet(votesMap, pId, sizeof(SyncNodeId*));
if (pType == NULL) { if (v == NULL) {
missing += 1; missing += 1;
continue; continue;
} }
if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { if (*v) {
g +=1; g +=1;
} else { } else {
r += 1; r += 1;
@ -56,4 +57,65 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb
} }
return SYNC_RAFT_VOTE_LOST; return SYNC_RAFT_VOTE_LOST;
}
int compSyncIndex(const void * elem1, const void * elem2) {
SyncIndex index1 = *((SyncIndex*)elem1);
SyncIndex index2 = *((SyncIndex*)elem1);
if (index1 > index2) return 1;
if (index1 < index2) return -1;
return 0;
}
SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg) {
int n = syncRaftNodeMapSize(config);
if (n == 0) {
// This plays well with joint quorums which, when one half is the zero
// MajorityConfig, should behave like the other half.
return kMaxCommitIndex;
}
// Use an on-stack slice to collect the committed indexes when n <= 7
// (otherwise we alloc). The alternative is to stash a slice on
// MajorityConfig, but this impairs usability (as is, MajorityConfig is just
// a map, and that's nice). The assumption is that running with a
// replication factor of >7 is rare, and in cases in which it happens
// performance is a lesser concern (additionally the performance
// implications of an allocation here are far from drastic).
SyncIndex* srt = NULL;
SyncIndex srk[TSDB_MAX_REPLICA];
if (n > TSDB_MAX_REPLICA) {
srt = (SyncIndex*)malloc(sizeof(SyncIndex) * n);
if (srt == NULL) {
return kMaxCommitIndex;
}
} else {
srt = &srk[0];
}
// Fill the slice with the indexes observed. Any unused slots will be
// left as zero; these correspond to voters that may report in, but
// haven't yet. We fill from the right (since the zeroes will end up on
// the left after sorting below anyway).
SyncNodeId *pId = NULL;
int i = 0;
SyncIndex index;
while (!syncRaftIterateNodeMap(config, pId)) {
indexer(*pId, arg, &index);
srt[i++] = index;
}
// Sort by index. Use a bespoke algorithm (copied from the stdlib's sort
// package) to keep srt on the stack.
qsort(srt, n, sizeof(SyncIndex), compSyncIndex);
// The smallest index into the array for which the value is acked by a
// quorum. In other words, from the end of the slice, move n/2+1 to the
// left (accounting for zero-indexing).
index = srt[n - (n/2 + 1)];
if (srt != &srk[0]) {
free(srt);
}
return index;
} }