[TD-10645][raft]<feature>refactor sync typedef
This commit is contained in:
parent
5a4f137b14
commit
6d3efd1b1a
|
@ -65,7 +65,8 @@ struct SSyncRaft {
|
||||||
|
|
||||||
SSyncRaftLog *log;
|
SSyncRaftLog *log;
|
||||||
|
|
||||||
int maxMsgSize;
|
uint64_t maxMsgSize;
|
||||||
|
uint64_t maxUncommittedSize;
|
||||||
SSyncRaftProgressTracker *tracker;
|
SSyncRaftProgressTracker *tracker;
|
||||||
|
|
||||||
ESyncState state;
|
ESyncState state;
|
||||||
|
|
|
@ -19,16 +19,16 @@
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "sync_type.h"
|
#include "sync_type.h"
|
||||||
|
|
||||||
typedef enum SyncEntryType {
|
typedef enum ESyncRaftEntryType {
|
||||||
SYNC_ENTRY_TYPE_LOG = 1,
|
SYNC_ENTRY_TYPE_LOG = 1,
|
||||||
}SyncEntryType;
|
} ESyncRaftEntryType;
|
||||||
|
|
||||||
struct SSyncRaftEntry {
|
struct SSyncRaftEntry {
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
|
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
|
|
||||||
SyncEntryType type;
|
ESyncRaftEntryType type;
|
||||||
|
|
||||||
SSyncBuffer buffer;
|
SSyncBuffer buffer;
|
||||||
};
|
};
|
||||||
|
|
|
@ -28,7 +28,7 @@
|
||||||
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
|
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
|
||||||
* need to implement its decode/encode functions.
|
* need to implement its decode/encode functions.
|
||||||
**/
|
**/
|
||||||
typedef enum RaftMessageType {
|
typedef enum ESyncRaftMessageType {
|
||||||
// client propose a cmd
|
// client propose a cmd
|
||||||
RAFT_MSG_INTERNAL_PROP = 1,
|
RAFT_MSG_INTERNAL_PROP = 1,
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ typedef enum RaftMessageType {
|
||||||
|
|
||||||
RAFT_MSG_APPEND = 5,
|
RAFT_MSG_APPEND = 5,
|
||||||
RAFT_MSG_APPEND_RESP = 6,
|
RAFT_MSG_APPEND_RESP = 6,
|
||||||
} RaftMessageType;
|
} ESyncRaftMessageType;
|
||||||
|
|
||||||
typedef struct RaftMsgInternal_Prop {
|
typedef struct RaftMsgInternal_Prop {
|
||||||
const SSyncBuffer *pBuf;
|
const SSyncBuffer *pBuf;
|
||||||
|
@ -53,14 +53,14 @@ typedef struct RaftMsgInternal_Election {
|
||||||
} RaftMsgInternal_Election;
|
} RaftMsgInternal_Election;
|
||||||
|
|
||||||
typedef struct RaftMsg_Vote {
|
typedef struct RaftMsg_Vote {
|
||||||
SyncRaftElectionType cType;
|
ESyncRaftElectionType cType;
|
||||||
SyncIndex lastIndex;
|
SyncIndex lastIndex;
|
||||||
SyncTerm lastTerm;
|
SyncTerm lastTerm;
|
||||||
} RaftMsg_Vote;
|
} RaftMsg_Vote;
|
||||||
|
|
||||||
typedef struct RaftMsg_VoteResp {
|
typedef struct RaftMsg_VoteResp {
|
||||||
bool rejected;
|
bool rejected;
|
||||||
SyncRaftElectionType cType;
|
ESyncRaftElectionType cType;
|
||||||
} RaftMsg_VoteResp;
|
} RaftMsg_VoteResp;
|
||||||
|
|
||||||
typedef struct RaftMsg_Append_Entries {
|
typedef struct RaftMsg_Append_Entries {
|
||||||
|
@ -85,7 +85,7 @@ typedef struct RaftMsg_Append_Resp {
|
||||||
} RaftMsg_Append_Resp;
|
} RaftMsg_Append_Resp;
|
||||||
|
|
||||||
typedef struct SSyncMessage {
|
typedef struct SSyncMessage {
|
||||||
RaftMessageType msgType;
|
ESyncRaftMessageType msgType;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncGroupId groupId;
|
SyncGroupId groupId;
|
||||||
SyncNodeId from;
|
SyncNodeId from;
|
||||||
|
@ -131,7 +131,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from,
|
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from,
|
||||||
SyncTerm term, SyncRaftElectionType cType,
|
SyncTerm term, ESyncRaftElectionType cType,
|
||||||
SyncIndex lastIndex, SyncTerm lastTerm) {
|
SyncIndex lastIndex, SyncTerm lastTerm) {
|
||||||
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
|
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
|
@ -153,7 +153,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from,
|
static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from,
|
||||||
SyncRaftElectionType cType, bool rejected) {
|
ESyncRaftElectionType cType, bool rejected) {
|
||||||
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
|
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -213,7 +213,7 @@ static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId,
|
||||||
return pMsg;
|
return pMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
|
static FORCE_INLINE bool syncIsInternalMsg(ESyncRaftMessageType msgType) {
|
||||||
return msgType == RAFT_MSG_INTERNAL_PROP ||
|
return msgType == RAFT_MSG_INTERNAL_PROP ||
|
||||||
msgType == RAFT_MSG_INTERNAL_ELECTION;
|
msgType == RAFT_MSG_INTERNAL_ELECTION;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
|
||||||
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
|
void syncRaftBecomeCandidate(SSyncRaft* pRaft);
|
||||||
void syncRaftBecomeLeader(SSyncRaft* pRaft);
|
void syncRaftBecomeLeader(SSyncRaft* pRaft);
|
||||||
|
|
||||||
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType);
|
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType);
|
||||||
|
|
||||||
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
|
void syncRaftTriggerHeartbeat(SSyncRaft* pRaft);
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft);
|
||||||
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
|
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
|
||||||
int syncRaftQuorum(SSyncRaft* pRaft);
|
int syncRaftQuorum(SSyncRaft* pRaft);
|
||||||
|
|
||||||
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
|
ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
|
||||||
bool preVote, bool accept,
|
bool preVote, bool accept,
|
||||||
int* rejectNum, int *granted);
|
int* rejectNum, int *granted);
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@
|
||||||
*
|
*
|
||||||
* PROGRESS_STATE_PROBE is the initial state.
|
* PROGRESS_STATE_PROBE is the initial state.
|
||||||
**/
|
**/
|
||||||
typedef enum RaftProgressState {
|
typedef enum ESyncRaftProgressState {
|
||||||
/**
|
/**
|
||||||
* StateProbe indicates a follower whose last index isn't known. Such a
|
* StateProbe indicates a follower whose last index isn't known. Such a
|
||||||
* follower is "probed" (i.e. an append sent periodically) to narrow down
|
* follower is "probed" (i.e. an append sent periodically) to narrow down
|
||||||
|
@ -56,7 +56,7 @@ typedef enum RaftProgressState {
|
||||||
* return to StateReplicate.
|
* return to StateReplicate.
|
||||||
**/
|
**/
|
||||||
PROGRESS_STATE_SNAPSHOT,
|
PROGRESS_STATE_SNAPSHOT,
|
||||||
} RaftProgressState;
|
} ESyncRaftProgressState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Progress represents a follower’s progress in the view of the leader. Leader maintains
|
* Progress represents a follower’s progress in the view of the leader. Leader maintains
|
||||||
|
@ -82,7 +82,7 @@ struct SSyncRaftProgress {
|
||||||
* When in StateSnapshot, leader should have sent out snapshot
|
* When in StateSnapshot, leader should have sent out snapshot
|
||||||
* before and stops sending any replication message.
|
* before and stops sending any replication message.
|
||||||
**/
|
**/
|
||||||
RaftProgressState state;
|
ESyncRaftProgressState state;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
|
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
|
||||||
|
@ -187,15 +187,15 @@ static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progr
|
||||||
return progress->nextIndex;
|
return progress->nextIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) {
|
static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) {
|
||||||
return progress->state == PROGRESS_STATE_REPLICATE;
|
return progress->state == PROGRESS_STATE_REPLICATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) {
|
static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) {
|
||||||
return progress->state == PROGRESS_STATE_SNAPSHOT;
|
return progress->state == PROGRESS_STATE_SNAPSHOT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) {
|
static FORCE_INLINE ESyncRaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) {
|
||||||
return progress->state == PROGRESS_STATE_PROBE;
|
return progress->state == PROGRESS_STATE_PROBE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,8 +87,8 @@ struct SSyncRaftProgressTracker {
|
||||||
|
|
||||||
SSyncRaftProgress progressMap[TSDB_MAX_REPLICA];
|
SSyncRaftProgress progressMap[TSDB_MAX_REPLICA];
|
||||||
|
|
||||||
SyncRaftVoteResult votes[TSDB_MAX_REPLICA];
|
ESyncRaftVoteResult votes[TSDB_MAX_REPLICA];
|
||||||
int maxInflight;
|
int maxInflightMsgs;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSyncRaftProgressTracker* syncRaftOpenProgressTracker();
|
SSyncRaftProgressTracker* syncRaftOpenProgressTracker();
|
||||||
|
@ -108,6 +108,6 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant);
|
||||||
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
|
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
|
||||||
* election outcome is known.
|
* election outcome is known.
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
|
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted);
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
|
||||||
|
|
|
@ -17,11 +17,11 @@
|
||||||
#define TD_SYNC_RAFT_QUORUM_H
|
#define TD_SYNC_RAFT_QUORUM_H
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SSyncRaftVoteResult indicates the outcome of a vote.
|
* ESyncRaftVoteResult indicates the outcome of a vote.
|
||||||
**/
|
**/
|
||||||
typedef enum {
|
typedef enum {
|
||||||
/**
|
/**
|
||||||
* SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
|
* SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
|
||||||
* votes, i.e. neither "yes" or "no" has reached quorum yet.
|
* votes, i.e. neither "yes" or "no" has reached quorum yet.
|
||||||
**/
|
**/
|
||||||
SYNC_RAFT_VOTE_PENDING = 1,
|
SYNC_RAFT_VOTE_PENDING = 1,
|
||||||
|
@ -35,6 +35,6 @@ typedef enum {
|
||||||
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
|
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
|
||||||
**/
|
**/
|
||||||
SYNC_RAFT_VOTE_WON = 3,
|
SYNC_RAFT_VOTE_WON = 3,
|
||||||
} SSyncRaftVoteResult;
|
} ESyncRaftVoteResult;
|
||||||
|
|
||||||
#endif /* TD_SYNC_RAFT_QUORUM_H */
|
#endif /* TD_SYNC_RAFT_QUORUM_H */
|
|
@ -33,6 +33,6 @@ typedef struct SSyncRaftQuorumJointConfig {
|
||||||
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
||||||
* requires both majority quorums to vote in favor.
|
* requires both majority quorums to vote in favor.
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes);
|
ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes);
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
|
||||||
|
|
|
@ -25,6 +25,6 @@
|
||||||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||||
* quorum of no has been reached).
|
* quorum of no has been reached).
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes);
|
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes);
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
|
||||||
|
|
|
@ -50,7 +50,7 @@ typedef enum {
|
||||||
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
|
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
|
||||||
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
|
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
|
||||||
SYNC_RAFT_CAMPAIGN_TRANSFER = 2,
|
SYNC_RAFT_CAMPAIGN_TRANSFER = 2,
|
||||||
} SyncRaftElectionType;
|
} ESyncRaftElectionType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
// the init vote resp status
|
// the init vote resp status
|
||||||
|
@ -61,6 +61,6 @@ typedef enum {
|
||||||
|
|
||||||
//reject the vote request
|
//reject the vote request
|
||||||
SYNC_RAFT_VOTE_RESP_REJECT = 2,
|
SYNC_RAFT_VOTE_RESP_REJECT = 2,
|
||||||
} SyncRaftVoteResult;
|
} ESyncRaftVoteResult;
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_TYPE_H */
|
#endif /* _TD_LIBS_SYNC_TYPE_H */
|
||||||
|
|
|
@ -101,7 +101,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
RaftMessageType msgType = pMsg->msgType;
|
ESyncRaftMessageType msgType = pMsg->msgType;
|
||||||
if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
|
if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
|
||||||
syncRaftHandleElectionMessage(pRaft, pMsg);
|
syncRaftHandleElectionMessage(pRaft, pMsg);
|
||||||
} else if (msgType == RAFT_MSG_VOTE) {
|
} else if (msgType == RAFT_MSG_VOTE) {
|
||||||
|
@ -140,7 +140,7 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
|
|
||||||
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
SyncNodeId leaderId = pMsg->from;
|
SyncNodeId leaderId = pMsg->from;
|
||||||
RaftMessageType msgType = pMsg->msgType;
|
ESyncRaftMessageType msgType = pMsg->msgType;
|
||||||
|
|
||||||
if (msgType == RAFT_MSG_VOTE) {
|
if (msgType == RAFT_MSG_VOTE) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
|
@ -18,10 +18,10 @@
|
||||||
#include "raft_log.h"
|
#include "raft_log.h"
|
||||||
#include "raft_message.h"
|
#include "raft_message.h"
|
||||||
|
|
||||||
void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
|
void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
bool preVote;
|
bool preVote;
|
||||||
RaftMessageType voteMsgType;
|
ESyncRaftMessageType voteMsgType;
|
||||||
|
|
||||||
if (syncRaftIsPromotable(pRaft)) {
|
if (syncRaftIsPromotable(pRaft)) {
|
||||||
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
|
syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId);
|
||||||
|
@ -41,7 +41,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int quorum = syncRaftQuorum(pRaft);
|
int quorum = syncRaftQuorum(pRaft);
|
||||||
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL);
|
ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL);
|
||||||
if (result == SYNC_RAFT_VOTE_WON) {
|
if (result == SYNC_RAFT_VOTE_WON) {
|
||||||
/**
|
/**
|
||||||
* We won the election after voting for ourselves (which must mean that
|
* We won the election after voting for ourselves (which must mean that
|
||||||
|
|
|
@ -36,7 +36,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from,
|
ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from,
|
||||||
pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION,
|
pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION,
|
||||||
!pMsg->voteResp.rejected, &rejected, &granted);
|
!pMsg->voteResp.rejected, &rejected, &granted);
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
|
||||||
return pRaft->cluster.replica / 2 + 1;
|
return pRaft->cluster.replica / 2 + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
|
ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
|
||||||
bool preVote, bool grant,
|
bool preVote, bool grant,
|
||||||
int* rejected, int *granted) {
|
int* rejected, int *granted) {
|
||||||
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
|
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
|
||||||
|
@ -186,7 +186,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
|
||||||
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
|
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
|
||||||
* our pre-candidate state).
|
* our pre-candidate state).
|
||||||
**/
|
**/
|
||||||
RaftMessageType msgType = pMsg->msgType;
|
ESyncRaftMessageType msgType = pMsg->msgType;
|
||||||
|
|
||||||
if (msgType == RAFT_MSG_INTERNAL_PROP) {
|
if (msgType == RAFT_MSG_INTERNAL_PROP) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -20,13 +20,13 @@
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
|
||||||
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state);
|
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state);
|
||||||
static void probeAcked(SSyncRaftProgress* progress);
|
static void probeAcked(SSyncRaftProgress* progress);
|
||||||
|
|
||||||
static void resumeProgress(SSyncRaftProgress* progress);
|
static void resumeProgress(SSyncRaftProgress* progress);
|
||||||
|
|
||||||
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) {
|
||||||
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight);
|
SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflightMsgs);
|
||||||
if (inflights == NULL) {
|
if (inflights == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -153,7 +153,7 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
|
||||||
* ResetState moves the Progress into the specified State, resetting ProbeSent,
|
* ResetState moves the Progress into the specified State, resetting ProbeSent,
|
||||||
* PendingSnapshot, and Inflights.
|
* PendingSnapshot, and Inflights.
|
||||||
**/
|
**/
|
||||||
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) {
|
static void resetProgressState(SSyncRaftProgress* progress, ESyncRaftProgressState state) {
|
||||||
progress->probeSent = false;
|
progress->probeSent = false;
|
||||||
progress->pendingSnapshotIndex = 0;
|
progress->pendingSnapshotIndex = 0;
|
||||||
progress->state = state;
|
progress->state = state;
|
||||||
|
@ -233,7 +233,7 @@ void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
|
||||||
progress->state = PROGRESS_STATE_PROBE;
|
progress->state = PROGRESS_STATE_PROBE;
|
||||||
}
|
}
|
||||||
|
|
||||||
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
|
ESyncRaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
|
||||||
return pRaft->leaderState.progress[i].state;
|
return pRaft->leaderState.progress[i].state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
|
void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) {
|
||||||
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA);
|
memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(ESyncRaftVoteResult) * TSDB_MAX_REPLICA);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
|
void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) {
|
||||||
|
@ -48,7 +48,7 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
|
||||||
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
|
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
|
||||||
* election outcome is known.
|
* election outcome is known.
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
|
ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) {
|
||||||
int i;
|
int i;
|
||||||
SSyncRaftProgress* progress;
|
SSyncRaftProgress* progress;
|
||||||
int r, g;
|
int r, g;
|
||||||
|
|
|
@ -22,9 +22,9 @@
|
||||||
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
* a result indicating whether the vote is pending, lost, or won. A joint quorum
|
||||||
* requires both majority quorums to vote in favor.
|
* requires both majority quorums to vote in favor.
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) {
|
ESyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteResult* votes) {
|
||||||
SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes);
|
ESyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes);
|
||||||
SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes);
|
ESyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes);
|
||||||
|
|
||||||
if (r1 == r2) {
|
if (r1 == r2) {
|
||||||
// If they agree, return the agreed state.
|
// If they agree, return the agreed state.
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
|
||||||
* quorum of no has been reached).
|
* quorum of no has been reached).
|
||||||
**/
|
**/
|
||||||
SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) {
|
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteResult* votes) {
|
||||||
if (config->replica == 0) {
|
if (config->replica == 0) {
|
||||||
return SYNC_RAFT_VOTE_WON;
|
return SYNC_RAFT_VOTE_WON;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue