From aab981f667f930117088f1e17c65d22d1488ed32 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 3 Nov 2021 15:30:54 +0800 Subject: [PATCH] [TD-10645][raft]add raft election message handle --- include/libs/sync/sync.h | 5 +- source/libs/sync/inc/raft.h | 16 ++++- source/libs/sync/inc/raft_message.h | 17 +++-- source/libs/sync/inc/raft_unstable_log.h | 6 +- source/libs/sync/src/raft.c | 62 ++++++++++++++-- .../sync/src/raft_handle_election_message.c | 71 +++++++++++++++++++ source/libs/sync/src/raft_message.c | 2 +- 7 files changed, 162 insertions(+), 17 deletions(-) create mode 100644 source/libs/sync/src/raft_handle_election_message.c diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ced9cc72fc..b938bbba77 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,12 +26,13 @@ extern "C" { typedef int32_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; -typedef uint64_t SSyncTerm; +typedef uint64_t SyncTerm; typedef enum { TAOS_SYNC_ROLE_FOLLOWER = 0, TAOS_SYNC_ROLE_CANDIDATE = 1, TAOS_SYNC_ROLE_LEADER = 2, + TAOS_SYNC_ROLE_PRE_CANDIDATE = 3, } ESyncRole; typedef struct { @@ -111,7 +112,7 @@ typedef struct SSyncLogStore { typedef struct SSyncServerState { SyncNodeId voteFor; - SSyncTerm term; + SyncTerm term; SyncIndex commitIndex; } SSyncServerState; diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 0e2d1769b3..702fcd00cf 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -43,10 +43,11 @@ struct SSyncRaft { SSyncInfo info; - SSyncTerm term; + SyncTerm term; SyncNodeId voteFor; SyncNodeId selfId; + SyncGroupId selfGroupId; /** * the leader id @@ -100,14 +101,25 @@ struct SSyncRaft { SyncRaftTickFp tickFp; }; +typedef enum { + SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, + SYNC_RAFT_CAMPAIGN_ELECTION = 1, +} SyncRaftCampaignType; + int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftTick(SSyncRaft* pRaft); -void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId); +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId); +void syncRaftBecomePreCandidate(SSyncRaft* pRaft); +void syncRaftBecomeCandidate(SSyncRaft* pRaft); +void syncRaftBecomeLeader(SSyncRaft* pRaft); + void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); bool syncRaftIsPromotable(SSyncRaft* pRaft); bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); +int syncRaftQuorum(SSyncRaft* pRaft); +int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept); #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index 9e690855c7..71fe37bebd 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -17,6 +17,7 @@ #define _TD_LIBS_SYNC_RAFT_MESSAGE_H #include "sync.h" +#include "sync_type.h" /** * below define message type which handled by Raft node thread @@ -54,7 +55,7 @@ typedef struct RaftMsg_PreVoteResp { typedef struct SSyncMessage { RaftMessageType msgType; - SSyncTerm term; + SyncTerm term; SyncNodeId from; SyncNodeId to; @@ -94,11 +95,19 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo return pMsg; } -static FORCE_INLINE bool syncIsInternalMsg(const SSyncMessage* pMsg) { - return pMsg->msgType == RAFT_MSG_INTERNAL_PROP || - pMsg->msgType == RAFT_MSG_INTERNAL_ELECTION; +static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { + return msgType == RAFT_MSG_INTERNAL_PROP || + msgType == RAFT_MSG_INTERNAL_ELECTION; +} + +static FORCE_INLINE RaftMessageType SyncRaftVoteRespMsgType(RaftMessageType msgType) { + if (msgType == RAFT_MSG_VOTE) return RAFT_MSG_PRE_VOTE_RESP; + return RAFT_MSG_PRE_VOTE_RESP; } void syncFreeMessage(const SSyncMessage* pMsg); +// message handlers +void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_unstable_log.h b/source/libs/sync/inc/raft_unstable_log.h index 2b7b30c15a..0c9957cb90 100644 --- a/source/libs/sync/inc/raft_unstable_log.h +++ b/source/libs/sync/inc/raft_unstable_log.h @@ -67,13 +67,13 @@ int raftLogNumEntries(const RaftLog* pLog); /** * return last term of in memory log, return 0 if log is empty **/ -SSyncTerm raftLogLastTerm(RaftLog* pLog); +SyncTerm raftLogLastTerm(RaftLog* pLog); /** * return term of log with the given index, return 0 if the term of index cannot be found * , errCode will save the error code. **/ -SSyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode); +SyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode); /** * Get the last index of the most recent snapshot. Return 0 if there are no * @@ -83,7 +83,7 @@ SyncIndex raftLogSnapshotIndex(RaftLog* pLog); /* Append a new entry to the log. */ int raftLogAppend(RaftLog* pLog, - SSyncTerm term, + SyncTerm term, const SSyncBuffer *buf); /** diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 09f29cbd28..87750eca9e 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -31,7 +31,7 @@ static void tickHeartbeat(SSyncRaft* pRaft); static void abortLeaderTransfer(SSyncRaft* pRaft); -static void resetRaft(SSyncRaft* pRaft, SSyncTerm term); +static void resetRaft(SSyncRaft* pRaft, SyncTerm term); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; @@ -84,7 +84,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - syncDebug("from "); + syncDebug("from %d, to %d, type:%d, term:%" PRId64 ", state:%d", + pMsg->from, pMsg->to, pMsg->msgType, pMsg->term, pRaft->state); + if (preHandleMessage(pRaft, pMsg)) { syncFreeMessage(pMsg); return 0; @@ -92,7 +94,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { RaftMessageType msgType = pMsg->msgType; if (msgType == RAFT_MSG_INTERNAL_ELECTION) { - + syncRaftHandleElectionMessage(pRaft, pMsg); } else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { } else { @@ -107,7 +109,7 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } -void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId) { +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) { pRaft->stepFp = stepFollower; resetRaft(pRaft, term); pRaft->tickFp = tickElection; @@ -115,6 +117,40 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderI pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; } +void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { + /** + * Becoming a pre-candidate changes our step functions and state, + * but doesn't change anything else. In particular it does not increase + * r.Term or change r.Vote. + **/ + pRaft->stepFp = stepCandidate; + pRaft->tickFp = tickElection; + pRaft->state = TAOS_SYNC_ROLE_PRE_CANDIDATE; + syncInfo("[%d:%d] became pre-candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeCandidate(SSyncRaft* pRaft) { + pRaft->stepFp = stepCandidate; + // become candidate make term+1 + resetRaft(pRaft, pRaft->term + 1); + pRaft->tickFp = tickElection; + pRaft->voteFor = pRaft->selfId; + pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; + syncInfo("[%d:%d] became candidate at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeLeader(SSyncRaft* pRaft) { + assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER); + + pRaft->stepFp = stepLeader; + resetRaft(pRaft, pRaft->term); + pRaft->leaderId = pRaft->leaderId; + pRaft->state = TAOS_SYNC_ROLE_LEADER; + // TODO: check if there is pending config log + + syncInfo("[%d:%d] became leader at term %d" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { // electionTimeoutTick in [3,6] tick pRaft->electionTimeoutTick = taosRand() % 4 + 3; @@ -130,6 +166,20 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { return pRaft->electionElapsed >= pRaft->electionTimeoutTick; } +int syncRaftQuorum(SSyncRaft* pRaft) { + return pRaft->leaderState.nProgress / 2 + 1; +} + +int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, RaftMessageType msgType, bool accept) { + if (accept) { + + } else { + + } + + +} + /** * pre-handle message, return true is no need to continue * Handle the message term, which may result in our stepping down to a follower. @@ -166,6 +216,8 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) * term. **/ } else { + syncInfo("%d [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]", + pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term); syncRaftBecomeFollower(pRaft, pMsg->term, leaderId); } @@ -218,7 +270,7 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) { pRaft->leadTransferee = SYNC_NON_NODE_ID; } -static void resetRaft(SSyncRaft* pRaft, SSyncTerm term) { +static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { if (pRaft->term != term) { pRaft->term = term; pRaft->voteFor = SYNC_NON_NODE_ID; diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c new file mode 100644 index 0000000000..2586cd918d --- /dev/null +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_message.h" + +static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType); + +void syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + if (pRaft->state == TAOS_SYNC_ROLE_LEADER) { + syncDebug("%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfId); + return; + } + + // TODO: is there pending uncommitted config? + + syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + if (pRaft->preVote) { + + } else { + + } +} + +static void campaign(SSyncRaft* pRaft, SyncRaftCampaignType cType) { + SyncTerm term; + RaftMessageType voteMsgType; + + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftBecomePreCandidate(pRaft); + voteMsgType = RAFT_MSG_PRE_VOTE; + // PreVote RPCs are sent for the next term before we've incremented r.Term. + term = pRaft->term + 1; + } else { + syncRaftBecomeCandidate(pRaft); + voteMsgType = RAFT_MSG_VOTE; + term = pRaft->term; + } + + int quorum = syncRaftQuorum(pRaft); + int granted = syncRaftNumOfGranted(pRaft, pRaft->selfId, SyncRaftVoteRespMsgType(voteMsgType), true); + if (quorum <= granted) { + /** + * We won the election after voting for ourselves (which must mean that + * this is a single-node cluster). Advance to the next state. + **/ + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + campaign(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } else { + syncRaftBecomeLeader(pRaft); + } + return; + } + + // broadcast vote message to other peers + +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_message.c b/source/libs/sync/src/raft_message.c index d17a5b732b..e706127f29 100644 --- a/source/libs/sync/src/raft_message.c +++ b/source/libs/sync/src/raft_message.c @@ -16,7 +16,7 @@ #include "raft_message.h" void syncFreeMessage(const SSyncMessage* pMsg) { - if (!syncIsInternalMsg(pMsg)) { + if (!syncIsInternalMsg(pMsg->msgType)) { free((SSyncMessage*)pMsg); } } \ No newline at end of file