From da106e29b2b0435d8b466f36901e610990bece99 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 4 Nov 2021 15:51:30 +0800 Subject: [PATCH] [TD-10645][raft]add raft vote message handle --- source/libs/sync/inc/raft.h | 2 + source/libs/sync/inc/raft_log.h | 2 + source/libs/sync/inc/raft_message.h | 36 ++++++++++-- source/libs/sync/src/raft.c | 4 +- source/libs/sync/src/raft_election.c | 3 +- .../libs/sync/src/raft_handle_vote_message.c | 57 +++++++++++++++++++ .../sync/src/raft_handle_vote_resp_message.c | 2 +- source/libs/sync/src/raft_log.c | 4 ++ 8 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 source/libs/sync/src/raft_handle_vote_message.c diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index cba9434414..2ce2dcb5de 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -38,6 +38,7 @@ typedef struct RaftCandidateState { } RaftCandidateState; typedef struct SSyncRaftIOMethods { + // send SSyncMessage to node int (*send)(const SSyncMessage* pMsg, const SNodeInfo* pNode); } SSyncRaftIOMethods; @@ -104,6 +105,7 @@ struct SSyncRaft { SSyncRaftIOMethods io; + // union different state data union { RaftLeaderState leaderState; RaftCandidateState candidateState; diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h index 7ffb946c82..3545bf7ba1 100644 --- a/source/libs/sync/inc/raft_log.h +++ b/source/libs/sync/inc/raft_log.h @@ -35,6 +35,8 @@ SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog); SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); +bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); + int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog); diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index da2e3bc52f..d51822f8b3 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -20,10 +20,13 @@ #include "sync_type.h" /** - * below define message type which handled by Raft node thread - * internal message, which communicate in threads, start with RAFT_MSG_INTERNAL_*, - * internal message use pointer only, need not to be decode/encode - * outter message start with RAFT_MSG_*, need to implement its decode/encode functions + * below define message type which handled by Raft. + * + * internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*. + * internal message use pointer only and stack memory, need not to be decode/encode and free. + * + * outter message start with RAFT_MSG_*, which communicate between cluster peers, + * need to implement its decode/encode functions. **/ typedef enum RaftMessageType { // client propose a cmd @@ -36,6 +39,7 @@ typedef enum RaftMessageType { RAFT_MSG_VOTE_RESP = 4, RAFT_MSG_APPEND = 5, + RAFT_MSG_APPEND_RESP = 6, } RaftMessageType; typedef struct RaftMsgInternal_Prop { @@ -55,7 +59,7 @@ typedef struct RaftMsg_Vote { } RaftMsg_Vote; typedef struct RaftMsg_VoteResp { - bool reject; + bool rejected; SyncRaftElectionType cType; } RaftMsg_VoteResp; @@ -115,6 +119,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId .from = from, .to = to, .term = term, + .msgType = RAFT_MSG_VOTE, .vote = (RaftMsg_Vote) { .cType = cType, .lastIndex = lastIndex, @@ -125,6 +130,26 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId return pMsg; } +static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, SyncNodeId to, + SyncRaftElectionType cType, bool rejected) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .to = to, + .msgType = RAFT_MSG_VOTE_RESP, + .voteResp = (RaftMsg_VoteResp) { + .cType = cType, + .rejected = rejected, + }, + }; + + return pMsg; +} + static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { return msgType == RAFT_MSG_INTERNAL_PROP || msgType == RAFT_MSG_INTERNAL_ELECTION; @@ -142,6 +167,7 @@ void syncFreeMessage(const SSyncMessage* pMsg); // message handlers int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 83ae76fa5e..6e8e359305 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -107,7 +107,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { if (msgType == RAFT_MSG_INTERNAL_ELECTION) { syncRaftHandleElectionMessage(pRaft, pMsg); } else if (msgType == RAFT_MSG_VOTE) { - + syncRaftHandleVoteMessage(pRaft, pMsg); } else { pRaft->stepFp(pRaft, pMsg); } @@ -245,7 +245,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) if (syncIsPreVoteMsg(pMsg)) { // Never change our term in response to a PreVote - } else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.reject) { + } else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.rejected) { /** * We send pre-vote requests with a term in our future. If the * pre-vote is granted, we will increment our term when we get a diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c index 7ebeb45254..bb4a7541c2 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/raft_election.c @@ -15,6 +15,7 @@ #include "syncInt.h" #include "raft.h" +#include "raft_log.h" #include "raft_message.h" void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { @@ -66,7 +67,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { continue; } - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %d] sent %d request to %d at term %" PRId64 "", + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, voteMsgType, nodeId, pRaft->term); diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c new file mode 100644 index 0000000000..a575c5df1a --- /dev/null +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_log.h" +#include "raft_message.h" + +static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + SSyncMessage* pRespMsg; + int voteIndex = syncRaftConfigurationIndexOfVoter(pRaft, pMsg->from); + if (voteIndex == -1) { + return 0; + } + bool grant; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); + + grant = canGrantVoteMessage(pRaft, pMsg); + pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->to, pMsg->vote.cType, !grant); + if (pRespMsg == NULL) { + return 0; + } + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d" \ + "[logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor, + grant ? "grant" : "reject", + pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); + + pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); + return 0; +} + +static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { + return false; + } + if (!syncRaftLogIsUptodate(pRaft, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { + return false; + } + + return true; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index e5d5d6cae7..a155f0fe63 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -35,7 +35,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { granted = syncRaftNumOfGranted(pRaft, pMsg->from, pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION, - !pMsg->voteResp.reject, &rejected); + !pMsg->voteResp.rejected, &rejected); quorum = syncRaftQuorum(pRaft); syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections", diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c index 46c4e4b304..f93595e9f3 100644 --- a/source/libs/sync/src/raft_log.c +++ b/source/libs/sync/src/raft_log.c @@ -27,6 +27,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { return 0; } +bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) { + return true; +} + int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) { return 0; }