[TD-10645][raft]<feature>add raft election message handle

This commit is contained in:
lichuang 2021-11-04 12:39:45 +08:00
parent 446b14f315
commit e05e6dba9a
12 changed files with 285 additions and 50 deletions

View File

@ -32,7 +32,6 @@ typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0, TAOS_SYNC_ROLE_FOLLOWER = 0,
TAOS_SYNC_ROLE_CANDIDATE = 1, TAOS_SYNC_ROLE_CANDIDATE = 1,
TAOS_SYNC_ROLE_LEADER = 2, TAOS_SYNC_ROLE_LEADER = 2,
TAOS_SYNC_ROLE_PRE_CANDIDATE = 3,
} ESyncRole; } ESyncRole;
typedef struct { typedef struct {

View File

@ -29,9 +29,16 @@ typedef struct RaftLeaderState {
SSyncRaftProgress* progress; SSyncRaftProgress* progress;
} RaftLeaderState; } RaftLeaderState;
typedef struct RaftCandidateState {
/* votes results */
bool votes[TSDB_MAX_REPLICA];
/* true if in pre-vote phase */
bool inPreVote;
} RaftCandidateState;
typedef struct SSyncRaftIOMethods { typedef struct SSyncRaftIOMethods {
int (*send)(const SSyncMessage* pMsg, const SNodeInfo* pNode);
} SSyncRaftIOMethods; } SSyncRaftIOMethods;
typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg); typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg);
@ -41,7 +48,10 @@ struct SSyncRaft {
// owner sync node // owner sync node
SSyncNode* pNode; SSyncNode* pNode;
SSyncInfo info; //SSyncInfo info;
SSyncFSM fsm;
SSyncLogStore logStore;
SStateManager stateManager;
SyncTerm term; SyncTerm term;
SyncNodeId voteFor; SyncNodeId voteFor;
@ -65,6 +75,8 @@ struct SSyncRaft {
**/ **/
bool pendingConf; bool pendingConf;
SSyncCluster cluster;
ESyncRole state; ESyncRole state;
/** /**
@ -92,25 +104,22 @@ struct SSyncRaft {
SSyncRaftIOMethods io; SSyncRaftIOMethods io;
RaftLeaderState leaderState; union {
RaftLeaderState leaderState;
SSyncRaftUnstableLog *log; RaftCandidateState candidateState;
};
SSyncRaftLog *log;
SyncRaftStepFp stepFp; SyncRaftStepFp stepFp;
SyncRaftTickFp tickFp; SyncRaftTickFp tickFp;
}; };
typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
} SyncRaftCampaignType;
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft); int32_t syncRaftTick(SSyncRaft* pRaft);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId); void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId);
void syncRaftBecomePreCandidate(SSyncRaft* pRaft); void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void syncRaftBecomeCandidate(SSyncRaft* pRaft); void syncRaftBecomeCandidate(SSyncRaft* pRaft);
@ -120,6 +129,6 @@ void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
int syncRaftQuorum(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft);
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept); int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept);
#endif /* _TD_LIBS_SYNC_RAFT_H */ #endif /* _TD_LIBS_SYNC_RAFT_H */

View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#include "sync.h"
#include "sync_type.h"
int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id);
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_LOG_H
#define _TD_LIBS_SYNC_RAFT_LOG_H
#include "sync.h"
#include "sync_type.h"
struct SSyncRaftLog {
SyncIndex uncommittedConfigIndex;
SyncIndex commitIndex;
SyncIndex appliedIndex;
};
SSyncRaftLog* syncRaftLogOpen();
SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog);
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog);
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog);
bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog);
#endif /* _TD_LIBS_SYNC_RAFT_LOG_H */

View File

@ -35,8 +35,7 @@ typedef enum RaftMessageType {
RAFT_MSG_VOTE = 3, RAFT_MSG_VOTE = 3,
RAFT_MSG_VOTE_RESP = 4, RAFT_MSG_VOTE_RESP = 4,
RAFT_MSG_PRE_VOTE = 5,
RAFT_MSG_PRE_VOTE_RESP = 6,
} RaftMessageType; } RaftMessageType;
typedef struct RaftMsgInternal_Prop { typedef struct RaftMsgInternal_Prop {
@ -49,13 +48,21 @@ typedef struct RaftMsgInternal_Election {
} RaftMsgInternal_Election; } RaftMsgInternal_Election;
typedef struct RaftMsg_PreVoteResp { typedef struct RaftMsg_Vote {
SyncRaftCampaignType cType;
SyncIndex lastIndex;
SyncTerm lastTerm;
} RaftMsg_Vote;
typedef struct RaftMsg_VoteResp {
bool reject; bool reject;
} RaftMsg_PreVoteResp; SyncRaftCampaignType cType;
} RaftMsg_VoteResp;
typedef struct SSyncMessage { typedef struct SSyncMessage {
RaftMessageType msgType; RaftMessageType msgType;
SyncTerm term; SyncTerm term;
SyncGroupId groupId;
SyncNodeId from; SyncNodeId from;
SyncNodeId to; SyncNodeId to;
@ -64,7 +71,8 @@ typedef struct SSyncMessage {
RaftMsgInternal_Election election; RaftMsgInternal_Election election;
RaftMsg_PreVoteResp preVoteResp; RaftMsg_Vote vote;
RaftMsg_VoteResp voteResp;
}; };
} SSyncMessage; } SSyncMessage;
@ -95,14 +103,39 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
return pMsg; return pMsg;
} }
static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to,
SyncTerm term, SyncRaftCampaignType cType,
SyncIndex lastIndex, SyncTerm lastTerm) {
SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage));
if (pMsg == NULL) {
return NULL;
}
*pMsg = (SSyncMessage) {
.groupId = groupId,
.from = from,
.to = to,
.term = term,
.vote = (RaftMsg_Vote) {
.cType = cType,
.lastIndex = lastIndex,
.lastTerm = lastTerm,
},
};
return pMsg;
}
static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) {
return msgType == RAFT_MSG_INTERNAL_PROP || return msgType == RAFT_MSG_INTERNAL_PROP ||
msgType == RAFT_MSG_INTERNAL_ELECTION; msgType == RAFT_MSG_INTERNAL_ELECTION;
} }
static FORCE_INLINE RaftMessageType SyncRaftVoteRespMsgType(RaftMessageType msgType) { static FORCE_INLINE bool syncIsPreVoteRespMsg(SSyncMessage* pMsg) {
if (msgType == RAFT_MSG_VOTE) return RAFT_MSG_PRE_VOTE_RESP; return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
return RAFT_MSG_PRE_VOTE_RESP; }
static FORCE_INLINE bool syncIsPreVoteMsg(SSyncMessage* pMsg) {
return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION;
} }
void syncFreeMessage(const SSyncMessage* pMsg); void syncFreeMessage(const SSyncMessage* pMsg);

View File

@ -41,7 +41,7 @@ struct SSyncRaftUnstableLog {
/** /**
* return index of last in memory log, return 0 if log is empty * return index of last in memory log, return 0 if log is empty
**/ **/
SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog); //SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog);
#if 0 #if 0
void raftLogInit(RaftLog* pLog); void raftLogInit(RaftLog* pLog);

View File

@ -18,10 +18,10 @@
typedef int32_t SyncTime; typedef int32_t SyncTime;
typedef struct SSyncRaftUnstableLog SSyncRaftUnstableLog;
typedef struct SSyncRaft SSyncRaft; typedef struct SSyncRaft SSyncRaft;
typedef struct SSyncRaftLog SSyncRaftLog;
#ifndef MIN #ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y)) #define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif #endif
@ -30,4 +30,10 @@ typedef struct SSyncRaft SSyncRaft;
#define MAX(x, y) (((x) > (y)) ? (x) : (y)) #define MAX(x, y) (((x) > (y)) ? (x) : (y))
#endif #endif
typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0,
SYNC_RAFT_CAMPAIGN_ELECTION = 1,
SYNC_RAFT_CAMPAIGN_TRANSFER = 3,
} SyncRaftCampaignType;
#endif /* _TD_LIBS_SYNC_TYPE_H */ #endif /* _TD_LIBS_SYNC_TYPE_H */

View File

@ -14,6 +14,7 @@
*/ */
#include "raft.h" #include "raft.h"
#include "raft_configuration.h"
#include "syncInt.h" #include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100 #define RAFT_READ_LOG_MAX_NUM 100
@ -22,6 +23,7 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int convertClear(SSyncRaft* pRaft);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
@ -45,11 +47,18 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
memset(pRaft, 0, sizeof(SSyncRaft)); memset(pRaft, 0, sizeof(SSyncRaft));
memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo)); memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM));
stateManager = &(pRaft->info.stateManager); memcpy(&pRaft->logStore, &pInfo->logStore, sizeof(SSyncLogStore));
logStore = &(pRaft->info.logStore); memcpy(&pRaft->stateManager, &pInfo->stateManager, sizeof(SStateManager));
fsm = &(pRaft->info.fsm);
stateManager = &(pRaft->stateManager);
logStore = &(pRaft->logStore);
fsm = &(pRaft->fsm);
// open raft log
if ((pRaft->log = syncRaftLogOpen()) == NULL) {
return -1;
}
// read server state // read server state
if (stateManager->readServerState(stateManager, &serverState) != 0) { if (stateManager->readServerState(stateManager, &serverState) != 0) {
syncError("readServerState for vgid %d fail", pInfo->vgId); syncError("readServerState for vgid %d fail", pInfo->vgId);
@ -79,7 +88,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
syncInfo("restore vgid %d state: snapshot index success", pInfo->vgId); syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
return 0; return 0;
} }
@ -95,7 +105,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
RaftMessageType msgType = pMsg->msgType; RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_ELECTION) { if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
syncRaftHandleElectionMessage(pRaft, pMsg); syncRaftHandleElectionMessage(pRaft, pMsg);
} else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { } else if (msgType == RAFT_MSG_VOTE) {
} else { } else {
pRaft->stepFp(pRaft, pMsg); pRaft->stepFp(pRaft, pMsg);
@ -125,11 +135,13 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
**/ **/
pRaft->stepFp = stepCandidate; pRaft->stepFp = stepCandidate;
pRaft->tickFp = tickElection; pRaft->tickFp = tickElection;
pRaft->state = TAOS_SYNC_ROLE_PRE_CANDIDATE; pRaft->state = TAOS_SYNC_ROLE_CANDIDATE;
pRaft->candidateState.inPreVote = true;
syncInfo("[%d:%d] became pre-candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); syncInfo("[%d:%d] became pre-candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
} }
void syncRaftBecomeCandidate(SSyncRaft* pRaft) { void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
pRaft->candidateState.inPreVote = false;
pRaft->stepFp = stepCandidate; pRaft->stepFp = stepCandidate;
// become candidate make term+1 // become candidate make term+1
resetRaft(pRaft, pRaft->term + 1); resetRaft(pRaft, pRaft->term + 1);
@ -157,9 +169,7 @@ void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
} }
bool syncRaftIsPromotable(SSyncRaft* pRaft) { bool syncRaftIsPromotable(SSyncRaft* pRaft) {
return pRaft->info.syncCfg.selfIndex >= 0 && return pRaft->selfId != SYNC_NON_NODE_ID;
pRaft->info.syncCfg.selfIndex < pRaft->info.syncCfg.replica &&
pRaft->selfId != SYNC_NON_NODE_ID;
} }
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
@ -167,17 +177,29 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
} }
int syncRaftQuorum(SSyncRaft* pRaft) { int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->leaderState.nProgress / 2 + 1; return pRaft->cluster.replica / 2 + 1;
} }
int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept) { int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept) {
if (accept) { if (accept) {
syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
} else { } else {
syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
} }
int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
pRaft->candidateState.votes[voteIndex] = accept;
int granted = 0;
int i;
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (pRaft->candidateState.votes[i]) granted++;
}
return granted;
} }
/** /**
@ -201,13 +223,13 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
SyncNodeId leaderId = pMsg->from; SyncNodeId leaderId = pMsg->from;
RaftMessageType msgType = pMsg->msgType; RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { if (msgType == RAFT_MSG_VOTE) {
leaderId = SYNC_NON_NODE_ID; leaderId = SYNC_NON_NODE_ID;
} }
if (msgType == RAFT_MSG_PRE_VOTE) { if (syncIsPreVoteMsg(pMsg)) {
// Never change our term in response to a PreVote // Never change our term in response to a PreVote
} else if (msgType == RAFT_MSG_PRE_VOTE_RESP && !pMsg->preVoteResp.reject) { } else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.reject) {
/** /**
* We send pre-vote requests with a term in our future. If the * We send pre-vote requests with a term in our future. If the
* pre-vote is granted, we will increment our term when we get a * pre-vote is granted, we will increment our term when we get a
@ -216,8 +238,8 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
* term. * term.
**/ **/
} else { } else {
syncInfo("%d [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]", syncInfo("[%d:%d] [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]",
pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term); pRaft->selfGroupId, pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term);
syncRaftBecomeFollower(pRaft, pMsg->term, leaderId); syncRaftBecomeFollower(pRaft, pMsg->term, leaderId);
} }
@ -230,15 +252,23 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
return true; return true;
} }
static int convertClear(SSyncRaft* pRaft) {
}
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
return 0; return 0;
} }
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
memset(pRaft->candidateState.votes, 0, sizeof(bool) * TSDB_MAX_REPLICA);
return 0; return 0;
} }
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
convertClear(pRaft);
return 0; return 0;
} }

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_configuration.h"
#include "raft.h"
int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id) {
return (int)(id);
}
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) {
return pRaft->cluster.replica;
}

View File

@ -25,34 +25,41 @@ void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return; return;
} }
// TODO: is there pending uncommitted config? // if there is pending uncommitted config,cannot campaign
if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) {
syncWarn("[%d:%d] cannot campaign at term %" PRId64 " since there are still pending configuration changes to apply",
pRaft->selfGroupId, pRaft->selfId, pRaft->term);
return;
}
syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term);
if (pRaft->preVote) { if (pRaft->preVote) {
campaign(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION);
} else { } else {
campaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION);
} }
} }
static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) { static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) {
SyncTerm term; SyncTerm term;
bool preVote;
RaftMessageType voteMsgType; RaftMessageType voteMsgType;
if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) {
syncRaftBecomePreCandidate(pRaft); syncRaftBecomePreCandidate(pRaft);
voteMsgType = RAFT_MSG_PRE_VOTE; preVote = true;
// PreVote RPCs are sent for the next term before we've incremented r.Term. // PreVote RPCs are sent for the next term before we've incremented r.Term.
term = pRaft->term + 1; term = pRaft->term + 1;
} else { } else {
syncRaftBecomeCandidate(pRaft); syncRaftBecomeCandidate(pRaft);
voteMsgType = RAFT_MSG_VOTE; voteMsgType = RAFT_MSG_VOTE;
term = pRaft->term; term = pRaft->term;
preVote = false;
} }
int quorum = syncRaftQuorum(pRaft); int quorum = syncRaftQuorum(pRaft);
int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, SyncRaftVoteRespMsgType(voteMsgType), true); int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, preVote, true);
if (quorum <= granted) { if (quorum <= granted) {
/** /**
* We won the election after voting for ourselves (which must mean that * We won the election after voting for ourselves (which must mean that
@ -67,5 +74,25 @@ static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) {
} }
// broadcast vote message to other peers // broadcast vote message to other peers
int i;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
for (i = 0; i < pRaft->cluster.replica; ++i) {
if (i == pRaft->cluster.selfIndex) {
continue;
}
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, nodeId, term, cType, lastIndex, lastTerm);
if (pMsg == NULL) {
continue;
}
syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %d] sent %d request to %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, lastTerm,
lastIndex, voteMsgType, nodeId, pRaft->term);
pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i]));
}
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_log.h"
SSyncRaftLog* syncRaftLogOpen() {
return NULL;
}
SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog) {
return 0;
}
SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return 0;
}
int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) {
return 0;
}
bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog) {
return pLog->commitIndex > pLog->appliedIndex;
}

View File

@ -16,6 +16,8 @@
#include "sync.h" #include "sync.h"
#include "raft_unstable_log.h" #include "raft_unstable_log.h"
/*
SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog) { SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog) {
return 0; return 0;
} }
*/