[TD-10645][raft]<feature>refactor node and progress map

This commit is contained in:
lichuang 2021-11-17 15:50:57 +08:00
parent da029f32a7
commit 98a6b1918c
14 changed files with 180 additions and 122 deletions

View File

@ -47,7 +47,6 @@ struct SSyncRaft {
// hash map nodeId -> SNodeInfo* // hash map nodeId -> SNodeInfo*
SHashObj* nodeInfoMap; SHashObj* nodeInfoMap;
int selfIndex;
SyncNodeId selfId; SyncNodeId selfId;
SyncGroupId selfGroupId; SyncGroupId selfGroupId;

View File

@ -43,4 +43,6 @@ int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap);
// return true if reach the end // return true if reach the end
bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId); bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId);
bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap);
#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */ #endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */

View File

@ -147,7 +147,7 @@ static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgr
return kProgressStateString[progress->state]; return kProgressStateString[progress->state];
} }
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress);
/** /**
* syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
@ -227,6 +227,8 @@ int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgres
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
/** /**
* return true if progress's log is up-todate * return true if progress's log is up-todate
**/ **/
@ -237,7 +239,9 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to); void syncRaftCopyProgress(const SSyncRaftProgress* from, SSyncRaftProgress* to);
// return true if reach the end // return true if reach the end
bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress); bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress);
bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg);
#if 0 #if 0

View File

@ -88,13 +88,14 @@ struct SSyncRaftProgressTracker {
SHashObj* votesMap; SHashObj* votesMap;
int maxInflightMsgs; int maxInflightMsgs;
SSyncRaft* pRaft;
}; };
SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft);
void syncRaftResetVotes(SSyncRaftProgressTracker*); void syncRaftResetVotes(SSyncRaftProgressTracker*);
typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg);
void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg);
/** /**

View File

@ -84,4 +84,6 @@ typedef enum {
SYNC_RAFT_VOTE_RESP_REJECT = 2, SYNC_RAFT_VOTE_RESP_REJECT = 2,
} ESyncRaftVoteType; } ESyncRaftVoteType;
typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg);
#endif /* _TD_LIBS_SYNC_TYPE_H */ #endif /* _TD_LIBS_SYNC_TYPE_H */

View File

@ -65,7 +65,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
} }
// init progress tracker // init progress tracker
pRaft->tracker = syncRaftOpenProgressTracker(); pRaft->tracker = syncRaftOpenProgressTracker(pRaft);
if (pRaft->tracker == NULL) { if (pRaft->tracker == NULL) {
return -1; return -1;
} }
@ -157,7 +157,7 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch
return 0; return 0;
} }
static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { static void visitProgressMaybeSendAppend(SSyncRaftProgress* progress, void* arg) {
syncRaftReplicate(arg, progress, false); syncRaftReplicate(arg, progress, false);
} }
@ -175,13 +175,12 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
SSyncRaftProgress* progress = NULL; SSyncRaftProgress* progress = NULL;
syncRaftConfigState(pRaft->tracker, cs); syncRaftConfigState(pRaft->tracker, cs);
i = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId); progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, selfId);
exist = (i != -1); exist = (progress != NULL);
// Update whether the node itself is a learner, resetting to false when the // Update whether the node itself is a learner, resetting to false when the
// node is removed. // node is removed.
if (exist) { if (exist) {
progress = &pRaft->tracker->progressMap.progress[i];
pRaft->isLearner = progress->isLearner; pRaft->isLearner = progress->isLearner;
} else { } else {
pRaft->isLearner = false; pRaft->isLearner = false;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "raft.h"
#include "syncInt.h" #include "syncInt.h"
#include "sync_raft_config_change.h" #include "sync_raft_config_change.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
@ -168,7 +169,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgressMap* progressMap, SyncNodeId id) {
SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(progressMap, id);
if (progress == -1) { if (progress == NULL) {
initProgress(changer, config, progressMap, id, false); initProgress(changer, config, progressMap, id, false);
return; return;
} }
@ -250,31 +251,34 @@ static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf
} else { } else {
nilAwareAdd(&config->learners, id); nilAwareAdd(&config->learners, id);
} }
}
// checkAndCopy copies the tracker's config and progress map (deeply enough for SSyncRaftProgress* pProgress = (SSyncRaftProgress*)malloc(sizeof(SSyncRaftProgress));
// the purposes of the Changer) and returns those copies. It returns an error assert (pProgress != NULL);
// if checkInvariants does. *pProgress = (SSyncRaftProgress) {
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { // Initializing the Progress with the last index means that the follower
syncRaftCloneTrackerConfig(&changer->tracker->config, config); // can be probed (with the last index).
int i; //
// TODO(tbg): seems awfully optimistic. Using the first index would be
// better. The general expectation here is that the follower has no log
// at all (and will thus likely need a snapshot), though the app may
// have applied a snapshot out of band before adding the replica (thus
// making the first index the better choice).
.id = id,
.groupId = changer->tracker->pRaft->selfGroupId,
.nextIndex = changer->lastIndex,
.matchIndex = 0,
.state = PROGRESS_STATE_PROBE,
.pendingSnapshotIndex = 0,
.probeSent = false,
.inflights = syncRaftOpenInflights(changer->tracker->maxInflightMsgs),
.isLearner = isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has had a chance to communicate with us.
.recentActive = true,
};
SSyncRaftProgress* pProgress = NULL; syncRaftAddToProgressMap(progressMap, pProgress);
while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) {
syncRaftAddToProgressMap(progressMap, pProgress);
}
return checkAndReturn(config, progressMap);
}
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
if (checkInvariants(config, progressMap) != 0) {
return -1;
}
return 0;
} }
// checkInvariants makes sure that the config and progress are compatible with // checkInvariants makes sure that the config and progress are compatible with
@ -304,7 +308,7 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
} }
// Conversely Learners and Voters doesn't intersect at all. // Conversely Learners and Voters doesn't intersect at all.
SyncNodeId* pNodeId = NULL; pNodeId = NULL;
while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) { while (!syncRaftIterateNodeMap(&config->learners, pNodeId)) {
SyncNodeId nodeId = *pNodeId; SyncNodeId nodeId = *pNodeId;
if (syncRaftJointConfigInIncoming(&config->voters, nodeId)) { if (syncRaftJointConfigInIncoming(&config->voters, nodeId)) {
@ -336,6 +340,31 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
return 0; return 0;
} }
// checkAndCopy copies the tracker's config and progress map (deeply enough for
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
syncRaftCloneTrackerConfig(&changer->tracker->config, config);
int i;
SSyncRaftProgress* pProgress = NULL;
while (!syncRaftIterateProgressMap(&changer->tracker->progressMap, pProgress)) {
syncRaftAddToProgressMap(progressMap, pProgress);
}
return checkAndReturn(config, progressMap);
}
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
if (checkInvariants(config, progressMap) != 0) {
return -1;
}
return 0;
}
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
return !syncRaftJointConfigIsOutgoingEmpty(&config->voters); return !syncRaftJointConfigIsOutgoingEmpty(&config->voters);
} }

View File

@ -86,12 +86,7 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
SSyncRaftNodeMap nodeMap; SSyncRaftNodeMap nodeMap;
syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap);
SyncNodeId *pNodeId = NULL; SyncNodeId *pNodeId = NULL;
while (true) { while (!syncRaftIterateNodeMap(&nodeMap, pNodeId)) {
syncRaftIterateNodeMap(&nodeMap, &pNodeId);
if (pNodeId == NULL || *pNodeId == NULL) {
break;
}
SyncNodeId nodeId = *pNodeId; SyncNodeId nodeId = *pNodeId;
if (nodeId == SYNC_NON_NODE_ID) { if (nodeId == SYNC_NON_NODE_ID) {
continue; continue;

View File

@ -186,7 +186,7 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
pRaft->voteFor = serverState->voteFor; pRaft->voteFor = serverState->voteFor;
} }
static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { static void visitProgressSendAppend(SSyncRaftProgress* progress, void* arg) {
SSyncRaft* pRaft = (SSyncRaft*)arg; SSyncRaft* pRaft = (SSyncRaft*)arg;
if (pRaft->selfId == progress->id) { if (pRaft->selfId == progress->id) {
return; return;
@ -279,7 +279,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
syncRaftLogAppend(pRaft->log, entries, n); syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->selfIndex]); SSyncRaftProgress* progress = syncRaftFindProgressByNodeId(&pRaft->tracker->progressMap, pRaft->selfId);
syncRaftProgressMaybeUpdate(progress, lastIndex); syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
syncRaftMaybeCommit(pRaft); syncRaftMaybeCommit(pRaft);
@ -316,8 +316,8 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID; pRaft->leadTransferee = SYNC_NON_NODE_ID;
} }
static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { static void resetProgress(SSyncRaftProgress* progress, void* arg) {
syncRaftInitProgress(i, (SSyncRaft*)arg, progress); syncRaftResetProgress((SSyncRaft*)arg, progress);
} }
static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
@ -336,7 +336,7 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
abortLeaderTransfer(pRaft); abortLeaderTransfer(pRaft);
syncRaftResetVotes(pRaft->tracker); syncRaftResetVotes(pRaft->tracker);
syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); syncRaftProgressVisit(pRaft->tracker, resetProgress, pRaft);
pRaft->pendingConfigIndex = 0; pRaft->pendingConfigIndex = 0;
pRaft->uncommittedSize = 0; pRaft->uncommittedSize = 0;

View File

@ -15,6 +15,7 @@
#include "sync_raft_node_map.h" #include "sync_raft_node_map.h"
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_progress.h"
void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) { void syncRaftInitNodeMap(SSyncRaftNodeMap* nodeMap) {
nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); nodeMap->nodeIdMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
@ -50,6 +51,17 @@ bool syncRaftIterateNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId *pId) {
return false; return false;
} }
bool syncRaftIsAllInProgressMap(const SSyncRaftNodeMap* nodeMap, const SSyncRaftProgressMap* progressMap) {
SyncNodeId *pId = NULL;
while (!syncRaftIterateNodeMap(nodeMap, pId)) {
if (!syncRaftIsInProgressMap(progressMap, *pId)) {
return false;
}
}
return true;
}
void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) {
syncRaftCopyNodeMap(nodeMap, to); syncRaftCopyNodeMap(nodeMap, to);
} }
@ -63,5 +75,5 @@ void syncRaftRemoveFromNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
} }
int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) { int32_t syncRaftNodeMapSize(const SSyncRaftNodeMap* nodeMap) {
return taosHashGetSize(nodeMap); return taosHashGetSize(nodeMap->nodeIdMap);
} }

