diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 6fa6c6e346..129f0f4dbc 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -47,7 +47,6 @@ struct SSyncRaft { // hash map nodeId -> SNodeInfo* SHashObj* nodeInfoMap; - int selfIndex; SyncNodeId selfId; SyncGroupId selfGroupId; diff --git a/source/libs/sync/inc/sync_raft_node_map.h b/source/libs/sync/inc/sync_raft_node_map.h index 285717ed78..15e559a733 100644 --- a/source/libs/sync/inc/sync_raft_node_map.h +++ b/source/libs/sync/inc/sync_raft_node_map.h @@ -43,4 +43,6 @@ int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap); // return true if reach the end bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId); +bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap); + #endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 3d2995ed77..5664cfd15e 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -147,7 +147,7 @@ static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgr return kProgressStateString[progress->state]; } -void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); +void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); /** * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, @@ -227,6 +227,8 @@ int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgres void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); +bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + /** * return true if progress's log is up-todate **/ @@ -237,7 +239,9 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); // return true if reach the end -bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress); +bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress); + +bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg); #if 0 diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 3a448290c8..35daf8139c 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -88,13 +88,14 @@ struct SSyncRaftProgressTracker { SHashObj* votesMap; int maxInflightMsgs; + + SSyncRaft* pRaft; }; -SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft); void syncRaftResetVotes(SSyncRaftProgressTracker*); -typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); /** diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index fcb0940609..e00700d724 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -84,4 +84,6 @@ typedef enum { SYNC_RAFT_VOTE_RESP_REJECT = 2, } ESyncRaftVoteType; +typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); + #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index b2170e0b68..85c330ece3 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -65,7 +65,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } // init progress tracker - pRaft->tracker = syncRaftOpenProgressTracker(); + pRaft->tracker = syncRaftOpenProgressTracker(pRaft); if (pRaft->tracker == NULL) { return -1; } @@ -157,7 +157,7 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch return 0; } -static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { +static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) { syncRaftReplicate(arg, progress, false); } @@ -175,13 +175,12 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi SSyncRaftProgress* progress = NULL; syncRaftConfigState(pRaft->tracker, cs); - i = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId); - exist = (i != -1); + progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId); + exist = (progress != NULL); // Update whether the node itself is a learner, resetting to false when the // node is removed. if (exist) { - progress = &pRaft->tracker->progressMap.progress[i]; pRaft->isLearner = progress->isLearner; } else { pRaft->isLearner = false; diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 0fe31b9f08..1f7aab064f 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "raft.h" #include "syncInt.h" #include "sync_raft_config_change.h" #include "sync_raft_progress.h" @@ -168,7 +169,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id); - if (progress == -1) { + if (progress == NULL) { initProgress(changer, config, progressMap, id, false); return; } @@ -250,31 +251,34 @@ static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf } else { nilAwareAdd(&config->learners, id); } -} -// checkAndCopy copies the tracker's config and progress map (deeply enough for -// the purposes of the Changer) and returns those copies. It returns an error -// if checkInvariants does. -static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - syncRaftCloneTrackerConfig(&changer->tracker->config, config); - int i; + SSyncRaftProgress* pProgress = (SSyncRaftProgress*)malloc(sizeof(SSyncRaftProgress)); + assert (pProgress != NULL); + *pProgress = (SSyncRaftProgress) { + // Initializing the Progress with the last index means that the follower + // can be probed (with the last index). + // + // TODO(tbg): seems awfully optimistic. Using the first index would be + // better. The general expectation here is that the follower has no log + // at all (and will thus likely need a snapshot), though the app may + // have applied a snapshot out of band before adding the replica (thus + // making the first index the better choice). + .id = id, + .groupId = changer->tracker->pRaft->selfGroupId, + .nextIndex = changer->lastIndex, + .matchIndex = 0, + .state = PROGRESS_STATE_PROBE, + .pendingSnapshotIndex = 0, + .probeSent = false, + .inflights = syncRaftOpenInflights(changer->tracker->maxInflightMsgs), + .isLearner = isLearner, + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has had a chance to communicate with us. + .recentActive = true, + }; - SSyncRaftProgress* pProgress = NULL; - while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) { - syncRaftAddToProgressMap(progressMap, pProgress); - } - - return checkAndReturn(config, progressMap); -} - -// checkAndReturn calls checkInvariants on the input and returns either the -// resulting error or the input. -static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { - if (checkInvariants(config, progressMap) != 0) { - return -1; - } - - return 0; + syncRaftAddToProgressMap(progressMap, pProgress); } // checkInvariants makes sure that the config and progress are compatible with @@ -304,7 +308,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg } // Conversely Learners and Voters doesn't intersect at all. - SyncNodeId* pNodeId = NULL; + pNodeId = NULL; while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) { SyncNodeId nodeId = *pNodeId; if (syncRaftJointConfigInIncoming(&config->voters, nodeId)) { @@ -336,6 +340,31 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg return 0; } +// checkAndCopy copies the tracker's config and progress map (deeply enough for +// the purposes of the Changer) and returns those copies. It returns an error +// if checkInvariants does. +static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + syncRaftCloneTrackerConfig(&changer->tracker->config, config); + int i; + + SSyncRaftProgress* pProgress = NULL; + while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) { + syncRaftAddToProgressMap(progressMap, pProgress); + } + + return checkAndReturn(config, progressMap); +} + +// checkAndReturn calls checkInvariants on the input and returns either the +// resulting error or the input. +static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + if (checkInvariants(config, progressMap) != 0) { + return -1; + } + + return 0; +} + static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { return !syncRaftJointConfigIsOutgoingEmpty(&config->voters); } diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index d4013bbc08..6d36d38267 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -86,12 +86,7 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { SSyncRaftNodeMap nodeMap; syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); SyncNodeId *pNodeId = NULL; - while (true) { - syncRaftIterateNodeMap(&nodeMap, &pNodeId); - if (pNodeId == NULL || *pNodeId == NULL) { - break; - } - + while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) { SyncNodeId nodeId = *pNodeId; if (nodeId == SYNC_NON_NODE_ID) { continue; diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 73a02c4b80..2093bcb046 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -186,7 +186,7 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { pRaft->voteFor = serverState->voteFor; } -static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { +static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) { SSyncRaft* pRaft = (SSyncRaft*)arg; if (pRaft->selfId == progress->id) { return; @@ -279,7 +279,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { syncRaftLogAppend(pRaft->log, entries, n); - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->selfIndex]); + SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. syncRaftMaybeCommit(pRaft); @@ -316,8 +316,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } -static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { - syncRaftInitProgress(i, (SSyncRaft*)arg, progress); +static void resetProgress(SSyncRaftProgress* progress, void* arg) { + syncRaftResetProgress((SSyncRaft*)arg, progress); } static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { @@ -336,7 +336,7 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { abortLeaderTransfer(pRaft); syncRaftResetVotes(pRaft->tracker); - syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); + syncRaftProgressVisit(pRaft->tracker, resetProgress, pRaft); pRaft->pendingConfigIndex = 0; pRaft->uncommittedSize = 0; diff --git a/source/libs/sync/src/sync_raft_node_map.c b/source/libs/sync/src/sync_raft_node_map.c index 9adb3844f5..022f3b2dcb 100644 --- a/source/libs/sync/src/sync_raft_node_map.c +++ b/source/libs/sync/src/sync_raft_node_map.c @@ -15,6 +15,7 @@ #include "sync_raft_node_map.h" #include "sync_type.h" +#include "sync_raft_progress.h" void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) { nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); @@ -50,6 +51,17 @@ bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) { return false; } +bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap) { + SyncNodeId *pId = NULL; + while (!syncRaftIterateNodeMap(nodeMap, pId)) { + if (!syncRaftIsInProgressMap(progressMap, *pId)) { + return false; + } + } + + return true; +} + void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { syncRaftCopyNodeMap(nodeMap, to); } @@ -63,5 +75,5 @@ void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { } int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) { - return taosHashGetSize(nodeMap); + return taosHashGetSize(nodeMap->nodeIdMap); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index 436250e594..e63d37cee9 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -25,13 +25,16 @@ static void probeAcked(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress); -void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { +void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + if (progress->inflights) { + syncRaftCloseInflights(progress->inflights); + } SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs); if (inflights == NULL) { return; } *progress = (SSyncRaftProgress) { - .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, + .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .inflights = inflights, .isLearner = false, @@ -113,7 +116,7 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { } SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { - SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap, &id, sizeof(SyncNodeId*)); + SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)); if (ppProgress == NULL) { return NULL; } @@ -126,9 +129,18 @@ int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgres } void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)); + if (ppProgress == NULL) { + return; + } + free(*ppProgress); taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*)); } +bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + return taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)) != NULL; +} + bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } @@ -170,8 +182,8 @@ void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress* memcpy(out, progress, sizeof(SSyncRaftProgress)); } -bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress) { - SSyncRaftProgress **ppProgress = taosHashIterate(nodeMap->nodeIdMap, pProgress); +bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress) { + SSyncRaftProgress **ppProgress = taosHashIterate(progressMap->progressMap, pProgress); if (ppProgress == NULL) { return true; } @@ -180,6 +192,13 @@ bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgre return false; } +bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg) { + SSyncRaftProgress *pProgress; + while (!syncRaftIterateProgressMap(progressMap, pProgress)) { + fp(pProgress, arg); + } +} + /** * ResetState moves the Progress into the specified State, resetting ProbeSent, * PendingSnapshot, and Inflights. diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 3dd0a5ffe1..e6a016b7cf 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -16,7 +16,7 @@ #include "sync_raft_progress_tracker.h" #include "sync_raft_proto.h" -SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) { SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); if (tracker == NULL) { return NULL; @@ -24,6 +24,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { syncRaftInitNodeMap(&tracker->config.learners); syncRaftInitNodeMap(&tracker->config.learnersNext); + tracker->pRaft = pRaft; return tracker; } @@ -33,11 +34,7 @@ void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { } void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { - int i; - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - SSyncRaftProgress* progress = &(tracker->progressMap.progress[i]); - visit(i, progress, arg); - } + syncRaftVisitProgressMap(&tracker->progressMap, visit, arg); } void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { @@ -51,10 +48,20 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool g } void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { - + memcpy(to, from, sizeof(SSyncRaftProgressTrackerConfig)); } int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + // NB: intentionally allow the empty config. In production we'll never see a + // non-empty config (we prevent it from being created) but we will need to + // be able to *create* an initial config, for example during bootstrap (or + // during tests). Instead of having to hand-code this, we allow + // transitioning from an empty config into any other legal and non-empty + // config. + if (!syncRaftIsAllInProgressMap(&config->voters.incoming, progressMap)) return -1; + if (!syncRaftIsAllInProgressMap(&config->voters.outgoing, progressMap)) return -1; + if (!syncRaftIsAllInProgressMap(&config->learners, progressMap)) return -1; + if (!syncRaftIsAllInProgressMap(&config->learnersNext, progressMap)) return -1; return 0; } @@ -67,8 +74,7 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r SSyncRaftProgress* progress; int r, g; - for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { - progress = &(tracker->progressMap.progress[i]); + while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) { if (progress->id == SYNC_NON_NODE_ID) { continue; } @@ -91,8 +97,8 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r } void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { - syncRaftCopyNodeMap(&cs->voters, &tracker->config.voters.incoming); - syncRaftCopyNodeMap(&cs->votersOutgoing, &tracker->config.voters.outgoing); - syncRaftCopyNodeMap(&cs->learners, &tracker->config.learners); - syncRaftCopyNodeMap(&cs->learnersNext, &tracker->config.learnersNext); + syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters); + syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing); + syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners); + syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 014a8c7303..ff5ba64876 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -17,24 +17,24 @@ #include "sync_raft_quorum_majority.h" #include "sync_raft_node_map.h" -/** - * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns - * a result indicating whether the vote is pending (i.e. neither a quorum of - * yes/no has been reached), won (a quorum of yes has been reached), or lost (a - * quorum of no has been reached). - **/ +// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns +// a result indicating whether the vote is pending (i.e. neither a quorum of +// yes/no has been reached), won (a quorum of yes has been reached), or lost (a +// quorum of no has been reached). ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) { - if (config->replica == 0) { + int n = syncRaftNodeMapSize(config); + if (n == 0) { + // By convention, the elections on an empty config win. This comes in + // handy with joint quorums because it'll make a half-populated joint + // quorum behave like a majority quorum. return SYNC_RAFT_VOTE_WON; } int i, g, r, missing; - for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->nodeId[i] == SYNC_NON_NODE_ID) { - continue; - } - - const ESyncRaftVoteType* pType = taosHashGet(votesMap, &config->nodeId[i], sizeof(SyncNodeId*)); + i = g = r = missing = 0; + SyncNodeId* pId = NULL; + while (!syncRaftIterateNodeMap(config, pId)) { + const ESyncRaftVoteType* pType = taosHashGet(votesMap, pId, sizeof(SyncNodeId*)); if (pType == NULL) { missing += 1; continue; @@ -47,11 +47,11 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb } } - int quorum = config->replica / 2 + 1; + int quorum = n / 2 + 1; if (g >= quorum) { return SYNC_RAFT_VOTE_WON; } - if (r + missing >= quorum) { + if (g + missing >= quorum) { return SYNC_RAFT_VOTE_PENDING; } diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index 01bc7da7eb..17269254bd 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -17,6 +17,7 @@ #include "sync_raft_restore.h" #include "sync_raft_progress_tracker.h" +static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t); static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and @@ -82,6 +83,18 @@ out: return ret; } +static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t) { + SyncNodeId* pId = NULL; + + while (!syncRaftIterateNodeMap(nodeMap, pId)) { + out->changes[*i] = (SSyncConfChangeSingle) { + .type = t, + .nodeId = *pId, + }; + *i += 1; + } +} + // toConfChangeSingle translates a conf state into 1) a slice of operations creating // first the config that will become the outgoing one, and then the incoming one, and // b) another slice that, when applied to the config resulted from 1), represents the @@ -91,13 +104,16 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA out->n = in->n = 0; - out->n = cs->votersOutgoing.replica; + out->n = syncRaftNodeMapSize(&cs->votersOutgoing); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n); if (out->changes == NULL) { out->n = 0; return -1; } - in->n = cs->votersOutgoing.replica + cs->voters.replica + cs->learners.replica + cs->learnersNext.replica; + in->n = syncRaftNodeMapSize(&cs->votersOutgoing) + + syncRaftNodeMapSize(&cs->voters) + + syncRaftNodeMapSize(&cs->learners) + + syncRaftNodeMapSize(&cs->learnersNext); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n); if (in->changes == NULL) { in->n = 0; @@ -132,50 +148,24 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA // // as desired. - for (i = 0; i < cs->votersOutgoing.replica; ++i) { - // If there are outgoing voters, first add them one by one so that the - // (non-joint) config has them all. - out->changes[i] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddNode, - .nodeId = cs->votersOutgoing.nodeId[i], - }; - } + // If there are outgoing voters, first add them one by one so that the + // (non-joint) config has them all. + i = 0; + addToConfChangeSingleArray(out, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_AddNode); + assert(i == out->n); // We're done constructing the outgoing slice, now on to the incoming one // (which will apply on top of the config created by the outgoing slice). - + i = 0; + // First, we'll remove all of the outgoing voters. - int j = 0; - for (i = 0; i < cs->votersOutgoing.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_RemoveNode, - .nodeId = cs->votersOutgoing.nodeId[i], - }; - j += 1; - } + addToConfChangeSingleArray(in, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_RemoveNode); + // Then we'll add the incoming voters and learners. - for (i = 0; i < cs->voters.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddNode, - .nodeId = cs->voters.nodeId[i], - }; - j += 1; - } - for (i = 0; i < cs->learners.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddLearnerNode, - .nodeId = cs->learners.nodeId[i], - }; - j += 1; - } - // Same for LearnersNext; these are nodes we want to be learners but which - // are currently voters in the outgoing config. - for (i = 0; i < cs->learnersNext.replica; ++i) { - in->changes[j] = (SSyncConfChangeSingle) { - .type = SYNC_RAFT_Conf_AddLearnerNode, - .nodeId = cs->learnersNext.nodeId[i], - }; - j += 1; - } + addToConfChangeSingleArray(in, &i, &cs->voters, SYNC_RAFT_Conf_AddNode); + addToConfChangeSingleArray(in, &i, &cs->learners, SYNC_RAFT_Conf_AddLearnerNode); + addToConfChangeSingleArray(in, &i, &cs->learnersNext, SYNC_RAFT_Conf_AddLearnerNode); + assert(i == in->n); + return 0; } \ No newline at end of file