From 8ab1eb642e50e802b8f2df905886cd37e5712150 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 16 Nov 2021 11:42:34 +0800 Subject: [PATCH] [TD-10645][raft]refactor sync interface --- include/libs/sync/sync.h | 6 +- source/libs/sync/inc/raft_log.h | 2 + source/libs/sync/inc/raft_replication.h | 7 +- source/libs/sync/inc/sync_raft_impl.h | 12 ++ source/libs/sync/inc/sync_raft_progress.h | 14 ++ .../sync/inc/sync_raft_progress_tracker.h | 3 + source/libs/sync/inc/sync_raft_quorum_joint.h | 4 +- source/libs/sync/inc/sync_type.h | 2 +- source/libs/sync/src/raft.c | 119 +++++++++++++- .../src/raft_handle_append_entries_message.c | 2 +- source/libs/sync/src/raft_log.c | 4 + source/libs/sync/src/raft_replication.c | 153 +++++++----------- source/libs/sync/src/sync_raft_impl.c | 34 ++-- .../sync/src/sync_raft_progress_tracker.c | 8 + source/libs/sync/src/sync_raft_restore.c | 2 +- 15 files changed, 253 insertions(+), 119 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6a9772bcc4..283604508f 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -119,15 +119,15 @@ typedef struct SStateManager { int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); // save serialized cluster state data, buffer will be free by Sync - void (*saveCluster)(struct SStateManager* stateMng, const char* buffer, int n); + void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n); // read serialized cluster state data, buffer will be free by Sync - int32_t (*readCluster)(struct SStateManager* stateMng, char** ppBuffer, int* n); + int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n); } SStateManager; typedef struct { SyncGroupId vgId; - SyncIndex snapshotIndex; + SyncIndex appliedIndex; SSyncCluster syncCfg; SSyncFSM fsm; SSyncLogStore logStore; diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index b36f0da3be..dc10c59b28 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog); SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); +void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex); + bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h index e457063980..d0e55ef10e 100644 --- a/source/libs/sync/inc/raft_replication.h +++ b/source/libs/sync/inc/raft_replication.h @@ -20,6 +20,11 @@ #include "syncInt.h" #include "sync_type.h" -int syncRaftReplicate(SSyncRaft* pRaft, int i); +// syncRaftReplicate sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty); #endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index 2f96b970dc..bd77978c28 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft); +bool syncRaftMaybeCommit(SSyncRaft* pRaft); + ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum, int *granted); +static FORCE_INLINE bool syncRaftIsEmptyServerState(const SSyncServerState* serverState) { + return serverState->commitIndex == 0 && + serverState->term == SYNC_NON_TERM && + serverState->voteFor == SYNC_NON_NODE_ID; +} + +void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); + +void syncRaftBroadcastAppend(SSyncRaft* pRaft); + #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h index c733a8ea74..173608a40a 100644 --- a/source/libs/sync/inc/sync_raft_progress.h +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -58,11 +58,20 @@ typedef enum ESyncRaftProgressState { PROGRESS_STATE_SNAPSHOT, } ESyncRaftProgressState; +static const char* kProgressStateString[] = { + "Probe", + "Replicate", + "Snapshot", +}; + /** * Progress represents a follower’s progress in the view of the leader. Leader maintains * progresses of all followers, and sends entries to the follower based on its progress. **/ struct SSyncRaftProgress { + // index in raft cluster config + int selfIndex; + SyncNodeId id; SyncIndex nextIndex; @@ -133,6 +142,11 @@ struct SSyncRaftProgressMap { SSyncRaftProgress progress[TSDB_MAX_REPLICA]; }; + +static FORCE_INLINE const char* syncRaftProgressStateString(const SSyncRaftProgress* progress) { + return kProgressStateString[progress->state]; +} + void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); /** diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h index 61308d5df5..b267c46f35 100644 --- a/source/libs/sync/inc/sync_raft_progress_tracker.h +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -20,6 +20,7 @@ #include "sync_raft_quorum.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_progress.h" +#include "sync_raft_proto.h" struct SSyncRaftProgressTrackerConfig { SSyncRaftQuorumJointConfig voters; @@ -109,4 +110,6 @@ int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaf **/ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); +void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); + #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index e2c2fd89b2..798e3d5eca 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -25,8 +25,8 @@ * majority configurations. Decisions require the support of both majorities. **/ typedef struct SSyncRaftQuorumJointConfig { - SSyncCluster outgoing; - SSyncCluster incoming; + SSyncRaftNodeMap outgoing; + SSyncRaftNodeMap incoming; } SSyncRaftQuorumJointConfig; /** diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 34be10dfd5..cb938c7319 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -86,7 +86,7 @@ typedef enum { // grant the vote request SYNC_RAFT_VOTE_RESP_GRANT = 1, - //reject the vote request + // reject the vote request SYNC_RAFT_VOTE_RESP_REJECT = 2, } ESyncRaftVoteType; diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index d39824c99c..f8c3d1b0d4 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -16,14 +16,19 @@ #include "raft.h" #include "raft_configuration.h" #include "raft_log.h" +#include "sync_raft_restore.h" #include "raft_replication.h" +#include "sync_raft_config_change.h" #include "sync_raft_progress_tracker.h" #include "syncInt.h" #define RAFT_READ_LOG_MAX_NUM 100 static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n); -static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n); +static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n); + +static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); @@ -32,14 +37,15 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; SSyncServerState serverState; + SSyncConfigState confState; SStateManager* stateManager; SSyncLogStore* logStore; SSyncFSM* fsm; - SyncIndex initIndex = pInfo->snapshotIndex; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; int nBuf, limit, i; char* buf; int n; + SSyncRaftChanger changer; memset(pRaft, 0, sizeof(SSyncRaft)); @@ -70,8 +76,45 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId); return -1; } + free(buf); + //assert(initIndex <= serverState.commitIndex); + + // read config state + if (stateManager->readClusterState(stateManager, &buf, &n) != 0) { + syncError("readClusterState for vgid %d fail", pInfo->vgId); + return -1; + } + if (deserializeClusterStateFromBuffer(&confState, buf, n) != 0) { + syncError("deserializeClusterStateFromBuffer for vgid %d fail", pInfo->vgId); + return -1; + } + free(buf); + + changer = (SSyncRaftChanger) { + .tracker = pRaft->tracker, + .lastIndex = syncRaftLogLastIndex(pRaft->log), + }; + if (syncRaftRestoreConfig(&changer, &confState) < 0) { + syncError("syncRaftRestoreConfig for vgid %d fail", pInfo->vgId); + return -1; + } + + if (!syncRaftIsEmptyServerState(&serverState)) { + syncRaftLoadState(pRaft, &serverState); + } + + if (pInfo->appliedIndex > 0) { + syncRaftLogAppliedTo(pRaft->log, pInfo->appliedIndex); + } + + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + + +#if 0 + + + - assert(initIndex <= serverState.commitIndex); // restore fsm state from snapshot index + 1 until commitIndex ++initIndex; @@ -96,6 +139,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); pRaft->selfIndex = pRaft->cluster.selfIndex; +#endif syncInfo("[%d:%d] restore vgid %d state: snapshot index success", pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); @@ -133,10 +177,77 @@ static int deserializeServerStateFromBuffer(SSyncServerState* server, const char return 0; } -static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) { +static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const char* buffer, int n) { return 0; } +static void visitProgressMaybeSendAppend(int i, SSyncRaftProgress* progress, void* arg) { + syncRaftReplicate(arg, progress, false); +} + +// switchToConfig reconfigures this node to use the provided configuration. It +// updates the in-memory state and, when necessary, carries out additional +// actions such as reacting to the removal of nodes or changed quorum +// requirements. +// +// The inputs usually result from restoring a ConfState or applying a ConfChange. +static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, + const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs) { + SyncNodeId selfId = pRaft->selfId; + int i; + bool exist; + SSyncRaftProgress* progress = NULL; + + syncRaftConfigState(pRaft->tracker, cs); + i = syncRaftFindProgressIndexByNodeId(&pRaft->tracker->progressMap, selfId); + exist = (i != -1); + + // Update whether the node itself is a learner, resetting to false when the + // node is removed. + if (exist) { + progress = &pRaft->tracker->progressMap.progress[i]; + pRaft->isLearner = progress->isLearner; + } else { + pRaft->isLearner = false; + } + + if ((!exist || pRaft->isLearner) && pRaft->state == TAOS_SYNC_STATE_LEADER) { + // This node is leader and was removed or demoted. We prevent demotions + // at the time writing but hypothetically we handle them the same way as + // removing the leader: stepping down into the next Term. + // + // TODO(tbg): step down (for sanity) and ask follower with largest Match + // to TimeoutNow (to avoid interruption). This might still drop some + // proposals but it's better than nothing. + // + // TODO(tbg): test this branch. It is untested at the time of writing. + return; + } + + // The remaining steps only make sense if this node is the leader and there + // are other nodes. + if (pRaft->state != TAOS_SYNC_STATE_LEADER || cs->voters.replica == 0) { + return; + } + + if (syncRaftMaybeCommit(pRaft)) { + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + syncRaftBroadcastAppend(pRaft); + } else { + // Otherwise, still probe the newly added replicas; there's no reason to + // let them wait out a heartbeat interval (or the next incoming + // proposal). + syncRaftProgressVisit(pRaft->tracker, visitProgressMaybeSendAppend, pRaft); + + // If the the leadTransferee was removed or demoted, abort the leadership transfer. + SyncNodeId leadTransferee = pRaft->leadTransferee; + if (leadTransferee != SYNC_NON_NODE_ID) { + + } + } +} + /** * pre-handle message, return true means no need to continue * Handle the message term, which may result in our stepping down to a follower. diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index 8c014a56bc..4797b6ce03 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs return 0; } - RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp); + RaftMsg_Append_Resp *appendResp = &(pRespMsg->appendResp); // ignore committed logs if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { appendResp->index = pRaft->log->commitIndex; diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index 0654dbea6b..b6e6d292e8 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { return 0; } +void syncRaftLogAppliedTo(SSyncRaftLog* pLog, SyncIndex appliedIndex) { + +} + bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) { return true; } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 78536bc6a3..c19fcd1e68 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -16,106 +16,62 @@ #include "raft.h" #include "raft_log.h" #include "sync_raft_progress.h" +#include "syncInt.h" #include "raft_replication.h" -static int sendSnapshot(SSyncRaft* pRaft, int i); -static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); +static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); +static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, + SyncIndex prevIndex, SyncTerm prevTerm, + const SSyncRaftEntry *entries, int nEntry); -int syncRaftReplicate(SSyncRaft* pRaft, int i) { -#if 0 +// syncRaftReplicate sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +bool syncRaftReplicate(SSyncRaft* pRaft, SSyncRaftProgress* progress, bool sendIfEmpty) { assert(pRaft->state == TAOS_SYNC_STATE_LEADER); - assert(i >= 0 && i < pRaft->leaderState.nProgress); + SyncNodeId nodeId = progress->id; - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); if (syncRaftProgressIsPaused(progress)) { - syncInfo("node %d paused", nodeId); - return 0; + syncInfo("node [%d:%d] paused", pRaft->selfGroupId, nodeId); + return false; } SyncIndex nextIndex = syncRaftProgressNextIndex(progress); - SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log); - bool inSnapshot = syncRaftProgressInSnapshot(progress); + SSyncRaftEntry *entries; + int nEntry; SyncIndex prevIndex; SyncTerm prevTerm; - /** - * From Section 3.5: - * - * When sending an AppendEntries RPC, the leader includes the index and - * term of the entry in its log that immediately precedes the new - * entries. If the follower does not find an entry in its log with the - * same index and term, then it refuses the new entries. The consistency - * check acts as an induction step: the initial empty state of the logs - * satisfies the Log Matching Property, and the consistency check - * preserves the Log Matching Property whenever logs are extended. As a - * result, whenever AppendEntries returns successfully, the leader knows - * that the follower's log is identical to its own log up through the new - * entries (Log Matching Property in Figure 3.2). - **/ - if (nextIndex == 1) { - /** - * We're including the very first entry, so prevIndex and prevTerm are - * null. If the first entry is not available anymore, send the last - * snapshot if we're not already sending one. - **/ - if (snapshotIndex > 0 && !inSnapshot) { - goto send_snapshot; - } + prevIndex = nextIndex - 1; + prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + int ret = syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); - // otherwise send append entries from start - prevIndex = 0; - prevTerm = 0; - } else { - /** - * Set prevIndex and prevTerm to the index and term of the entry at - * nextIndex - 1. - **/ - prevIndex = nextIndex - 1; - prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); - /** - * If the entry is not anymore in our log, send the last snapshot if we're - * not doing so already. - **/ - if (prevTerm == SYNC_NON_TERM && !inSnapshot) { - goto send_snapshot; - } + if (nEntry == 0 && !sendIfEmpty) { + return false; } - /* Send empty AppendEntries RPC when installing a snaphot */ - if (inSnapshot) { - prevIndex = syncRaftLogLastIndex(pRaft->log); - prevTerm = syncRaftLogLastTerm(pRaft->log); + if (ret != 0 || prevTerm == SYNC_NON_TERM) { + return sendSnapshot(pRaft, progress); } - return sendAppendEntries(pRaft, i, prevIndex, prevTerm); - -send_snapshot: - if (syncRaftProgressRecentActive(progress)) { - /* Only send a snapshot when we have heard from the server */ - return sendSnapshot(pRaft, i); - } else { - /* Send empty AppendEntries RPC when we haven't heard from the server */ - prevIndex = syncRaftLogLastIndex(pRaft->log); - prevTerm = syncRaftLogLastTerm(pRaft->log); - return sendAppendEntries(pRaft, i, prevIndex, prevTerm); - } -#endif - return 0; + return sendAppendEntries(pRaft, progress, prevIndex, prevTerm, entries, nEntry); } -static int sendSnapshot(SSyncRaft* pRaft, int i) { - return 0; +static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + if (!syncRaftProgressRecentActive(progress)) { + return false; + } + return true; } -static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { -#if 0 - SyncIndex nextIndex = prevIndex + 1; - SSyncRaftEntry *entries; - int nEntry; - SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]); - SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); - syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); +static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, + SyncIndex prevIndex, SyncTerm prevTerm, + const SSyncRaftEntry *entries, int nEntry) { + SyncIndex lastIndex; + SyncTerm logTerm = prevTerm; + SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, prevIndex, prevTerm, pRaft->log->commitIndex, @@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT goto err_release_log; } - pRaft->io.send(msg, pNode); - - if (syncRaftProgressInReplicate(progress)) { - SyncIndex lastIndex = nextIndex + nEntry; - syncRaftProgressOptimisticNextIndex(progress, lastIndex); - syncRaftInflightAdd(&progress->inflights, lastIndex); - } else if (syncRaftProgressInProbe(progress)) { - syncRaftProgressPause(progress); - } else { - + if (nEntry != 0) { + switch (progress->state) { + // optimistically increase the next when in StateReplicate + case PROGRESS_STATE_REPLICATE: + lastIndex = entries[nEntry - 1].index; + syncRaftProgressOptimisticNextIndex(progress, lastIndex); + syncRaftInflightAdd(&progress->inflights, lastIndex); + break; + case PROGRESS_STATE_PROBE: + progress->probeSent = true; + break; + default: + syncFatal("[%d:%d] is sending append in unhandled state %s", + pRaft->selfGroupId, pRaft->selfId, syncRaftProgressStateString(progress)); + break; + } } - - syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); - - return 0; + pRaft->io.send(msg, pNode); + return true; err_release_log: - syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); -#endif - return 0; -} \ No newline at end of file + syncRaftLogRelease(pRaft->log, prevIndex + 1, entries, nEntry); + return false; +} diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index 5e23474a89..ba09291682 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft); static void tickHeartbeat(SSyncRaft* pRaft); static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); -static bool maybeCommit(SSyncRaft* pRaft); static void abortLeaderTransfer(SSyncRaft* pRaft); @@ -171,6 +170,25 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, return granted; */ +void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { + SyncIndex commitIndex = serverState->commitIndex; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + + if (commitIndex < pRaft->log->commitIndex || commitIndex > lastIndex) { + syncFatal("[%d:%d] state.commit %"PRId64" is out of range [%" PRId64 ",%" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, commitIndex, pRaft->log->commitIndex, lastIndex); + return; + } + + pRaft->log->commitIndex = commitIndex; + pRaft->term = serverState->term; + pRaft->voteFor = serverState->voteFor; +} + +void syncRaftBroadcastAppend(SSyncRaft* pRaft) { + +} + static int convertClear(SSyncRaft* pRaft) { } @@ -245,16 +263,14 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); syncRaftProgressMaybeUpdate(progress, lastIndex); - // Regardless of maybeCommit's return, our caller will call bcastAppend. - maybeCommit(pRaft); + // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. + syncRaftMaybeCommit(pRaft); } -/** - * maybeCommit attempts to advance the commit index. Returns true if - * the commit index changed (in which case the caller should call - * r.bcastAppend). - **/ -static bool maybeCommit(SSyncRaft* pRaft) { +// syncRaftMaybeCommit attempts to advance the commit index. Returns true if +// the commit index changed (in which case the caller should call +// r.bcastAppend). +bool syncRaftMaybeCommit(SSyncRaft* pRaft) { return true; } diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index 525b2eec1a..ea7f1ae4f5 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -14,6 +14,7 @@ */ #include "sync_raft_progress_tracker.h" +#include "sync_raft_proto.h" SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); @@ -77,4 +78,11 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r if (rejected) *rejected = r; if (granted) *granted = g; return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); +} + +void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs) { + memcpy(&cs->voters, &tracker->config.voters.incoming, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->votersOutgoing, &tracker->config.voters.outgoing, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->learners, &tracker->config.learners, sizeof(SSyncRaftNodeMap)); + memcpy(&cs->learnersNext, &tracker->config.learnersNext, sizeof(SSyncRaftNodeMap)); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_restore.c b/source/libs/sync/src/sync_raft_restore.c index d2bdbd6351..01bc7da7eb 100644 --- a/source/libs/sync/src/sync_raft_restore.c +++ b/source/libs/sync/src/sync_raft_restore.c @@ -48,7 +48,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) .n = 1, .changes = &incoming.changes[i], }; - ret = syncRaftChangerSimpleConfig(changer, &css, &config, &progressMap); + ret = syncRaftChangerSimpleConfig(changer, &css, config, progressMap); if (ret != 0) { goto out; }