From a936a7a584f01b7dcf98ec98e7591f5df167e74a Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 16 Nov 2021 15:41:08 +0800 Subject: [PATCH] [TD-10645][raft]add node map --- source/libs/sync/inc/sync_raft_node_map.h | 36 ++++++++++ source/libs/sync/inc/sync_raft_proto.h | 1 + source/libs/sync/inc/sync_raft_quorum_joint.h | 7 +- source/libs/sync/inc/sync_type.h | 7 +- source/libs/sync/src/raft.c | 8 ++- .../sync/src/raft_handle_election_message.c | 18 ----- source/libs/sync/src/raft_replication.c | 6 +- .../libs/sync/src/sync_raft_config_change.c | 4 +- .../{raft_election.c => sync_raft_election.c} | 55 ++++++++++++---- source/libs/sync/src/sync_raft_impl.c | 5 +- source/libs/sync/src/sync_raft_node_map.c | 66 +++++++++++++++++++ .../sync/src/sync_raft_progress_tracker.c | 4 ++ source/libs/sync/src/sync_raft_quorum_joint.c | 14 ++-- .../libs/sync/src/sync_raft_quorum_majority.c | 1 + 14 files changed, 174 insertions(+), 58 deletions(-) create mode 100644 source/libs/sync/inc/sync_raft_node_map.h rename source/libs/sync/src/{raft_election.c => sync_raft_election.c} (56%) create mode 100644 source/libs/sync/src/sync_raft_node_map.c diff --git a/source/libs/sync/inc/sync_raft_node_map.h b/source/libs/sync/inc/sync_raft_node_map.h new file mode 100644 index 0000000000..bfb5f68489 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_node_map.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H +#define _TD_LIBS_SYNC_RAFT_NODE_MAP_H + +#include "sync.h" +#include "sync_type.h" + +// TODO: is TSDB_MAX_REPLICA enough? +struct SSyncRaftNodeMap { + int32_t replica; + SyncNodeId nodeId[TSDB_MAX_REPLICA]; +}; + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); + +void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h index c131e91139..dd153e8dad 100644 --- a/source/libs/sync/inc/sync_raft_proto.h +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -17,6 +17,7 @@ #define TD_SYNC_RAFT_PROTO_H #include "sync_type.h" +#include "sync_raft_node_map.h" typedef enum ESyncRaftConfChangeType { SYNC_RAFT_Conf_AddNode = 0, diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 0ef002fe1a..0637a9be9a 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -19,6 +19,7 @@ #include "taosdef.h" #include "sync.h" #include "sync_type.h" +#include "sync_raft_node_map.h" /** * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) @@ -36,8 +37,6 @@ typedef struct SSyncRaftQuorumJointConfig { **/ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); - static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { return syncRaftIsInNodeMap(&config->outgoing, id); } @@ -59,7 +58,9 @@ static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SS } static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { - memset(&config->outgoing, 0, sizeof(SSyncCluster)); + memset(&config->outgoing, 0, sizeof(SSyncRaftNodeMap)); } +void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap); + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index cb938c7319..6d29e019cc 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -32,6 +32,8 @@ typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct SSyncRaftProgressMap SSyncRaftProgressMap; typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; +typedef struct SSyncRaftNodeMap SSyncRaftNodeMap; + typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; typedef struct SSyncRaftChanger SSyncRaftChanger; @@ -68,11 +70,6 @@ typedef struct SSyncClusterConfig { const SSyncCluster* cluster; } SSyncClusterConfig; -typedef struct { - int32_t replica; - SyncNodeId nodeId[TSDB_MAX_REPLICA]; -} SSyncRaftNodeMap; - typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 23351277c4..39e0377545 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -140,6 +140,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int32_t syncRaftTick(SSyncRaft* pRaft) { pRaft->currentTick += 1; + pRaft->tickFp(pRaft); return 0; } @@ -212,8 +213,11 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // If the the leadTransferee was removed or demoted, abort the leadership transfer. SyncNodeId leadTransferee = pRaft->leadTransferee; - if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { - abortLeaderTransfer(pRaft); + if (leadTransferee != SYNC_NON_NODE_ID) { + if (!syncRaftIsInNodeMap(&pRaft->tracker->config.voters.incoming, leadTransferee) && + !syncRaftIsInNodeMap(&pRaft->tracker->config.voters.outgoing, leadTransferee)) { + abortLeaderTransfer(pRaft); + } } } } diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c index e536fc67c0..a58c8ba5cf 100644 --- a/source/libs/sync/src/raft_handle_election_message.c +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -19,24 +19,6 @@ #include "raft_message.h" int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - if (pRaft->state == TAOS_SYNC_STATE_LEADER) { - syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - - if (!syncRaftIsPromotable(pRaft)) { - syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - // if there is pending uncommitted config,cannot start election - if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { - syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", - pRaft->selfGroupId, pRaft->selfId, pRaft->term); - return 0; - } - - syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - if (pRaft->preVote) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); } else { diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index c19fcd1e68..74f40179c6 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -22,7 +22,7 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry); + SSyncRaftEntry *entries, int nEntry); // syncRaftReplicate sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty @@ -68,7 +68,7 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry) { + SSyncRaftEntry *entries, int nEntry) { SyncIndex lastIndex; SyncTerm logTerm = prevTerm; SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); @@ -87,7 +87,7 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, case PROGRESS_STATE_REPLICATE: lastIndex = entries[nEntry - 1].index; syncRaftProgressOptimisticNextIndex(progress, lastIndex); - syncRaftInflightAdd(&progress->inflights, lastIndex); + syncRaftInflightAdd(progress->inflights, lastIndex); break; case PROGRESS_STATE_PROBE: progress->probeSent = true; diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 4e7f2190ea..288fdc465e 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -359,7 +359,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi // be turned into a learner in LeaveJoint(). // // Otherwise, add a regular learner right away. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + bool inOutgoing = syncRaftIsInNodeMap(&config->voters.outgoing, id); if (inOutgoing) { nilAwareAdd(&config->learnersNext, id); } else { @@ -381,7 +381,7 @@ static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf nilAwareDelete(&config->learnersNext, id); // If the peer is still a voter in the outgoing config, keep the Progress. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + bool inOutgoing = syncRaftIsInNodeMap(&config->voters.outgoing, id); if (!inOutgoing) { syncRaftRemoveFromProgressMap(progressMap, id); } diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/sync_raft_election.c similarity index 56% rename from source/libs/sync/src/raft_election.c rename to source/libs/sync/src/sync_raft_election.c index eb310c31ec..74c3e09dae 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -17,15 +17,42 @@ #include "raft.h" #include "raft_log.h" #include "raft_message.h" +#include "sync_raft_progress_tracker.h" + +static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType); void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { - SyncTerm term; + if (pRaft->state == TAOS_SYNC_STATE_LEADER) { + syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); + return; + } + + if (!syncRaftIsPromotable(pRaft)) { + syncWarn("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); + return; + } + + // if there is pending uncommitted config,cannot start election + if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { + syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", + pRaft->selfGroupId, pRaft->selfId, pRaft->term); + return; + } + + syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + campaign(pRaft, cType); +} + +// campaign transitions the raft instance to candidate state. This must only be +// called after verifying that this is a legitimate transition. +static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { bool preVote; - ESyncRaftMessageType voteMsgType; + SyncTerm term; if (syncRaftIsPromotable(pRaft)) { syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); - return 0; + return; } if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { @@ -35,7 +62,6 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { term = pRaft->term + 1; } else { syncRaftBecomeCandidate(pRaft); - voteMsgType = RAFT_MSG_VOTE; term = pRaft->term; preVote = false; } @@ -43,10 +69,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int quorum = syncRaftQuorum(pRaft); ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); if (result == SYNC_RAFT_VOTE_WON) { - /** - * We won the election after voting for ourselves (which must mean that - * this is a single-node cluster). Advance to the next state. - **/ + // We won the election after voting for ourselves (which must mean that + // this is a single-node cluster). Advance to the next state. if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { @@ -59,12 +83,17 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int i; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (i == pRaft->cluster.selfIndex) { + SSyncRaftNodeMap nodeMap; + syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId nodeId = nodeMap.nodeId[i]; + if (nodeId == SYNC_NON_NODE_ID) { continue; } - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + if (nodeId == pRaft->selfId) { + continue; + } SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, term, cType, lastIndex, lastTerm); @@ -72,9 +101,9 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { continue; } - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent vote request to %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, - lastIndex, voteMsgType, nodeId, pRaft->term); + lastIndex, nodeId, pRaft->term); pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); } diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index d65e03c64f..ab2db10230 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -234,9 +234,7 @@ static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } -/** - * tickElection is run by followers and candidates per tick. - **/ +// tickElection is run by followers and candidates after r.electionTimeout. static void tickElection(SSyncRaft* pRaft) { pRaft->electionElapsed += 1; @@ -254,6 +252,7 @@ static void tickElection(SSyncRaft* pRaft) { syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); } +// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. static void tickHeartbeat(SSyncRaft* pRaft) { } diff --git a/source/libs/sync/src/sync_raft_node_map.c b/source/libs/sync/src/sync_raft_node_map.c new file mode 100644 index 0000000000..e13c808075 --- /dev/null +++ b/source/libs/sync/src/sync_raft_node_map.c @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_node_map.h" +#include "sync_type.h" + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeId == nodeMap->nodeId[i]) { + return true; + } + } + + return false; +} + +void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { + memcpy(to, nodeMap, sizeof(SSyncRaftNodeMap)); +} + +void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { + int i, j, m; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId id = nodeMap->nodeId[i]; + if (id == SYNC_NON_NODE_ID) { + continue; + } + + syncRaftAddToNodeMap(to, id); + } +} + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + assert(nodeMap->replica < TSDB_MAX_REPLICA); + + int i, j; + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId id = nodeMap->nodeId[i]; + if (id == SYNC_NON_NODE_ID) { + if (j == -1) j = i; + continue; + } + if (id == nodeId) { + return; + } + } + + assert(j != -1); + nodeMap->nodeId[j] = nodeId; + nodeMap->replica += 1; +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index ea7f1ae4f5..f43414127d 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -49,6 +49,10 @@ void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyn } +int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + return 0; +} + /** * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index fa663b6fc3..8a99574d68 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "sync_raft_node_map.h" #include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_quorum.h" @@ -71,15 +72,10 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S assert(config->incoming.replica >= 0); } +void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { + int i, j, m; -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { - int i; + syncRaftCopyNodeMap(&config->incoming, nodeMap); - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (nodeId == nodeMap->nodeId[i]) { - return true; - } - } - - return false; + syncRaftUnionNodeMap(&config->outgoing, nodeMap); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 73eb378e09..8ff5752b97 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -15,6 +15,7 @@ #include "sync_raft_quorum.h" #include "sync_raft_quorum_majority.h" +#include "sync_raft_node_map.h" /** * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns