From ce654f835a5b31cff3fbcfe5c52b488689a0335f Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 19 Nov 2021 10:18:56 +0800 Subject: [PATCH] [TD-10645][raft]refactor node and progress map --- source/libs/sync/inc/sync_const.h | 25 +++ source/libs/sync/inc/sync_raft_inflights.h | 41 ++--- source/libs/sync/inc/sync_raft_node_map.h | 4 +- source/libs/sync/inc/sync_raft_progress.h | 159 ++++++++---------- .../sync/inc/sync_raft_progress_tracker.h | 24 ++- source/libs/sync/inc/sync_raft_quorum_joint.h | 25 +-- .../libs/sync/inc/sync_raft_quorum_majority.h | 4 + source/libs/sync/inc/sync_type.h | 2 + source/libs/sync/src/sync_raft_election.c | 2 +- source/libs/sync/src/sync_raft_inflights.c | 25 +-- source/libs/sync/src/sync_raft_node_map.c | 11 +- source/libs/sync/src/sync_raft_progress.c | 155 +++++------------ .../sync/src/sync_raft_progress_tracker.c | 67 ++++++-- source/libs/sync/src/sync_raft_quorum_joint.c | 11 +- .../libs/sync/src/sync_raft_quorum_majority.c | 68 +++++++- 15 files changed, 333 insertions(+), 290 deletions(-) create mode 100644 source/libs/sync/inc/sync_const.h diff --git a/source/libs/sync/inc/sync_const.h b/source/libs/sync/inc/sync_const.h new file mode 100644 index 0000000000..b49c17f82e --- /dev/null +++ b/source/libs/sync/inc/sync_const.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_CONST_H +#define _TD_LIBS_SYNC_CONST_H + +#include "sync.h" + +static int kSyncRaftMaxInflghtMsgs = 20; + +static SyncIndex kMaxCommitIndex = UINT64_MAX; + +#endif /* _TD_LIBS_SYNC_CONST_H */ diff --git a/source/libs/sync/inc/sync_raft_inflights.h b/source/libs/sync/inc/sync_raft_inflights.h index 6d249c9274..627bf9a26f 100644 --- a/source/libs/sync/inc/sync_raft_inflights.h +++ b/source/libs/sync/inc/sync_raft_inflights.h @@ -18,54 +18,47 @@ #include "sync.h" -/** - * SSyncRaftInflights limits the number of MsgApp (represented by the largest index - * contained within) sent to followers but not yet acknowledged by them. Callers - * use syncRaftInflightFull() to check whether more messages can be sent, - * call syncRaftInflightAdd() whenever they are sending a new append, - * and release "quota" via FreeLE() whenever an ack is received. -**/ +// Inflights limits the number of MsgApp (represented by the largest index +// contained within) sent to followers but not yet acknowledged by them. Callers +// use Full() to check whether more messages can be sent, call Add() whenever +// they are sending a new append, and release "quota" via FreeLE() whenever an +// ack is received. typedef struct SSyncRaftInflights { - /* the starting index in the buffer */ + // the starting index in the buffer int start; - /* number of inflights in the buffer */ + // number of inflights in the buffer int count; - /* the size of the buffer */ + // the size of the buffer int size; - /** - * buffer contains the index of the last entry - * inside one message. - **/ + // buffer contains the index of the last entry + // inside one message. SyncIndex* buffer; } SSyncRaftInflights; SSyncRaftInflights* syncRaftOpenInflights(int size); void syncRaftCloseInflights(SSyncRaftInflights*); +// reset frees all inflights. static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { inflights->count = 0; inflights->start = 0; } +// Full returns true if no more messages can be sent at the moment. static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { return inflights->count == inflights->size; } -/** - * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being - * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() - * to verify that there is room for one more message, - * and consecutive calls to add syncRaftInflightAdd() must provide a - * monotonic sequence of indexes. - **/ +// Add notifies the Inflights that a new message with the given index is being +// dispatched. Full() must be called prior to Add() to verify that there is room +// for one more message, and consecutive calls to add Add() must provide a +// monotonic sequence of indexes. void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); -/** - * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. - **/ +// FreeLE frees the inflights smaller or equal to the given `to` flight. void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); /** diff --git a/source/libs/sync/inc/sync_raft_node_map.h b/source/libs/sync/inc/sync_raft_node_map.h index 2de4887bf4..b4cf04056d 100644 --- a/source/libs/sync/inc/sync_raft_node_map.h +++ b/source/libs/sync/inc/sync_raft_node_map.h @@ -31,9 +31,9 @@ void syncRaftClearNodeMap(SSyncRaftNodeMap* nodeMap); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); -void syncRaftCopyNodeMap(const SSyncRaftNodeMap* from, SSyncRaftNodeMap* to); +void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to); -void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); +void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 7d80ce5438..32c21281cd 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -65,10 +65,13 @@ static const char* kProgressStateString[] = { "Snapshot", }; -/** - * Progress represents a follower’s progress in the view of the leader. Leader maintains - * progresses of all followers, and sends entries to the follower based on its progress. - **/ +// Progress represents a follower’s progress in the view of the leader. Leader +// maintains progresses of all followers, and sends entries to the follower +// based on its progress. +// +// NB(tbg): Progress is basically a state machine whose transitions are mostly +// strewn around `*raft.raft`. Additionally, some fields are only used when in a +// certain State. All of this isn't ideal. struct SSyncRaftProgress { SyncGroupId groupId; @@ -80,63 +83,53 @@ struct SSyncRaftProgress { SyncIndex matchIndex; - /** - * State defines how the leader should interact with the follower. - * - * When in StateProbe, leader sends at most one replication message - * per heartbeat interval. It also probes actual progress of the follower. - * - * When in StateReplicate, leader optimistically increases next - * to the latest entry sent after sending replication message. This is - * an optimized state for fast replicating log entries to the follower. - * - * When in StateSnapshot, leader should have sent out snapshot - * before and stops sending any replication message. - **/ + // State defines how the leader should interact with the follower. + // + // When in StateProbe, leader sends at most one replication message + // per heartbeat interval. It also probes actual progress of the follower. + // + // When in StateReplicate, leader optimistically increases next + // to the latest entry sent after sending replication message. This is + // an optimized state for fast replicating log entries to the follower. + // + // When in StateSnapshot, leader should have sent out snapshot + // before and stops sending any replication message. ESyncRaftProgressState state; - /** - * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. - * If there is a pending snapshot, the pendingSnapshotIndex will be set to the - * index of the snapshot. If pendingSnapshotIndex is set, the replication process of - * this Progress will be paused. raft will not resend snapshot until the pending one - * is reported to be failed. - **/ + // PendingSnapshot is used in StateSnapshot. + // If there is a pending snapshot, the pendingSnapshot will be set to the + // index of the snapshot. If pendingSnapshot is set, the replication process of + // this Progress will be paused. raft will not resend snapshot until the pending one + // is reported to be failed. SyncIndex pendingSnapshotIndex; - /** - * recentActive is true if the progress is recently active. Receiving any messages - * from the corresponding follower indicates the progress is active. - * RecentActive can be reset to false after an election timeout. - **/ + // RecentActive is true if the progress is recently active. Receiving any messages + // from the corresponding follower indicates the progress is active. + // RecentActive can be reset to false after an election timeout. + // + // TODO(tbg): the leader should always have this set to true. bool recentActive; - /** - * probeSent is used while this follower is in StateProbe. When probeSent is - * true, raft should pause sending replication message to this peer until - * probeSent is reset. See ProbeAcked() and IsPaused(). - **/ + // ProbeSent is used while this follower is in StateProbe. When ProbeSent is + // true, raft should pause sending replication message to this peer until + // ProbeSent is reset. See ProbeAcked() and IsPaused(). bool probeSent; - /** - * inflights is a sliding window for the inflight messages. - * Each inflight message contains one or more log entries. - * The max number of entries per message is defined in raft config as MaxSizePerMsg. - * Thus inflight effectively limits both the number of inflight messages - * and the bandwidth each Progress can use. - * When inflights is Full, no more message should be sent. - * When a leader sends out a message, the index of the last - * entry should be added to inflights. The index MUST be added - * into inflights in order. - * When a leader receives a reply, the previous inflights should - * be freed by calling inflights.FreeLE with the index of the last - * received entry. - **/ + // Inflights is a sliding window for the inflight messages. + // Each inflight message contains one or more log entries. + // The max number of entries per message is defined in raft config as MaxSizePerMsg. + // Thus inflight effectively limits both the number of inflight messages + // and the bandwidth each Progress can use. + // When inflights is Full, no more message should be sent. + // When a leader sends out a message, the index of the last + // entry should be added to inflights. The index MUST be added + // into inflights in order. + // When a leader receives a reply, the previous inflights should + // be freed by calling inflights.FreeLE with the index of the last + // received entry. SSyncRaftInflights* inflights; - /** - * IsLearner is true if this progress is tracked for a learner. - **/ + // IsLearner is true if this progress is tracked for a learner. bool isLearner; }; @@ -151,56 +144,44 @@ static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgr void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); -/** - * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, - * optionally and if larger, the index of the pending snapshot. - **/ +// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, +// optionally and if larger, the index of the pending snapshot. void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); -/** - * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. - **/ +// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); -/** - * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the - * index acked by it. The method returns false if the given n index comes from - * an outdated message. Otherwise it updates the progress and returns true. - **/ +// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the +// index acked by it. The method returns false if the given n index comes from +// an outdated message. Otherwise it updates the progress and returns true. bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); -/** - * syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n - * are in-flight. As a result, Next is increased to n+1. - **/ +// OptimisticUpdate signals that appends all the way up to and including index n +// are in-flight. As a result, Next is increased to n+1. static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { progress->nextIndex = nextIndex + 1; } -/** - * syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The - * arguments are the index of the append message rejected by the follower, and - * the hint that we want to decrease to. - * - * Rejections can happen spuriously as messages are sent out of order or - * duplicated. In such cases, the rejection pertains to an index that the - * Progress already knows were previously acknowledged, and false is returned - * without changing the Progress. - * - * If the rejection is genuine, Next is lowered sensibly, and the Progress is - * cleared for sending log entries. -**/ +// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The +// arguments are the index of the append message rejected by the follower, and +// the hint that we want to decrease to. +// +// Rejections can happen spuriously as messages are sent out of order or +// duplicated. In such cases, the rejection pertains to an index that the +// Progress already knows were previously acknowledged, and false is returned +// without changing the Progress. +// +// If the rejection is genuine, Next is lowered sensibly, and the Progress is +// cleared for sending log entries. bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint); -/** - * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. - * This is done when a node has rejected recent MsgApps, is currently waiting - * for a snapshot, or has reached the MaxInflightMsgs limit. In normal - * operation, this is false. A throttled node will be contacted less frequently - * until it has reached a state in which it's able to accept a steady stream of - * log entries again. - **/ +// IsPaused returns whether sending log entries to this node has been throttled. +// This is done when a node has rejected recent MsgApps, is currently waiting +// for a snapshot, or has reached the MaxInflightMsgs limit. In normal +// operation, this is false. A throttled node will be contacted less frequently +// until it has reached a state in which it's able to accept a steady stream of +// log entries again. bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { @@ -242,6 +223,8 @@ bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); **/ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); +// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending +// snapshot index. void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index ff69b7b1d1..0a3c7dd6fc 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -23,6 +23,7 @@ #include "sync_raft_proto.h" #include "thash.h" +// Config reflects the configuration tracked in a ProgressTracker. struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -99,27 +100,32 @@ void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config); void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config); +// ResetVotes prepares for a new round of vote counting via recordVote. void syncRaftResetVotes(SSyncRaftProgressTracker*); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); -/** - * syncRaftRecordVote records that the node with the given id voted for this Raft - * instance if v == true (and declined it otherwise). - **/ +// RecordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant); void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to); int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); -/** - * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the - * election outcome is known. - **/ +// TallyVotes returns the number of granted and rejected Votes, and whether the +// election outcome is known. ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); -void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); +void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); + +// Committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker); + +// QuorumActive returns true if the quorum is active from the view of the local +// raft state machine. Otherwise, it returns false. +bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker); bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 92cddaaec1..9d5f10ab51 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -22,20 +22,25 @@ #include "sync_raft_node_map.h" #include "thash.h" -/** - * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) - * majority configurations. Decisions require the support of both majorities. - **/ +// JointConfig is a configuration of two groups of (possibly overlapping) +// majority configurations. Decisions require the support of both majorities. typedef struct SSyncRaftQuorumJointConfig { SSyncRaftNodeMap outgoing; SSyncRaftNodeMap incoming; } SSyncRaftQuorumJointConfig; -/** - * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns - * a result indicating whether the vote is pending, lost, or won. A joint quorum - * requires both majority quorums to vote in favor. - **/ +// IDs returns a newly initialized map representing the set of voters present +// in the joint configuration. +void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap); + +// CommittedIndex returns the largest committed index for the given joint +// quorum. An index is jointly committed if it is committed in both constituent +// majorities. +SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg); + +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending, lost, or won. A joint quorum +// requires both majority quorums to vote in favor. ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, SHashObj* votesMap); void syncRaftInitQuorumJointConfig(SSyncRaftQuorumJointConfig* config); @@ -76,6 +81,4 @@ static FORCE_INLINE bool syncRaftJointConfigIsInOutgoing(const SSyncRaftQuorumJo return syncRaftIsInNodeMap(&config->outgoing, id); } -void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap); - #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 38df40147a..399bd71db8 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -29,4 +29,8 @@ **/ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap); +// CommittedIndex computes the committed index from those supplied via the +// provided AckedIndexer (for the active config). +SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg); + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index e00700d724..9c4bc9e63c 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -86,4 +86,6 @@ typedef enum { typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); +typedef void (*matchAckIndexerFp)(SyncNodeId id, void* arg, SyncIndex* index); + #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index 6d36d38267..d961978be2 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -84,7 +84,7 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); SSyncRaftNodeMap nodeMap; - syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); + syncRaftJointConfigIDs(&pRaft->tracker->config.voters, &nodeMap); SyncNodeId *pNodeId = NULL; while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) { SyncNodeId nodeId = *pNodeId; diff --git a/source/libs/sync/src/sync_raft_inflights.c b/source/libs/sync/src/sync_raft_inflights.c index 3d740b5a9e..7b97aca014 100644 --- a/source/libs/sync/src/sync_raft_inflights.c +++ b/source/libs/sync/src/sync_raft_inflights.c @@ -40,19 +40,16 @@ void syncRaftCloseInflights(SSyncRaftInflights* inflights) { free(inflights); } -/** - * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being - * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() - * to verify that there is room for one more message, - * and consecutive calls to add syncRaftInflightAdd() must provide a - * monotonic sequence of indexes. - **/ +// Add notifies the Inflights that a new message with the given index is being +// dispatched. Full() must be called prior to Add() to verify that there is room +// for one more message, and consecutive calls to add Add() must provide a +// monotonic sequence of indexes. void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { assert(!syncRaftInflightFull(inflights)); int next = inflights->start + inflights->count; int size = inflights->size; - /* is next wrapped around buffer? */ + if (next >= size) { next -= size; } @@ -61,12 +58,10 @@ void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) inflights->count++; } -/** - * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. - **/ +// FreeLE frees the inflights smaller or equal to the given `to` flight. void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { - /* out of the left side of the window */ + // out of the left side of the window return; } @@ -95,10 +90,8 @@ void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { } } -/** - * syncRaftInflightFreeFirstOne releases the first inflight. - * This is a no-op if nothing is inflight. - **/ +// FreeFirstOne releases the first inflight. This is a no-op if nothing is +// inflight. void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); } diff --git a/source/libs/sync/src/sync_raft_node_map.c b/source/libs/sync/src/sync_raft_node_map.c index 1c54d32b59..642eebe65b 100644 --- a/source/libs/sync/src/sync_raft_node_map.c +++ b/source/libs/sync/src/sync_raft_node_map.c @@ -37,11 +37,10 @@ bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { return true; } -void syncRaftCopyNodeMap(const SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) { - SyncNodeId** ppId = (SyncNodeId**)taosHashIterate(from->nodeIdMap, NULL); - while (ppId) { - taosHashPut(to->nodeIdMap, ppId, sizeof(SyncNodeId*), ppId, sizeof(SyncNodeId*)); - ppId = taosHashIterate(from->nodeIdMap, ppId); +void syncRaftCopyNodeMap(SSyncRaftNodeMap* from, SSyncRaftNodeMap* to) { + SyncNodeId *pId = NULL; + while (!syncRaftIterateNodeMap(from, pId)) { + taosHashPut(to->nodeIdMap, &pId, sizeof(SyncNodeId*), &pId, sizeof(SyncNodeId*)); } } @@ -66,7 +65,7 @@ bool syncRaftIsAllNodeInProgressMap(SSyncRaftNodeMap* nodeMap, SSyncRaftProgress return true; } -void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { +void syncRaftUnionNodeMap(SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { syncRaftCopyNodeMap(nodeMap, to); } diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index 65676655ec..a3ab93c0fc 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -47,11 +47,9 @@ void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) { }; } -/** - * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the - * index acked by it. The method returns false if the given n index comes from - * an outdated message. Otherwise it updates the progress and returns true. - **/ +// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the +// index acked by it. The method returns false if the given n index comes from +// an outdated message. Otherwise it updates the progress and returns true. bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool updated = false; @@ -66,27 +64,36 @@ bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastInde return updated; } +// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The +// arguments are the index of the append message rejected by the follower, and +// the hint that we want to decrease to. +// +// Rejections can happen spuriously as messages are sent out of order or +// duplicated. In such cases, the rejection pertains to an index that the +// Progress already knows were previously acknowledged, and false is returned +// without changing the Progress. +// +// If the rejection is genuine, Next is lowered sensibly, and the Progress is +// cleared for sending log entries. bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex matchHint) { if (progress->state == PROGRESS_STATE_REPLICATE) { - /** - * the rejection must be stale if the progress has matched and "rejected" - * is smaller than "match". - **/ + // The rejection must be stale if the progress has matched and "rejected" + // is smaller than "match". if (rejected <= progress->matchIndex) { syncDebug("match index is up to date,ignore"); return false; } - /* directly decrease next to match + 1 */ + // Directly decrease next to match + 1. + // + // TODO(tbg): why not use matchHint if it's larger? progress->nextIndex = progress->matchIndex + 1; return true; } - /** - * The rejection must be stale if "rejected" does not match next - 1. This - * is because non-replicating followers are probed one entry at a time. - **/ + // The rejection must be stale if "rejected" does not match next - 1. This + // is because non-replicating followers are probed one entry at a time. if (rejected != progress->nextIndex - 1) { syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" , rejected, progress->nextIndex); @@ -99,14 +106,12 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, return true; } -/** - * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. - * This is done when a node has rejected recent MsgApps, is currently waiting - * for a snapshot, or has reached the MaxInflightMsgs limit. In normal - * operation, this is false. A throttled node will be contacted less frequently - * until it has reached a state in which it's able to accept a steady stream of - * log entries again. - **/ +// IsPaused returns whether sending log entries to this node has been throttled. +// This is done when a node has rejected recent MsgApps, is currently waiting +// for a snapshot, or has reached the MaxInflightMsgs limit. In normal +// operation, this is false. A throttled node will be contacted less frequently +// until it has reached a state in which it's able to accept a steady stream of +// log entries again. bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { switch (progress->state) { case PROGRESS_STATE_PROBE: @@ -152,16 +157,12 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } -/** - * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, - * optionally and if larger, the index of the pending snapshot. - **/ +// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, +// optionally and if larger, the index of the pending snapshot. void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { - /** - * If the original state is ProgressStateSnapshot, progress knows that - * the pending snapshot has been sent to this peer successfully, then - * probes from pendingSnapshot + 1. - **/ + // If the original state is StateSnapshot, progress knows that + // the pending snapshot has been sent to this peer successfully, then + // probes from pendingSnapshot + 1. if (progress->state == PROGRESS_STATE_SNAPSHOT) { SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; resetProgressState(progress, PROGRESS_STATE_PROBE); @@ -172,14 +173,14 @@ void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { } } -/** - * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. - **/ +// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { resetProgressState(progress, PROGRESS_STATE_REPLICATE); progress->nextIndex = progress->matchIndex + 1; } +// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending +// snapshot index. void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); progress->pendingSnapshotIndex = snapshotIndex; @@ -242,10 +243,8 @@ static void unrefProgress(SSyncRaftProgress* progress, void* arg) { } } -/** - * ResetState moves the Progress into the specified State, resetting ProbeSent, - * PendingSnapshot, and Inflights. - **/ +// ResetState moves the Progress into the specified State, resetting ProbeSent, +// PendingSnapshot, and Inflights. static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) { progress->probeSent = false; progress->pendingSnapshotIndex = 0; @@ -253,83 +252,9 @@ static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressSta syncRaftInflightReset(progress->inflights); } -/** - * probeAcked is called when this peer has accepted an append. It resets - * ProbeSent to signal that additional append messages should be sent without - * further delay. - **/ +// ProbeAcked is called when this peer has accepted an append. It resets +// ProbeSent to signal that additional append messages should be sent without +// further delay. static void probeAcked(SSyncRaftProgress* progress) { progress->probeSent = false; } - -#if 0 - -SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].nextIndex; -} - -SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].matchIndex; -} - -void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); -} - -void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); -} - -bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - bool prev = progress->recentRecv; - progress->recentRecv = false; - return prev; -} - -void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { - pRaft->leaderState.progress[i].recentRecv = true; -} - -bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].recentRecv; -} - -void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); - progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); -} - -void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - - if (progress->state == PROGRESS_STATE_SNAPSHOT) { - assert(progress->pendingSnapshotIndex > 0); - SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; - resetProgressState(progress, PROGRESS_STATE_PROBE); - progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); - } else { - resetProgressState(progress, PROGRESS_STATE_PROBE); - progress->nextIndex = progress->matchIndex + 1; - } -} - -void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { - resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); - pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; -} - -void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - progress->pendingSnapshotIndex = 0; - progress->state = PROGRESS_STATE_PROBE; -} - -ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { - return pRaft->leaderState.progress[i].state; -} - - - -#endif \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 60e3ccea6a..e0b4afae21 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include "raft.h" +#include "sync_const.h" #include "sync_raft_progress_tracker.h" #include "sync_raft_proto.h" @@ -22,9 +24,11 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) { return NULL; } + tracker->votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + syncRaftInitTrackConfig(&tracker->config); - syncRaftInitNodeMap(&tracker->config.learnersNext); tracker->pRaft = pRaft; + tracker->maxInflightMsgs = kSyncRaftMaxInflghtMsgs; return tracker; } @@ -39,9 +43,11 @@ void syncRaftInitTrackConfig(SSyncRaftProgressTrackerConfig* config) { void syncRaftFreeTrackConfig(SSyncRaftProgressTrackerConfig* config) { syncRaftFreeNodeMap(&config->learners); syncRaftFreeNodeMap(&config->learnersNext); - syncRaftFreeQuorumJointConfig(&config->voters); + syncRaftFreeNodeMap(&config->voters.incoming); + syncRaftFreeNodeMap(&config->voters.outgoing); } +// ResetVotes prepares for a new round of vote counting via recordVote. void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { taosHashClear(tracker->votesMap); } @@ -50,14 +56,15 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp vi syncRaftVisitProgressMap(&tracker->progressMap, visit, arg); } +// RecordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &id, sizeof(SyncNodeId*)); if (pType != NULL) { return; } - ESyncRaftVoteType type = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; - taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &type, sizeof(ESyncRaftVoteType*)); + taosHashPut(tracker->votesMap, &id, sizeof(SyncNodeId), &grant, sizeof(bool*)); } void syncRaftCopyTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { @@ -78,26 +85,27 @@ int syncRaftCheckTrackerConfigInProgress(SSyncRaftProgressTrackerConfig* config, return 0; } -/** - * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the - * election outcome is known. - **/ +// TallyVotes returns the number of granted and rejected Votes, and whether the +// election outcome is known. ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { - int i; - SSyncRaftProgress* progress; + SSyncRaftProgress* progress = NULL; int r, g; + // Make sure to populate granted/rejected correctly even if the Votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) { if (progress->id == SYNC_NON_NODE_ID) { continue; } - ESyncRaftVoteType* pType = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*)); - if (pType == NULL) { + bool* v = taosHashGet(tracker->votesMap, &progress->id, sizeof(SyncNodeId*)); + if (v == NULL) { continue; } - if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { + if (*v) { g++; } else { r++; @@ -109,9 +117,40 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r return syncRaftVoteResult(&(tracker->config.voters), tracker->votesMap); } -void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { +void syncRaftConfigState(SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters); syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing); syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners); syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext); + cs->autoLeave = tracker->config.autoLeave; +} + +static void matchAckIndexer(SyncNodeId id, void* arg, SyncIndex* index) { + SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)arg; + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&tracker->progressMap, id); + if (progress == NULL) { + *index = 0; + return; + } + *index = progress->matchIndex; +} + +// Committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. +SyncIndex syncRaftCommittedIndex(SSyncRaftProgressTracker* tracker) { + return syncRaftJointConfigCommittedIndex(&tracker->config.voters, matchAckIndexer, tracker); +} + +static void visitProgressActive(SSyncRaftProgress* progress, void* arg) { + SHashObj* votesMap = (SHashObj*)arg; + taosHashPut(votesMap, &progress->id, sizeof(SyncNodeId), &progress->recentActive, sizeof(bool)); +} + +// QuorumActive returns true if the quorum is active from the view of the local +// raft state machine. Otherwise, it returns false. +bool syncRaftQuorumActive(SSyncRaftProgressTracker* tracker) { + SHashObj* votesMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + syncRaftVisitProgressMap(&tracker->progressMap, visitProgressActive, votesMap); + + return syncRaftVoteResult(&tracker->config.voters, votesMap) == SYNC_RAFT_VOTE_WON; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index 500bd908c0..70c078b6f5 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -59,8 +59,17 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S syncRaftRemoveFromNodeMap(&config->incoming, id); } -void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { +void syncRaftJointConfigIDs(SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { syncRaftCopyNodeMap(&config->incoming, nodeMap); syncRaftUnionNodeMap(&config->outgoing, nodeMap); +} + +SyncIndex syncRaftJointConfigCommittedIndex(const SSyncRaftQuorumJointConfig* config, matchAckIndexerFp indexer, void* arg) { + SyncIndex index0, index1; + + index0 = syncRaftMajorityConfigCommittedIndex(&config->incoming, indexer, arg); + index1 = syncRaftMajorityConfigCommittedIndex(&config->outgoing, indexer, arg); + + return index0 < index1 ? index0 : index1; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index ff5ba64876..313f213cda 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "sync_const.h" #include "sync_raft_quorum.h" #include "sync_raft_quorum_majority.h" #include "sync_raft_node_map.h" @@ -34,13 +35,13 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb i = g = r = missing = 0; SyncNodeId* pId = NULL; while (!syncRaftIterateNodeMap(config, pId)) { - const ESyncRaftVoteType* pType = taosHashGet(votesMap, pId, sizeof(SyncNodeId*)); - if (pType == NULL) { + const bool* v = (const bool*)taosHashGet(votesMap, pId, sizeof(SyncNodeId*)); + if (v == NULL) { missing += 1; continue; } - if (*pType == SYNC_RAFT_VOTE_RESP_GRANT) { + if (*v) { g +=1; } else { r += 1; @@ -56,4 +57,65 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb } return SYNC_RAFT_VOTE_LOST; +} + +int compSyncIndex(const void * elem1, const void * elem2) { + SyncIndex index1 = *((SyncIndex*)elem1); + SyncIndex index2 = *((SyncIndex*)elem1); + if (index1 > index2) return 1; + if (index1 < index2) return -1; + return 0; +} + +SyncIndex syncRaftMajorityConfigCommittedIndex(const SSyncRaftNodeMap* config, matchAckIndexerFp indexer, void* arg) { + int n = syncRaftNodeMapSize(config); + if (n == 0) { + // This plays well with joint quorums which, when one half is the zero + // MajorityConfig, should behave like the other half. + return kMaxCommitIndex; + } + + // Use an on-stack slice to collect the committed indexes when n <= 7 + // (otherwise we alloc). The alternative is to stash a slice on + // MajorityConfig, but this impairs usability (as is, MajorityConfig is just + // a map, and that's nice). The assumption is that running with a + // replication factor of >7 is rare, and in cases in which it happens + // performance is a lesser concern (additionally the performance + // implications of an allocation here are far from drastic). + SyncIndex* srt = NULL; + SyncIndex srk[TSDB_MAX_REPLICA]; + if (n > TSDB_MAX_REPLICA) { + srt = (SyncIndex*)malloc(sizeof(SyncIndex) * n); + if (srt == NULL) { + return kMaxCommitIndex; + } + } else { + srt = &srk[0]; + } + + // Fill the slice with the indexes observed. Any unused slots will be + // left as zero; these correspond to voters that may report in, but + // haven't yet. We fill from the right (since the zeroes will end up on + // the left after sorting below anyway). + SyncNodeId *pId = NULL; + int i = 0; + SyncIndex index; + while (!syncRaftIterateNodeMap(config, pId)) { + indexer(*pId, arg, &index); + srt[i++] = index; + } + + // Sort by index. Use a bespoke algorithm (copied from the stdlib's sort + // package) to keep srt on the stack. + qsort(srt, n, sizeof(SyncIndex), compSyncIndex); + + // The smallest index into the array for which the value is acked by a + // quorum. In other words, from the end of the slice, move n/2+1 to the + // left (accounting for zero-indexing). + index = srt[n - (n/2 + 1)]; + if (srt != &srk[0]) { + free(srt); + } + + return index; } \ No newline at end of file