From aa438e7a535a2d5624baea3e6169ac6edd20c29a Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 15 Nov 2021 10:56:22 +0800 Subject: [PATCH] [TD-10645][raft]add restore process --- source/libs/sync/inc/sync_raft_progress.h | 6 + source/libs/sync/inc/sync_raft_proto.h | 2 +- source/libs/sync/inc/sync_raft_quorum_joint.h | 8 + .../libs/sync/src/sync_raft_config_change.c | 178 +++++++++++++++++- source/libs/sync/src/sync_raft_progress.c | 38 ++++ source/libs/sync/src/sync_raft_quorum_joint.c | 31 +++ 6 files changed, 256 insertions(+), 7 deletions(-) diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index 98edfc5e4f..c733a8ea74 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -207,6 +207,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres return progress->recentActive; } +int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id); + +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + +void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id); + /** * return true if progress's log is up-todate **/ diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h index 49d706875f..c131e91139 100644 --- a/source/libs/sync/inc/sync_raft_proto.h +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -22,7 +22,7 @@ typedef enum ESyncRaftConfChangeType { SYNC_RAFT_Conf_AddNode = 0, SYNC_RAFT_Conf_RemoveNode = 1, SYNC_RAFT_Conf_UpdateNode = 2, - SYNC_RAFT_Conf_AddLearnerNode = 2, + SYNC_RAFT_Conf_AddLearnerNode = 3, } ESyncRaftConfChangeType; // ConfChangeSingle is an individual configuration change operation. Multiple diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index cdde6d24f7..e2c2fd89b2 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -55,6 +55,10 @@ static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJoin return syncRaftJointConfigInCluster(&config->incoming, id); } +void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); + +void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); + static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { return &config->incoming; } @@ -63,4 +67,8 @@ static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncR return &config->outgoing; } +static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { + memset(&config->outgoing, 0, sizeof(SSyncCluster)); +} + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index e9cad376aa..7bf409fba0 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -23,11 +23,24 @@ static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgr static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); -static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, - const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); +static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); static int symDiff(const SSyncCluster* l, const SSyncCluster* r); -// Simple carries out a series of configuration changes that (in aggregate) + +static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner); + +static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id); +static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id); + +static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id); +// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) // mutates the incoming majority config Voters[0] by at most one. This method // will return an error if that is not the case, if the resulting quorum is // zero, or if the configuration is in a joint state (i.e. if there is an @@ -42,6 +55,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange } if (hasJointConfig(config)) { + syncError("can't apply simple config change in joint config"); return -1; } @@ -75,7 +89,28 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + int ret; + ret = checkAndCopy(changer, config, progressMap); + if (ret != 0) { + return ret; + } + if (hasJointConfig(config)) { + syncError("config is already joint"); + return -1; + } + + if(config->voters.incoming.replica == 0) { + // We allow adding nodes to an empty config for convenience (testing and + // bootstrap), but you can't enter a joint state. + syncError("can't make a zero-voter config joint"); + return -1; + } + + // Clear the outgoing config. + syncRaftJointConfigClearOutgoing(config); + + // Copy incoming to outgoing. } // checkAndCopy copies the tracker's config and progress map (deeply enough for @@ -156,8 +191,8 @@ static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config) { return config->voters.outgoing.replica > 0; } -static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTrackerConfig* config, - const SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { +static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css) { int i; for (i = 0; i < css->n; ++i) { @@ -168,11 +203,22 @@ static int applyConfig(SSyncRaftChanger* changer, const SSyncRaftProgressTracker ESyncRaftConfChangeType type = cs->type; switch (type) { - + case SYNC_RAFT_Conf_AddNode: + makeVoter(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_AddLearnerNode: + makeLearner(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_RemoveNode: + remove(changer, config, progressMap, cs->nodeId); + break; + case SYNC_RAFT_Conf_UpdateNode: + break; } } if (config->voters.incoming.replica == 0) { + syncError("removed all voters"); return -1; } @@ -209,4 +255,124 @@ static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { } return n; +} + +static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner) { + +} + +// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after. +static void nilAwareDelete(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeMap->nodeId[i] == id) { + nodeMap->replica -= 1; + nodeMap->nodeId[i] = SYNC_NON_NODE_ID; + break; + } + } + + assert(nodeMap->replica >= 0); +} + +// nilAwareAdd populates a map entry, creating the map if necessary. +static void nilAwareAdd(SSyncRaftNodeMap* nodeMap, SyncNodeId id) { + int i, j; + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + if (nodeMap->nodeId[i] == id) { + return; + } + if (j == -1 && nodeMap->nodeId[i] == SYNC_NON_NODE_ID) { + j = i; + } + } + + assert(j != -1); + nodeMap->nodeId[j] = id; + nodeMap->replica += 1; +} + +// makeVoter adds or promotes the given ID to be a voter in the incoming +// majority config. +static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + initProgress(changer, config, progressMap, id, false); + i = syncRaftFindProgressIndexByNodeId(progressMap, id); + } + + assert(i != -1); + SSyncRaftProgress* progress = &(progressMap->progress[i]); + + progress->isLearner = false; + nilAwareDelete(&config->learners, id); + nilAwareDelete(&config->learnersNext, id); + syncRaftJointConfigAddToIncoming(config, id); +} + +// makeLearner makes the given ID a learner or stages it to be a learner once +// an active joint configuration is exited. +// +// The former happens when the peer is not a part of the outgoing config, in +// which case we either add a new learner or demote a voter in the incoming +// config. +// +// The latter case occurs when the configuration is joint and the peer is a +// voter in the outgoing config. In that case, we do not want to add the peer +// as a learner because then we'd have to track a peer as a voter and learner +// simultaneously. Instead, we add the learner to LearnersNext, so that it will +// be added to Learners the moment the outgoing config is removed by +// LeaveJoint(). +static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + initProgress(changer, config, progressMap, id, false); + i = syncRaftFindProgressIndexByNodeId(progressMap, id); + } + + assert(i != -1); + SSyncRaftProgress* progress = &(progressMap->progress[i]); + if (progress->isLearner) { + return; + } + // Remove any existing voter in the incoming config... + remove(changer, config, progressMap, id); + + // ... but save the Progress. + syncRaftAddToProgressMap(progressMap, id); + + // Use LearnersNext if we can't add the learner to Learners directly, i.e. + // if the peer is still tracked as a voter in the outgoing config. It will + // be turned into a learner in LeaveJoint(). + // + // Otherwise, add a regular learner right away. + bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + if (inOutgoing) { + nilAwareAdd(&config->learnersNext, id); + } else { + nilAwareAdd(&config->learners, id); + progress->isLearner = true; + } +} + +// remove this peer as a voter or learner from the incoming config. +static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, + SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i = syncRaftFindProgressIndexByNodeId(progressMap, id); + if (i == -1) { + return; + } + + syncRaftJointConfigRemoveFromIncoming(&config->voters, id); + nilAwareDelete(&config->learners, id); + nilAwareDelete(&config->learnersNext, id); + + // If the peer is still a voter in the outgoing config, keep the Progress. + bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + if (!inOutgoing) { + syncRaftRemoveFromProgressMap(progressMap, id); + } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c index a3426aa999..a53aae93d0 100644 --- a/source/libs/sync/src/sync_raft_progress.c +++ b/source/libs/sync/src/sync_raft_progress.c @@ -112,6 +112,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { } } +int syncRaftFindProgressIndexByNodeId(const SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + return i; + } + } + return -1; +} + +int syncRaftAddToProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i, j; + + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + return i; + } + if (j == -1 && progressMap->progress[i].id == SYNC_NON_NODE_ID) { + j = i; + } + } + + assert(j != -1); + + progressMap->progress[i].id = id; +} + +void syncRaftRemoveFromProgressMap(SSyncRaftProgressMap* progressMap, SyncNodeId id) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (progressMap->progress[i].id == id) { + progressMap->progress[i].id = SYNC_NON_NODE_ID; + break; + } + } +} + bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index f8b5463ad8..9a8e9eb7ba 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -39,3 +39,34 @@ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const E // One side won, the other one is pending, so the whole outcome is. return SYNC_RAFT_VOTE_PENDING; } + +void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + int i, min; + + for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { + if (config->incoming.nodeInfo[i].nodeId == id) { + return; + } + if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + min = i; + } + } + + assert(min != -1); + config->incoming.nodeInfo[min].nodeId = id; + config->incoming.replica += 1; +} + +void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (config->incoming.nodeInfo[i].nodeId == id) { + config->incoming.replica -= 1; + config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID; + break; + } + } + + assert(config->incoming.replica >= 0); +} \ No newline at end of file