diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 44ee6a3b69..cba9434414 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -31,7 +31,7 @@ typedef struct RaftLeaderState { typedef struct RaftCandidateState { /* votes results */ - bool votes[TSDB_MAX_REPLICA]; + SyncRaftVoteRespType votes[TSDB_MAX_REPLICA]; /* true if in pre-vote phase */ bool inPreVote; @@ -125,10 +125,15 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft); void syncRaftBecomeCandidate(SSyncRaft* pRaft); void syncRaftBecomeLeader(SSyncRaft* pRaft); +void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); + +void syncRaftTriggerReplicate(SSyncRaft* pRaft); + void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); int syncRaftQuorum(SSyncRaft* pRaft); -int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept); +int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, + bool preVote, bool accept, int* rejectNum); #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h index ed0cc33115..993f863f33 100644 --- a/source/libs/sync/inc/raft_configuration.h +++ b/source/libs/sync/inc/raft_configuration.h @@ -19,6 +19,7 @@ #include "sync.h" #include "sync_type.h" +// return -1 if cannot find this id int syncRaftConfigurationIndexOfVoter(SSyncRaft *pRaft, SyncNodeId id); int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index d4736d6169..da2e3bc52f 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -35,7 +35,7 @@ typedef enum RaftMessageType { RAFT_MSG_VOTE = 3, RAFT_MSG_VOTE_RESP = 4, - + RAFT_MSG_APPEND = 5, } RaftMessageType; typedef struct RaftMsgInternal_Prop { @@ -49,14 +49,14 @@ typedef struct RaftMsgInternal_Election { } RaftMsgInternal_Election; typedef struct RaftMsg_Vote { - SyncRaftCampaignType cType; + SyncRaftElectionType cType; SyncIndex lastIndex; SyncTerm lastTerm; } RaftMsg_Vote; typedef struct RaftMsg_VoteResp { bool reject; - SyncRaftCampaignType cType; + SyncRaftElectionType cType; } RaftMsg_VoteResp; typedef struct SSyncMessage { @@ -104,7 +104,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo } static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to, - SyncTerm term, SyncRaftCampaignType cType, + SyncTerm term, SyncRaftElectionType cType, SyncIndex lastIndex, SyncTerm lastTerm) { SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); if (pMsg == NULL) { @@ -134,13 +134,14 @@ static FORCE_INLINE bool syncIsPreVoteRespMsg(SSyncMessage* pMsg) { return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION; } -static FORCE_INLINE bool syncIsPreVoteMsg(SSyncMessage* pMsg) { +static FORCE_INLINE bool syncIsPreVoteMsg(const SSyncMessage* pMsg) { return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION; } void syncFreeMessage(const SSyncMessage* pMsg); // message handlers -void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index 4343e607cb..f9632f6ae8 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -34,6 +34,12 @@ typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, SYNC_RAFT_CAMPAIGN_TRANSFER = 3, -} SyncRaftCampaignType; +} SyncRaftElectionType; + +typedef enum { + SYNC_RAFT_VOTE_RESP_UNKNOWN = 0, + SYNC_RAFT_VOTE_RESP_GRANT = 1, + SYNC_RAFT_VOTE_RESP_REJECT = 2, +} SyncRaftVoteRespType; #endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index a6e013758e..83ae76fa5e 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -15,6 +15,7 @@ #include "raft.h" #include "raft_configuration.h" +#include "raft_log.h" #include "syncInt.h" #define RAFT_READ_LOG_MAX_NUM 100 @@ -120,14 +121,19 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { } void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) { + convertClear(pRaft); + pRaft->stepFp = stepFollower; resetRaft(pRaft, term); pRaft->tickFp = tickElection; pRaft->leaderId = leaderId; pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; + syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); } void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + memset(pRaft->candidateState.votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); /** * Becoming a pre-candidate changes our step functions and state, * but doesn't change anything else. In particular it does not increase @@ -137,10 +143,13 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { pRaft->tickFp = tickElection; pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; pRaft->candidateState.inPreVote = true; - syncInfo("[%d:%d] became pre-candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); } void syncRaftBecomeCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + memset(pRaft->candidateState.votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteRespType) * TSDB_MAX_REPLICA); + pRaft->candidateState.inPreVote = false; pRaft->stepFp = stepCandidate; // become candidate make term+1 @@ -148,7 +157,7 @@ void syncRaftBecomeCandidate(SSyncRaft* pRaft) { pRaft->tickFp = tickElection; pRaft->voteFor = pRaft->selfId; pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; - syncInfo("[%d:%d] became candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); } void syncRaftBecomeLeader(SSyncRaft* pRaft) { @@ -160,7 +169,11 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) { pRaft->state = TAOS_SYNC_ROLE_LEADER; // TODO: check if there is pending config log - syncInfo("[%d:%d] became leader at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftTriggerReplicate(SSyncRaft* pRaft) { + } void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { @@ -180,7 +193,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) { return pRaft->cluster.replica / 2 + 1; } -int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept) { +int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool accept, int* rejectNum) { if (accept) { syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); @@ -188,17 +201,20 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); } - + int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, id); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); + assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); - pRaft->candidateState.votes[voteIndex] = accept; - int granted = 0; + pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + int granted = 0, rejected = 0; int i; for (i = 0; i < pRaft->cluster.replica; ++i) { - if (pRaft->candidateState.votes[i]) granted++; + if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++; + else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++; } + if (rejectNum) *rejectNum = rejected; return granted; } @@ -262,8 +278,20 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { } static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - convertClear(pRaft); - memset(pRaft->candidateState.votes, 0, sizeof(bool) * TSDB_MAX_REPLICA); + /** + * Only handle vote responses corresponding to our candidacy (while in + * StateCandidate, we may get stale MsgPreVoteResp messages in this term from + * our pre-candidate state). + **/ + RaftMessageType msgType = pMsg->msgType; + + if (msgType == RAFT_MSG_INTERNAL_PROP) { + return 0; + } + + if (msgType == RAFT_MSG_VOTE_RESP) { + return 0; + } return 0; } diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c new file mode 100644 index 0000000000..7ebeb45254 --- /dev/null +++ b/source/libs/sync/src/raft_election.c @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_message.h" + +void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { + SyncTerm term; + bool preVote; + RaftMessageType voteMsgType; + + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftBecomePreCandidate(pRaft); + preVote = true; + // PreVote RPCs are sent for the next term before we've incremented r.Term. + term = pRaft->term + 1; + } else { + syncRaftBecomeCandidate(pRaft); + voteMsgType = RAFT_MSG_VOTE; + term = pRaft->term; + preVote = false; + } + + int quorum = syncRaftQuorum(pRaft); + int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, preVote, true, NULL); + if (quorum <= granted) { + /** + * We won the election after voting for ourselves (which must mean that + * this is a single-node cluster). Advance to the next state. + **/ + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } else { + syncRaftBecomeLeader(pRaft); + } + return; + } + + // broadcast vote message to other peers + int i; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (i == pRaft->cluster.selfIndex) { + continue; + } + + SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + + SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, + nodeId, term, cType, lastIndex, lastTerm); + if (pMsg == NULL) { + continue; + } + + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %d] sent %d request to %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, lastTerm, + lastIndex, voteMsgType, nodeId, pRaft->term); + + pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + } +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c index 0d2004dec2..19471846ba 100644 --- a/source/libs/sync/src/raft_handle_election_message.c +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -15,84 +15,29 @@ #include "syncInt.h" #include "raft.h" +#include "raft_log.h" #include "raft_message.h" -static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType); - -void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { +int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (pRaft->state == TAOS_SYNC_ROLE_LEADER) { syncDebug("%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfId); - return; + return 0; } - // if there is pending uncommitted config,cannot campaign + // if there is pending uncommitted config,cannot start election if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { - syncWarn("[%d:%d] cannot campaign at term %" PRId64 " since there are still pending configuration changes to apply", + syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - return; + return 0; } syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); if (pRaft->preVote) { - campaign(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); } else { - campaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } + + return 0; } - -static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) { - SyncTerm term; - bool preVote; - RaftMessageType voteMsgType; - - if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { - syncRaftBecomePreCandidate(pRaft); - preVote = true; - // PreVote RPCs are sent for the next term before we've incremented r.Term. - term = pRaft->term + 1; - } else { - syncRaftBecomeCandidate(pRaft); - voteMsgType = RAFT_MSG_VOTE; - term = pRaft->term; - preVote = false; - } - - int quorum = syncRaftQuorum(pRaft); - int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, preVote, true); - if (quorum <= granted) { - /** - * We won the election after voting for ourselves (which must mean that - * this is a single-node cluster). Advance to the next state. - **/ - if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { - campaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); - } else { - syncRaftBecomeLeader(pRaft); - } - return; - } - - // broadcast vote message to other peers - int i; - SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); - SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (i == pRaft->cluster.selfIndex) { - continue; - } - - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; - - SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, nodeId, term, cType, lastIndex, lastTerm); - if (pMsg == NULL) { - continue; - } - - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %d] sent %d request to %d at term %" PRId64 "", - pRaft->selfGroupId, pRaft->selfId, lastTerm, - lastIndex, voteMsgType, nodeId, pRaft->term); - - pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); - } -} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c new file mode 100644 index 0000000000..e5d5d6cae7 --- /dev/null +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_message.h" + +int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + int granted, rejected; + int quorum; + int voterIndex; + + voterIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from); + if (voterIndex == -1) { + syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); + return 0; + } + + if (pRaft->state != TAOS_SYNC_ROLE_CANDIDATE) { + syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + + granted = syncRaftNumOfGranted(pRaft, pMsg->from, + pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION, + !pMsg->voteResp.reject, &rejected); + quorum = syncRaftQuorum(pRaft); + + syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections", + pRaft->selfGroupId, pRaft->selfId, quorum, granted, rejected); + + if (granted >= quorum) { + if (pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } else { + syncRaftBecomeLeader(pRaft); + syncRaftTriggerReplicate(pRaft); + } + + return 0; + } else if (rejected == quorum) { + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + } + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/raft_progress.c index ba09973f48..458f829394 100644 --- a/source/libs/sync/src/raft_progress.c +++ b/source/libs/sync/src/raft_progress.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_unstable_log.h" +#include "raft_log.h" #include "raft_progress.h" #include "sync.h" #include "syncInt.h"