View File

@ -25,13 +25,16 @@ static void probeAcked(SSyncRaftProgress* progress);
static void resumeProgress(SSyncRaftProgress* progress); static void resumeProgress(SSyncRaftProgress* progress);
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { void syncRaftResetProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
if (progress->inflights) {
syncRaftCloseInflights(progress->inflights);
}
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs); SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs);
if (inflights == NULL) { if (inflights == NULL) {
return; return;
} }
*progress = (SSyncRaftProgress) { *progress = (SSyncRaftProgress) {
.matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0,
.nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1,
.inflights = inflights, .inflights = inflights,
.isLearner = false, .isLearner = false,
@ -113,7 +116,7 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
} }
SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgress* syncRaftFindProgressByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) {
SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap, &id, sizeof(SyncNodeId*)); SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*));
if (ppProgress == NULL) { if (ppProgress == NULL) {
return NULL; return NULL;
} }
@ -126,9 +129,18 @@ int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SSyncRaftProgres
} }
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
SSyncRaftProgress** ppProgress = (SSyncRaftProgress**)taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*));
if (ppProgress == NULL) {
return;
}
free(*ppProgress);
taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*)); taosHashRemove(progressMap->progressMap, &id, sizeof(SyncNodeId*));
} }
bool syncRaftIsInProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
return taosHashGet(progressMap->progressMap, &id, sizeof(SyncNodeId*)) != NULL;
}
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
} }
@ -170,8 +182,8 @@ void syncRaftCopyProgress(const SSyncRaftProgress* progress, SSyncRaftProgress*
memcpy(out, progress, sizeof(SSyncRaftProgress)); memcpy(out, progress, sizeof(SSyncRaftProgress));
} }
bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgress *pProgress) { bool syncRaftIterateProgressMap(const SSyncRaftProgressMap* progressMap, SSyncRaftProgress *pProgress) {
SSyncRaftProgress **ppProgress = taosHashIterate(nodeMap->nodeIdMap, pProgress); SSyncRaftProgress **ppProgress = taosHashIterate(progressMap->progressMap, pProgress);
if (ppProgress == NULL) { if (ppProgress == NULL) {
return true; return true;
} }
@ -180,6 +192,13 @@ bool syncRaftIterateProgressMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftProgre
return false; return false;
} }
bool syncRaftVisitProgressMap(SSyncRaftProgressMap* progressMap, visitProgressFp fp, void* arg) {
SSyncRaftProgress *pProgress;
while (!syncRaftIterateProgressMap(progressMap, pProgress)) {
fp(pProgress, arg);
}
}
/** /**
* ResetState moves the Progress into the specified State, resetting ProbeSent, * ResetState moves the Progress into the specified State, resetting ProbeSent,
* PendingSnapshot, and Inflights. * PendingSnapshot, and Inflights.

View File

@ -16,7 +16,7 @@
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { SSyncRaftProgressTracker* syncRaftOpenProgressTracker(SSyncRaft* pRaft) {
SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker));
if (tracker == NULL) { if (tracker == NULL) {
return NULL; return NULL;
@ -24,6 +24,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
syncRaftInitNodeMap(&tracker->config.learners); syncRaftInitNodeMap(&tracker->config.learners);
syncRaftInitNodeMap(&tracker->config.learnersNext); syncRaftInitNodeMap(&tracker->config.learnersNext);
tracker->pRaft = pRaft;
return tracker; return tracker;
} }
@ -33,11 +34,7 @@ void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
} }
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
int i; syncRaftVisitProgressMap(&tracker->progressMap, visit, arg);
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
SSyncRaftProgress* progress = &(tracker->progressMap.progress[i]);
visit(i, progress, arg);
}
} }
void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) { void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool grant) {
@ -51,10 +48,20 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, SyncNodeId id, bool g
} }
void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) { void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyncRaftProgressTrackerConfig* to) {
memcpy(to, from, sizeof(SSyncRaftProgressTrackerConfig));
} }
int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
// NB: intentionally allow the empty config. In production we'll never see a
// non-empty config (we prevent it from being created) but we will need to
// be able to *create* an initial config, for example during bootstrap (or
// during tests). Instead of having to hand-code this, we allow
// transitioning from an empty config into any other legal and non-empty
// config.
if (!syncRaftIsAllInProgressMap(&config->voters.incoming, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->voters.outgoing, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->learners, progressMap)) return -1;
if (!syncRaftIsAllInProgressMap(&config->learnersNext, progressMap)) return -1;
return 0; return 0;
} }
@ -67,8 +74,7 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
SSyncRaftProgress* progress; SSyncRaftProgress* progress;
int r, g; int r, g;
for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { while (!syncRaftIterateProgressMap(&tracker->progressMap, progress)) {
progress = &(tracker->progressMap.progress[i]);
if (progress->id == SYNC_NON_NODE_ID) { if (progress->id == SYNC_NON_NODE_ID) {
continue; continue;
} }
@ -91,8 +97,8 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
} }
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) {
syncRaftCopyNodeMap(&cs->voters, &tracker->config.voters.incoming); syncRaftCopyNodeMap(&tracker->config.voters.incoming, &cs->voters);
syncRaftCopyNodeMap(&cs->votersOutgoing, &tracker->config.voters.outgoing); syncRaftCopyNodeMap(&tracker->config.voters.outgoing, &cs->votersOutgoing);
syncRaftCopyNodeMap(&cs->learners, &tracker->config.learners); syncRaftCopyNodeMap(&tracker->config.learners, &cs->learners);
syncRaftCopyNodeMap(&cs->learnersNext, &tracker->config.learnersNext); syncRaftCopyNodeMap(&tracker->config.learnersNext, &cs->learnersNext);
} }

View File

@ -17,24 +17,24 @@
#include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_majority.h"
#include "sync_raft_node_map.h" #include "sync_raft_node_map.h"
/** // VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns // a result indicating whether the vote is pending (i.e. neither a quorum of
* a result indicating whether the vote is pending (i.e. neither a quorum of // yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a // quorum of no has been reached).
* quorum of no has been reached).
**/
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) { ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashObj* votesMap) {
if (config->replica == 0) { int n = syncRaftNodeMapSize(config);
if (n == 0) {
// By convention, the elections on an empty config win. This comes in
// handy with joint quorums because it'll make a half-populated joint
// quorum behave like a majority quorum.
return SYNC_RAFT_VOTE_WON; return SYNC_RAFT_VOTE_WON;
} }
int i, g, r, missing; int i, g, r, missing;
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { i = g = r = missing = 0;
if (config->nodeId[i] == SYNC_NON_NODE_ID) { SyncNodeId* pId = NULL;
continue; while (!syncRaftIterateNodeMap(config, pId)) {
} const ESyncRaftVoteType* pType = taosHashGet(votesMap, pId, sizeof(SyncNodeId*));
const ESyncRaftVoteType* pType = taosHashGet(votesMap, &config->nodeId[i], sizeof(SyncNodeId*));
if (pType == NULL) { if (pType == NULL) {
missing += 1; missing += 1;
continue; continue;
@ -47,11 +47,11 @@ ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, SHashOb
} }
} }
int quorum = config->replica / 2 + 1; int quorum = n / 2 + 1;
if (g >= quorum) { if (g >= quorum) {
return SYNC_RAFT_VOTE_WON; return SYNC_RAFT_VOTE_WON;
} }
if (r + missing >= quorum) { if (g + missing >= quorum) {
return SYNC_RAFT_VOTE_PENDING; return SYNC_RAFT_VOTE_PENDING;
} }

View File

@ -17,6 +17,7 @@
#include "sync_raft_restore.h" #include "sync_raft_restore.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t);
static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in);
// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
@ -82,6 +83,18 @@ out:
return ret; return ret;
} }
static void addToConfChangeSingleArray(SSyncConfChangeSingleArray* out, int* i, const SSyncRaftNodeMap* nodeMap, ESyncRaftConfChangeType t) {
SyncNodeId* pId = NULL;
while (!syncRaftIterateNodeMap(nodeMap, pId)) {
out->changes[*i] = (SSyncConfChangeSingle) {
.type = t,
.nodeId = *pId,
};
*i += 1;
}
}
// toConfChangeSingle translates a conf state into 1) a slice of operations creating // toConfChangeSingle translates a conf state into 1) a slice of operations creating
// first the config that will become the outgoing one, and then the incoming one, and // first the config that will become the outgoing one, and then the incoming one, and
// b) another slice that, when applied to the config resulted from 1), represents the // b) another slice that, when applied to the config resulted from 1), represents the
@ -91,13 +104,16 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA
out->n = in->n = 0; out->n = in->n = 0;
out->n = cs->votersOutgoing.replica; out->n = syncRaftNodeMapSize(&cs->votersOutgoing);
out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * out->n);
if (out->changes == NULL) { if (out->changes == NULL) {
out->n = 0; out->n = 0;
return -1; return -1;
} }
in->n = cs->votersOutgoing.replica + cs->voters.replica + cs->learners.replica + cs->learnersNext.replica; in->n = syncRaftNodeMapSize(&cs->votersOutgoing) +
syncRaftNodeMapSize(&cs->voters) +
syncRaftNodeMapSize(&cs->learners) +
syncRaftNodeMapSize(&cs->learnersNext);
out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n); out->changes = (SSyncConfChangeSingle*)malloc(sizeof(SSyncConfChangeSingle) * in->n);
if (in->changes == NULL) { if (in->changes == NULL) {
in->n = 0; in->n = 0;
@ -132,50 +148,24 @@ static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleA
// //
// as desired. // as desired.
for (i = 0; i < cs->votersOutgoing.replica; ++i) { // If there are outgoing voters, first add them one by one so that the
// If there are outgoing voters, first add them one by one so that the // (non-joint) config has them all.
// (non-joint) config has them all. i = 0;
out->changes[i] = (SSyncConfChangeSingle) { addToConfChangeSingleArray(out, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_AddNode);
.type = SYNC_RAFT_Conf_AddNode, assert(i == out->n);
.nodeId = cs->votersOutgoing.nodeId[i],
};
}
// We're done constructing the outgoing slice, now on to the incoming one // We're done constructing the outgoing slice, now on to the incoming one
// (which will apply on top of the config created by the outgoing slice). // (which will apply on top of the config created by the outgoing slice).
i = 0;
// First, we'll remove all of the outgoing voters. // First, we'll remove all of the outgoing voters.
int j = 0; addToConfChangeSingleArray(in, &i, &cs->votersOutgoing, SYNC_RAFT_Conf_RemoveNode);
for (i = 0; i < cs->votersOutgoing.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_RemoveNode,
.nodeId = cs->votersOutgoing.nodeId[i],
};
j += 1;
}
// Then we'll add the incoming voters and learners. // Then we'll add the incoming voters and learners.
for (i = 0; i < cs->voters.replica; ++i) { addToConfChangeSingleArray(in, &i, &cs->voters, SYNC_RAFT_Conf_AddNode);
in->changes[j] = (SSyncConfChangeSingle) { addToConfChangeSingleArray(in, &i, &cs->learners, SYNC_RAFT_Conf_AddLearnerNode);
.type = SYNC_RAFT_Conf_AddNode, addToConfChangeSingleArray(in, &i, &cs->learnersNext, SYNC_RAFT_Conf_AddLearnerNode);
.nodeId = cs->voters.nodeId[i], assert(i == in->n);
};
j += 1;
}
for (i = 0; i < cs->learners.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_AddLearnerNode,
.nodeId = cs->learners.nodeId[i],
};
j += 1;
}
// Same for LearnersNext; these are nodes we want to be learners but which
// are currently voters in the outgoing config.
for (i = 0; i < cs->learnersNext.replica; ++i) {
in->changes[j] = (SSyncConfChangeSingle) {
.type = SYNC_RAFT_Conf_AddLearnerNode,
.nodeId = cs->learnersNext.nodeId[i],
};
j += 1;
}
return 0; return 0;
} }