[TD-10645][raft]<feature>add restore process
This commit is contained in:
parent
5833c9c076
commit
aa438e7a53
|
@ -207,6 +207,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
|
||||||
return progress->recentActive;
|
return progress->recentActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
|
||||||
|
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
|
||||||
|
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return true if progress's log is up-todate
|
* return true if progress's log is up-todate
|
||||||
**/
|
**/
|
||||||
|
|
|
@ -22,7 +22,7 @@ typedef enum ESyncRaftConfChangeType {
|
||||||
SYNC_RAFT_Conf_AddNode = 0,
|
SYNC_RAFT_Conf_AddNode = 0,
|
||||||
SYNC_RAFT_Conf_RemoveNode = 1,
|
SYNC_RAFT_Conf_RemoveNode = 1,
|
||||||
SYNC_RAFT_Conf_UpdateNode = 2,
|
SYNC_RAFT_Conf_UpdateNode = 2,
|
||||||
SYNC_RAFT_Conf_AddLearnerNode = 2,
|
SYNC_RAFT_Conf_AddLearnerNode = 3,
|
||||||
} ESyncRaftConfChangeType;
|
} ESyncRaftConfChangeType;
|
||||||
|
|
||||||
// ConfChangeSingle is an individual configuration change operation. Multiple
|
// ConfChangeSingle is an individual configuration change operation. Multiple
|
||||||
|
|
|
@ -55,6 +55,10 @@ static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJoin
|
||||||
return syncRaftJointConfigInCluster(&config->incoming, id);
|
return syncRaftJointConfigInCluster(&config->incoming, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
|
||||||
|
|
||||||
|
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
|
||||||
|
|
||||||
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) {
|
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) {
|
||||||
return &config->incoming;
|
return &config->incoming;
|
||||||
}
|
}
|
||||||
|
@ -63,4 +67,8 @@ static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncR
|
||||||
return &config->outgoing;
|
return &config->outgoing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) {
|
||||||
|
memset(&config->outgoing, 0, sizeof(SSyncCluster));
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
|
||||||
|
|
|
@ -23,11 +23,24 @@ static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgr
|
||||||
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
|
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
|
||||||
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
|
static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
|
||||||
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config);
|
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config);
|
||||||
static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config,
|
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css);
|
SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css);
|
||||||
static int symDiff(const SSyncCluster* l, const SSyncCluster* r);
|
static int symDiff(const SSyncCluster* l, const SSyncCluster* r);
|
||||||
|
|
||||||
// Simple carries out a series of configuration changes that (in aggregate)
|
|
||||||
|
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner);
|
||||||
|
|
||||||
|
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id);
|
||||||
|
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id);
|
||||||
|
|
||||||
|
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id);
|
||||||
|
// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate)
|
||||||
// mutates the incoming majority config Voters[0] by at most one. This method
|
// mutates the incoming majority config Voters[0] by at most one. This method
|
||||||
// will return an error if that is not the case, if the resulting quorum is
|
// will return an error if that is not the case, if the resulting quorum is
|
||||||
// zero, or if the configuration is in a joint state (i.e. if there is an
|
// zero, or if the configuration is in a joint state (i.e. if there is an
|
||||||
|
@ -42,6 +55,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasJointConfig(config)) {
|
if (hasJointConfig(config)) {
|
||||||
|
syncError("can't apply simple config change in joint config");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +89,28 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange
|
||||||
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
|
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
|
||||||
int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
|
int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
|
||||||
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
|
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
ret = checkAndCopy(changer, config, progressMap);
|
||||||
|
if (ret != 0) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
if (hasJointConfig(config)) {
|
||||||
|
syncError("config is already joint");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(config->voters.incoming.replica == 0) {
|
||||||
|
// We allow adding nodes to an empty config for convenience (testing and
|
||||||
|
// bootstrap), but you can't enter a joint state.
|
||||||
|
syncError("can't make a zero-voter config joint");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the outgoing config.
|
||||||
|
syncRaftJointConfigClearOutgoing(config);
|
||||||
|
|
||||||
|
// Copy incoming to outgoing.
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkAndCopy copies the tracker's config and progress map (deeply enough for
|
// checkAndCopy copies the tracker's config and progress map (deeply enough for
|
||||||
|
@ -156,8 +191,8 @@ static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) {
|
||||||
return config->voters.outgoing.replica > 0;
|
return config->voters.outgoing.replica > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config,
|
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) {
|
SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) {
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < css->n; ++i) {
|
for (i = 0; i < css->n; ++i) {
|
||||||
|
@ -168,11 +203,22 @@ static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTracker
|
||||||
|
|
||||||
ESyncRaftConfChangeType type = cs->type;
|
ESyncRaftConfChangeType type = cs->type;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
case SYNC_RAFT_Conf_AddNode:
|
||||||
|
makeVoter(changer, config, progressMap, cs->nodeId);
|
||||||
|
break;
|
||||||
|
case SYNC_RAFT_Conf_AddLearnerNode:
|
||||||
|
makeLearner(changer, config, progressMap, cs->nodeId);
|
||||||
|
break;
|
||||||
|
case SYNC_RAFT_Conf_RemoveNode:
|
||||||
|
remove(changer, config, progressMap, cs->nodeId);
|
||||||
|
break;
|
||||||
|
case SYNC_RAFT_Conf_UpdateNode:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config->voters.incoming.replica == 0) {
|
if (config->voters.incoming.replica == 0) {
|
||||||
|
syncError("removed all voters");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,4 +255,124 @@ static int symDiff(const SSyncCluster* l, const SSyncCluster* r) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return n;
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
|
||||||
|
static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (nodeMap->nodeId[i] == id) {
|
||||||
|
nodeMap->replica -= 1;
|
||||||
|
nodeMap->nodeId[i] = SYNC_NON_NODE_ID;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(nodeMap->replica >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// nilAwareAdd populates a map entry, creating the map if necessary.
|
||||||
|
static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) {
|
||||||
|
int i, j;
|
||||||
|
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (nodeMap->nodeId[i] == id) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) {
|
||||||
|
j = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(j != -1);
|
||||||
|
nodeMap->nodeId[j] = id;
|
||||||
|
nodeMap->replica += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeVoter adds or promotes the given ID to be a voter in the incoming
|
||||||
|
// majority config.
|
||||||
|
static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
||||||
|
if (i == -1) {
|
||||||
|
initProgress(changer, config, progressMap, id, false);
|
||||||
|
i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(i != -1);
|
||||||
|
SSyncRaftProgress* progress = &(progressMap->progress[i]);
|
||||||
|
|
||||||
|
progress->isLearner = false;
|
||||||
|
nilAwareDelete(&config->learners, id);
|
||||||
|
nilAwareDelete(&config->learnersNext, id);
|
||||||
|
syncRaftJointConfigAddToIncoming(config, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeLearner makes the given ID a learner or stages it to be a learner once
|
||||||
|
// an active joint configuration is exited.
|
||||||
|
//
|
||||||
|
// The former happens when the peer is not a part of the outgoing config, in
|
||||||
|
// which case we either add a new learner or demote a voter in the incoming
|
||||||
|
// config.
|
||||||
|
//
|
||||||
|
// The latter case occurs when the configuration is joint and the peer is a
|
||||||
|
// voter in the outgoing config. In that case, we do not want to add the peer
|
||||||
|
// as a learner because then we'd have to track a peer as a voter and learner
|
||||||
|
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
|
||||||
|
// be added to Learners the moment the outgoing config is removed by
|
||||||
|
// LeaveJoint().
|
||||||
|
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
||||||
|
if (i == -1) {
|
||||||
|
initProgress(changer, config, progressMap, id, false);
|
||||||
|
i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(i != -1);
|
||||||
|
SSyncRaftProgress* progress = &(progressMap->progress[i]);
|
||||||
|
if (progress->isLearner) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Remove any existing voter in the incoming config...
|
||||||
|
remove(changer, config, progressMap, id);
|
||||||
|
|
||||||
|
// ... but save the Progress.
|
||||||
|
syncRaftAddToProgressMap(progressMap, id);
|
||||||
|
|
||||||
|
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
|
||||||
|
// if the peer is still tracked as a voter in the outgoing config. It will
|
||||||
|
// be turned into a learner in LeaveJoint().
|
||||||
|
//
|
||||||
|
// Otherwise, add a regular learner right away.
|
||||||
|
bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id);
|
||||||
|
if (inOutgoing) {
|
||||||
|
nilAwareAdd(&config->learnersNext, id);
|
||||||
|
} else {
|
||||||
|
nilAwareAdd(&config->learners, id);
|
||||||
|
progress->isLearner = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove this peer as a voter or learner from the incoming config.
|
||||||
|
static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
|
||||||
|
SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
|
||||||
|
if (i == -1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
syncRaftJointConfigRemoveFromIncoming(&config->voters, id);
|
||||||
|
nilAwareDelete(&config->learners, id);
|
||||||
|
nilAwareDelete(&config->learnersNext, id);
|
||||||
|
|
||||||
|
// If the peer is still a voter in the outgoing config, keep the Progress.
|
||||||
|
bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id);
|
||||||
|
if (!inOutgoing) {
|
||||||
|
syncRaftRemoveFromProgressMap(progressMap, id);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -112,6 +112,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (progressMap->progress[i].id == id) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i, j;
|
||||||
|
|
||||||
|
for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (progressMap->progress[i].id == id) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) {
|
||||||
|
j = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(j != -1);
|
||||||
|
|
||||||
|
progressMap->progress[i].id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) {
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (progressMap->progress[i].id == id) {
|
||||||
|
progressMap->progress[i].id = SYNC_NON_NODE_ID;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
||||||
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
|
return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex;
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,3 +39,34 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E
|
||||||
// One side won, the other one is pending, so the whole outcome is.
|
// One side won, the other one is pending, so the whole outcome is.
|
||||||
return SYNC_RAFT_VOTE_PENDING;
|
return SYNC_RAFT_VOTE_PENDING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||||
|
int i, min;
|
||||||
|
|
||||||
|
for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (config->incoming.nodeInfo[i].nodeId == id) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) {
|
||||||
|
min = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(min != -1);
|
||||||
|
config->incoming.nodeInfo[min].nodeId = id;
|
||||||
|
config->incoming.replica += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if (config->incoming.nodeInfo[i].nodeId == id) {
|
||||||
|
config->incoming.replica -= 1;
|
||||||
|
config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(config->incoming.replica >= 0);
|
||||||
|
}
|
Loading…
Reference in New Issue