From 349a6a47711dbd26dcc3d97df411b2aa2a74b185 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 3 Nov 2021 11:47:44 +0800 Subject: [PATCH] [TD-10645][raft]add raft message handle --- include/libs/sync/sync.h | 4 +- source/libs/sync/inc/raft.h | 66 +++++++++-- source/libs/sync/inc/raft_message.h | 56 ++++++--- source/libs/sync/inc/raft_progress.h | 1 + source/libs/sync/src/raft.c | 168 +++++++++++++++++++++++++-- source/libs/sync/src/raft_message.c | 4 +- source/libs/sync/src/raft_progress.c | 15 +-- source/libs/sync/src/sync.c | 2 +- 8 files changed, 271 insertions(+), 45 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ef8773f5cc..ced9cc72fc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -133,9 +133,9 @@ typedef struct SStateManager { int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); - // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); + void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - // const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); + const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 869baecdda..0e2d1769b3 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -20,6 +20,8 @@ #include "sync_type.h" #include "raft_message.h" +#define SYNC_NON_NODE_ID -1 + typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct RaftLeaderState { @@ -28,38 +30,84 @@ typedef struct RaftLeaderState { } RaftLeaderState; typedef struct SSyncRaftIOMethods { - SyncTime (*time)(SSyncRaft*); + } SSyncRaftIOMethods; +typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg); +typedef void (*SyncRaftTickFp)(SSyncRaft* pRaft); + struct SSyncRaft { // owner sync node SSyncNode* pNode; SSyncInfo info; + SSyncTerm term; + SyncNodeId voteFor; + + SyncNodeId selfId; + + /** + * the leader id + **/ + SyncNodeId leaderId; + + /** + * leadTransferee is id of the leader transfer target when its value is not zero. + * Follow the procedure defined in raft thesis 3.10. + **/ + SyncNodeId leadTransferee; + + /** + * New configuration is ignored if there exists unapplied configuration. + **/ + bool pendingConf; + + ESyncRole state; + + /** + * number of ticks since it reached last electionTimeout when it is leader + * or candidate. + * number of ticks since it reached last electionTimeout or received a + * valid message from current leader when it is a follower. + **/ + uint16_t electionElapsed; + + /** + * number of ticks since it reached last heartbeatTimeout. + * only leader keeps heartbeatElapsed. + **/ + uint16_t heartbeatElapsed; + // election timeout tick(random in [3:6] tick) - uint16_t electionTick; + uint16_t electionTimeoutTick; // heartbeat timeout tick(default: 1 tick) - uint16_t heartbeatTick; - - int installSnapShotTimeoutMS; - - // - int heartbeatTimeoutMS; + uint16_t heartbeatTimeoutTick; bool preVote; + bool checkQuorum; SSyncRaftIOMethods io; RaftLeaderState leaderState; SSyncRaftUnstableLog *log; + + SyncRaftStepFp stepFp; + + SyncRaftTickFp tickFp; }; int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); -int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg); +int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); int32_t syncRaftTick(SSyncRaft* pRaft); + +void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId); +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); +bool syncRaftIsPromotable(SSyncRaft* pRaft); +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); + #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h index faf14840c9..9e690855c7 100644 --- a/source/libs/sync/inc/raft_message.h +++ b/source/libs/sync/inc/raft_message.h @@ -28,15 +28,14 @@ typedef enum RaftMessageType { // client propose a cmd RAFT_MSG_INTERNAL_PROP = 1, - RAFT_MSG_APPEND, - RAFT_MSG_APPEND_RESP, + // node election timeout + RAFT_MSG_INTERNAL_ELECTION = 2, - RAFT_MSG_VOTE, - RAFT_MSG_VOTE_RESP, - - RAFT_MSG_PRE_VOTE, - RAFT_MSG_PRE_VOTE_RESP, + RAFT_MSG_VOTE = 3, + RAFT_MSG_VOTE_RESP = 4, + RAFT_MSG_PRE_VOTE = 5, + RAFT_MSG_PRE_VOTE_RESP = 6, } RaftMessageType; typedef struct RaftMsgInternal_Prop { @@ -45,7 +44,15 @@ typedef struct RaftMsgInternal_Prop { void* pData; } RaftMsgInternal_Prop; -typedef struct RaftMessage { +typedef struct RaftMsgInternal_Election { + +} RaftMsgInternal_Election; + +typedef struct RaftMsg_PreVoteResp { + bool reject; +} RaftMsg_PreVoteResp; + +typedef struct SSyncMessage { RaftMessageType msgType; SSyncTerm term; SyncNodeId from; @@ -53,12 +60,17 @@ typedef struct RaftMessage { union { RaftMsgInternal_Prop propose; - }; -} RaftMessage; -static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) { - *pMsg = (RaftMessage) { + RaftMsgInternal_Election election; + + RaftMsg_PreVoteResp preVoteResp; + }; +} SSyncMessage; + +static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) { + *pMsg = (SSyncMessage) { .msgType = RAFT_MSG_INTERNAL_PROP, + .term = 0, .propose = (RaftMsgInternal_Prop) { .isWeak = isWeak, .pBuf = pBuf, @@ -69,10 +81,24 @@ static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncB return pMsg; } -static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) { - return pMsg->msgType == RAFT_MSG_INTERNAL_PROP; +static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) { + *pMsg = (SSyncMessage) { + .msgType = RAFT_MSG_INTERNAL_ELECTION, + .term = 0, + .from = from, + .election = (RaftMsgInternal_Election) { + + }, + }; + + return pMsg; } -void syncFreeMessage(const RaftMessage* pMsg); +static FORCE_INLINE bool syncIsInternalMsg(const SSyncMessage* pMsg) { + return pMsg->msgType == RAFT_MSG_INTERNAL_PROP || + pMsg->msgType == RAFT_MSG_INTERNAL_ELECTION; +} + +void syncFreeMessage(const SSyncMessage* pMsg); #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_progress.h b/source/libs/sync/inc/raft_progress.h index 73aa9db59f..159a80fa0e 100644 --- a/source/libs/sync/inc/raft_progress.h +++ b/source/libs/sync/inc/raft_progress.h @@ -148,6 +148,7 @@ void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i); void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex); +/* inflights APIs */ int syncRaftInflightReset(SSyncRaftInflights* inflights); bool syncRaftInflightFull(SSyncRaftInflights* inflights); void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 42b220e642..09f29cbd28 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -18,7 +18,20 @@ #define RAFT_READ_LOG_MAX_NUM 100 -static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term); +static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +static void tickElection(SSyncRaft* pRaft); +static void tickHeartbeat(SSyncRaft* pRaft); + +static void abortLeaderTransfer(SSyncRaft* pRaft); + +static void resetRaft(SSyncRaft* pRaft, SSyncTerm term); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; @@ -30,6 +43,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; int nBuf, limit, i; + memset(pRaft, 0, sizeof(SSyncRaft)); + memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo)); stateManager = &(pRaft->info.stateManager); logStore = &(pRaft->info.logStore); @@ -60,15 +75,30 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } assert(initIndex == serverState.commitIndex); - pRaft->heartbeatTick = 1; + pRaft->heartbeatTimeoutTick = 1; - syncRaftBecomeFollower(pRaft, 1); + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncInfo("restore vgid %d state: snapshot index success", pInfo->vgId); return 0; } -int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) { +int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + syncDebug("from "); + if (preHandleMessage(pRaft, pMsg)) { + syncFreeMessage(pMsg); + return 0; + } + + RaftMessageType msgType = pMsg->msgType; + if (msgType == RAFT_MSG_INTERNAL_ELECTION) { + + } else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { + + } else { + pRaft->stepFp(pRaft, pMsg); + } + syncFreeMessage(pMsg); return 0; } @@ -77,7 +107,131 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; } -static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term) { - pRaft->electionTick = taosRand() % 3 + 3; - return; +void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId) { + pRaft->stepFp = stepFollower; + resetRaft(pRaft, term); + pRaft->tickFp = tickElection; + pRaft->leaderId = leaderId; + pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; +} + +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { + // electionTimeoutTick in [3,6] tick + pRaft->electionTimeoutTick = taosRand() % 4 + 3; +} + +bool syncRaftIsPromotable(SSyncRaft* pRaft) { + return pRaft->info.syncCfg.selfIndex >= 0 && + pRaft->info.syncCfg.selfIndex < pRaft->info.syncCfg.replica && + pRaft->selfId != SYNC_NON_NODE_ID; +} + +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { + return pRaft->electionElapsed >= pRaft->electionTimeoutTick; +} + +/** + * pre-handle message, return true is no need to continue + * Handle the message term, which may result in our stepping down to a follower. + **/ +static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + // local message? + if (pMsg->term == 0) { + return false; + } + + if (pMsg->term > pRaft->term) { + return preHandleNewTermMessage(pRaft, pMsg); + } + + return preHandleOldTermMessage(pRaft, pMsg);; +} + +static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + SyncNodeId leaderId = pMsg->from; + RaftMessageType msgType = pMsg->msgType; + + if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) { + leaderId = SYNC_NON_NODE_ID; + } + + if (msgType == RAFT_MSG_PRE_VOTE) { + // Never change our term in response to a PreVote + } else if (msgType == RAFT_MSG_PRE_VOTE_RESP && !pMsg->preVoteResp.reject) { + /** + * We send pre-vote requests with a term in our future. If the + * pre-vote is granted, we will increment our term when we get a + * quorum. If it is not, the term comes from the node that + * rejected our vote so we should become a follower at the new + * term. + **/ + } else { + syncRaftBecomeFollower(pRaft, pMsg->term, leaderId); + } + + return false; +} + +static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + + // if receive old term message, no need to continue + return true; +} + +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + return 0; +} + +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + return 0; +} + +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + return 0; +} + +/** + * tickElection is run by followers and candidates per tick. + **/ +static void tickElection(SSyncRaft* pRaft) { + pRaft->electionElapsed += 1; + + if (!syncRaftIsPromotable(pRaft)) { + return; + } + + if (!syncRaftIsPastElectionTimeout(pRaft)) { + return; + } + + // election timeout + pRaft->electionElapsed = 0; + SSyncMessage msg; + syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); +} + +static void tickHeartbeat(SSyncRaft* pRaft) { + +} + +static void abortLeaderTransfer(SSyncRaft* pRaft) { + pRaft->leadTransferee = SYNC_NON_NODE_ID; +} + +static void resetRaft(SSyncRaft* pRaft, SSyncTerm term) { + if (pRaft->term != term) { + pRaft->term = term; + pRaft->voteFor = SYNC_NON_NODE_ID; + } + + pRaft->leaderId = SYNC_NON_NODE_ID; + + pRaft->electionElapsed = 0; + pRaft->heartbeatElapsed = 0; + + syncRaftRandomizedElectionTimeout(pRaft); + + abortLeaderTransfer(pRaft); + + pRaft->pendingConf = false; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_message.c b/source/libs/sync/src/raft_message.c index 912314daf2..d17a5b732b 100644 --- a/source/libs/sync/src/raft_message.c +++ b/source/libs/sync/src/raft_message.c @@ -15,8 +15,8 @@ #include "raft_message.h" -void syncFreeMessage(const RaftMessage* pMsg) { +void syncFreeMessage(const SSyncMessage* pMsg) { if (!syncIsInternalMsg(pMsg)) { - free((RaftMessage*)pMsg); + free((SSyncMessage*)pMsg); } } \ No newline at end of file diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/raft_progress.c index 0f51d20531..ba09973f48 100644 --- a/source/libs/sync/src/raft_progress.c +++ b/source/libs/sync/src/raft_progress.c @@ -177,14 +177,6 @@ void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotI progress->pendingSnapshotIndex = snapshotIndex; } -static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { - progress->paused = false; - progress->pendingSnapshotIndex = 0; - progress->state = state; - syncRaftInflightReset(&(progress->inflights)); -} - - int syncRaftInflightReset(SSyncRaftInflights* inflights) { inflights->count = 0; inflights->start = 0; @@ -240,7 +232,12 @@ void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]); } - +static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { + progress->paused = false; + progress->pendingSnapshotIndex = 0; + progress->state = state; + syncRaftInflightReset(&(progress->inflights)); +} diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index e3d0606c08..9077be3f2d 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -157,7 +157,7 @@ void syncStop(const SSyncNode* pNode) { } int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) { - RaftMessage msg; + SSyncMessage msg; pthread_mutex_lock(&syncNode->mutex); int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak));