From ccf8f14fdb59a5301ccc4753dc4e3fd278685153 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 8 Nov 2021 15:58:19 +0800 Subject: [PATCH] [TD-10645][raft]add raft progress tracker --- source/libs/sync/inc/raft.h | 75 +++++++------ source/libs/sync/inc/raft_log.h | 14 ++- source/libs/sync/inc/raft_progress.h | 26 +++-- .../sync/inc/sync_raft_progress_tracker.h | 100 ++++++++++++++++++ source/libs/sync/inc/sync_raft_quorum_joint.h | 30 ++++++ source/libs/sync/inc/sync_type.h | 10 ++ source/libs/sync/src/raft.c | 78 ++++++++++++-- .../src/raft_handle_append_entries_message.c | 4 +- .../libs/sync/src/raft_handle_vote_message.c | 2 +- source/libs/sync/src/raft_log.c | 4 + source/libs/sync/src/raft_progress.c | 44 +++----- source/libs/sync/src/raft_replication.c | 5 + .../sync/src/sync_raft_progress_tracker.c | 41 +++++++ 13 files changed, 349 insertions(+), 84 deletions(-) create mode 100644 source/libs/sync/inc/sync_raft_progress_tracker.h create mode 100644 source/libs/sync/inc/sync_raft_quorum_joint.h create mode 100644 source/libs/sync/src/sync_raft_progress_tracker.c diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index dd3eed9e02..795ea7cc99 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -20,17 +20,12 @@ #include "sync_type.h" #include "raft_message.h" -typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct RaftLeaderState { - int nProgress; - SSyncRaftProgress* progress; + } RaftLeaderState; typedef struct RaftCandidateState { - /* votes results */ - SyncRaftVoteRespType votes[TSDB_MAX_REPLICA]; - /* true if in pre-vote phase */ bool inPreVote; } RaftCandidateState; @@ -47,17 +42,34 @@ struct SSyncRaft { // owner sync node SSyncNode* pNode; - int maxMsgSize; + SSyncCluster cluster; + + SyncNodeId selfId; + SyncGroupId selfGroupId; + + SSyncRaftIOMethods io; SSyncFSM fsm; SSyncLogStore logStore; SStateManager stateManager; + union { + RaftLeaderState leaderState; + RaftCandidateState candidateState; + }; + SyncTerm term; SyncNodeId voteFor; - SyncNodeId selfId; - SyncGroupId selfGroupId; + SSyncRaftLog *log; + + int maxMsgSize; + SSyncRaftProgressTracker *tracker; + + ESyncRole state; + + // isLearner is true if the local raft node is a learner. + bool isLearner; /** * the leader id @@ -70,15 +82,23 @@ struct SSyncRaft { **/ SyncNodeId leadTransferee; - /** - * New configuration is ignored if there exists unapplied configuration. + /** + * Only one conf change may be pending (in the log, but not yet + * applied) at a time. This is enforced via pendingConfIndex, which + * is set to a value >= the log index of the latest pending + * configuration change (if any). Config changes are only allowed to + * be proposed if the leader's applied index is greater than this + * value. **/ - bool hasPendingConf; - - SSyncCluster cluster; - - ESyncRole state; + SyncIndex pendingConfigIndex; + /** + * an estimate of the size of the uncommitted tail of the Raft log. Used to + * prevent unbounded log growth. Only maintained by the leader. Reset on + * term changes. + **/ + uint32_t uncommittedSize; + /** * number of ticks since it reached last electionTimeout when it is leader * or candidate. @@ -96,24 +116,19 @@ struct SSyncRaft { // current tick count since start up uint32_t currentTick; - // election timeout tick(random in [3:6] tick) - uint16_t electionTimeoutTick; - - // heartbeat timeout tick(default: 1 tick) - uint16_t heartbeatTimeoutTick; - bool preVote; bool checkQuorum; - SSyncRaftIOMethods io; + int heartbeatTimeout; + int electionTimeout; - // union different state data - union { - RaftLeaderState leaderState; - RaftCandidateState candidateState; - }; - - SSyncRaftLog *log; + /** + * randomizedElectionTimeout is a random number between + * [electiontimeout, 2 * electiontimeout - 1]. It gets reset + * when raft changes its state to follower or candidate. + **/ + int randomizedElectionTimeout; + bool disableProposalForwarding; SyncRaftStepFp stepFp; diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index bab9932fb5..a44f5a7273 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -19,8 +19,18 @@ #include "sync.h" #include "sync_type.h" -struct SSyncRaftEntry { +typedef enum SyncEntryType { + SYNC_ENTRY_TYPE_LOG = 1, +}SyncEntryType; +struct SSyncRaftEntry { + SyncTerm term; + + SyncIndex index; + + SyncEntryType type; + + SSyncBuffer buffer; }; struct SSyncRaftLog { @@ -49,6 +59,8 @@ bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog); SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index); +int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n); + int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, SSyncRaftEntry **ppEntries, int *n); diff --git a/source/libs/sync/inc/raft_progress.h b/source/libs/sync/inc/raft_progress.h index 5840468a5d..41d66d59d0 100644 --- a/source/libs/sync/inc/raft_progress.h +++ b/source/libs/sync/inc/raft_progress.h @@ -73,6 +73,8 @@ typedef enum RaftProgressState { * progresses of all followers, and sends entries to the follower based on its progress. **/ struct SSyncRaftProgress { + SyncNodeId id; + SyncIndex nextIndex; SyncIndex matchIndex; @@ -108,16 +110,18 @@ struct SSyncRaftProgress { * flow control sliding window **/ SSyncRaftInflights inflights; + + // IsLearner is true if this progress is tracked for a learner. + bool isLearner; }; -int syncRaftProgressCreate(SSyncRaft* pRaft); -//int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration); +void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress); /** * syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log. * Otherwise it updates the progress and returns true. **/ -bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex); +bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { progress->nextIndex = nextIndex + 1; @@ -127,7 +131,7 @@ static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* * syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message. * Otherwise it decreases the progress next index to min(rejected, last) and returns true. **/ -bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, +bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex lastIndex); /** @@ -166,20 +170,20 @@ static FORCE_INLINE bool syncRaftProgressUpdateSendTick(SSyncRaftProgress* progr return progress->lastSendTick = current; } -void syncRaftProgressFailure(SSyncRaft* pRaft, int i); +void syncRaftProgressFailure(SSyncRaftProgress* progress); -bool syncRaftProgressNeedAbortSnapshot(SSyncRaft* pRaft, int i); +bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress); /** - * return true if i-th node's log is up-todate + * return true if progress's log is up-todate **/ -bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, int i); +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); -void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i); +void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); -void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i); +void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); -void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex); +void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); /* inflights APIs */ int syncRaftInflightReset(SSyncRaftInflights* inflights); diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h new file mode 100644 index 0000000000..ffc134fec4 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H +#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H + +#include "sync_type.h" +#include "sync_raft_quorum_joint.h" +#include "raft_progress.h" + +struct SSyncRaftProgressTrackerConfig { + SSyncRaftQuorumJointConfig voters; + + /** AutoLeave is true if the configuration is joint and a transition to the + * incoming configuration should be carried out automatically by Raft when + * this is possible. If false, the configuration will be joint until the + * application initiates the transition manually. + **/ + bool autoLeave; + + /** + * Learners is a set of IDs corresponding to the learners active in the + * current configuration. + * + * Invariant: Learners and Voters does not intersect, i.e. if a peer is in + * either half of the joint config, it can't be a learner; if it is a + * learner it can't be in either half of the joint config. This invariant + * simplifies the implementation since it allows peers to have clarity about + * its current role without taking into account joint consensus. + **/ + SyncNodeId learners[TSDB_MAX_REPLICA]; + + /** + * When we turn a voter into a learner during a joint consensus transition, + * we cannot add the learner directly when entering the joint state. This is + * because this would violate the invariant that the intersection of + * voters and learners is empty. For example, assume a Voter is removed and + * immediately re-added as a learner (or in other words, it is demoted): + * + * Initially, the configuration will be + * + * voters: {1 2 3} + * learners: {} + * + * and we want to demote 3. Entering the joint configuration, we naively get + * + * voters: {1 2} & {1 2 3} + * learners: {3} + * + * but this violates the invariant (3 is both voter and learner). Instead, + * we get + * + * voters: {1 2} & {1 2 3} + * learners: {} + * next_learners: {3} + * + * Where 3 is now still purely a voter, but we are remembering the intention + * to make it a learner upon transitioning into the final configuration: + * + * voters: {1 2} + * learners: {3} + * next_learners: {} + * + * Note that next_learners is not used while adding a learner that is not + * also a voter in the joint config. In this case, the learner is added + * right away when entering the joint configuration, so that it is caught up + * as soon as possible. + **/ + SyncNodeId learnersNext[TSDB_MAX_REPLICA]; +}; + +struct SSyncRaftProgressTracker { + SSyncRaftProgressTrackerConfig config; + + SSyncRaftProgress progressMap[TSDB_MAX_REPLICA]; + + SyncRaftVoteRespType votes[TSDB_MAX_REPLICA]; + int maxInflight; +}; + +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); + +void syncRaftResetVotes(SSyncRaftProgressTracker*); + +typedef void (*visitProgressFp)(SSyncRaftProgress* progress, void* arg); +void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); + +#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h new file mode 100644 index 0000000000..4f7424db7e --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H +#define _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H + +#include "taosdef.h" +#include "sync.h" + +/** + * JointConfig is a configuration of two groups of (possibly overlapping) + * majority configurations. Decisions require the support of both majorities. + **/ +typedef struct SSyncRaftQuorumJointConfig { + SyncNodeId majorityConfig[2][TSDB_MAX_REPLICA]; +}SSyncRaftQuorumJointConfig; + +#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 130243a72a..9faebe94b2 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -16,6 +16,9 @@ #ifndef _TD_LIBS_SYNC_TYPE_H #define _TD_LIBS_SYNC_TYPE_H +#include +#include "osMath.h" + #define SYNC_NON_NODE_ID -1 #define SYNC_NON_TERM 0 @@ -24,10 +27,16 @@ typedef uint32_t SyncTick; typedef struct SSyncRaft SSyncRaft; +typedef struct SSyncRaftProgress SSyncRaftProgress; +typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; + +typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; + typedef struct SSyncRaftLog SSyncRaftLog; typedef struct SSyncRaftEntry SSyncRaftEntry; +#if 0 #ifndef MIN #define MIN(x, y) (((x) < (y)) ? (x) : (y)) #endif @@ -35,6 +44,7 @@ typedef struct SSyncRaftEntry SSyncRaftEntry; #ifndef MAX #define MAX(x, y) (((x) > (y)) ? (x) : (y)) #endif +#endif typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 39e7a80d0b..4a3654131c 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -17,6 +17,7 @@ #include "raft_configuration.h" #include "raft_log.h" #include "raft_replication.h" +#include "sync_raft_progress_tracker.h" #include "syncInt.h" #define RAFT_READ_LOG_MAX_NUM 100 @@ -35,6 +36,9 @@ static int triggerAll(SSyncRaft* pRaft); static void tickElection(SSyncRaft* pRaft); static void tickHeartbeat(SSyncRaft* pRaft); +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); +static bool maybeCommit(SSyncRaft* pRaft); + static void abortLeaderTransfer(SSyncRaft* pRaft); static void resetRaft(SSyncRaft* pRaft, SyncTerm term); @@ -59,6 +63,12 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { logStore = &(pRaft->logStore); fsm = &(pRaft->fsm); + // init progress tracker + pRaft->tracker = syncRaftOpenProgressTracker(); + if (pRaft->tracker == NULL) { + return -1; + } + // open raft log if ((pRaft->log = syncRaftLogOpen()) == NULL) { return -1; @@ -88,7 +98,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } assert(initIndex == serverState.commitIndex); - pRaft->heartbeatTimeoutTick = 1; + //pRaft->heartbeatTimeoutTick = 1; syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); @@ -137,7 +147,7 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { convertClear(pRaft); - memset(pRaft->candidateState.votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); + /** * Becoming a pre-candidate changes our step functions and state, * but doesn't change anything else. In particular it does not increase @@ -152,7 +162,6 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { void syncRaftBecomeCandidate(SSyncRaft* pRaft) { convertClear(pRaft); - memset(pRaft->candidateState.votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); pRaft->candidateState.inPreVote = false; pRaft->stepFp = stepCandidate; @@ -176,14 +185,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { if (nPendingConf > 1) { syncFatal("unexpected multiple uncommitted config entry"); } - if (nPendingConf == 1) { - pRaft->hasPendingConf = true; - } syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - // after become leader, send initial heartbeat - syncRaftTriggerHeartbeat(pRaft); + // after become leader, send a no-op log + SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); + if (entry == NULL) { + return; + } + *entry = (SSyncRaftEntry) { + .buffer = (SSyncBuffer) { + .data = NULL, + .len = 0, + } + }; + appendEntries(pRaft, entry, 1); + //syncRaftTriggerHeartbeat(pRaft); } void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { @@ -192,7 +209,7 @@ void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { // electionTimeoutTick in [3,6] tick - pRaft->electionTimeoutTick = taosRand() % 4 + 3; + pRaft->randomizedElectionTimeout = taosRand() % 4 + 3; } bool syncRaftIsPromotable(SSyncRaft* pRaft) { @@ -200,7 +217,7 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft) { } bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { - return pRaft->electionElapsed >= pRaft->electionTimeoutTick; + return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout; } int syncRaftQuorum(SSyncRaft* pRaft) { @@ -208,6 +225,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) { } int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum) { +/* if (accept) { syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); @@ -230,6 +248,8 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc if (rejectNum) *rejectNum = rejected; return granted; +*/ + return 0; } /** @@ -375,6 +395,34 @@ static void tickHeartbeat(SSyncRaft* pRaft) { } +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm term = pRaft->term; + int i; + + for (i = 0; i < n; ++i) { + entries[i].term = term; + entries[i].index = lastIndex + 1 + i; + } + + syncRaftLogAppend(pRaft->log, entries, n); + + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]); + syncRaftProgressMaybeUpdate(progress, lastIndex); + // Regardless of maybeCommit's return, our caller will call bcastAppend. + maybeCommit(pRaft); +} + +/** + * maybeCommit attempts to advance the commit index. Returns true if + * the commit index changed (in which case the caller should call + * r.bcastAppend). + **/ +static bool maybeCommit(SSyncRaft* pRaft) { + + return true; +} + /** * trigger I/O requests for newly appended log entries or heartbeats. **/ @@ -395,6 +443,10 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } +static void initProgress(SSyncRaftProgress* progress, void* arg) { + syncRaftInitProgress((SSyncRaft*)arg, progress); +} + static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { if (pRaft->term != term) { pRaft->term = term; @@ -410,5 +462,9 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { abortLeaderTransfer(pRaft); - pRaft->hasPendingConf = false; + syncRaftResetVotes(pRaft->tracker); + syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); + + pRaft->pendingConfigIndex = 0; + pRaft->uncommittedSize = 0; } diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index d4d362848f..8c014a56bc 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -20,7 +20,7 @@ #include "raft_message.h" int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); + const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); @@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs return 0; } - RaftMsg_Append_Entries *appendResp = &(pMsg->appendResp); + RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp); // ignore committed logs if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { appendResp->index = pRaft->log->commitIndex; diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 2fab8ad5a9..709e319c3e 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -36,7 +36,7 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (pRespMsg == NULL) { return 0; } - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d" \ + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d"\ "[logterm: %" PRId64 ", index: %" PRId64 "] at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor, grant ? "grant" : "reject", diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index a26650cbb7..0654dbea6b 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -47,6 +47,10 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) { return SYNC_NON_TERM; } +int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n) { + +} + int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, SSyncRaftEntry **ppEntries, int *n) { return 0; diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/raft_progress.c index 8133b670ff..6edc808698 100644 --- a/source/libs/sync/src/raft_progress.c +++ b/source/libs/sync/src/raft_progress.c @@ -40,9 +40,15 @@ int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configur } */ -bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); +void syncRaftInitProgress(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + *progress = (SSyncRaftProgress) { + .matchIndex = progress->id == pRaft->selfId ? syncRaftLogLastIndex(pRaft->log) : 0, + .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, + //.inflights = + }; +} + +bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { bool updated = false; if (progress->matchIndex < lastIndex) { @@ -57,11 +63,8 @@ bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) { return updated; } -bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, +bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, SyncIndex rejected, SyncIndex lastIndex) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - if (progress->state == PROGRESS_REPLICATE) { /** * the rejection must be stale if the progress has matched and "rejected" @@ -110,30 +113,19 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { } } -void syncRaftProgressFailure(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - +void syncRaftProgressFailure(SSyncRaftProgress* progress) { progress->pendingSnapshotIndex = 0; } -bool syncRaftProgressNeedAbortSnapshot(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - +bool syncRaftProgressNeedAbortSnapshot(SSyncRaftProgress* progress) { return progress->state == PROGRESS_SNAPSHOT && progress->matchIndex >= progress->pendingSnapshotIndex; } -bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; } -void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); +void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { /** * If the original state is ProgressStateSnapshot, progress knows that * the pending snapshot has been sent to this peer successfully, then @@ -149,16 +141,12 @@ void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { } } -void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); +void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { resetProgressState(progress, PROGRESS_REPLICATE); progress->nextIndex = progress->matchIndex + 1; } -void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex) { - assert(i >= 0 && i < pRaft->leaderState.nProgress); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); +void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { resetProgressState(progress, PROGRESS_SNAPSHOT); progress->pendingSnapshotIndex = snapshotIndex; } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index b6ff1fb329..473499b795 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -22,6 +22,7 @@ static int sendSnapshot(SSyncRaft* pRaft, int i); static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); int syncRaftReplicate(SSyncRaft* pRaft, int i) { +#if 0 assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); assert(i >= 0 && i < pRaft->leaderState.nProgress); @@ -99,6 +100,8 @@ send_snapshot: prevTerm = syncRaftLogLastTerm(pRaft->log); return sendAppendEntries(pRaft, i, prevIndex, prevTerm); } +#endif + return 0; } static int sendSnapshot(SSyncRaft* pRaft, int i) { @@ -106,6 +109,7 @@ static int sendSnapshot(SSyncRaft* pRaft, int i) { } static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { +#if 0 SyncIndex nextIndex = prevIndex + 1; SSyncRaftEntry *entries; int nEntry; @@ -139,5 +143,6 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT err_release_log: syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); +#endif return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c new file mode 100644 index 0000000000..d349cbb9b2 --- /dev/null +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_progress_tracker.h" + +SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { + SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); + if (tracker == NULL) { + return NULL; + } + + return tracker; +} + +void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { + memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); +} + +void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SSyncRaftProgress* progress = &(tracker->progressMap[i]); + if (progress->id == SYNC_NON_NODE_ID) { + continue; + } + + visit(progress, arg); + } +} \ No newline at end of file