diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index b267c46f35..8adb0b4736 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -112,4 +112,6 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 798e3d5eca..0ef002fe1a 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -36,34 +36,25 @@ typedef struct SSyncRaftQuorumJointConfig { **/ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); -static FORCE_INLINE bool syncRaftJointConfigInCluster(const SSyncCluster* cluster, SyncNodeId id) { - int i; - for (i = 0; i < cluster->replica; ++i) { - if (cluster->nodeInfo[i].nodeId == id) { - return true; - } - } - - return false; -} +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - return syncRaftJointConfigInCluster(&config->outgoing, id); + return syncRaftIsInNodeMap(&config->outgoing, id); } static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { - return syncRaftJointConfigInCluster(&config->incoming, id); + return syncRaftIsInNodeMap(&config->incoming, id); } void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); -static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { +static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { return &config->incoming; } -static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { +static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { return &config->outgoing; } diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h index 8f148873e3..0512a4dc87 100644 --- a/source/libs/sync/inc/sync_raft_quorum_majority.h +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -26,6 +26,6 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes); +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes); #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index f8c3d1b0d4..23351277c4 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -30,6 +30,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs); +static void abortLeaderTransfer(SSyncRaft* pRaft); + static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -109,38 +111,6 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); - -#if 0 - - - - - - // restore fsm state from snapshot index + 1 until commitIndex - ++initIndex; - while (initIndex <= serverState.commitIndex) { - limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1); - - if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) { - return -1; - } - assert(limit == nBuf); - - for (i = 0; i < limit; ++i) { - fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL); - free(buffer[i].data); - } - initIndex += nBuf; - } - assert(initIndex == serverState.commitIndex); - - //pRaft->heartbeatTimeoutTick = 1; - - syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); - - pRaft->selfIndex = pRaft->cluster.selfIndex; -#endif - syncInfo("[%d:%d] restore vgid %d state: snapshot index success", pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); return 0; @@ -242,12 +212,16 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // If the the leadTransferee was removed or demoted, abort the leadership transfer. SyncNodeId leadTransferee = pRaft->leadTransferee; - if (leadTransferee != SYNC_NON_NODE_ID) { - + if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { + abortLeaderTransfer(pRaft); } } } +static void abortLeaderTransfer(SSyncRaft* pRaft) { + pRaft->leadTransferee = SYNC_NON_NODE_ID; +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index b80562ffa3..4e7f2190ea 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -26,8 +26,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); -static int symDiff(const SSyncCluster* l, const SSyncCluster* r); +static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r); static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner); @@ -237,27 +237,27 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig // symdiff returns the count of the symmetric difference between the sets of // uint64s, i.e. len( (l - r) \union (r - l)). -static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { +static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) { int n; int i; int j0, j1; - const SSyncCluster* pairs[2][2] = { + const SSyncRaftNodeMap* pairs[2][2] = { {l, r}, // count elems in l but not in r {r, l}, // count elems in r but not in l }; for (n = 0, i = 0; i < 2; ++i) { - const SSyncCluster** pp = pairs[i]; + const SSyncRaftNodeMap** pp = pairs[i]; - const SSyncCluster* p0 = pp[0]; - const SSyncCluster* p1 = pp[1]; - for (j0 = 0; j0 < p0->replica; ++j0) { - SyncNodeId id = p0->nodeInfo[j0].nodeId; + const SSyncRaftNodeMap* p0 = pp[0]; + const SSyncRaftNodeMap* p1 = pp[1]; + for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) { + SyncNodeId id = p0->nodeId[j0]; if (id == SYNC_NON_NODE_ID) { continue; } for (j1 = 0; j1 < p1->replica; ++j1) { - if (p1->nodeInfo[j1].nodeId != SYNC_NON_NODE_ID && p1->nodeInfo[j1].nodeId != id) { + if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) { n+=1; } } diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index ba09291682..d65e03c64f 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -185,8 +185,17 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { pRaft->voteFor = serverState->voteFor; } -void syncRaftBroadcastAppend(SSyncRaft* pRaft) { +static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) { + SSyncRaft* pRaft = (SSyncRaft*)arg; + if (pRaft->selfId == progress->id) { + return; + } + syncRaftReplicate(arg, progress, true); +} + +void syncRaftBroadcastAppend(SSyncRaft* pRaft) { + syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } static int convertClear(SSyncRaft* pRaft) { @@ -279,6 +288,7 @@ bool syncRaftMaybeCommit(SSyncRaft* pRaft) { * trigger I/O requests for newly appended log entries or heartbeats. **/ static int triggerAll(SSyncRaft* pRaft) { + #if 0 assert(pRaft->state == TAOS_SYNC_STATE_LEADER); int i; @@ -287,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) { continue; } - syncRaftReplicate(pRaft, i); + syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true); } + #endif + return 0; } static void abortLeaderTransfer(SSyncRaft* pRaft) { diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index 9a8e9eb7ba..fa663b6fc3 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -44,16 +44,16 @@ void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNo int i, min; for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeInfo[i].nodeId == id) { + if (config->incoming.nodeId[i] == id) { return; } - if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) { min = i; } } assert(min != -1); - config->incoming.nodeInfo[min].nodeId = id; + config->incoming.nodeId[min] = id; config->incoming.replica += 1; } @@ -61,12 +61,25 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S int i; for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->incoming.nodeInfo[i].nodeId == id) { + if (config->incoming.nodeId[i] == id) { config->incoming.replica -= 1; - config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID; + config->incoming.nodeId[i] = SYNC_NON_NODE_ID; break; } } assert(config->incoming.replica >= 0); +} + + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeId == nodeMap->nodeId[i]) { + return true; + } + } + + return false; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 0361845414..73eb378e09 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -22,14 +22,14 @@ * yes/no has been reached), won (a quorum of yes has been reached), or lost (a * quorum of no has been reached). **/ -ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes) { +ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) { if (config->replica == 0) { return SYNC_RAFT_VOTE_WON; } int i, g, r, missing; for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { - if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + if (config->nodeId[i] == SYNC_NON_NODE_ID) { continue; }