From 6d3efd1b1afd1c6cbf199ef50bd6bc8600a6ede4 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 12 Nov 2021 10:20:03 +0800 Subject: [PATCH 1/8] [TD-10645][raft]refactor sync typedef --- source/libs/sync/inc/raft.h | 3 ++- source/libs/sync/inc/raft_log.h | 6 +++--- source/libs/sync/inc/raft_message.h | 16 ++++++++-------- source/libs/sync/inc/sync_raft_impl.h | 4 ++-- source/libs/sync/inc/sync_raft_progress.h | 12 ++++++------ .../libs/sync/inc/sync_raft_progress_tracker.h | 6 +++--- source/libs/sync/inc/sync_raft_quorum.h | 6 +++--- source/libs/sync/inc/sync_raft_quorum_joint.h | 2 +- source/libs/sync/inc/sync_raft_quorum_majority.h | 2 +- source/libs/sync/inc/sync_type.h | 4 ++-- source/libs/sync/src/raft.c | 4 ++-- source/libs/sync/src/raft_election.c | 6 +++--- .../sync/src/raft_handle_vote_resp_message.c | 2 +- source/libs/sync/src/sync_raft_impl.c | 4 ++-- source/libs/sync/src/sync_raft_progress.c | 8 ++++---- .../libs/sync/src/sync_raft_progress_tracker.c | 4 ++-- source/libs/sync/src/sync_raft_quorum_joint.c | 6 +++--- source/libs/sync/src/sync_raft_quorum_majority.c | 2 +- 18 files changed, 49 insertions(+), 48 deletions(-) diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 5d85ed3cd4..5b6efb95e5 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -65,7 +65,8 @@ struct SSyncRaft { SSyncRaftLog *log; - int maxMsgSize; + uint64_t maxMsgSize; + uint64_t maxUncommittedSize; SSyncRaftProgressTracker *tracker; ESyncState state; diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index a44f5a7273..b36f0da3be 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -19,16 +19,16 @@ #include "sync.h" #include "sync_type.h" -typedef enum SyncEntryType { +typedef enum ESyncRaftEntryType { SYNC_ENTRY_TYPE_LOG = 1, -}SyncEntryType; +} ESyncRaftEntryType; struct SSyncRaftEntry { SyncTerm term; SyncIndex index; - SyncEntryType type; + ESyncRaftEntryType type; SSyncBuffer buffer; }; diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index 2cb625d1fb..0d81511756 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -28,7 +28,7 @@ * outter message start with RAFT_MSG_*, which communicate between cluster peers, * need to implement its decode/encode functions. **/ -typedef enum RaftMessageType { +typedef enum ESyncRaftMessageType { // client propose a cmd RAFT_MSG_INTERNAL_PROP = 1, @@ -40,7 +40,7 @@ typedef enum RaftMessageType { RAFT_MSG_APPEND = 5, RAFT_MSG_APPEND_RESP = 6, -} RaftMessageType; +} ESyncRaftMessageType; typedef struct RaftMsgInternal_Prop { const SSyncBuffer *pBuf; @@ -53,14 +53,14 @@ typedef struct RaftMsgInternal_Election { } RaftMsgInternal_Election; typedef struct RaftMsg_Vote { - SyncRaftElectionType cType; + ESyncRaftElectionType cType; SyncIndex lastIndex; SyncTerm lastTerm; } RaftMsg_Vote; typedef struct RaftMsg_VoteResp { bool rejected; - SyncRaftElectionType cType; + ESyncRaftElectionType cType; } RaftMsg_VoteResp; typedef struct RaftMsg_Append_Entries { @@ -85,7 +85,7 @@ typedef struct RaftMsg_Append_Resp { } RaftMsg_Append_Resp; typedef struct SSyncMessage { - RaftMessageType msgType; + ESyncRaftMessageType msgType; SyncTerm term; SyncGroupId groupId; SyncNodeId from; @@ -131,7 +131,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo } static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, - SyncTerm term, SyncRaftElectionType cType, + SyncTerm term, ESyncRaftElectionType cType, SyncIndex lastIndex, SyncTerm lastTerm) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); if (pMsg == NULL) { @@ -153,7 +153,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId } static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, - SyncRaftElectionType cType, bool rejected) { + ESyncRaftElectionType cType, bool rejected) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); if (pMsg == NULL) { return NULL; @@ -213,7 +213,7 @@ static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, return pMsg; } -static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { +static FORCE_INLINE bool syncIsInternalMsg(ESyncRaftMessageType msgType) { return msgType == RAFT_MSG_INTERNAL_PROP || msgType == RAFT_MSG_INTERNAL_ELECTION; } diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index 26af06866b..2f96b970dc 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -26,7 +26,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft); void syncRaftBecomeCandidate(SSyncRaft* pRaft); void syncRaftBecomeLeader(SSyncRaft* pRaft); -void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); +void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType); void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); @@ -35,7 +35,7 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft); -SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, +ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum, int *granted); diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index fff0c13e31..6a376ad710 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -34,7 +34,7 @@ * * PROGRESS_STATE_PROBE is the initial state. **/ -typedef enum RaftProgressState { +typedef enum ESyncRaftProgressState { /** * StateProbe indicates a follower whose last index isn't known. Such a * follower is "probed" (i.e. an append sent periodically) to narrow down @@ -56,7 +56,7 @@ typedef enum RaftProgressState { * return to StateReplicate. **/ PROGRESS_STATE_SNAPSHOT, -} RaftProgressState; +} ESyncRaftProgressState; /** * Progress represents a follower’s progress in the view of the leader. Leader maintains @@ -82,7 +82,7 @@ struct SSyncRaftProgress { * When in StateSnapshot, leader should have sent out snapshot * before and stops sending any replication message. **/ - RaftProgressState state; + ESyncRaftProgressState state; /** * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. @@ -187,15 +187,15 @@ static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progr return progress->nextIndex; } -static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { +static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_REPLICATE; } -static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { +static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_SNAPSHOT; } -static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { +static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { return progress->state == PROGRESS_STATE_PROBE; } diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 887aeb2377..acfa38c378 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -87,8 +87,8 @@ struct SSyncRaftProgressTracker { SSyncRaftProgress progressMap[TSDB_MAX_REPLICA]; - SyncRaftVoteResult votes[TSDB_MAX_REPLICA]; - int maxInflight; + ESyncRaftVoteResult votes[TSDB_MAX_REPLICA]; + int maxInflightMsgs; }; SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); @@ -108,6 +108,6 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. **/ -SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); +ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum.h b/source/libs/sync/inc/sync_raft_quorum.h index 42f65c9806..16ac1cd029 100644 --- a/source/libs/sync/inc/sync_raft_quorum.h +++ b/source/libs/sync/inc/sync_raft_quorum.h @@ -17,11 +17,11 @@ #define TD_SYNC_RAFT_QUORUM_H /** - * SSyncRaftVoteResult indicates the outcome of a vote. + * ESyncRaftVoteResult indicates the outcome of a vote. **/ typedef enum { /** - * SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future + * SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future * votes, i.e. neither "yes" or "no" has reached quorum yet. **/ SYNC_RAFT_VOTE_PENDING = 1, @@ -35,6 +35,6 @@ typedef enum { * SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes". **/ SYNC_RAFT_VOTE_WON = 3, -} SSyncRaftVoteResult; +} ESyncRaftVoteResult; #endif /* TD_SYNC_RAFT_QUORUM_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 14c1f63754..dec1c39d90 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -33,6 +33,6 @@ typedef struct SSyncRaftQuorumJointConfig { * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes); +ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index b1857ef056..6955c7a072 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -25,6 +25,6 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 525623b4cf..3925323415 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -50,7 +50,7 @@ typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, SYNC_RAFT_CAMPAIGN_TRANSFER = 2, -} SyncRaftElectionType; +} ESyncRaftElectionType; typedef enum { // the init vote resp status @@ -61,6 +61,6 @@ typedef enum { //reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, -} SyncRaftVoteResult; +} ESyncRaftVoteResult; #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 20d24e3267..cdf2abebea 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -101,7 +101,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } - RaftMessageType msgType = pMsg->msgType; + ESyncRaftMessageType msgType = pMsg->msgType; if (msgType == RAFT_MSG_INTERNAL_ELECTION) { syncRaftHandleElectionMessage(pRaft, pMsg); } else if (msgType == RAFT_MSG_VOTE) { @@ -140,7 +140,7 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SyncNodeId leaderId = pMsg->from; - RaftMessageType msgType = pMsg->msgType; + ESyncRaftMessageType msgType = pMsg->msgType; if (msgType == RAFT_MSG_VOTE) { // TODO diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c index 1ca3326810..eb310c31ec 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/raft_election.c @@ -18,10 +18,10 @@ #include "raft_log.h" #include "raft_message.h" -void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { +void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { SyncTerm term; bool preVote; - RaftMessageType voteMsgType; + ESyncRaftMessageType voteMsgType; if (syncRaftIsPromotable(pRaft)) { syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); @@ -41,7 +41,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { } int quorum = syncRaftQuorum(pRaft); - SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); + ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); if (result == SYNC_RAFT_VOTE_WON) { /** * We won the election after voting for ourselves (which must mean that diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 4ec5b88eb5..1781205ec0 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -36,7 +36,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } - SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from, + ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from, pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION, !pMsg->voteResp.rejected, &rejected, &granted); diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 618efed570..4d1f66a474 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -127,7 +127,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) { return pRaft->cluster.replica / 2 + 1; } -SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, +ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool grant, int* rejected, int *granted) { int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); @@ -186,7 +186,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { * StateCandidate, we may get stale MsgPreVoteResp messages in this term from * our pre-candidate state). **/ - RaftMessageType msgType = pMsg->msgType; + ESyncRaftMessageType msgType = pMsg->msgType; if (msgType == RAFT_MSG_INTERNAL_PROP) { return 0; diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index 437c083b4d..df954402f8 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -20,13 +20,13 @@ #include "sync.h" #include "syncInt.h" -static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); +static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state); static void probeAcked(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress); void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { - SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight); + SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs); if (inflights == NULL) { return; } @@ -153,7 +153,7 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps * ResetState moves the Progress into the specified State, resetting ProbeSent, * PendingSnapshot, and Inflights. **/ -static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { +static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) { progress->probeSent = false; progress->pendingSnapshotIndex = 0; progress->state = state; @@ -233,7 +233,7 @@ void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { progress->state = PROGRESS_STATE_PROBE; } -RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { +ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { return pRaft->leaderState.progress[i].state; } diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 43b68a4b08..f5da538e1a 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -25,7 +25,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { } void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { - memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA); + memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteResult) * TSDB_MAX_REPLICA); } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { @@ -48,7 +48,7 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. **/ -SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { +ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { int i; SSyncRaftProgress* progress; int r, g; diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index a0e6a6782a..9084e1868a 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -22,9 +22,9 @@ * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) { - SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes); - SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes); +ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes) { + ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes); + ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes); if (r1 == r2) { // If they agree, return the agreed state. diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index ea543d7335..0d13998112 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -22,7 +22,7 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) { +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes) { if (config->replica == 0) { return SYNC_RAFT_VOTE_WON; } From fb01dd562878f401f93de4788c1b18af272702f7 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 12 Nov 2021 16:00:33 +0800 Subject: [PATCH 2/8] [TD-10645][raft]add restore process --- .../libs/sync/inc/sync_raft_config_change.h | 42 +++++ source/libs/sync/inc/sync_raft_progress.h | 6 + .../sync/inc/sync_raft_progress_tracker.h | 111 ++++++----- source/libs/sync/inc/sync_raft_proto.h | 61 ++++++ source/libs/sync/inc/sync_raft_quorum_joint.h | 26 ++- .../libs/sync/inc/sync_raft_quorum_majority.h | 3 +- source/libs/sync/inc/sync_raft_restore.h | 25 +++ source/libs/sync/inc/sync_type.h | 11 +- .../libs/sync/src/sync_raft_config_change.c | 154 +++++++++++++++ source/libs/sync/src/sync_raft_impl.c | 2 +- source/libs/sync/src/sync_raft_progress.c | 4 + .../sync/src/sync_raft_progress_tracker.c | 10 +- source/libs/sync/src/sync_raft_quorum_joint.c | 6 +- .../libs/sync/src/sync_raft_quorum_majority.c | 2 +- source/libs/sync/src/sync_raft_restore.c | 177 ++++++++++++++++++ 15 files changed, 571 insertions(+), 69 deletions(-) create mode 100644 source/libs/sync/inc/sync_raft_config_change.h create mode 100644 source/libs/sync/inc/sync_raft_proto.h create mode 100644 source/libs/sync/inc/sync_raft_restore.h create mode 100644 source/libs/sync/src/sync_raft_config_change.c create mode 100644 source/libs/sync/src/sync_raft_restore.c diff --git a/source/libs/sync/inc/sync_raft_config_change.h b/source/libs/sync/inc/sync_raft_config_change.h new file mode 100644 index 0000000000..98b8b49cb8 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_config_change.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_CONFIG_CHANGE_H +#define TD_SYNC_RAFT_CONFIG_CHANGE_H + +#include "sync_type.h" +#include "sync_raft_proto.h" + +/** + * Changer facilitates configuration changes. It exposes methods to handle + * simple and joint consensus while performing the proper validation that allows + * refusing invalid configuration changes before they affect the active + * configuration. + **/ +struct SSyncRaftChanger { + SSyncRaftProgressTracker* tracker; + SyncIndex lastIndex; +}; + +typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); + +int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); + +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); + +#endif /* TD_SYNC_RAFT_CONFIG_CHANGE_H */ diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 6a376ad710..b5017f963d 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -129,6 +129,10 @@ struct SSyncRaftProgress { bool isLearner; }; +struct SSyncRaftProgressMap { + SSyncRaftProgress progress[TSDB_MAX_REPLICA]; +}; + void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); /** @@ -210,7 +214,9 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); +void syncRaftProgressCopy(const SSyncRaftProgress* from, SSyncRaftProgress* to); +void syncRaftProgressMapCopy(const SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); #if 0 diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index acfa38c378..61308d5df5 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -17,77 +17,72 @@ #define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H #include "sync_type.h" +#include "sync_raft_quorum.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_progress.h" struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; - /** - * autoLeave is true if the configuration is joint and a transition to the - * incoming configuration should be carried out automatically by Raft when - * this is possible. If false, the configuration will be joint until the - * application initiates the transition manually. - **/ + // autoLeave is true if the configuration is joint and a transition to the + // incoming configuration should be carried out automatically by Raft when + // this is possible. If false, the configuration will be joint until the + // application initiates the transition manually. bool autoLeave; - /** - * Learners is a set of IDs corresponding to the learners active in the - * current configuration. - * - * Invariant: Learners and Voters does not intersect, i.e. if a peer is in - * either half of the joint config, it can't be a learner; if it is a - * learner it can't be in either half of the joint config. This invariant - * simplifies the implementation since it allows peers to have clarity about - * its current role without taking into account joint consensus. - **/ - SyncNodeId learners[TSDB_MAX_REPLICA]; + // Learners is a set of IDs corresponding to the learners active in the + // current configuration. + // + // Invariant: Learners and Voters does not intersect, i.e. if a peer is in + // either half of the joint config, it can't be a learner; if it is a + // learner it can't be in either half of the joint config. This invariant + // simplifies the implementation since it allows peers to have clarity about + // its current role without taking into account joint consensus. + SSyncRaftNodeMap learners; - /** - * When we turn a voter into a learner during a joint consensus transition, - * we cannot add the learner directly when entering the joint state. This is - * because this would violate the invariant that the intersection of - * voters and learners is empty. For example, assume a Voter is removed and - * immediately re-added as a learner (or in other words, it is demoted): - * - * Initially, the configuration will be - * - * voters: {1 2 3} - * learners: {} - * - * and we want to demote 3. Entering the joint configuration, we naively get - * - * voters: {1 2} & {1 2 3} - * learners: {3} - * - * but this violates the invariant (3 is both voter and learner). Instead, - * we get - * - * voters: {1 2} & {1 2 3} - * learners: {} - * next_learners: {3} - * - * Where 3 is now still purely a voter, but we are remembering the intention - * to make it a learner upon transitioning into the final configuration: - * - * voters: {1 2} - * learners: {3} - * next_learners: {} - * - * Note that next_learners is not used while adding a learner that is not - * also a voter in the joint config. In this case, the learner is added - * right away when entering the joint configuration, so that it is caught up - * as soon as possible. - **/ - SyncNodeId learnersNext[TSDB_MAX_REPLICA]; + // When we turn a voter into a learner during a joint consensus transition, + // we cannot add the learner directly when entering the joint state. This is + // because this would violate the invariant that the intersection of + // voters and learners is empty. For example, assume a Voter is removed and + // immediately re-added as a learner (or in other words, it is demoted): + // + // Initially, the configuration will be + // + // voters: {1 2 3} + // learners: {} + // + // and we want to demote 3. Entering the joint configuration, we naively get + // + // voters: {1 2} & {1 2 3} + // learners: {3} + // + // but this violates the invariant (3 is both voter and learner). Instead, + // we get + // + // voters: {1 2} & {1 2 3} + // learners: {} + // next_learners: {3} + // + // Where 3 is now still purely a voter, but we are remembering the intention + // to make it a learner upon transitioning into the final configuration: + // + // voters: {1 2} + // learners: {3} + // next_learners: {} + // + // Note that next_learners is not used while adding a learner that is not + // also a voter in the joint config. In this case, the learner is added + // right away when entering the joint configuration, so that it is caught up + // as soon as possible. + SSyncRaftNodeMap learnersNext; }; struct SSyncRaftProgressTracker { SSyncRaftProgressTrackerConfig config; - SSyncRaftProgress progressMap[TSDB_MAX_REPLICA]; + SSyncRaftProgressMap progressMap; - ESyncRaftVoteResult votes[TSDB_MAX_REPLICA]; + ESyncRaftVoteType votes[TSDB_MAX_REPLICA]; int maxInflightMsgs; }; @@ -104,6 +99,10 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi **/ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); +void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressTrackerConfig* result); + +int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); + /** * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h new file mode 100644 index 0000000000..49d706875f --- /dev/null +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_PROTO_H +#define TD_SYNC_RAFT_PROTO_H + +#include "sync_type.h" + +typedef enum ESyncRaftConfChangeType { + SYNC_RAFT_Conf_AddNode = 0, + SYNC_RAFT_Conf_RemoveNode = 1, + SYNC_RAFT_Conf_UpdateNode = 2, + SYNC_RAFT_Conf_AddLearnerNode = 2, +} ESyncRaftConfChangeType; + +// ConfChangeSingle is an individual configuration change operation. Multiple +// such operations can be carried out atomically via a ConfChangeV2. +typedef struct SSyncConfChangeSingle { + ESyncRaftConfChangeType type; + SyncNodeId nodeId; +} SSyncConfChangeSingle; + +typedef struct SSyncConfChangeSingleArray { + int n; + SSyncConfChangeSingle* changes; +} SSyncConfChangeSingleArray; + +typedef struct SSyncConfigState { + // The voters in the incoming config. (If the configuration is not joint, + // then the outgoing config is empty). + SSyncRaftNodeMap voters; + + // The learners in the incoming config. + SSyncRaftNodeMap learners; + + // The voters in the outgoing config. + SSyncRaftNodeMap votersOutgoing; + + // The nodes that will become learners when the outgoing config is removed. + // These nodes are necessarily currently in nodes_joint (or they would have + // been added to the incoming config right away). + SSyncRaftNodeMap learnersNext; + + // If set, the config is joint and Raft will automatically transition into + // the final config (i.e. remove the outgoing config) when this is safe. + bool autoLeave; +} SSyncConfigState; + +#endif /* TD_SYNC_RAFT_PROTO_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index dec1c39d90..103a147de3 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -25,14 +25,34 @@ * majority configurations. Decisions require the support of both majorities. **/ typedef struct SSyncRaftQuorumJointConfig { - SSyncCluster majorityConfig[2]; -}SSyncRaftQuorumJointConfig; + SSyncCluster outgoing; + SSyncCluster incoming; +} SSyncRaftQuorumJointConfig; /** * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes); +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); + +static FORCE_INLINE bool syncRaftJointConfigInCluster(const SSyncCluster* cluster, SyncNodeId id) { + int i; + for (i = 0; i < cluster->replica; ++i) { + if (cluster->nodeInfo[i].nodeId == id) { + return true; + } + } + + return false; +} + +static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + return syncRaftJointConfigInCluster(&config->outgoing, id); +} + +static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + return syncRaftJointConfigInCluster(&config->incoming, id); +} #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 6955c7a072..8f148873e3 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -18,6 +18,7 @@ #include "sync.h" #include "sync_type.h" +#include "sync_raft_quorum.h" /** * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns @@ -25,6 +26,6 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_raft_restore.h b/source/libs/sync/inc/sync_raft_restore.h new file mode 100644 index 0000000000..fc65ae7440 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_restore.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_RESTORE_H +#define TD_SYNC_RAFT_RESTORE_H + +#include "sync_type.h" +#include "sync_raft_proto.h" + +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); + +#endif /* TD_SYNC_RAFT_RESTORE_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 3925323415..a7977f318e 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -17,6 +17,7 @@ #define _TD_LIBS_SYNC_TYPE_H #include +#include "sync.h" #include "osMath.h" #define SYNC_NON_NODE_ID -1 @@ -28,10 +29,13 @@ typedef uint32_t SyncTick; typedef struct SSyncRaft SSyncRaft; typedef struct SSyncRaftProgress SSyncRaftProgress; +typedef struct SSyncRaftProgressMap SSyncRaftProgressMap; typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; +typedef struct SSyncRaftChanger SSyncRaftChanger; + typedef struct SSyncRaftLog SSyncRaftLog; typedef struct SSyncRaftEntry SSyncRaftEntry; @@ -46,6 +50,11 @@ typedef struct SSyncRaftEntry SSyncRaftEntry; #endif #endif +typedef struct { + int32_t replica; + SyncNodeId nodeId[TSDB_MAX_REPLICA]; +} SSyncRaftNodeMap; + typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, @@ -61,6 +70,6 @@ typedef enum { //reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, -} ESyncRaftVoteResult; +} ESyncRaftVoteType; #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c new file mode 100644 index 0000000000..e99da0e226 --- /dev/null +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "sync_raft_config_change.h" +#include "sync_raft_progress.h" +#include "sync_raft_progress_tracker.h" + +static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); +static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); + +// Simple carries out a series of configuration changes that (in aggregate) +// mutates the incoming majority config Voters[0] by at most one. This method +// will return an error if that is not the case, if the resulting quorum is +// zero, or if the configuration is in a joint state (i.e. if there is an +// outgoing configuration). +int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret; + + ret = checkAndCopy(changer, config, progressMap); + if (ret != 0) { + return ret; + } + + if (hasJointConfig(config)) { + return -1; + } + + ret = applyConfig(changer, config, progressMap, css); + if (ret != 0) { + return ret; + } + + return checkAndReturn(config, progressMap); +} + +// checkAndCopy copies the tracker's config and progress map (deeply enough for +// the purposes of the Changer) and returns those copies. It returns an error +// if checkInvariants does. +static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + syncRaftCloneTrackerConfig(&changer->tracker->config, config); + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SSyncRaftProgress* progress = &(changer->tracker->progressMap.progress[i]); + if (progress->id == SYNC_NON_NODE_ID) { + continue; + } + syncRaftProgressCopy(progress, &(progressMap->progress[i])); + } + return checkAndReturn(config, progressMap); +} + +// checkAndReturn calls checkInvariants on the input and returns either the +// resulting error or the input. +static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + if (checkInvariants(config, progressMap) != 0) { + return -1; + } + + return 0; +} + +// checkInvariants makes sure that the config and progress are compatible with +// each other. This is used to check both what the Changer is initialized with, +// as well as what it returns. +static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret = syncRaftCheckProgress(config, progressMap); + if (ret != 0) { + return ret; + } + + int i; + // Any staged learner was staged because it could not be directly added due + // to a conflicting voter in the outgoing config. + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (!syncRaftJointConfigInOutgoing(&config->voters, config->learnersNext.nodeId[i])) { + return -1; + } + if (progressMap->progress[i].id != SYNC_NON_NODE_ID && progressMap->progress[i].isLearner) { + syncError("%d is in LearnersNext, but is already marked as learner", progressMap->progress[i].id); + return -1; + } + } + // Conversely Learners and Voters doesn't intersect at all. + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (syncRaftJointConfigInIncoming(&config->voters, config->learners.nodeId[i])) { + syncError("%d is in Learners and voter.incoming", progressMap->progress[i].id); + return -1; + } + if (progressMap->progress[i].id != SYNC_NON_NODE_ID && !progressMap->progress[i].isLearner) { + syncError("%d is in Learners, but is not marked as learner", progressMap->progress[i].id); + return -1; + } + } + + if (!hasJointConfig(config)) { + // We enforce that empty maps are nil instead of zero. + if (config->learnersNext.replica > 0) { + syncError("cfg.LearnersNext must be nil when not joint"); + return -1; + } + if (config->autoLeave) { + syncError("AutoLeave must be false when not joint"); + return -1; + } + } + + return 0; +} + +static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { + return config->voters.outgoing.replica > 0; +} + +static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { + int i; + + for (i = 0; i < css->n; ++i) { + const SSyncConfChangeSingle* cs = &(css->changes[i]); + if (cs->nodeId == SYNC_NON_NODE_ID) { + continue; + } + + ESyncRaftConfChangeType type = cs->type; + switch (type) { + + } + } + + if (config->voters.incoming.replica == 0) { + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 4d1f66a474..5e23474a89 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -243,7 +243,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { syncRaftLogAppend(pRaft->log, entries, n); - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]); + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of maybeCommit's return, our caller will call bcastAppend. maybeCommit(pRaft); diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index df954402f8..e7de3fcf98 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -149,6 +149,10 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps progress->pendingSnapshotIndex = snapshotIndex; } +void syncRaftProgressCopy(const SSyncRaftProgress* progress, SSyncRaftProgress* out) { + +} + /** * ResetState moves the Progress into the specified State, resetting ProbeSent, * PendingSnapshot, and Inflights. diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index f5da538e1a..525b2eec1a 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -25,13 +25,13 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { } void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { - memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteResult) * TSDB_MAX_REPLICA); + memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteType) * TSDB_MAX_REPLICA); } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { int i; for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - SSyncRaftProgress* progress = &(tracker->progressMap[i]); + SSyncRaftProgress* progress = &(tracker->progressMap.progress[i]); visit(i, progress, arg); } } @@ -44,6 +44,10 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; } +void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { + +} + /** * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. @@ -54,7 +58,7 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r int r, g; for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { - progress = &(tracker->progressMap[i]); + progress = &(tracker->progressMap.progress[i]); if (progress->id == SYNC_NON_NODE_ID) { continue; } diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index 9084e1868a..f8b5463ad8 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -22,9 +22,9 @@ * a result indicating whether the vote is pending, lost, or won. A joint quorum * requires both majority quorums to vote in favor. **/ -ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes) { - ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes); - ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes); +ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes) { + ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->incoming), votes); + ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->outgoing), votes); if (r1 == r2) { // If they agree, return the agreed state. diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 0d13998112..0361845414 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -22,7 +22,7 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes) { +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes) { if (config->replica == 0) { return SYNC_RAFT_VOTE_WON; } diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c new file mode 100644 index 0000000000..c7dfaa07b4 --- /dev/null +++ b/source/libs/sync/src/sync_raft_restore.c @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_config_change.h" +#include "sync_raft_restore.h" +#include "sync_raft_progress_tracker.h" + +static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); + +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + SSyncConfChangeSingleArray outgoing; + SSyncConfChangeSingleArray incoming; + SSyncConfChangeSingleArray css; + int i, ret; + + ret = toConfChangeSingle(cs, &outgoing, &incoming); + if (ret != 0) { + goto out; + } + + if (outgoing.n == 0) { + // No outgoing config, so just apply the incoming changes one by one. + for (i = 0; i < incoming.n; ++i) { + css = (SSyncConfChangeSingleArray) { + .n = 1, + .changes = &incoming.changes[i], + }; + ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap); + if (ret != 0) { + goto out; + } + syncRaftCloneTrackerConfig(config, &changer->tracker->config); + syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); + } + } else { + // The ConfState describes a joint configuration. + // + // First, apply all of the changes of the outgoing config one by one, so + // that it temporarily becomes the incoming active config. For example, + // if the config is (1 2 3)&(2 3 4), this will establish (2 3 4)&(). + for (i = 0; i < outgoing.n; ++i) { + css = (SSyncConfChangeSingleArray) { + .n = 1, + .changes = &outgoing.changes[i], + }; + ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap); + if (ret != 0) { + goto out; + } + syncRaftCloneTrackerConfig(config, &changer->tracker->config); + syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); + } + + ret = syncRaftChangerEnterJoint(changer, &incoming, config, progressMap); + if (ret != 0) { + goto out; + } + syncRaftCloneTrackerConfig(config, &changer->tracker->config); + syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); + } + +out: + if (incoming.n != 0) free(incoming.changes); + if (outgoing.n != 0) free(outgoing.changes); + return ret; +} + +// toConfChangeSingle translates a conf state into 1) a slice of operations creating +// first the config that will become the outgoing one, and then the incoming one, and +// b) another slice that, when applied to the config resulted from 1), represents the +// ConfState. +static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in) { + int i; + + out->n = in->n = 0; + + out->n = cs->votersOutgoing.replica; + out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n); + if (out->changes == NULL) { + out->n = 0; + return -1; + } + in->n = cs->votersOutgoing.replica + cs->voters.replica + cs->learners.replica + cs->learnersNext.replica; + out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n); + if (in->changes == NULL) { + in->n = 0; + return -1; + } + + // Example to follow along this code: + // voters=(1 2 3) learners=(5) outgoing=(1 2 4 6) learners_next=(4) + // + // This means that before entering the joint config, the configuration + // had voters (1 2 4 6) and perhaps some learners that are already gone. + // The new set of voters is (1 2 3), i.e. (1 2) were kept around, and (4 6) + // are no longer voters; however 4 is poised to become a learner upon leaving + // the joint state. + // We can't tell whether 5 was a learner before entering the joint config, + // but it doesn't matter (we'll pretend that it wasn't). + // + // The code below will construct + // outgoing = add 1; add 2; add 4; add 6 + // incoming = remove 1; remove 2; remove 4; remove 6 + // add 1; add 2; add 3; + // add-learner 5; + // add-learner 4; + // + // So, when starting with an empty config, after applying 'outgoing' we have + // + // quorum=(1 2 4 6) + // + // From which we enter a joint state via 'incoming' + // + // quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4) + // + // as desired. + + for (i = 0; i < cs->votersOutgoing.replica; ++i) { + // If there are outgoing voters, first add them one by one so that the + // (non-joint) config has them all. + out->changes[i] = (SSyncConfChangeSingle) { + .type = SYNC_RAFT_Conf_AddNode, + .nodeId = cs->votersOutgoing.nodeId[i], + }; + } + + // We're done constructing the outgoing slice, now on to the incoming one + // (which will apply on top of the config created by the outgoing slice). + + // First, we'll remove all of the outgoing voters. + int j = 0; + for (i = 0; i < cs->votersOutgoing.replica; ++i) { + in->changes[j] = (SSyncConfChangeSingle) { + .type = SYNC_RAFT_Conf_RemoveNode, + .nodeId = cs->votersOutgoing.nodeId[i], + }; + j += 1; + } + // Then we'll add the incoming voters and learners. + for (i = 0; i < cs->voters.replica; ++i) { + in->changes[j] = (SSyncConfChangeSingle) { + .type = SYNC_RAFT_Conf_AddNode, + .nodeId = cs->voters.nodeId[i], + }; + j += 1; + } + for (i = 0; i < cs->learners.replica; ++i) { + in->changes[j] = (SSyncConfChangeSingle) { + .type = SYNC_RAFT_Conf_AddLearnerNode, + .nodeId = cs->learners.nodeId[i], + }; + j += 1; + } + // Same for LearnersNext; these are nodes we want to be learners but which + // are currently voters in the outgoing config. + for (i = 0; i < cs->learnersNext.replica; ++i) { + in->changes[j] = (SSyncConfChangeSingle) { + .type = SYNC_RAFT_Conf_AddLearnerNode, + .nodeId = cs->learnersNext.nodeId[i], + }; + j += 1; + } + return 0; +} \ No newline at end of file From 24b599f14bc1e6126dde16af6d7af84cd624dd55 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 12 Nov 2021 18:09:09 +0800 Subject: [PATCH 3/8] [TD-10645][raft]add restore process --- source/libs/sync/inc/sync_raft_progress.h | 2 +- source/libs/sync/inc/sync_raft_quorum_joint.h | 8 +++ source/libs/sync/inc/sync_raft_restore.h | 11 +++- .../libs/sync/src/sync_raft_config_change.c | 60 ++++++++++++++++++- source/libs/sync/src/sync_raft_progress.c | 2 +- source/libs/sync/src/sync_raft_restore.c | 22 ++++--- 6 files changed, 91 insertions(+), 14 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index b5017f963d..98edfc5e4f 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -214,7 +214,7 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); -void syncRaftProgressCopy(const SSyncRaftProgress* from, SSyncRaftProgress* to); +void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); void syncRaftProgressMapCopy(const SSyncRaftProgressMap* from, SSyncRaftProgressMap* to); diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 103a147de3..cdde6d24f7 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -55,4 +55,12 @@ static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJoin return syncRaftJointConfigInCluster(&config->incoming, id); } +static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { + return &config->incoming; +} + +static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { + return &config->outgoing; +} + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_restore.h b/source/libs/sync/inc/sync_raft_restore.h index fc65ae7440..f7c4ce67b5 100644 --- a/source/libs/sync/inc/sync_raft_restore.h +++ b/source/libs/sync/inc/sync_raft_restore.h @@ -19,7 +19,14 @@ #include "sync_type.h" #include "sync_raft_proto.h" -int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, - SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); +// Restore takes a Changer (which must represent an empty configuration), and +// runs a sequence of changes enacting the configuration described in the +// ConfState. +// +// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure +// the Changer only needs a ProgressMap (not a whole Tracker) at which point +// this can just take LastIndex and MaxInflight directly instead and cook up +// the results from that alone. +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs); #endif /* TD_SYNC_RAFT_RESTORE_H */ diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index e99da0e226..e9cad376aa 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -25,6 +25,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); +static int symDiff(const SSyncCluster* l, const SSyncCluster* r); // Simple carries out a series of configuration changes that (in aggregate) // mutates the incoming majority config Voters[0] by at most one. This method @@ -49,9 +50,34 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange return ret; } + int n = symDiff(syncRaftJointConfigIncoming(&changer->tracker->config.voters), + syncRaftJointConfigIncoming(&config->voters)); + if (n > 1) { + syncError("more than one voter changed without entering joint config"); + return -1; + } + return checkAndReturn(config, progressMap); } +// EnterJoint verifies that the outgoing (=right) majority config of the joint +// config is empty and initializes it with a copy of the incoming (=left) +// majority config. That is, it transitions from +// +// (1 2 3)&&() +// to +// (1 2 3)&&(1 2 3). +// +// The supplied changes are then applied to the incoming majority config, +// resulting in a joint configuration that in terms of the Raft thesis[1] +// (Section 4.3) corresponds to `C_{new,old}`. +// +// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, + SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + +} + // checkAndCopy copies the tracker's config and progress map (deeply enough for // the purposes of the Changer) and returns those copies. It returns an error // if checkInvariants does. @@ -63,7 +89,7 @@ static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi if (progress->id == SYNC_NON_NODE_ID) { continue; } - syncRaftProgressCopy(progress, &(progressMap->progress[i])); + syncRaftCopyProgress(progress, &(progressMap->progress[i])); } return checkAndReturn(config, progressMap); } @@ -151,4 +177,36 @@ static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTracker } return 0; +} + +// symdiff returns the count of the symmetric difference between the sets of +// uint64s, i.e. len( (l - r) \union (r - l)). +static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { + int n; + int i; + int j0, j1; + const SSyncCluster* pairs[2][2] = { + {l, r}, // count elems in l but not in r + {r, l}, // count elems in r but not in l + }; + + for (n = 0, i = 0; i < 2; ++i) { + const SSyncCluster** pp = pairs[i]; + + const SSyncCluster* p0 = pp[0]; + const SSyncCluster* p1 = pp[1]; + for (j0 = 0; j0 < p0->replica; ++j0) { + SyncNodeId id = p0->nodeInfo[j0].nodeId; + if (id == SYNC_NON_NODE_ID) { + continue; + } + for (j1 = 0; j1 < p1->replica; ++j1) { + if (p1->nodeInfo[j1].nodeId != SYNC_NON_NODE_ID && p1->nodeInfo[j1].nodeId != id) { + n+=1; + } + } + } + } + + return n; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index e7de3fcf98..a3426aa999 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -149,7 +149,7 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps progress->pendingSnapshotIndex = snapshotIndex; } -void syncRaftProgressCopy(const SSyncRaftProgress* progress, SSyncRaftProgress* out) { +void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* out) { } diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index c7dfaa07b4..b7d9cc6888 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -19,11 +19,21 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); -int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, - SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { +// Restore takes a Changer (which must represent an empty configuration), and +// runs a sequence of changes enacting the configuration described in the +// ConfState. +// +// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure +// the Changer only needs a ProgressMap (not a whole Tracker) at which point +// this can just take LastIndex and MaxInflight directly instead and cook up +// the results from that alone. +int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) { SSyncConfChangeSingleArray outgoing; SSyncConfChangeSingleArray incoming; SSyncConfChangeSingleArray css; + SSyncRaftProgressTracker* tracker = changer->tracker; + SSyncRaftProgressTrackerConfig* config = &tracker->config; + SSyncRaftProgressMap* progressMap = &tracker->progressMap; int i, ret; ret = toConfChangeSingle(cs, &outgoing, &incoming); @@ -38,12 +48,10 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, .n = 1, .changes = &incoming.changes[i], }; - ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap); + ret = syncRaftChangerSimpleConfig(changer, &css, &config, &progressMap); if (ret != 0) { goto out; } - syncRaftCloneTrackerConfig(config, &changer->tracker->config); - syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); } } else { // The ConfState describes a joint configuration. @@ -60,16 +68,12 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs, if (ret != 0) { goto out; } - syncRaftCloneTrackerConfig(config, &changer->tracker->config); - syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); } ret = syncRaftChangerEnterJoint(changer, &incoming, config, progressMap); if (ret != 0) { goto out; } - syncRaftCloneTrackerConfig(config, &changer->tracker->config); - syncRaftProgressMapCopy(progressMap, &changer->tracker->progressMap); } out: From aa438e7a535a2d5624baea3e6169ac6edd20c29a Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 15 Nov 2021 10:56:22 +0800 Subject: [PATCH 4/8] [TD-10645][raft]add restore process --- source/libs/sync/inc/sync_raft_progress.h | 6 + source/libs/sync/inc/sync_raft_proto.h | 2 +- source/libs/sync/inc/sync_raft_quorum_joint.h | 8 + .../libs/sync/src/sync_raft_config_change.c | 178 +++++++++++++++++- source/libs/sync/src/sync_raft_progress.c | 38 ++++ source/libs/sync/src/sync_raft_quorum_joint.c | 31 +++ 6 files changed, 256 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 98edfc5e4f..c733a8ea74 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -207,6 +207,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres return progress->recentActive; } +int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); + +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + +void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + /** * return true if progress's log is up-todate **/ diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h index 49d706875f..c131e91139 100644 --- a/source/libs/sync/inc/sync_raft_proto.h +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -22,7 +22,7 @@ typedef enum ESyncRaftConfChangeType { SYNC_RAFT_Conf_AddNode = 0, SYNC_RAFT_Conf_RemoveNode = 1, SYNC_RAFT_Conf_UpdateNode = 2, - SYNC_RAFT_Conf_AddLearnerNode = 2, + SYNC_RAFT_Conf_AddLearnerNode = 3, } ESyncRaftConfChangeType; // ConfChangeSingle is an individual configuration change operation. Multiple diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index cdde6d24f7..e2c2fd89b2 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -55,6 +55,10 @@ static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJoin return syncRaftJointConfigInCluster(&config->incoming, id); } +void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); + +void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); + static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { return &config->incoming; } @@ -63,4 +67,8 @@ static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncR return &config->outgoing; } +static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { + memset(&config->outgoing, 0, sizeof(SSyncCluster)); +} + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index e9cad376aa..7bf409fba0 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -23,11 +23,24 @@ static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgr static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); -static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, - const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); +static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); static int symDiff(const SSyncCluster* l, const SSyncCluster* r); -// Simple carries out a series of configuration changes that (in aggregate) + +static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner); + +static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id); +static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id); + +static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) // mutates the incoming majority config Voters[0] by at most one. This method // will return an error if that is not the case, if the resulting quorum is // zero, or if the configuration is in a joint state (i.e. if there is an @@ -42,6 +55,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange } if (hasJointConfig(config)) { + syncError("can't apply simple config change in joint config"); return -1; } @@ -75,7 +89,28 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret; + ret = checkAndCopy(changer, config, progressMap); + if (ret != 0) { + return ret; + } + if (hasJointConfig(config)) { + syncError("config is already joint"); + return -1; + } + + if(config->voters.incoming.replica == 0) { + // We allow adding nodes to an empty config for convenience (testing and + // bootstrap), but you can't enter a joint state. + syncError("can't make a zero-voter config joint"); + return -1; + } + + // Clear the outgoing config. + syncRaftJointConfigClearOutgoing(config); + + // Copy incoming to outgoing. } // checkAndCopy copies the tracker's config and progress map (deeply enough for @@ -156,8 +191,8 @@ static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { return config->voters.outgoing.replica > 0; } -static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, - const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { +static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { int i; for (i = 0; i < css->n; ++i) { @@ -168,11 +203,22 @@ static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTracker ESyncRaftConfChangeType type = cs->type; switch (type) { - + case SYNC_RAFT_Conf_AddNode: + makeVoter(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_AddLearnerNode: + makeLearner(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_RemoveNode: + remove(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_UpdateNode: + break; } } if (config->voters.incoming.replica == 0) { + syncError("removed all voters"); return -1; } @@ -209,4 +255,124 @@ static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { } return n; +} + +static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) { + +} + +// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after. +static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeMap->nodeId[i] == id) { + nodeMap->replica -= 1; + nodeMap->nodeId[i] = SYNC_NON_NODE_ID; + break; + } + } + + assert(nodeMap->replica >= 0); +} + +// nilAwareAdd populates a map entry, creating the map if necessary. +static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + int i, j; + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + if (nodeMap->nodeId[i] == id) { + return; + } + if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) { + j = i; + } + } + + assert(j != -1); + nodeMap->nodeId[j] = id; + nodeMap->replica += 1; +} + +// makeVoter adds or promotes the given ID to be a voter in the incoming +// majority config. +static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + initProgress(changer, config, progressMap, id, false); + i = syncRaftFindProgressIndexByNodeId(progressMap, id); + } + + assert(i != -1); + SSyncRaftProgress* progress = &(progressMap->progress[i]); + + progress->isLearner = false; + nilAwareDelete(&config->learners, id); + nilAwareDelete(&config->learnersNext, id); + syncRaftJointConfigAddToIncoming(config, id); +} + +// makeLearner makes the given ID a learner or stages it to be a learner once +// an active joint configuration is exited. +// +// The former happens when the peer is not a part of the outgoing config, in +// which case we either add a new learner or demote a voter in the incoming +// config. +// +// The latter case occurs when the configuration is joint and the peer is a +// voter in the outgoing config. In that case, we do not want to add the peer +// as a learner because then we'd have to track a peer as a voter and learner +// simultaneously. Instead, we add the learner to LearnersNext, so that it will +// be added to Learners the moment the outgoing config is removed by +// LeaveJoint(). +static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + initProgress(changer, config, progressMap, id, false); + i = syncRaftFindProgressIndexByNodeId(progressMap, id); + } + + assert(i != -1); + SSyncRaftProgress* progress = &(progressMap->progress[i]); + if (progress->isLearner) { + return; + } + // Remove any existing voter in the incoming config... + remove(changer, config, progressMap, id); + + // ... but save the Progress. + syncRaftAddToProgressMap(progressMap, id); + + // Use LearnersNext if we can't add the learner to Learners directly, i.e. + // if the peer is still tracked as a voter in the outgoing config. It will + // be turned into a learner in LeaveJoint(). + // + // Otherwise, add a regular learner right away. + bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + if (inOutgoing) { + nilAwareAdd(&config->learnersNext, id); + } else { + nilAwareAdd(&config->learners, id); + progress->isLearner = true; + } +} + +// remove this peer as a voter or learner from the incoming config. +static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + return; + } + + syncRaftJointConfigRemoveFromIncoming(&config->voters, id); + nilAwareDelete(&config->learners, id); + nilAwareDelete(&config->learnersNext, id); + + // If the peer is still a voter in the outgoing config, keep the Progress. + bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + if (!inOutgoing) { + syncRaftRemoveFromProgressMap(progressMap, id); + } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index a3426aa999..a53aae93d0 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -112,6 +112,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { } } +int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + return i; + } + } + return -1; +} + +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i, j; + + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + return i; + } + if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) { + j = i; + } + } + + assert(j != -1); + + progressMap->progress[i].id = id; +} + +void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + progressMap->progress[i].id = SYNC_NON_NODE_ID; + break; + } + } +} + bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index f8b5463ad8..9a8e9eb7ba 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -39,3 +39,34 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E // One side won, the other one is pending, so the whole outcome is. return SYNC_RAFT_VOTE_PENDING; } + +void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + int i, min; + + for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { + if (config->incoming.nodeInfo[i].nodeId == id) { + return; + } + if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + min = i; + } + } + + assert(min != -1); + config->incoming.nodeInfo[min].nodeId = id; + config->incoming.replica += 1; +} + +void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (config->incoming.nodeInfo[i].nodeId == id) { + config->incoming.replica -= 1; + config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID; + break; + } + } + + assert(config->incoming.replica >= 0); +} \ No newline at end of file From 83a40a7f5abe01f5d83211199b87954fc68ef642 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 15 Nov 2021 11:11:06 +0800 Subject: [PATCH 5/8] [TD-10645][raft]refactor sync interface --- include/libs/sync/sync.h | 29 ++++++++--------------------- source/libs/sync/inc/sync_type.h | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fc48d260f5..6a9772bcc4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -109,33 +109,20 @@ typedef struct SSyncLogStore { SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore); } SSyncLogStore; -typedef struct SSyncServerState { - SyncNodeId voteFor; - SyncTerm term; - SyncIndex commitIndex; -} SSyncServerState; - -typedef struct SSyncClusterConfig { - // Log index number of current cluster config. - SyncIndex index; - - // Log index number of previous cluster config. - SyncIndex prevIndex; - - // current cluster - const SSyncCluster* cluster; -} SSyncClusterConfig; - typedef struct SStateManager { void* pData; - int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state); + // save serialized server state data, buffer will be free by Sync + int32_t (*saveServerState)(struct SStateManager* stateMng, const char* buffer, int n); - int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); + // read serialized server state data, buffer will be free by Sync + int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); - void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); + // save serialized cluster state data, buffer will be free by Sync + void (*saveCluster)(struct SStateManager* stateMng, const char* buffer, int n); - const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); + // read serialized cluster state data, buffer will be free by Sync + int32_t (*readCluster)(struct SStateManager* stateMng, char** ppBuffer, int* n); } SStateManager; typedef struct { diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index a7977f318e..34be10dfd5 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -50,6 +50,24 @@ typedef struct SSyncRaftEntry SSyncRaftEntry; #endif #endif + +typedef struct SSyncServerState { + SyncNodeId voteFor; + SyncTerm term; + SyncIndex commitIndex; +} SSyncServerState; + +typedef struct SSyncClusterConfig { + // Log index number of current cluster config. + SyncIndex index; + + // Log index number of previous cluster config. + SyncIndex prevIndex; + + // current cluster + const SSyncCluster* cluster; +} SSyncClusterConfig; + typedef struct { int32_t replica; SyncNodeId nodeId[TSDB_MAX_REPLICA]; From 2521994923d9fe1e4c514a310dbf25653f89edf3 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 15 Nov 2021 11:58:12 +0800 Subject: [PATCH 6/8] [TD-10645][raft]refactor sync interface --- .../libs/sync/inc/sync_raft_config_change.h | 2 +- source/libs/sync/inc/sync_raft_restore.h | 2 +- source/libs/sync/src/raft.c | 22 +++++++++++++-- .../libs/sync/src/sync_raft_config_change.c | 28 +++++++++++++------ source/libs/sync/src/sync_raft_restore.c | 4 +-- 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_config_change.h b/source/libs/sync/inc/sync_raft_config_change.h index 98b8b49cb8..a54a7544fe 100644 --- a/source/libs/sync/inc/sync_raft_config_change.h +++ b/source/libs/sync/inc/sync_raft_config_change.h @@ -36,7 +36,7 @@ typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSi int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); -int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); #endif /* TD_SYNC_RAFT_CONFIG_CHANGE_H */ diff --git a/source/libs/sync/inc/sync_raft_restore.h b/source/libs/sync/inc/sync_raft_restore.h index f7c4ce67b5..38eadb00c7 100644 --- a/source/libs/sync/inc/sync_raft_restore.h +++ b/source/libs/sync/inc/sync_raft_restore.h @@ -19,7 +19,7 @@ #include "sync_type.h" #include "sync_raft_proto.h" -// Restore takes a Changer (which must represent an empty configuration), and +// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // runs a sequence of changes enacting the configuration described in the // ConfState. // diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index cdf2abebea..d39824c99c 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -22,6 +22,9 @@ #define RAFT_READ_LOG_MAX_NUM 100 +static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n); +static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n); + static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -35,7 +38,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SyncIndex initIndex = pInfo->snapshotIndex; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; int nBuf, limit, i; - + char* buf; + int n; + memset(pRaft, 0, sizeof(SSyncRaft)); memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM)); @@ -57,10 +62,15 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { return -1; } // read server state - if (stateManager->readServerState(stateManager, &serverState) != 0) { + if (stateManager->readServerState(stateManager, &buf, &n) != 0) { syncError("readServerState for vgid %d fail", pInfo->vgId); return -1; } + if (deserializeServerStateFromBuffer(&serverState, buf, n) != 0) { + syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId); + return -1; + } + assert(initIndex <= serverState.commitIndex); // restore fsm state from snapshot index + 1 until commitIndex @@ -119,6 +129,14 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } +static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n) { + return 0; +} + +static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) { + return 0; +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 7bf409fba0..b80562ffa3 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -17,6 +17,7 @@ #include "sync_raft_config_change.h" #include "sync_raft_progress.h" #include "sync_raft_progress_tracker.h" +#include "sync_raft_quorum_joint.h" static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); @@ -38,8 +39,8 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* SSyncRaftProgressMap* progressMap, SyncNodeId id); static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id); -static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, - SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); // syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) // mutates the incoming majority config Voters[0] by at most one. This method // will return an error if that is not the case, if the resulting quorum is @@ -87,7 +88,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange // (Section 4.3) corresponds to `C_{new,old}`. // // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf -int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, +int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { int ret; @@ -108,9 +109,18 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSi } // Clear the outgoing config. - syncRaftJointConfigClearOutgoing(config); + syncRaftJointConfigClearOutgoing(&config->voters); // Copy incoming to outgoing. + memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster)); + + ret = applyConfig(changer, config, progressMap, css); + if (ret != 0) { + return ret; + } + + config->autoLeave = autoLeave; + return checkAndReturn(config, progressMap); } // checkAndCopy copies the tracker's config and progress map (deeply enough for @@ -210,7 +220,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig makeLearner(changer, config, progressMap, cs->nodeId); break; case SYNC_RAFT_Conf_RemoveNode: - remove(changer, config, progressMap, cs->nodeId); + removeNodeId(changer, config, progressMap, cs->nodeId); break; case SYNC_RAFT_Conf_UpdateNode: break; @@ -309,7 +319,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* progress->isLearner = false; nilAwareDelete(&config->learners, id); nilAwareDelete(&config->learnersNext, id); - syncRaftJointConfigAddToIncoming(config, id); + syncRaftJointConfigAddToIncoming(&config->voters, id); } // makeLearner makes the given ID a learner or stages it to be a learner once @@ -339,7 +349,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi return; } // Remove any existing voter in the incoming config... - remove(changer, config, progressMap, id); + removeNodeId(changer, config, progressMap, id); // ... but save the Progress. syncRaftAddToProgressMap(progressMap, id); @@ -358,8 +368,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi } } -// remove this peer as a voter or learner from the incoming config. -static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, +// removeNodeId this peer as a voter or learner from the incoming config. +static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { int i = syncRaftFindProgressIndexByNodeId(progressMap, id); if (i == -1) { diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index b7d9cc6888..d2bdbd6351 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -19,7 +19,7 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); -// Restore takes a Changer (which must represent an empty configuration), and +// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // runs a sequence of changes enacting the configuration described in the // ConfState. // @@ -70,7 +70,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) } } - ret = syncRaftChangerEnterJoint(changer, &incoming, config, progressMap); + ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap); if (ret != 0) { goto out; } From 8ab1eb642e50e802b8f2df905886cd37e5712150 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 16 Nov 2021 11:42:34 +0800 Subject: [PATCH 7/8] [TD-10645][raft]refactor sync interface --- include/libs/sync/sync.h | 6 +- source/libs/sync/inc/raft_log.h | 2 + source/libs/sync/inc/raft_replication.h | 7 +- source/libs/sync/inc/sync_raft_impl.h | 12 ++ source/libs/sync/inc/sync_raft_progress.h | 14 ++ .../sync/inc/sync_raft_progress_tracker.h | 3 + source/libs/sync/inc/sync_raft_quorum_joint.h | 4 +- source/libs/sync/inc/sync_type.h | 2 +- source/libs/sync/src/raft.c | 119 +++++++++++++- .../src/raft_handle_append_entries_message.c | 2 +- source/libs/sync/src/raft_log.c | 4 + source/libs/sync/src/raft_replication.c | 153 +++++++----------- source/libs/sync/src/sync_raft_impl.c | 34 ++-- .../sync/src/sync_raft_progress_tracker.c | 8 + source/libs/sync/src/sync_raft_restore.c | 2 +- 15 files changed, 253 insertions(+), 119 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6a9772bcc4..283604508f 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -119,15 +119,15 @@ typedef struct SStateManager { int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); // save serialized cluster state data, buffer will be free by Sync - void (*saveCluster)(struct SStateManager* stateMng, const char* buffer, int n); + void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n); // read serialized cluster state data, buffer will be free by Sync - int32_t (*readCluster)(struct SStateManager* stateMng, char** ppBuffer, int* n); + int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n); } SStateManager; typedef struct { SyncGroupId vgId; - SyncIndex snapshotIndex; + SyncIndex appliedIndex; SSyncCluster syncCfg; SSyncFSM fsm; SSyncLogStore logStore; diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index b36f0da3be..dc10c59b28 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog); SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); +void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex); + bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h index e457063980..d0e55ef10e 100644 --- a/source/libs/sync/inc/raft_replication.h +++ b/source/libs/sync/inc/raft_replication.h @@ -20,6 +20,11 @@ #include "syncInt.h" #include "sync_type.h" -int syncRaftReplicate(SSyncRaft* pRaft, int i); +// syncRaftReplicate sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); #endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index 2f96b970dc..bd77978c28 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft); +bool syncRaftMaybeCommit(SSyncRaft* pRaft); + ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum, int *granted); +static FORCE_INLINE bool syncRaftIsEmptyServerState(const SSyncServerState* serverState) { + return serverState->commitIndex == 0 && + serverState->term == SYNC_NON_TERM && + serverState->voteFor == SYNC_NON_NODE_ID; +} + +void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); + +void syncRaftBroadcastAppend(SSyncRaft* pRaft); + #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index c733a8ea74..173608a40a 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -58,11 +58,20 @@ typedef enum ESyncRaftProgressState { PROGRESS_STATE_SNAPSHOT, } ESyncRaftProgressState; +static const char* kProgressStateString[] = { + "Probe", + "Replicate", + "Snapshot", +}; + /** * Progress represents a follower’s progress in the view of the leader. Leader maintains * progresses of all followers, and sends entries to the follower based on its progress. **/ struct SSyncRaftProgress { + // index in raft cluster config + int selfIndex; + SyncNodeId id; SyncIndex nextIndex; @@ -133,6 +142,11 @@ struct SSyncRaftProgressMap { SSyncRaftProgress progress[TSDB_MAX_REPLICA]; }; + +static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) { + return kProgressStateString[progress->state]; +} + void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); /** diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 61308d5df5..b267c46f35 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -20,6 +20,7 @@ #include "sync_raft_quorum.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_progress.h" +#include "sync_raft_proto.h" struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -109,4 +110,6 @@ int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaf **/ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); +void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); + #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index e2c2fd89b2..798e3d5eca 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -25,8 +25,8 @@ * majority configurations. Decisions require the support of both majorities. **/ typedef struct SSyncRaftQuorumJointConfig { - SSyncCluster outgoing; - SSyncCluster incoming; + SSyncRaftNodeMap outgoing; + SSyncRaftNodeMap incoming; } SSyncRaftQuorumJointConfig; /** diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 34be10dfd5..cb938c7319 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -86,7 +86,7 @@ typedef enum { // grant the vote request SYNC_RAFT_VOTE_RESP_GRANT = 1, - //reject the vote request + // reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, } ESyncRaftVoteType; diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index d39824c99c..f8c3d1b0d4 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -16,14 +16,19 @@ #include "raft.h" #include "raft_configuration.h" #include "raft_log.h" +#include "sync_raft_restore.h" #include "raft_replication.h" +#include "sync_raft_config_change.h" #include "sync_raft_progress_tracker.h" #include "syncInt.h" #define RAFT_READ_LOG_MAX_NUM 100 static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n); -static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n); +static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n); + +static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -32,14 +37,15 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; SSyncServerState serverState; + SSyncConfigState confState; SStateManager* stateManager; SSyncLogStore* logStore; SSyncFSM* fsm; - SyncIndex initIndex = pInfo->snapshotIndex; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; int nBuf, limit, i; char* buf; int n; + SSyncRaftChanger changer; memset(pRaft, 0, sizeof(SSyncRaft)); @@ -70,8 +76,45 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId); return -1; } + free(buf); + //assert(initIndex <= serverState.commitIndex); + + // read config state + if (stateManager->readClusterState(stateManager, &buf, &n) != 0) { + syncError("readClusterState for vgid %d fail", pInfo->vgId); + return -1; + } + if (deserializeClusterStateFromBuffer(&confState, buf, n) != 0) { + syncError("deserializeClusterStateFromBuffer for vgid %d fail", pInfo->vgId); + return -1; + } + free(buf); + + changer = (SSyncRaftChanger) { + .tracker = pRaft->tracker, + .lastIndex = syncRaftLogLastIndex(pRaft->log), + }; + if (syncRaftRestoreConfig(&changer, &confState) < 0) { + syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId); + return -1; + } + + if (!syncRaftIsEmptyServerState(&serverState)) { + syncRaftLoadState(pRaft, &serverState); + } + + if (pInfo->appliedIndex > 0) { + syncRaftLogAppliedTo(pRaft->log, pInfo->appliedIndex); + } + + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + + +#if 0 + + + - assert(initIndex <= serverState.commitIndex); // restore fsm state from snapshot index + 1 until commitIndex ++initIndex; @@ -96,6 +139,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); pRaft->selfIndex = pRaft->cluster.selfIndex; +#endif syncInfo("[%d:%d] restore vgid %d state: snapshot index success", pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); @@ -133,10 +177,77 @@ static int deserializeServerStateFromBuffer(SSyncServerState* server, const char return 0; } -static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) { +static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n) { return 0; } +static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { + syncRaftReplicate(arg, progress, false); +} + +// switchToConfig reconfigures this node to use the provided configuration. It +// updates the in-memory state and, when necessary, carries out additional +// actions such as reacting to the removal of nodes or changed quorum +// requirements. +// +// The inputs usually result from restoring a ConfState or applying a ConfChange. +static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs) { + SyncNodeId selfId = pRaft->selfId; + int i; + bool exist; + SSyncRaftProgress* progress = NULL; + + syncRaftConfigState(pRaft->tracker, cs); + i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId); + exist = (i != -1); + + // Update whether the node itself is a learner, resetting to false when the + // node is removed. + if (exist) { + progress = &pRaft->tracker->progressMap.progress[i]; + pRaft->isLearner = progress->isLearner; + } else { + pRaft->isLearner = false; + } + + if ((!exist || pRaft->isLearner) && pRaft->state == TAOS_SYNC_STATE_LEADER) { + // This node is leader and was removed or demoted. We prevent demotions + // at the time writing but hypothetically we handle them the same way as + // removing the leader: stepping down into the next Term. + // + // TODO(tbg): step down (for sanity) and ask follower with largest Match + // to TimeoutNow (to avoid interruption). This might still drop some + // proposals but it's better than nothing. + // + // TODO(tbg): test this branch. It is untested at the time of writing. + return; + } + + // The remaining steps only make sense if this node is the leader and there + // are other nodes. + if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) { + return; + } + + if (syncRaftMaybeCommit(pRaft)) { + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + syncRaftBroadcastAppend(pRaft); + } else { + // Otherwise, still probe the newly added replicas; there's no reason to + // let them wait out a heartbeat interval (or the next incoming + // proposal). + syncRaftProgressVisit(pRaft->tracker, visitProgressMaybeSendAppend, pRaft); + + // If the the leadTransferee was removed or demoted, abort the leadership transfer. + SyncNodeId leadTransferee = pRaft->leadTransferee; + if (leadTransferee != SYNC_NON_NODE_ID) { + + } + } +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index 8c014a56bc..4797b6ce03 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs return 0; } - RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp); + RaftMsg_Append_Resp *appendResp = &(pRespMsg->appendResp); // ignore committed logs if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { appendResp->index = pRaft->log->commitIndex; diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index 0654dbea6b..b6e6d292e8 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { return 0; } +void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex) { + +} + bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) { return true; } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 78536bc6a3..c19fcd1e68 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -16,106 +16,62 @@ #include "raft.h" #include "raft_log.h" #include "sync_raft_progress.h" +#include "syncInt.h" #include "raft_replication.h" -static int sendSnapshot(SSyncRaft* pRaft, int i); -static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); +static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); +static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, + SyncIndex prevIndex, SyncTerm prevTerm, + const SSyncRaftEntry *entries, int nEntry); -int syncRaftReplicate(SSyncRaft* pRaft, int i) { -#if 0 +// syncRaftReplicate sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { assert(pRaft->state == TAOS_SYNC_STATE_LEADER); - assert(i >= 0 && i < pRaft->leaderState.nProgress); + SyncNodeId nodeId = progress->id; - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); if (syncRaftProgressIsPaused(progress)) { - syncInfo("node %d paused", nodeId); - return 0; + syncInfo("node [%d:%d] paused", pRaft->selfGroupId, nodeId); + return false; } SyncIndex nextIndex = syncRaftProgressNextIndex(progress); - SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log); - bool inSnapshot = syncRaftProgressInSnapshot(progress); + SSyncRaftEntry *entries; + int nEntry; SyncIndex prevIndex; SyncTerm prevTerm; - /** - * From Section 3.5: - * - * When sending an AppendEntries RPC, the leader includes the index and - * term of the entry in its log that immediately precedes the new - * entries. If the follower does not find an entry in its log with the - * same index and term, then it refuses the new entries. The consistency - * check acts as an induction step: the initial empty state of the logs - * satisfies the Log Matching Property, and the consistency check - * preserves the Log Matching Property whenever logs are extended. As a - * result, whenever AppendEntries returns successfully, the leader knows - * that the follower's log is identical to its own log up through the new - * entries (Log Matching Property in Figure 3.2). - **/ - if (nextIndex == 1) { - /** - * We're including the very first entry, so prevIndex and prevTerm are - * null. If the first entry is not available anymore, send the last - * snapshot if we're not already sending one. - **/ - if (snapshotIndex > 0 && !inSnapshot) { - goto send_snapshot; - } + prevIndex = nextIndex - 1; + prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + int ret = syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); - // otherwise send append entries from start - prevIndex = 0; - prevTerm = 0; - } else { - /** - * Set prevIndex and prevTerm to the index and term of the entry at - * nextIndex - 1. - **/ - prevIndex = nextIndex - 1; - prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); - /** - * If the entry is not anymore in our log, send the last snapshot if we're - * not doing so already. - **/ - if (prevTerm == SYNC_NON_TERM && !inSnapshot) { - goto send_snapshot; - } + if (nEntry == 0 && !sendIfEmpty) { + return false; } - /* Send empty AppendEntries RPC when installing a snaphot */ - if (inSnapshot) { - prevIndex = syncRaftLogLastIndex(pRaft->log); - prevTerm = syncRaftLogLastTerm(pRaft->log); + if (ret != 0 || prevTerm == SYNC_NON_TERM) { + return sendSnapshot(pRaft, progress); } - return sendAppendEntries(pRaft, i, prevIndex, prevTerm); - -send_snapshot: - if (syncRaftProgressRecentActive(progress)) { - /* Only send a snapshot when we have heard from the server */ - return sendSnapshot(pRaft, i); - } else { - /* Send empty AppendEntries RPC when we haven't heard from the server */ - prevIndex = syncRaftLogLastIndex(pRaft->log); - prevTerm = syncRaftLogLastTerm(pRaft->log); - return sendAppendEntries(pRaft, i, prevIndex, prevTerm); - } -#endif - return 0; + return sendAppendEntries(pRaft, progress, prevIndex, prevTerm, entries, nEntry); } -static int sendSnapshot(SSyncRaft* pRaft, int i) { - return 0; +static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + if (!syncRaftProgressRecentActive(progress)) { + return false; + } + return true; } -static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { -#if 0 - SyncIndex nextIndex = prevIndex + 1; - SSyncRaftEntry *entries; - int nEntry; - SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); +static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, + SyncIndex prevIndex, SyncTerm prevTerm, + const SSyncRaftEntry *entries, int nEntry) { + SyncIndex lastIndex; + SyncTerm logTerm = prevTerm; + SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, prevIndex, prevTerm, pRaft->log->commitIndex, @@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT goto err_release_log; } - pRaft->io.send(msg, pNode); - - if (syncRaftProgressInReplicate(progress)) { - SyncIndex lastIndex = nextIndex + nEntry; - syncRaftProgressOptimisticNextIndex(progress, lastIndex); - syncRaftInflightAdd(&progress->inflights, lastIndex); - } else if (syncRaftProgressInProbe(progress)) { - syncRaftProgressPause(progress); - } else { - + if (nEntry != 0) { + switch (progress->state) { + // optimistically increase the next when in StateReplicate + case PROGRESS_STATE_REPLICATE: + lastIndex = entries[nEntry - 1].index; + syncRaftProgressOptimisticNextIndex(progress, lastIndex); + syncRaftInflightAdd(&progress->inflights, lastIndex); + break; + case PROGRESS_STATE_PROBE: + progress->probeSent = true; + break; + default: + syncFatal("[%d:%d] is sending append in unhandled state %s", + pRaft->selfGroupId, pRaft->selfId, syncRaftProgressStateString(progress)); + break; + } } - - syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); - - return 0; + pRaft->io.send(msg, pNode); + return true; err_release_log: - syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); -#endif - return 0; -} \ No newline at end of file + syncRaftLogRelease(pRaft->log, prevIndex + 1, entries, nEntry); + return false; +} diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 5e23474a89..ba09291682 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft); static void tickHeartbeat(SSyncRaft* pRaft); static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); -static bool maybeCommit(SSyncRaft* pRaft); static void abortLeaderTransfer(SSyncRaft* pRaft); @@ -171,6 +170,25 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, return granted; */ +void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { + SyncIndex commitIndex = serverState->commitIndex; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + + if (commitIndex < pRaft->log->commitIndex || commitIndex > lastIndex) { + syncFatal("[%d:%d] state.commit %"PRId64" is out of range [%" PRId64 ",%" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, commitIndex, pRaft->log->commitIndex, lastIndex); + return; + } + + pRaft->log->commitIndex = commitIndex; + pRaft->term = serverState->term; + pRaft->voteFor = serverState->voteFor; +} + +void syncRaftBroadcastAppend(SSyncRaft* pRaft) { + +} + static int convertClear(SSyncRaft* pRaft) { } @@ -245,16 +263,14 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); syncRaftProgressMaybeUpdate(progress, lastIndex); - // Regardless of maybeCommit's return, our caller will call bcastAppend. - maybeCommit(pRaft); + // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. + syncRaftMaybeCommit(pRaft); } -/** - * maybeCommit attempts to advance the commit index. Returns true if - * the commit index changed (in which case the caller should call - * r.bcastAppend). - **/ -static bool maybeCommit(SSyncRaft* pRaft) { +// syncRaftMaybeCommit attempts to advance the commit index. Returns true if +// the commit index changed (in which case the caller should call +// r.bcastAppend). +bool syncRaftMaybeCommit(SSyncRaft* pRaft) { return true; } diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 525b2eec1a..ea7f1ae4f5 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -14,6 +14,7 @@ */ #include "sync_raft_progress_tracker.h" +#include "sync_raft_proto.h" SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); @@ -77,4 +78,11 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r if (rejected) *rejected = r; if (granted) *granted = g; return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); +} + +void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { + memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap)); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index d2bdbd6351..01bc7da7eb 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -48,7 +48,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) .n = 1, .changes = &incoming.changes[i], }; - ret = syncRaftChangerSimpleConfig(changer, &css, &config, &progressMap); + ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap); if (ret != 0) { goto out; } From ffed86366f4207e7f82c7e1f8c23be3883ce1199 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 16 Nov 2021 13:50:28 +0800 Subject: [PATCH 8/8] [TD-10645][raft]fix compile error --- .../sync/inc/sync_raft_progress_tracker.h | 2 + source/libs/sync/inc/sync_raft_quorum_joint.h | 19 +++------ .../libs/sync/inc/sync_raft_quorum_majority.h | 2 +- source/libs/sync/src/raft.c | 42 ++++--------------- .../libs/sync/src/sync_raft_config_change.c | 18 ++++---- source/libs/sync/src/sync_raft_impl.c | 16 ++++++- source/libs/sync/src/sync_raft_quorum_joint.c | 23 +++++++--- .../libs/sync/src/sync_raft_quorum_majority.c | 4 +- 8 files changed, 59 insertions(+), 67 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index b267c46f35..8adb0b4736 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -112,4 +112,6 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 798e3d5eca..0ef002fe1a 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -36,34 +36,25 @@ typedef struct SSyncRaftQuorumJointConfig { **/ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); -static FORCE_INLINE bool syncRaftJointConfigInCluster(const SSyncCluster* cluster, SyncNodeId id) { - int i; - for (i = 0; i < cluster->replica; ++i) { - if (cluster->nodeInfo[i].nodeId == id) { - return true; - } - } - - return false; -} +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - return syncRaftJointConfigInCluster(&config->outgoing, id); + return syncRaftIsInNodeMap(&config->outgoing, id); } static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - return syncRaftJointConfigInCluster(&config->incoming, id); + return syncRaftIsInNodeMap(&config->incoming, id); } void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); -static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { +static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { return &config->incoming; } -static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { +static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { return &config->outgoing; } diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 8f148873e3..0512a4dc87 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -26,6 +26,6 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index f8c3d1b0d4..23351277c4 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -30,6 +30,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs); +static void abortLeaderTransfer(SSyncRaft* pRaft); + static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -109,38 +111,6 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); - -#if 0 - - - - - - // restore fsm state from snapshot index + 1 until commitIndex - ++initIndex; - while (initIndex <= serverState.commitIndex) { - limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1); - - if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) { - return -1; - } - assert(limit == nBuf); - - for (i = 0; i < limit; ++i) { - fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL); - free(buffer[i].data); - } - initIndex += nBuf; - } - assert(initIndex == serverState.commitIndex); - - //pRaft->heartbeatTimeoutTick = 1; - - syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); - - pRaft->selfIndex = pRaft->cluster.selfIndex; -#endif - syncInfo("[%d:%d] restore vgid %d state: snapshot index success", pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); return 0; @@ -242,12 +212,16 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // If the the leadTransferee was removed or demoted, abort the leadership transfer. SyncNodeId leadTransferee = pRaft->leadTransferee; - if (leadTransferee != SYNC_NON_NODE_ID) { - + if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { + abortLeaderTransfer(pRaft); } } } +static void abortLeaderTransfer(SSyncRaft* pRaft) { + pRaft->leadTransferee = SYNC_NON_NODE_ID; +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index b80562ffa3..4e7f2190ea 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -26,8 +26,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); -static int symDiff(const SSyncCluster* l, const SSyncCluster* r); +static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r); static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner); @@ -237,27 +237,27 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig // symdiff returns the count of the symmetric difference between the sets of // uint64s, i.e. len( (l - r) \union (r - l)). -static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { +static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) { int n; int i; int j0, j1; - const SSyncCluster* pairs[2][2] = { + const SSyncRaftNodeMap* pairs[2][2] = { {l, r}, // count elems in l but not in r {r, l}, // count elems in r but not in l }; for (n = 0, i = 0; i < 2; ++i) { - const SSyncCluster** pp = pairs[i]; + const SSyncRaftNodeMap** pp = pairs[i]; - const SSyncCluster* p0 = pp[0]; - const SSyncCluster* p1 = pp[1]; - for (j0 = 0; j0 < p0->replica; ++j0) { - SyncNodeId id = p0->nodeInfo[j0].nodeId; + const SSyncRaftNodeMap* p0 = pp[0]; + const SSyncRaftNodeMap* p1 = pp[1]; + for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) { + SyncNodeId id = p0->nodeId[j0]; if (id == SYNC_NON_NODE_ID) { continue; } for (j1 = 0; j1 < p1->replica; ++j1) { - if (p1->nodeInfo[j1].nodeId != SYNC_NON_NODE_ID && p1->nodeInfo[j1].nodeId != id) { + if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) { n+=1; } } diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index ba09291682..d65e03c64f 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -185,8 +185,17 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { pRaft->voteFor = serverState->voteFor; } -void syncRaftBroadcastAppend(SSyncRaft* pRaft) { +static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { + SSyncRaft* pRaft = (SSyncRaft*)arg; + if (pRaft->selfId == progress->id) { + return; + } + syncRaftReplicate(arg, progress, true); +} + +void syncRaftBroadcastAppend(SSyncRaft* pRaft) { + syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } static int convertClear(SSyncRaft* pRaft) { @@ -279,6 +288,7 @@ bool syncRaftMaybeCommit(SSyncRaft* pRaft) { * trigger I/O requests for newly appended log entries or heartbeats. **/ static int triggerAll(SSyncRaft* pRaft) { + #if 0 assert(pRaft->state == TAOS_SYNC_STATE_LEADER); int i; @@ -287,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) { continue; } - syncRaftReplicate(pRaft, i); + syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true); } + #endif + return 0; } static void abortLeaderTransfer(SSyncRaft* pRaft) { diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index 9a8e9eb7ba..fa663b6fc3 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -44,16 +44,16 @@ void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNo int i, min; for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeInfo[i].nodeId == id) { + if (config->incoming.nodeId[i] == id) { return; } - if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) { min = i; } } assert(min != -1); - config->incoming.nodeInfo[min].nodeId = id; + config->incoming.nodeId[min] = id; config->incoming.replica += 1; } @@ -61,12 +61,25 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S int i; for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeInfo[i].nodeId == id) { + if (config->incoming.nodeId[i] == id) { config->incoming.replica -= 1; - config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID; + config->incoming.nodeId[i] = SYNC_NON_NODE_ID; break; } } assert(config->incoming.replica >= 0); +} + + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeId == nodeMap->nodeId[i]) { + return true; + } + } + + return false; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 0361845414..73eb378e09 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -22,14 +22,14 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes) { +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) { if (config->replica == 0) { return SYNC_RAFT_VOTE_WON; } int i, g, r, missing; for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + if (config->nodeId[i] == SYNC_NON_NODE_ID) { continue; }