diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9ffd74c229..726fbc0621 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,7 +26,7 @@ extern "C" { typedef int32_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; -typedef uint64_t SSyncTerm; +typedef uint64_t SyncTerm; typedef enum { TAOS_SYNC_ROLE_FOLLOWER = 0, @@ -61,13 +61,13 @@ typedef struct { typedef struct SSyncFSM { void* pData; - // apply committed log, bufs will be free by raft module + // apply committed log, bufs will be free by sync module int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); // cluster commit callback int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); - // fsm return snapshot in ppBuf, bufs will be free by raft module + // fsm return snapshot in ppBuf, bufs will be free by sync module // TODO: getSnapshot SHOULD be async? int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); @@ -89,19 +89,30 @@ typedef struct SSyncLogStore { // write log with given index int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); + /** + * read log from given index(included) with limit, return the actual num in nBuf, + * pBuf will be free in sync module + **/ + int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit, + SSyncBuffer* pBuf, int* nBuf); + // mark log with given index has been commtted int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); - // prune log before given index + // prune log before given index(not included) int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); - // rollback log after given index + // rollback log after given index(included) int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); + + // return last index of log + SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore); } SSyncLogStore; typedef struct SSyncServerState { SyncNodeId voteFor; - SSyncTerm term; + SyncTerm term; + SyncIndex commitIndex; } SSyncServerState; typedef struct SSyncClusterConfig { @@ -122,9 +133,9 @@ typedef struct SStateManager { int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); - // void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); + void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); - // const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); + const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); } SStateManager; typedef struct { @@ -146,13 +157,13 @@ SSyncNode* syncStart(const SSyncInfo*); void syncReconfig(const SSyncNode*, const SSyncCluster*); void syncStop(const SSyncNode*); -int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak); // int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); // int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode); -extern int32_t syncDebugFlag; +extern int32_t sDebugFlag; #ifdef __cplusplus } diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 124f4a1fee..37ee5194c8 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -4,6 +4,7 @@ add_library(sync ${SYNC_SRC}) target_link_libraries( sync PUBLIC common + PUBLIC transport PUBLIC util PUBLIC wal ) diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h new file mode 100644 index 0000000000..14f587d58e --- /dev/null +++ b/source/libs/sync/inc/raft.h @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_H +#define _TD_LIBS_SYNC_RAFT_H + +#include "sync.h" +#include "sync_type.h" +#include "raft_message.h" +#include "sync_raft_impl.h" +#include "sync_raft_quorum.h" + +typedef struct RaftLeaderState { + +} RaftLeaderState; + +typedef struct RaftCandidateState { + /* true if in pre-vote phase */ + bool inPreVote; +} RaftCandidateState; + +typedef struct SSyncRaftIOMethods { + // send SSyncMessage to node + int (*send)(const SSyncMessage* pMsg, const SNodeInfo* pNode); +} SSyncRaftIOMethods; + +typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg); +typedef void (*SyncRaftTickFp)(SSyncRaft* pRaft); + +struct SSyncRaft { + // owner sync node + SSyncNode* pNode; + + SSyncCluster cluster; + + int selfIndex; + SyncNodeId selfId; + SyncGroupId selfGroupId; + + SSyncRaftIOMethods io; + + SSyncFSM fsm; + SSyncLogStore logStore; + SStateManager stateManager; + + union { + RaftLeaderState leaderState; + RaftCandidateState candidateState; + }; + + SyncTerm term; + SyncNodeId voteFor; + + SSyncRaftLog *log; + + int maxMsgSize; + SSyncRaftProgressTracker *tracker; + + ESyncRole state; + + // isLearner is true if the local raft node is a learner. + bool isLearner; + + /** + * the leader id + **/ + SyncNodeId leaderId; + + /** + * leadTransferee is id of the leader transfer target when its value is not zero. + * Follow the procedure defined in raft thesis 3.10. + **/ + SyncNodeId leadTransferee; + + /** + * Only one conf change may be pending (in the log, but not yet + * applied) at a time. This is enforced via pendingConfIndex, which + * is set to a value >= the log index of the latest pending + * configuration change (if any). Config changes are only allowed to + * be proposed if the leader's applied index is greater than this + * value. + **/ + SyncIndex pendingConfigIndex; + + /** + * an estimate of the size of the uncommitted tail of the Raft log. Used to + * prevent unbounded log growth. Only maintained by the leader. Reset on + * term changes. + **/ + uint32_t uncommittedSize; + + /** + * number of ticks since it reached last electionTimeout when it is leader + * or candidate. + * number of ticks since it reached last electionTimeout or received a + * valid message from current leader when it is a follower. + **/ + uint16_t electionElapsed; + + /** + * number of ticks since it reached last heartbeatTimeout. + * only leader keeps heartbeatElapsed. + **/ + uint16_t heartbeatElapsed; + + bool preVote; + bool checkQuorum; + + int heartbeatTimeout; + int electionTimeout; + + /** + * randomizedElectionTimeout is a random number between + * [electiontimeout, 2 * electiontimeout - 1]. It gets reset + * when raft changes its state to follower or candidate. + **/ + int randomizedElectionTimeout; + bool disableProposalForwarding; + + // current tick count since start up + uint32_t currentTick; + + SyncRaftStepFp stepFp; + + SyncRaftTickFp tickFp; +}; + +int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); +int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int32_t syncRaftTick(SSyncRaft* pRaft); + +#endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h new file mode 100644 index 0000000000..ac9bbb5e55 --- /dev/null +++ b/source/libs/sync/inc/raft_configuration.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H +#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H + +#include "sync.h" +#include "sync_type.h" + +// return -1 if cannot find this id +int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); + +int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); + +#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_log.h b/source/libs/sync/inc/raft_log.h new file mode 100644 index 0000000000..a44f5a7273 --- /dev/null +++ b/source/libs/sync/inc/raft_log.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_LOG_H +#define _TD_LIBS_SYNC_RAFT_LOG_H + +#include "sync.h" +#include "sync_type.h" + +typedef enum SyncEntryType { + SYNC_ENTRY_TYPE_LOG = 1, +}SyncEntryType; + +struct SSyncRaftEntry { + SyncTerm term; + + SyncIndex index; + + SyncEntryType type; + + SSyncBuffer buffer; +}; + +struct SSyncRaftLog { + SyncIndex uncommittedConfigIndex; + + SyncIndex commitIndex; + + SyncIndex appliedIndex; + + +}; + +SSyncRaftLog* syncRaftLogOpen(); + +SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog); + +SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog); + +SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog); + +bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term); + +int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog); + +bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog); + +SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index); + +int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n); + +int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, + SSyncRaftEntry **ppEntries, int *n); + +void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index, + SSyncRaftEntry *pEntries, int n); + +bool syncRaftLogMatchTerm(); + +static FORCE_INLINE bool syncRaftLogIsCommitted(SSyncRaftLog* pLog, SyncIndex index) { + return pLog->commitIndex > index; +} + +#endif /* _TD_LIBS_SYNC_RAFT_LOG_H */ diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h new file mode 100644 index 0000000000..2cb625d1fb --- /dev/null +++ b/source/libs/sync/inc/raft_message.h @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H +#define _TD_LIBS_SYNC_RAFT_MESSAGE_H + +#include "sync.h" +#include "sync_type.h" + +/** + * below define message type which handled by Raft. + * + * internal message, which communicate between threads, start with RAFT_MSG_INTERNAL_*. + * internal message use pointer only and stack memory, need not to be decode/encode and free. + * + * outter message start with RAFT_MSG_*, which communicate between cluster peers, + * need to implement its decode/encode functions. + **/ +typedef enum RaftMessageType { + // client propose a cmd + RAFT_MSG_INTERNAL_PROP = 1, + + // node election timeout + RAFT_MSG_INTERNAL_ELECTION = 2, + + RAFT_MSG_VOTE = 3, + RAFT_MSG_VOTE_RESP = 4, + + RAFT_MSG_APPEND = 5, + RAFT_MSG_APPEND_RESP = 6, +} RaftMessageType; + +typedef struct RaftMsgInternal_Prop { + const SSyncBuffer *pBuf; + bool isWeak; + void* pData; +} RaftMsgInternal_Prop; + +typedef struct RaftMsgInternal_Election { + +} RaftMsgInternal_Election; + +typedef struct RaftMsg_Vote { + SyncRaftElectionType cType; + SyncIndex lastIndex; + SyncTerm lastTerm; +} RaftMsg_Vote; + +typedef struct RaftMsg_VoteResp { + bool rejected; + SyncRaftElectionType cType; +} RaftMsg_VoteResp; + +typedef struct RaftMsg_Append_Entries { + // index of log entry preceeding new ones + SyncIndex index; + + // term of entry at prevIndex + SyncTerm term; + + // leader's commit index. + SyncIndex commitIndex; + + // size of the log entries array + int nEntries; + + // log entries array + SSyncRaftEntry* entries; +} RaftMsg_Append_Entries; + +typedef struct RaftMsg_Append_Resp { + SyncIndex index; +} RaftMsg_Append_Resp; + +typedef struct SSyncMessage { + RaftMessageType msgType; + SyncTerm term; + SyncGroupId groupId; + SyncNodeId from; + + union { + RaftMsgInternal_Prop propose; + + RaftMsgInternal_Election election; + + RaftMsg_Vote vote; + RaftMsg_VoteResp voteResp; + + RaftMsg_Append_Entries appendEntries; + RaftMsg_Append_Resp appendResp; + }; +} SSyncMessage; + +static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) { + *pMsg = (SSyncMessage) { + .msgType = RAFT_MSG_INTERNAL_PROP, + .term = 0, + .propose = (RaftMsgInternal_Prop) { + .isWeak = isWeak, + .pBuf = pBuf, + .pData = pData, + }, + }; + + return pMsg; +} + +static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) { + *pMsg = (SSyncMessage) { + .msgType = RAFT_MSG_INTERNAL_ELECTION, + .term = 0, + .from = from, + .election = (RaftMsgInternal_Election) { + + }, + }; + + return pMsg; +} + +static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId from, + SyncTerm term, SyncRaftElectionType cType, + SyncIndex lastIndex, SyncTerm lastTerm) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .term = term, + .msgType = RAFT_MSG_VOTE, + .vote = (RaftMsg_Vote) { + .cType = cType, + .lastIndex = lastIndex, + .lastTerm = lastTerm, + }, + }; + + return pMsg; +} + +static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNodeId from, + SyncRaftElectionType cType, bool rejected) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .msgType = RAFT_MSG_VOTE_RESP, + .voteResp = (RaftMsg_VoteResp) { + .cType = cType, + .rejected = rejected, + }, + }; + + return pMsg; +} + +static FORCE_INLINE SSyncMessage* syncNewAppendMsg(SyncGroupId groupId, SyncNodeId from, + SyncTerm term, SyncIndex logIndex, SyncTerm logTerm, + SyncIndex commitIndex, int nEntries, SSyncRaftEntry* entries) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .term = term, + .msgType = RAFT_MSG_APPEND, + .appendEntries = (RaftMsg_Append_Entries) { + .index = logIndex, + .term = logTerm, + .commitIndex = commitIndex, + .nEntries = nEntries, + .entries = entries, + }, + }; + + return pMsg; +} + +static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId, SyncNodeId from, SyncTerm term) { + SSyncMessage* pMsg = (SSyncMessage*)malloc(sizeof(SSyncMessage)); + if (pMsg == NULL) { + return NULL; + } + *pMsg = (SSyncMessage) { + .groupId = groupId, + .from = from, + .term = term, + .msgType = RAFT_MSG_APPEND_RESP, + .appendResp = (RaftMsg_Append_Resp) { + + }, + }; + + return pMsg; +} + +static FORCE_INLINE bool syncIsInternalMsg(RaftMessageType msgType) { + return msgType == RAFT_MSG_INTERNAL_PROP || + msgType == RAFT_MSG_INTERNAL_ELECTION; +} + +static FORCE_INLINE bool syncIsPreVoteRespMsg(const SSyncMessage* pMsg) { + return pMsg->msgType == RAFT_MSG_VOTE_RESP && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION; +} + +static FORCE_INLINE bool syncIsPreVoteMsg(const SSyncMessage* pMsg) { + return pMsg->msgType == RAFT_MSG_VOTE && pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION; +} + +void syncFreeMessage(const SSyncMessage* pMsg); + +// message handlers +int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_replication.h b/source/libs/sync/inc/raft_replication.h new file mode 100644 index 0000000000..e457063980 --- /dev/null +++ b/source/libs/sync/inc/raft_replication.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_REPLICATION_H +#define TD_SYNC_RAFT_REPLICATION_H + +#include "sync.h" +#include "syncInt.h" +#include "sync_type.h" + +int syncRaftReplicate(SSyncRaft* pRaft, int i); + +#endif /* TD_SYNC_RAFT_REPLICATION_H */ diff --git a/source/libs/sync/inc/raft_unstable_log.h b/source/libs/sync/inc/raft_unstable_log.h new file mode 100644 index 0000000000..0748a425a1 --- /dev/null +++ b/source/libs/sync/inc/raft_unstable_log.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_UNSTABLE_LOG_H +#define TD_SYNC_RAFT_UNSTABLE_LOG_H + +#include "sync_type.h" + +/* in-memory unstable raft log storage */ +struct SSyncRaftUnstableLog { +#if 0 + /* Circular buffer of log entries */ + RaftEntry *entries; + + /* size of Circular buffer */ + int size; + + /* Indexes of used slots [front, back) */ + int front, back; + + /* Index of first entry is offset + 1 */ + SyncIndex offset; + + /* meta data of snapshot */ + SSyncRaftUnstableLog snapshot; +#endif +}; + +/** + * return index of last in memory log, return 0 if log is empty + **/ +//SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog); + +#if 0 +void raftLogInit(RaftLog* pLog); + +void raftLogClose(RaftLog* pLog); + +/** + * When startup populating log entrues loaded from disk, + * init raft memory log with snapshot index,term and log start idnex. + **/ +/* +void raftLogStart(RaftLog* pLog, + RaftSnapshotMeta snapshot, + SyncIndex startIndex); +*/ +/** + * Get the number of entries the log. + **/ +int raftLogNumEntries(const RaftLog* pLog); + + + +/** + * return last term of in memory log, return 0 if log is empty + **/ +SyncTerm raftLogLastTerm(RaftLog* pLog); + +/** + * return term of log with the given index, return 0 if the term of index cannot be found + * , errCode will save the error code. + **/ +SyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode); + +/** + * Get the last index of the most recent snapshot. Return 0 if there are no * + * snapshots. + **/ +SyncIndex raftLogSnapshotIndex(RaftLog* pLog); + +/* Append a new entry to the log. */ +int raftLogAppend(RaftLog* pLog, + SyncTerm term, + const SSyncBuffer *buf); + +/** + * acquire log from given index onwards. + **/ +/* +int raftLogAcquire(RaftLog* pLog, + SyncIndex index, + RaftEntry **ppEntries, + int *n); + +void raftLogRelease(RaftLog* pLog, + SyncIndex index, + RaftEntry *pEntries, + int n); +*/ +/* Delete all entries from the given index (included) onwards. */ +void raftLogTruncate(RaftLog* pLog, SyncIndex index); + +/** + * when taking a new snapshot, the function will update the last snapshot information and delete + * all entries up last_index - trailing (included). If the log contains no entry + * a last_index - trailing, then no entry will be deleted. + **/ +void raftLogSnapshot(RaftLog* pLog, SyncIndex index, SyncIndex trailing); + +#endif + +#endif /* TD_SYNC_RAFT_UNSTABLE_LOG_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h new file mode 100644 index 0000000000..f99fb066ae --- /dev/null +++ b/source/libs/sync/inc/syncInt.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_INT_H +#define _TD_LIBS_SYNC_INT_H + +#include "thash.h" +#include "os.h" +#include "sync.h" +#include "sync_type.h" +#include "raft.h" +#include "tlog.h" + +#define TAOS_SYNC_MAX_WORKER 3 + +typedef struct SSyncWorker { + pthread_t thread; +} SSyncWorker; + +struct SSyncNode { + pthread_mutex_t mutex; + int32_t refCount; + SyncGroupId vgId; + SSyncRaft raft; + void* syncTimer; +}; + +typedef struct SSyncManager { + pthread_mutex_t mutex; + + // sync server rpc + void* serverRpc; + // rpc server hash table base on FQDN:port key + SHashObj* rpcServerTable; + + // sync client rpc + void* clientRpc; + + // worker threads + SSyncWorker worker[TAOS_SYNC_MAX_WORKER]; + + // vgroup hash table + SHashObj* vgroupTable; + + // timer manager + void* syncTimerManager; + +} SSyncManager; + +extern SSyncManager* gSyncManager; + +#define syncFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYNC FATAL ", 255, __VA_ARGS__); }} while(0) +#define syncError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYNC ERROR ", 255, __VA_ARGS__); }} while(0) +#define syncWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYNC WARN ", 255, __VA_ARGS__); }} while(0) +#define syncInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYNC ", 255, __VA_ARGS__); }} while(0) +#define syncDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0) +#define syncTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYNC ", sDebugFlag, __VA_ARGS__); }} while(0) + +#endif /* _TD_LIBS_SYNC_INT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h new file mode 100644 index 0000000000..26af06866b --- /dev/null +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_IMPL_H +#define _TD_LIBS_SYNC_RAFT_IMPL_H + +#include "sync.h" +#include "sync_type.h" +#include "raft_message.h" +#include "sync_raft_quorum.h" + +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId); +void syncRaftBecomePreCandidate(SSyncRaft* pRaft); +void syncRaftBecomeCandidate(SSyncRaft* pRaft); +void syncRaftBecomeLeader(SSyncRaft* pRaft); + +void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType); + +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft); + +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft); +bool syncRaftIsPromotable(SSyncRaft* pRaft); +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft); +int syncRaftQuorum(SSyncRaft* pRaft); + +SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, + bool preVote, bool accept, + int* rejectNum, int *granted); + +#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/inc/sync_raft_inflights.h b/source/libs/sync/inc/sync_raft_inflights.h new file mode 100644 index 0000000000..6d249c9274 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_inflights.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_INFLIGHTS_H +#define TD_SYNC_RAFT_INFLIGHTS_H + +#include "sync.h" + +/** + * SSyncRaftInflights limits the number of MsgApp (represented by the largest index + * contained within) sent to followers but not yet acknowledged by them. Callers + * use syncRaftInflightFull() to check whether more messages can be sent, + * call syncRaftInflightAdd() whenever they are sending a new append, + * and release "quota" via FreeLE() whenever an ack is received. +**/ +typedef struct SSyncRaftInflights { + /* the starting index in the buffer */ + int start; + + /* number of inflights in the buffer */ + int count; + + /* the size of the buffer */ + int size; + + /** + * buffer contains the index of the last entry + * inside one message. + **/ + SyncIndex* buffer; +} SSyncRaftInflights; + +SSyncRaftInflights* syncRaftOpenInflights(int size); +void syncRaftCloseInflights(SSyncRaftInflights*); + +static FORCE_INLINE void syncRaftInflightReset(SSyncRaftInflights* inflights) { + inflights->count = 0; + inflights->start = 0; +} + +static FORCE_INLINE bool syncRaftInflightFull(SSyncRaftInflights* inflights) { + return inflights->count == inflights->size; +} + +/** + * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being + * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() + * to verify that there is room for one more message, + * and consecutive calls to add syncRaftInflightAdd() must provide a + * monotonic sequence of indexes. + **/ +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); + +/** + * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. + **/ +void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex); + +/** + * syncRaftInflightFreeFirstOne releases the first inflight. + * This is a no-op if nothing is inflight. + **/ +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights); + +#endif /* TD_SYNC_RAFT_INFLIGHTS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress.h b/source/libs/sync/inc/sync_raft_progress.h new file mode 100644 index 0000000000..fff0c13e31 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_progress.h @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_PROGRESS_H +#define TD_SYNC_RAFT_PROGRESS_H + +#include "sync_type.h" +#include "sync_raft_inflights.h" + +/** + * State defines how the leader should interact with the follower. + * + * When in PROGRESS_STATE_PROBE, leader sends at most one replication message + * per heartbeat interval. It also probes actual progress of the follower. + * + * When in PROGRESS_STATE_REPLICATE, leader optimistically increases next + * to the latest entry sent after sending replication message. This is + * an optimized state for fast replicating log entries to the follower. + * + * When in PROGRESS_STATE_SNAPSHOT, leader should have sent out snapshot + * before and stops sending any replication message. + * + * PROGRESS_STATE_PROBE is the initial state. + **/ +typedef enum RaftProgressState { + /** + * StateProbe indicates a follower whose last index isn't known. Such a + * follower is "probed" (i.e. an append sent periodically) to narrow down + * its last index. In the ideal (and common) case, only one round of probing + * is necessary as the follower will react with a hint. Followers that are + * probed over extended periods of time are often offline. + **/ + PROGRESS_STATE_PROBE = 0, + + /** + * StateReplicate is the state steady in which a follower eagerly receives + * log entries to append to its log. + **/ + PROGRESS_STATE_REPLICATE, + + /** + * StateSnapshot indicates a follower that needs log entries not available + * from the leader's Raft log. Such a follower needs a full snapshot to + * return to StateReplicate. + **/ + PROGRESS_STATE_SNAPSHOT, +} RaftProgressState; + +/** + * Progress represents a follower’s progress in the view of the leader. Leader maintains + * progresses of all followers, and sends entries to the follower based on its progress. + **/ +struct SSyncRaftProgress { + SyncNodeId id; + + SyncIndex nextIndex; + + SyncIndex matchIndex; + + /** + * State defines how the leader should interact with the follower. + * + * When in StateProbe, leader sends at most one replication message + * per heartbeat interval. It also probes actual progress of the follower. + * + * When in StateReplicate, leader optimistically increases next + * to the latest entry sent after sending replication message. This is + * an optimized state for fast replicating log entries to the follower. + * + * When in StateSnapshot, leader should have sent out snapshot + * before and stops sending any replication message. + **/ + RaftProgressState state; + + /** + * pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT. + * If there is a pending snapshot, the pendingSnapshotIndex will be set to the + * index of the snapshot. If pendingSnapshotIndex is set, the replication process of + * this Progress will be paused. raft will not resend snapshot until the pending one + * is reported to be failed. + **/ + SyncIndex pendingSnapshotIndex; + + /** + * recentActive is true if the progress is recently active. Receiving any messages + * from the corresponding follower indicates the progress is active. + * RecentActive can be reset to false after an election timeout. + **/ + bool recentActive; + + /** + * probeSent is used while this follower is in StateProbe. When probeSent is + * true, raft should pause sending replication message to this peer until + * probeSent is reset. See ProbeAcked() and IsPaused(). + **/ + bool probeSent; + + /** + * inflights is a sliding window for the inflight messages. + * Each inflight message contains one or more log entries. + * The max number of entries per message is defined in raft config as MaxSizePerMsg. + * Thus inflight effectively limits both the number of inflight messages + * and the bandwidth each Progress can use. + * When inflights is Full, no more message should be sent. + * When a leader sends out a message, the index of the last + * entry should be added to inflights. The index MUST be added + * into inflights in order. + * When a leader receives a reply, the previous inflights should + * be freed by calling inflights.FreeLE with the index of the last + * received entry. + **/ + SSyncRaftInflights* inflights; + + /** + * IsLearner is true if this progress is tracked for a learner. + **/ + bool isLearner; +}; + +void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress); + +/** + * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, + * optionally and if larger, the index of the pending snapshot. + **/ +void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress); + +/** + * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. + **/ +void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress); + +/** + * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the + * index acked by it. The method returns false if the given n index comes from + * an outdated message. Otherwise it updates the progress and returns true. + **/ +bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex); + +/** + * syncRaftProgressOptimisticNextIndex signals that appends all the way up to and including index n + * are in-flight. As a result, Next is increased to n+1. + **/ +static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress* progress, SyncIndex nextIndex) { + progress->nextIndex = nextIndex + 1; +} + +/** + * syncRaftProgressMaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The + * arguments are the index of the append message rejected by the follower, and + * the hint that we want to decrease to. + * + * Rejections can happen spuriously as messages are sent out of order or + * duplicated. In such cases, the rejection pertains to an index that the + * Progress already knows were previously acknowledged, and false is returned + * without changing the Progress. + * + * If the rejection is genuine, Next is lowered sensibly, and the Progress is + * cleared for sending log entries. +**/ +bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, + SyncIndex rejected, SyncIndex matchHint); + +/** + * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. + * This is done when a node has rejected recent MsgApps, is currently waiting + * for a snapshot, or has reached the MaxInflightMsgs limit. In normal + * operation, this is false. A throttled node will be contacted less frequently + * until it has reached a state in which it's able to accept a steady stream of + * log entries again. + **/ +bool syncRaftProgressIsPaused(SSyncRaftProgress* progress); + +static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progress) { + return progress->nextIndex; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInReplicate(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_REPLICATE; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInSnapshot(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_SNAPSHOT; +} + +static FORCE_INLINE RaftProgressState syncRaftProgressInProbe(SSyncRaftProgress* progress) { + return progress->state == PROGRESS_STATE_PROBE; +} + +static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progress) { + return progress->recentActive; +} + +/** + * return true if progress's log is up-todate + **/ +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress); + +void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex); + + + +#if 0 + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + + + +SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i); + +bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); + +void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); + + + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + +#endif + +#endif /* TD_SYNC_RAFT_PROGRESS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_progress_tracker.h b/source/libs/sync/inc/sync_raft_progress_tracker.h new file mode 100644 index 0000000000..887aeb2377 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_progress_tracker.h @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H +#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H + +#include "sync_type.h" +#include "sync_raft_quorum_joint.h" +#include "sync_raft_progress.h" + +struct SSyncRaftProgressTrackerConfig { + SSyncRaftQuorumJointConfig voters; + + /** + * autoLeave is true if the configuration is joint and a transition to the + * incoming configuration should be carried out automatically by Raft when + * this is possible. If false, the configuration will be joint until the + * application initiates the transition manually. + **/ + bool autoLeave; + + /** + * Learners is a set of IDs corresponding to the learners active in the + * current configuration. + * + * Invariant: Learners and Voters does not intersect, i.e. if a peer is in + * either half of the joint config, it can't be a learner; if it is a + * learner it can't be in either half of the joint config. This invariant + * simplifies the implementation since it allows peers to have clarity about + * its current role without taking into account joint consensus. + **/ + SyncNodeId learners[TSDB_MAX_REPLICA]; + + /** + * When we turn a voter into a learner during a joint consensus transition, + * we cannot add the learner directly when entering the joint state. This is + * because this would violate the invariant that the intersection of + * voters and learners is empty. For example, assume a Voter is removed and + * immediately re-added as a learner (or in other words, it is demoted): + * + * Initially, the configuration will be + * + * voters: {1 2 3} + * learners: {} + * + * and we want to demote 3. Entering the joint configuration, we naively get + * + * voters: {1 2} & {1 2 3} + * learners: {3} + * + * but this violates the invariant (3 is both voter and learner). Instead, + * we get + * + * voters: {1 2} & {1 2 3} + * learners: {} + * next_learners: {3} + * + * Where 3 is now still purely a voter, but we are remembering the intention + * to make it a learner upon transitioning into the final configuration: + * + * voters: {1 2} + * learners: {3} + * next_learners: {} + * + * Note that next_learners is not used while adding a learner that is not + * also a voter in the joint config. In this case, the learner is added + * right away when entering the joint configuration, so that it is caught up + * as soon as possible. + **/ + SyncNodeId learnersNext[TSDB_MAX_REPLICA]; +}; + +struct SSyncRaftProgressTracker { + SSyncRaftProgressTrackerConfig config; + + SSyncRaftProgress progressMap[TSDB_MAX_REPLICA]; + + SyncRaftVoteResult votes[TSDB_MAX_REPLICA]; + int maxInflight; +}; + +SSyncRaftProgressTracker* syncRaftOpenProgressTracker(); + +void syncRaftResetVotes(SSyncRaftProgressTracker*); + +typedef void (*visitProgressFp)(int i, SSyncRaftProgress* progress, void* arg); +void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, void* arg); + +/** + * syncRaftRecordVote records that the node with the given id voted for this Raft + * instance if v == true (and declined it otherwise). + **/ +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant); + +/** + * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the + * election outcome is known. + **/ +SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted); + +#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum.h b/source/libs/sync/inc/sync_raft_quorum.h new file mode 100644 index 0000000000..42f65c9806 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_QUORUM_H +#define TD_SYNC_RAFT_QUORUM_H + +/** + * SSyncRaftVoteResult indicates the outcome of a vote. + **/ +typedef enum { + /** + * SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future + * votes, i.e. neither "yes" or "no" has reached quorum yet. + **/ + SYNC_RAFT_VOTE_PENDING = 1, + + /** + * SYNC_RAFT_VOTE_LOST indicates that the quorum has voted "no". + **/ + SYNC_RAFT_VOTE_LOST = 2, + + /** + * SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes". + **/ + SYNC_RAFT_VOTE_WON = 3, +} SSyncRaftVoteResult; + +#endif /* TD_SYNC_RAFT_QUORUM_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h new file mode 100644 index 0000000000..14c1f63754 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H +#define _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H + +#include "taosdef.h" +#include "sync.h" +#include "sync_type.h" + +/** + * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) + * majority configurations. Decisions require the support of both majorities. + **/ +typedef struct SSyncRaftQuorumJointConfig { + SSyncCluster majorityConfig[2]; +}SSyncRaftQuorumJointConfig; + +/** + * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending, lost, or won. A joint quorum + * requires both majority quorums to vote in favor. + **/ +SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes); + +#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_raft_quorum_majority.h b/source/libs/sync/inc/sync_raft_quorum_majority.h new file mode 100644 index 0000000000..b1857ef056 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_quorum_majority.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H +#define _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H + +#include "sync.h" +#include "sync_type.h" + +/** + * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending (i.e. neither a quorum of + * yes/no has been reached), won (a quorum of yes has been reached), or lost (a + * quorum of no has been reached). + **/ +SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes); + +#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h new file mode 100644 index 0000000000..525623b4cf --- /dev/null +++ b/source/libs/sync/inc/sync_type.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_TYPE_H +#define _TD_LIBS_SYNC_TYPE_H + +#include +#include "osMath.h" + +#define SYNC_NON_NODE_ID -1 +#define SYNC_NON_TERM 0 + +typedef int32_t SyncTime; +typedef uint32_t SyncTick; + +typedef struct SSyncRaft SSyncRaft; + +typedef struct SSyncRaftProgress SSyncRaftProgress; +typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; + +typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; + +typedef struct SSyncRaftLog SSyncRaftLog; + +typedef struct SSyncRaftEntry SSyncRaftEntry; + +#if 0 +#ifndef MIN +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#endif +#endif + +typedef enum { + SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, + SYNC_RAFT_CAMPAIGN_ELECTION = 1, + SYNC_RAFT_CAMPAIGN_TRANSFER = 2, +} SyncRaftElectionType; + +typedef enum { + // the init vote resp status + SYNC_RAFT_VOTE_RESP_UNKNOWN = 0, + + // grant the vote request + SYNC_RAFT_VOTE_RESP_GRANT = 1, + + //reject the vote request + SYNC_RAFT_VOTE_RESP_REJECT = 2, +} SyncRaftVoteResult; + +#endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c new file mode 100644 index 0000000000..20d24e3267 --- /dev/null +++ b/source/libs/sync/src/raft.c @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_configuration.h" +#include "raft_log.h" +#include "raft_replication.h" +#include "sync_raft_progress_tracker.h" +#include "syncInt.h" + +#define RAFT_READ_LOG_MAX_NUM 100 + +static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { + SSyncNode* pNode = pRaft->pNode; + SSyncServerState serverState; + SStateManager* stateManager; + SSyncLogStore* logStore; + SSyncFSM* fsm; + SyncIndex initIndex = pInfo->snapshotIndex; + SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; + int nBuf, limit, i; + + memset(pRaft, 0, sizeof(SSyncRaft)); + + memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM)); + memcpy(&pRaft->logStore, &pInfo->logStore, sizeof(SSyncLogStore)); + memcpy(&pRaft->stateManager, &pInfo->stateManager, sizeof(SStateManager)); + + stateManager = &(pRaft->stateManager); + logStore = &(pRaft->logStore); + fsm = &(pRaft->fsm); + + // init progress tracker + pRaft->tracker = syncRaftOpenProgressTracker(); + if (pRaft->tracker == NULL) { + return -1; + } + + // open raft log + if ((pRaft->log = syncRaftLogOpen()) == NULL) { + return -1; + } + // read server state + if (stateManager->readServerState(stateManager, &serverState) != 0) { + syncError("readServerState for vgid %d fail", pInfo->vgId); + return -1; + } + assert(initIndex <= serverState.commitIndex); + + // restore fsm state from snapshot index + 1 until commitIndex + ++initIndex; + while (initIndex <= serverState.commitIndex) { + limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1); + + if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) { + return -1; + } + assert(limit == nBuf); + + for (i = 0; i < limit; ++i) { + fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL); + free(buffer[i].data); + } + initIndex += nBuf; + } + assert(initIndex == serverState.commitIndex); + + //pRaft->heartbeatTimeoutTick = 1; + + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + + pRaft->selfIndex = pRaft->cluster.selfIndex; + + syncInfo("[%d:%d] restore vgid %d state: snapshot index success", + pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); + return 0; +} + +int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + syncDebug("from %d, type:%d, term:%" PRId64 ", state:%d", + pMsg->from, pMsg->msgType, pMsg->term, pRaft->state); + + if (preHandleMessage(pRaft, pMsg)) { + syncFreeMessage(pMsg); + return 0; + } + + RaftMessageType msgType = pMsg->msgType; + if (msgType == RAFT_MSG_INTERNAL_ELECTION) { + syncRaftHandleElectionMessage(pRaft, pMsg); + } else if (msgType == RAFT_MSG_VOTE) { + syncRaftHandleVoteMessage(pRaft, pMsg); + } else { + pRaft->stepFp(pRaft, pMsg); + } + + syncFreeMessage(pMsg); + return 0; +} + +int32_t syncRaftTick(SSyncRaft* pRaft) { + pRaft->currentTick += 1; + return 0; +} + +/** + * pre-handle message, return true means no need to continue + * Handle the message term, which may result in our stepping down to a follower. + **/ +static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + // local message? + if (pMsg->term == 0) { + return false; + } + + if (pMsg->term > pRaft->term) { + return preHandleNewTermMessage(pRaft, pMsg); + } else if (pMsg->term < pRaft->term) { + return preHandleOldTermMessage(pRaft, pMsg); + } + + return false; +} + +static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + SyncNodeId leaderId = pMsg->from; + RaftMessageType msgType = pMsg->msgType; + + if (msgType == RAFT_MSG_VOTE) { + // TODO + leaderId = SYNC_NON_NODE_ID; + } + + if (syncIsPreVoteMsg(pMsg)) { + // Never change our term in response to a PreVote + } else if (syncIsPreVoteRespMsg(pMsg) && !pMsg->voteResp.rejected) { + /** + * We send pre-vote requests with a term in our future. If the + * pre-vote is granted, we will increment our term when we get a + * quorum. If it is not, the term comes from the node that + * rejected our vote so we should become a follower at the new + * term. + **/ + } else { + syncInfo("[%d:%d] [term:%" PRId64 "] received a %d message with higher term from %d [term:%" PRId64 "]", + pRaft->selfGroupId, pRaft->selfId, pRaft->term, msgType, pMsg->from, pMsg->term); + syncRaftBecomeFollower(pRaft, pMsg->term, leaderId); + } + + return false; +} + +static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + if (pRaft->checkQuorum && pMsg->msgType == RAFT_MSG_APPEND) { + /** + * We have received messages from a leader at a lower term. It is possible + * that these messages were simply delayed in the network, but this could + * also mean that this node has advanced its term number during a network + * partition, and it is now unable to either win an election or to rejoin + * the majority on the old term. If checkQuorum is false, this will be + * handled by incrementing term numbers in response to MsgVote with a + * higher term, but if checkQuorum is true we may not advance the term on + * MsgVote and must generate other messages to advance the term. The net + * result of these two features is to minimize the disruption caused by + * nodes that have been removed from the cluster's configuration: a + * removed node will send MsgVotes (or MsgPreVotes) which will be ignored, + * but it will not receive MsgApp or MsgHeartbeat, so it will not create + * disruptive term increases + **/ + int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + if (peerIndex < 0) { + return true; + } + SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); + if (msg == NULL) { + return true; + } + + pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); + } else { + // ignore other cases + syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", + pRaft->selfGroupId, pRaft->selfId, pRaft->term, pMsg->msgType, pMsg->from, pMsg->term); + } + + return true; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_configuration.c b/source/libs/sync/src/raft_configuration.c new file mode 100644 index 0000000000..e16cb34989 --- /dev/null +++ b/source/libs/sync/src/raft_configuration.c @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft_configuration.h" +#include "raft.h" + +int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { + return (int)(id); +} + +int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) { + return pRaft->cluster.replica; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/raft_election.c new file mode 100644 index 0000000000..1ca3326810 --- /dev/null +++ b/source/libs/sync/src/raft_election.c @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_log.h" +#include "raft_message.h" + +void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) { + SyncTerm term; + bool preVote; + RaftMessageType voteMsgType; + + if (syncRaftIsPromotable(pRaft)) { + syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftBecomePreCandidate(pRaft); + preVote = true; + // PreVote RPCs are sent for the next term before we've incremented r.Term. + term = pRaft->term + 1; + } else { + syncRaftBecomeCandidate(pRaft); + voteMsgType = RAFT_MSG_VOTE; + term = pRaft->term; + preVote = false; + } + + int quorum = syncRaftQuorum(pRaft); + SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); + if (result == SYNC_RAFT_VOTE_WON) { + /** + * We won the election after voting for ourselves (which must mean that + * this is a single-node cluster). Advance to the next state. + **/ + if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } else { + syncRaftBecomeLeader(pRaft); + } + return; + } + + // broadcast vote message to other peers + int i; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (i == pRaft->cluster.selfIndex) { + continue; + } + + SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + + SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, + term, cType, lastIndex, lastTerm); + if (pMsg == NULL) { + continue; + } + + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, lastTerm, + lastIndex, voteMsgType, nodeId, pRaft->term); + + pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + } +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c new file mode 100644 index 0000000000..8c014a56bc --- /dev/null +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_log.h" +#include "raft_configuration.h" +#include "raft_message.h" + +int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); + + int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + + if (peerIndex < 0) { + return 0; + } + + SSyncMessage* pRespMsg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); + if (pRespMsg == NULL) { + return 0; + } + + RaftMsg_Append_Entries *appendResp = &(pRespMsg->appendResp); + // ignore committed logs + if (syncRaftLogIsCommitted(pRaft->log, appendEntries->index)) { + appendResp->index = pRaft->log->commitIndex; + goto out; + } + + syncInfo("[%d:%d] recv append from %d index %" PRId64"", + pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); + +out: + pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c new file mode 100644 index 0000000000..6ffa24ff30 --- /dev/null +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_log.h" +#include "raft_message.h" + +int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + if (pRaft->state == TAOS_SYNC_ROLE_LEADER) { + syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + + if (!syncRaftIsPromotable(pRaft)) { + syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + // if there is pending uncommitted config,cannot start election + if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { + syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", + pRaft->selfGroupId, pRaft->selfId, pRaft->term); + return 0; + } + + syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + if (pRaft->preVote) { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); + } else { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } + + return 0; +} diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c new file mode 100644 index 0000000000..709e319c3e --- /dev/null +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_configuration.h" +#include "raft_log.h" +#include "raft_message.h" + +static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + SSyncMessage* pRespMsg; + int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + if (voteIndex == -1) { + return 0; + } + bool grant; + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); + + grant = canGrantVoteMessage(pRaft, pMsg); + pRespMsg = syncNewVoteRespMsg(pRaft->selfGroupId, pRaft->selfId, pMsg->vote.cType, !grant); + if (pRespMsg == NULL) { + return 0; + } + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 ", vote: %d] %s for %d"\ + "[logterm: %" PRId64 ", index: %" PRId64 "] at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, pRaft->voteFor, + grant ? "grant" : "reject", + pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); + + pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); + return 0; +} + +static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + if (!(pRaft->voteFor == SYNC_NON_NODE_ID || pMsg->term > pRaft->term || pRaft->voteFor == pMsg->from)) { + return false; + } + if (!syncRaftLogIsUptodate(pRaft->log, pMsg->vote.lastIndex, pMsg->vote.lastTerm)) { + return false; + } + + return true; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c new file mode 100644 index 0000000000..b3a47aac7f --- /dev/null +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncInt.h" +#include "raft.h" +#include "raft_configuration.h" +#include "raft_message.h" + +int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + int granted, rejected; + int quorum; + int voterIndex; + + assert(pRaft->state == TAOS_SYNC_ROLE_CANDIDATE); + + voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); + if (voterIndex == -1) { + syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); + return 0; + } + + if (pRaft->state != TAOS_SYNC_ROLE_CANDIDATE) { + syncError("[%d:%d] is not candidate, ignore vote resp", pRaft->selfGroupId, pRaft->selfId); + return 0; + } + + SSyncRaftVoteResult result = syncRaftPollVote(pRaft, pMsg->from, + pMsg->voteResp.cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION, + !pMsg->voteResp.rejected, &rejected, &granted); + + syncInfo("[%d:%d] [quorum:%d] has received %d votes and %d vote rejections", + pRaft->selfGroupId, pRaft->selfId, quorum, granted, rejected); + + if (result == SYNC_RAFT_VOTE_WON) { + if (pRaft->candidateState.inPreVote) { + syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); + } else { + syncRaftBecomeLeader(pRaft); + + } + } else if (result == SYNC_RAFT_VOTE_LOST) { + syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); + } + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_log.c b/source/libs/sync/src/raft_log.c new file mode 100644 index 0000000000..0654dbea6b --- /dev/null +++ b/source/libs/sync/src/raft_log.c @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft_log.h" + +SSyncRaftLog* syncRaftLogOpen() { + return NULL; +} + +SyncIndex syncRaftLogLastIndex(SSyncRaftLog* pLog) { + return 0; +} + +SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog) { + return 0; +} + +SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) { + return 0; +} + +bool syncRaftLogIsUptodate(SSyncRaftLog* pLog, SyncIndex index, SyncTerm term) { + return true; +} + +int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) { + return 0; +} + +bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog) { + return pLog->commitIndex > pLog->appliedIndex; +} + +SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) { + return SYNC_NON_TERM; +} + +int syncRaftLogAppend(SSyncRaftLog* pLog, SSyncRaftEntry *pEntries, int n) { + +} + +int syncRaftLogAcquire(SSyncRaftLog* pLog, SyncIndex index, int maxMsgSize, + SSyncRaftEntry **ppEntries, int *n) { + return 0; +} + +void syncRaftLogRelease(SSyncRaftLog* pLog, SyncIndex index, + SSyncRaftEntry *pEntries, int n) { + return; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_message.c b/source/libs/sync/src/raft_message.c new file mode 100644 index 0000000000..e706127f29 --- /dev/null +++ b/source/libs/sync/src/raft_message.c @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft_message.h" + +void syncFreeMessage(const SSyncMessage* pMsg) { + if (!syncIsInternalMsg(pMsg->msgType)) { + free((SSyncMessage*)pMsg); + } +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c new file mode 100644 index 0000000000..3c7216239a --- /dev/null +++ b/source/libs/sync/src/raft_replication.c @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_log.h" +#include "sync_raft_progress.h" +#include "raft_replication.h" + +static int sendSnapshot(SSyncRaft* pRaft, int i); +static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex index, SyncTerm term); + +int syncRaftReplicate(SSyncRaft* pRaft, int i) { +#if 0 + assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); + assert(i >= 0 && i < pRaft->leaderState.nProgress); + + SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + if (syncRaftProgressIsPaused(progress)) { + syncInfo("node %d paused", nodeId); + return 0; + } + + SyncIndex nextIndex = syncRaftProgressNextIndex(progress); + SyncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log); + bool inSnapshot = syncRaftProgressInSnapshot(progress); + SyncIndex prevIndex; + SyncTerm prevTerm; + + /** + * From Section 3.5: + * + * When sending an AppendEntries RPC, the leader includes the index and + * term of the entry in its log that immediately precedes the new + * entries. If the follower does not find an entry in its log with the + * same index and term, then it refuses the new entries. The consistency + * check acts as an induction step: the initial empty state of the logs + * satisfies the Log Matching Property, and the consistency check + * preserves the Log Matching Property whenever logs are extended. As a + * result, whenever AppendEntries returns successfully, the leader knows + * that the follower's log is identical to its own log up through the new + * entries (Log Matching Property in Figure 3.2). + **/ + if (nextIndex == 1) { + /** + * We're including the very first entry, so prevIndex and prevTerm are + * null. If the first entry is not available anymore, send the last + * snapshot if we're not already sending one. + **/ + if (snapshotIndex > 0 && !inSnapshot) { + goto send_snapshot; + } + + // otherwise send append entries from start + prevIndex = 0; + prevTerm = 0; + } else { + /** + * Set prevIndex and prevTerm to the index and term of the entry at + * nextIndex - 1. + **/ + prevIndex = nextIndex - 1; + prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex); + /** + * If the entry is not anymore in our log, send the last snapshot if we're + * not doing so already. + **/ + if (prevTerm == SYNC_NON_TERM && !inSnapshot) { + goto send_snapshot; + } + } + + /* Send empty AppendEntries RPC when installing a snaphot */ + if (inSnapshot) { + prevIndex = syncRaftLogLastIndex(pRaft->log); + prevTerm = syncRaftLogLastTerm(pRaft->log); + } + + return sendAppendEntries(pRaft, i, prevIndex, prevTerm); + +send_snapshot: + if (syncRaftProgressRecentActive(progress)) { + /* Only send a snapshot when we have heard from the server */ + return sendSnapshot(pRaft, i); + } else { + /* Send empty AppendEntries RPC when we haven't heard from the server */ + prevIndex = syncRaftLogLastIndex(pRaft->log); + prevTerm = syncRaftLogLastTerm(pRaft->log); + return sendAppendEntries(pRaft, i, prevIndex, prevTerm); + } +#endif + return 0; +} + +static int sendSnapshot(SSyncRaft* pRaft, int i) { + return 0; +} + +static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncTerm prevTerm) { +#if 0 + SyncIndex nextIndex = prevIndex + 1; + SSyncRaftEntry *entries; + int nEntry; + SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry); + + SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, + prevIndex, prevTerm, pRaft->log->commitIndex, + nEntry, entries); + + if (msg == NULL) { + goto err_release_log; + } + + pRaft->io.send(msg, pNode); + + if (syncRaftProgressInReplicate(progress)) { + SyncIndex lastIndex = nextIndex + nEntry; + syncRaftProgressOptimisticNextIndex(progress, lastIndex); + syncRaftInflightAdd(&progress->inflights, lastIndex); + } else if (syncRaftProgressInProbe(progress)) { + syncRaftProgressPause(progress); + } else { + + } + + syncRaftProgressUpdateSendTick(progress, pRaft->currentTick); + + return 0; + +err_release_log: + syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry); +#endif + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/inc/raftInt.h b/source/libs/sync/src/raft_unstable_log.c similarity index 74% rename from source/libs/sync/inc/raftInt.h rename to source/libs/sync/src/raft_unstable_log.c index 75c1c2187f..e798e20662 100644 --- a/source/libs/sync/inc/raftInt.h +++ b/source/libs/sync/src/raft_unstable_log.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -13,15 +13,11 @@ * along with this program. If not, see . */ -#ifndef _TD_RAFT_INT_H_ -#define _TD_RAFT_INT_H_ +#include "sync.h" +#include "raft_unstable_log.h" -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef __cplusplus +/* +SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog) { + return 0; } -#endif - -#endif /*_TD_RAFT_INT_H_*/ \ No newline at end of file +*/ \ No newline at end of file diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 879f2d4f6d..fa35917668 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -13,14 +13,282 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncInt.h" +#include "trpc.h" +#include "ttimer.h" -int32_t syncInit() { return 0; } +SSyncManager* gSyncManager = NULL; -void syncCleanUp() {} +#define SYNC_TICK_TIMER 50 +#define SYNC_ACTIVITY_TIMER 5 +#define SYNC_SERVER_WORKER 2 -SSyncNode* syncStart(const SSyncInfo* pInfo) { return NULL; } +static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncProcessReqMsg(SRpcMsg *pMsg, SEpSet *pEpSet); -void syncStop(const SSyncNode* pNode) {} +static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg); +static int syncInitRpcClient(SSyncManager* syncManager); +static int syncOpenWorkerPool(SSyncManager* syncManager); +static int syncCloseWorkerPool(SSyncManager* syncManager); +static void *syncWorkerMain(void *argv); +static void syncNodeTick(void *param, void *tmrId); -void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} \ No newline at end of file +int32_t syncInit() { + if (gSyncManager != NULL) { + return 0; + } + + gSyncManager = (SSyncManager*)calloc(sizeof(SSyncManager), 0); + if (gSyncManager == NULL) { + syncError("malloc SSyncManager fail"); + return -1; + } + + pthread_mutex_init(&gSyncManager->mutex, NULL); + + // init client rpc + if (syncInitRpcClient(gSyncManager) != 0) { + syncCleanUp(); + return -1; + } + + // init sync timer manager + gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + if (gSyncManager->syncTimerManager == NULL) { + syncCleanUp(); + return -1; + } + + // init worker pool + if (syncOpenWorkerPool(gSyncManager) != 0) { + syncCleanUp(); + return -1; + } + + // init vgroup hash table + gSyncManager->vgroupTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (gSyncManager->vgroupTable == NULL) { + syncCleanUp(); + return -1; + } + return 0; +} + +void syncCleanUp() { + if (gSyncManager == NULL) { + return; + } + pthread_mutex_lock(&gSyncManager->mutex); + if (gSyncManager->vgroupTable) { + taosHashCleanup(gSyncManager->vgroupTable); + } + if (gSyncManager->clientRpc) { + rpcClose(gSyncManager->clientRpc); + syncInfo("sync inter-sync rpc client is closed"); + } + if (gSyncManager->syncTimerManager) { + taosTmrCleanUp(gSyncManager->syncTimerManager); + } + syncCloseWorkerPool(gSyncManager); + pthread_mutex_unlock(&gSyncManager->mutex); + pthread_mutex_destroy(&gSyncManager->mutex); + free(gSyncManager); + gSyncManager = NULL; +} + +SSyncNode* syncStart(const SSyncInfo* pInfo) { + pthread_mutex_lock(&gSyncManager->mutex); + + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId)); + if (ppNode != NULL) { + syncInfo("vgroup %d already exist", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return *ppNode; + } + + // init rpc server + if (syncInitRpcServer(gSyncManager, &pInfo->syncCfg) != 0) { + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + + SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode)); + if (pNode == NULL) { + syncError("malloc vgroup %d node fail", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + + pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pInfo->vgId, gSyncManager->syncTimerManager); + + // start raft + pNode->raft.pNode = pNode; + if (syncRaftStart(&pNode->raft, pInfo) != 0) { + syncError("raft start at %d node fail", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + + pthread_mutex_init(&pNode->mutex, NULL); + + taosHashPut(gSyncManager->vgroupTable, &pInfo->vgId, sizeof(SyncGroupId), &pNode, sizeof(SSyncNode *)); + + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; +} + +void syncStop(const SSyncNode* pNode) { + pthread_mutex_lock(&gSyncManager->mutex); + + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + if (ppNode == NULL) { + syncInfo("vgroup %d not exist", pNode->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return; + } + assert(*ppNode == pNode); + taosTmrStop(pNode->syncTimer); + + taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); + pthread_mutex_unlock(&gSyncManager->mutex); + + pthread_mutex_destroy(&((*ppNode)->mutex)); + free(*ppNode); +} + +int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) { + SSyncMessage msg; + + pthread_mutex_lock(&syncNode->mutex); + int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak)); + pthread_mutex_unlock(&syncNode->mutex); + return ret; +} + +void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} + +// process rpc rsp message from other sync server +static void syncProcessRsp(SRpcMsg *pMsg, SEpSet *pEpSet) { + +} + +// process rpc message from other sync server +static void syncProcessReqMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + +} + +static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg) { + if (gSyncManager->rpcServerTable == NULL) { + gSyncManager->rpcServerTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (gSyncManager->rpcServerTable == NULL) { + syncError("init sync rpc server hash table error"); + return -1; + } + } + assert(pSyncCfg->selfIndex < pSyncCfg->replica && pSyncCfg->selfIndex >= 0); + const SNodeInfo* pNode = &(pSyncCfg->nodeInfo[pSyncCfg->replica]); + char buffer[20] = {'\0'}; + snprintf(buffer, sizeof(buffer), "%s:%d", &(pNode->nodeFqdn[0]), pNode->nodePort); + size_t len = strlen(buffer); + void** ppRpcServer = taosHashGet(gSyncManager->rpcServerTable, buffer, len); + if (ppRpcServer != NULL) { + // already inited + syncInfo("sync rpc server for %s already exist", buffer); + return 0; + } + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = pNode->nodePort; + rpcInit.label = "sync-server"; + rpcInit.numOfThreads = SYNC_SERVER_WORKER; + rpcInit.cfp = syncProcessReqMsg; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000; + + void* rpcServer = rpcOpen(&rpcInit); + if (rpcServer == NULL) { + syncInfo("rpcOpen for sync rpc server for %s fail", buffer); + return -1; + } + + taosHashPut(gSyncManager->rpcServerTable, buffer, strlen(buffer), rpcServer, len); + syncInfo("sync rpc server for %s init success", buffer); + + return 0; +} + +static int syncInitRpcClient(SSyncManager* syncManager) { + char secret[TSDB_KEY_LEN] = "secret"; + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.label = "sync-client"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = syncProcessRsp; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = secret; + + syncManager->clientRpc = rpcOpen(&rpcInit); + if (syncManager->clientRpc == NULL) { + syncError("failed to init sync rpc client"); + return -1; + } + + syncInfo("sync inter-sync rpc client is initialized"); + return 0; +} + +static int syncOpenWorkerPool(SSyncManager* syncManager) { + int i; + pthread_attr_t thattr; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + for (i = 0; i < TAOS_SYNC_MAX_WORKER; ++i) { + SSyncWorker* pWorker = &(syncManager->worker[i]); + + if (pthread_create(&(pWorker->thread), &thattr, (void *)syncWorkerMain, pWorker) != 0) { + syncError("failed to create sync worker since %s", strerror(errno)); + + return -1; + } + } + + pthread_attr_destroy(&thattr); + + return 0; +} + +static int syncCloseWorkerPool(SSyncManager* syncManager) { + return 0; +} + +static void *syncWorkerMain(void *argv) { + SSyncWorker* pWorker = (SSyncWorker *)argv; + + taosBlockSIGPIPE(); + setThreadName("syncWorker"); + + return NULL; +} + +static void syncNodeTick(void *param, void *tmrId) { + SyncGroupId vgId = (SyncGroupId)param; + SSyncNode **ppNode = taosHashGet(gSyncManager->vgroupTable, &vgId, sizeof(SyncGroupId)); + if (ppNode == NULL) { + return; + } + SSyncNode *pNode = *ppNode; + + pthread_mutex_lock(&pNode->mutex); + syncRaftTick(&pNode->raft); + pthread_mutex_unlock(&pNode->mutex); + + pNode->syncTimer = taosTmrStart(syncNodeTick, SYNC_TICK_TIMER, (void*)pNode->vgId, gSyncManager->syncTimerManager); +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c new file mode 100644 index 0000000000..b7353fd787 --- /dev/null +++ b/source/libs/sync/src/sync_raft_impl.c @@ -0,0 +1,306 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_configuration.h" +#include "raft_log.h" +#include "raft_replication.h" +#include "sync_raft_progress_tracker.h" +#include "syncInt.h" + +static int convertClear(SSyncRaft* pRaft); +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg); +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg); + +static int triggerAll(SSyncRaft* pRaft); + +static void tickElection(SSyncRaft* pRaft); +static void tickHeartbeat(SSyncRaft* pRaft); + +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n); +static bool maybeCommit(SSyncRaft* pRaft); + +static void abortLeaderTransfer(SSyncRaft* pRaft); + +static void resetRaft(SSyncRaft* pRaft, SyncTerm term); + +void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId) { + convertClear(pRaft); + + pRaft->stepFp = stepFollower; + resetRaft(pRaft, term); + pRaft->tickFp = tickElection; + pRaft->leaderId = leaderId; + pRaft->state = TAOS_SYNC_ROLE_FOLLOWER; + syncInfo("[%d:%d] became followe at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomePreCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + + /** + * Becoming a pre-candidate changes our step functions and state, + * but doesn't change anything else. In particular it does not increase + * r.Term or change r.Vote. + **/ + pRaft->stepFp = stepCandidate; + pRaft->tickFp = tickElection; + pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; + pRaft->candidateState.inPreVote = true; + syncInfo("[%d:%d] became pre-candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeCandidate(SSyncRaft* pRaft) { + convertClear(pRaft); + + pRaft->candidateState.inPreVote = false; + pRaft->stepFp = stepCandidate; + // become candidate make term+1 + resetRaft(pRaft, pRaft->term + 1); + pRaft->tickFp = tickElection; + pRaft->voteFor = pRaft->selfId; + pRaft->state = TAOS_SYNC_ROLE_CANDIDATE; + syncInfo("[%d:%d] became candidate at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); +} + +void syncRaftBecomeLeader(SSyncRaft* pRaft) { + assert(pRaft->state != TAOS_SYNC_ROLE_FOLLOWER); + + pRaft->stepFp = stepLeader; + resetRaft(pRaft, pRaft->term); + pRaft->leaderId = pRaft->leaderId; + pRaft->state = TAOS_SYNC_ROLE_LEADER; + // TODO: check if there is pending config log + int nPendingConf = syncRaftLogNumOfPendingConf(pRaft->log); + if (nPendingConf > 1) { + syncFatal("unexpected multiple uncommitted config entry"); + } + + syncInfo("[%d:%d] became leader at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + // after become leader, send a no-op log + SSyncRaftEntry* entry = (SSyncRaftEntry*)malloc(sizeof(SSyncRaftEntry)); + if (entry == NULL) { + return; + } + *entry = (SSyncRaftEntry) { + .buffer = (SSyncBuffer) { + .data = NULL, + .len = 0, + } + }; + appendEntries(pRaft, entry, 1); + //syncRaftTriggerHeartbeat(pRaft); +} + +void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) { + triggerAll(pRaft); +} + +void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) { + // electionTimeoutTick in [3,6] tick + pRaft->randomizedElectionTimeout = taosRand() % 4 + 3; +} + +bool syncRaftIsPromotable(SSyncRaft* pRaft) { + return pRaft->selfId != SYNC_NON_NODE_ID; +} + +bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { + return pRaft->electionElapsed >= pRaft->randomizedElectionTimeout; +} + +int syncRaftQuorum(SSyncRaft* pRaft) { + return pRaft->cluster.replica / 2 + 1; +} + +SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, + bool preVote, bool grant, + int* rejected, int *granted) { + int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + if (voterIndex == -1) { + return SYNC_RAFT_VOTE_PENDING; + } + + if (grant) { + syncInfo("[%d:%d] received grant (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } else { + syncInfo("[%d:%d] received rejection (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } + + syncRaftRecordVote(pRaft->tracker, voterIndex, grant); + return syncRaftTallyVotes(pRaft->tracker, rejected, granted); +} +/* + if (accept) { + syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); + } else { + syncInfo("[%d:%d] received rejection from %d at term %" PRId64 "", + pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); + } + + int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); + assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); + + pRaft->candidateState.votes[voteIndex] = accept ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; + int granted = 0, rejected = 0; + int i; + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) granted++; + else if (pRaft->candidateState.votes[i] == SYNC_RAFT_VOTE_RESP_REJECT) rejected++; + } + + if (rejectNum) *rejectNum = rejected; + return granted; +*/ + +static int convertClear(SSyncRaft* pRaft) { + +} + +static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + + return 0; +} + +static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + /** + * Only handle vote responses corresponding to our candidacy (while in + * StateCandidate, we may get stale MsgPreVoteResp messages in this term from + * our pre-candidate state). + **/ + RaftMessageType msgType = pMsg->msgType; + + if (msgType == RAFT_MSG_INTERNAL_PROP) { + return 0; + } + + if (msgType == RAFT_MSG_VOTE_RESP) { + syncRaftHandleVoteRespMessage(pRaft, pMsg); + return 0; + } else if (msgType == RAFT_MSG_APPEND) { + syncRaftBecomeFollower(pRaft, pRaft->term, pMsg->from); + syncRaftHandleAppendEntriesMessage(pRaft, pMsg); + } + return 0; +} + +static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { + convertClear(pRaft); + return 0; +} + +/** + * tickElection is run by followers and candidates per tick. + **/ +static void tickElection(SSyncRaft* pRaft) { + pRaft->electionElapsed += 1; + + if (!syncRaftIsPromotable(pRaft)) { + return; + } + + if (!syncRaftIsPastElectionTimeout(pRaft)) { + return; + } + + // election timeout + pRaft->electionElapsed = 0; + SSyncMessage msg; + syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); +} + +static void tickHeartbeat(SSyncRaft* pRaft) { + +} + +static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { + SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); + SyncTerm term = pRaft->term; + int i; + + for (i = 0; i < n; ++i) { + entries[i].term = term; + entries[i].index = lastIndex + 1 + i; + } + + syncRaftLogAppend(pRaft->log, entries, n); + + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap[pRaft->cluster.selfIndex]); + syncRaftProgressMaybeUpdate(progress, lastIndex); + // Regardless of maybeCommit's return, our caller will call bcastAppend. + maybeCommit(pRaft); +} + +/** + * maybeCommit attempts to advance the commit index. Returns true if + * the commit index changed (in which case the caller should call + * r.bcastAppend). + **/ +static bool maybeCommit(SSyncRaft* pRaft) { + + return true; +} + +/** + * trigger I/O requests for newly appended log entries or heartbeats. + **/ +static int triggerAll(SSyncRaft* pRaft) { + assert(pRaft->state == TAOS_SYNC_ROLE_LEADER); + int i; + + for (i = 0; i < pRaft->cluster.replica; ++i) { + if (i == pRaft->cluster.selfIndex) { + continue; + } + + syncRaftReplicate(pRaft, i); + } +} + +static void abortLeaderTransfer(SSyncRaft* pRaft) { + pRaft->leadTransferee = SYNC_NON_NODE_ID; +} + +static void initProgress(int i, SSyncRaftProgress* progress, void* arg) { + syncRaftInitProgress(i, (SSyncRaft*)arg, progress); +} + +static void resetRaft(SSyncRaft* pRaft, SyncTerm term) { + if (pRaft->term != term) { + pRaft->term = term; + pRaft->voteFor = SYNC_NON_NODE_ID; + } + + pRaft->leaderId = SYNC_NON_NODE_ID; + + pRaft->electionElapsed = 0; + pRaft->heartbeatElapsed = 0; + + syncRaftRandomizedElectionTimeout(pRaft); + + abortLeaderTransfer(pRaft); + + syncRaftResetVotes(pRaft->tracker); + syncRaftProgressVisit(pRaft->tracker, initProgress, pRaft); + + pRaft->pendingConfigIndex = 0; + pRaft->uncommittedSize = 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_inflights.c b/source/libs/sync/src/sync_raft_inflights.c new file mode 100644 index 0000000000..3d740b5a9e --- /dev/null +++ b/source/libs/sync/src/sync_raft_inflights.c @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_inflights.h" + +SSyncRaftInflights* syncRaftOpenInflights(int size) { + SSyncRaftInflights* inflights = (SSyncRaftInflights*)malloc(sizeof(SSyncRaftInflights)); + if (inflights == NULL) { + return NULL; + } + SyncIndex* buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * size); + if (buffer == NULL) { + free(inflights); + return NULL; + } + *inflights = (SSyncRaftInflights) { + .buffer = buffer, + .count = 0, + .size = 0, + .start = 0, + }; + + return inflights; +} + +void syncRaftCloseInflights(SSyncRaftInflights* inflights) { + free(inflights->buffer); + free(inflights); +} + +/** + * syncRaftInflightAdd notifies the Inflights that a new message with the given index is being + * dispatched. syncRaftInflightFull() must be called prior to syncRaftInflightAdd() + * to verify that there is room for one more message, + * and consecutive calls to add syncRaftInflightAdd() must provide a + * monotonic sequence of indexes. + **/ +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { + assert(!syncRaftInflightFull(inflights)); + + int next = inflights->start + inflights->count; + int size = inflights->size; + /* is next wrapped around buffer? */ + if (next >= size) { + next -= size; + } + + inflights->buffer[next] = inflightIndex; + inflights->count++; +} + +/** + * syncRaftInflightFreeLE frees the inflights smaller or equal to the given `to` flight. + **/ +void syncRaftInflightFreeLE(SSyncRaftInflights* inflights, SyncIndex toIndex) { + if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { + /* out of the left side of the window */ + return; + } + + int i, idx; + for (i = 0, idx = inflights->start; i < inflights->count; i++) { + if (toIndex < inflights->buffer[idx]) { // found the first large inflight + break; + } + + // increase index and maybe rotate + int size = inflights->size; + idx++; + if (idx >= size) { + idx -= size; + } + } + + // free i inflights and set new start index + inflights->count -= i; + inflights->start = idx; + assert(inflights->count >= 0); + if (inflights->count == 0) { + // inflights is empty, reset the start index so that we don't grow the + // buffer unnecessarily. + inflights->start = 0; + } +} + +/** + * syncRaftInflightFreeFirstOne releases the first inflight. + * This is a no-op if nothing is inflight. + **/ +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { + syncRaftInflightFreeLE(inflights, inflights->buffer[inflights->start]); +} diff --git a/source/libs/sync/src/sync_raft_progress.c b/source/libs/sync/src/sync_raft_progress.c new file mode 100644 index 0000000000..437c083b4d --- /dev/null +++ b/source/libs/sync/src/sync_raft_progress.c @@ -0,0 +1,242 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_log.h" +#include "sync_raft_progress.h" +#include "sync_raft_progress_tracker.h" +#include "sync.h" +#include "syncInt.h" + +static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); +static void probeAcked(SSyncRaftProgress* progress); + +static void resumeProgress(SSyncRaftProgress* progress); + +void syncRaftInitProgress(int i, SSyncRaft* pRaft, SSyncRaftProgress* progress) { + SSyncRaftInflights* inflights = syncRaftOpenInflights(pRaft->tracker->maxInflight); + if (inflights == NULL) { + return; + } + *progress = (SSyncRaftProgress) { + .matchIndex = i == pRaft->selfIndex ? syncRaftLogLastIndex(pRaft->log) : 0, + .nextIndex = syncRaftLogLastIndex(pRaft->log) + 1, + .inflights = inflights, + .isLearner = false, + .state = PROGRESS_STATE_PROBE, + }; +} + +/** + * syncRaftProgressMaybeUpdate is called when an MsgAppResp arrives from the follower, with the + * index acked by it. The method returns false if the given n index comes from + * an outdated message. Otherwise it updates the progress and returns true. + **/ +bool syncRaftProgressMaybeUpdate(SSyncRaftProgress* progress, SyncIndex lastIndex) { + bool updated = false; + + if (progress->matchIndex < lastIndex) { + progress->matchIndex = lastIndex; + updated = true; + probeAcked(progress); + } + + progress->nextIndex = MAX(progress->nextIndex, lastIndex + 1); + + return updated; +} + +bool syncRaftProgressMaybeDecrTo(SSyncRaftProgress* progress, + SyncIndex rejected, SyncIndex matchHint) { + if (progress->state == PROGRESS_STATE_REPLICATE) { + /** + * the rejection must be stale if the progress has matched and "rejected" + * is smaller than "match". + **/ + if (rejected <= progress->matchIndex) { + syncDebug("match index is up to date,ignore"); + return false; + } + + /* directly decrease next to match + 1 */ + progress->nextIndex = progress->matchIndex + 1; + return true; + } + + /** + * The rejection must be stale if "rejected" does not match next - 1. This + * is because non-replicating followers are probed one entry at a time. + **/ + if (rejected != progress->nextIndex - 1) { + syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" + , rejected, progress->nextIndex); + return false; + } + + progress->nextIndex = MAX(MIN(rejected, matchHint + 1), 1); + + progress->probeSent = false; + return true; +} + +/** + * syncRaftProgressIsPaused returns whether sending log entries to this node has been throttled. + * This is done when a node has rejected recent MsgApps, is currently waiting + * for a snapshot, or has reached the MaxInflightMsgs limit. In normal + * operation, this is false. A throttled node will be contacted less frequently + * until it has reached a state in which it's able to accept a steady stream of + * log entries again. + **/ +bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) { + switch (progress->state) { + case PROGRESS_STATE_PROBE: + return progress->probeSent; + case PROGRESS_STATE_REPLICATE: + return syncRaftInflightFull(progress->inflights); + case PROGRESS_STATE_SNAPSHOT: + return true; + default: + syncFatal("error sync state:%d", progress->state); + } +} + +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress) { + return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; +} + +/** + * syncRaftProgressBecomeProbe transitions into StateProbe. Next is reset to Match+1 or, + * optionally and if larger, the index of the pending snapshot. + **/ +void syncRaftProgressBecomeProbe(SSyncRaftProgress* progress) { + /** + * If the original state is ProgressStateSnapshot, progress knows that + * the pending snapshot has been sent to this peer successfully, then + * probes from pendingSnapshot + 1. + **/ + if (progress->state == PROGRESS_STATE_SNAPSHOT) { + SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; + resetProgressState(progress, PROGRESS_STATE_PROBE); + progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1); + } else { + resetProgressState(progress, PROGRESS_STATE_PROBE); + progress->nextIndex = progress->matchIndex + 1; + } +} + +/** + * syncRaftProgressBecomeReplicate transitions into StateReplicate, resetting Next to Match+1. + **/ +void syncRaftProgressBecomeReplicate(SSyncRaftProgress* progress) { + resetProgressState(progress, PROGRESS_STATE_REPLICATE); + progress->nextIndex = progress->matchIndex + 1; +} + +void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snapshotIndex) { + resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); + progress->pendingSnapshotIndex = snapshotIndex; +} + +/** + * ResetState moves the Progress into the specified State, resetting ProbeSent, + * PendingSnapshot, and Inflights. + **/ +static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { + progress->probeSent = false; + progress->pendingSnapshotIndex = 0; + progress->state = state; + syncRaftInflightReset(progress->inflights); +} + +/** + * probeAcked is called when this peer has accepted an append. It resets + * ProbeSent to signal that additional append messages should be sent without + * further delay. + **/ +static void probeAcked(SSyncRaftProgress* progress) { + progress->probeSent = false; +} + +#if 0 + +SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].nextIndex; +} + +SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].matchIndex; +} + +void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); +} + +void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); +} + +bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + bool prev = progress->recentRecv; + progress->recentRecv = false; + return prev; +} + +void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].recentRecv = true; +} + +bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].recentRecv; +} + +void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + resetProgressState(progress, PROGRESS_STATE_SNAPSHOT); + progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); +} + +void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + if (progress->state == PROGRESS_STATE_SNAPSHOT) { + assert(progress->pendingSnapshotIndex > 0); + SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; + resetProgressState(progress, PROGRESS_STATE_PROBE); + progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); + } else { + resetProgressState(progress, PROGRESS_STATE_PROBE); + progress->nextIndex = progress->matchIndex + 1; + } +} + +void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { + resetProgressState(pRaft->leaderState.progress, PROGRESS_STATE_REPLICATE); + pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; +} + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + progress->pendingSnapshotIndex = 0; + progress->state = PROGRESS_STATE_PROBE; +} + +RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].state; +} + + + +#endif \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c new file mode 100644 index 0000000000..43b68a4b08 --- /dev/null +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_progress_tracker.h" + +SSyncRaftProgressTracker* syncRaftOpenProgressTracker() { + SSyncRaftProgressTracker* tracker = (SSyncRaftProgressTracker*)malloc(sizeof(SSyncRaftProgressTracker)); + if (tracker == NULL) { + return NULL; + } + + return tracker; +} + +void syncRaftResetVotes(SSyncRaftProgressTracker* tracker) { + memset(tracker->votes, SYNC_RAFT_VOTE_RESP_UNKNOWN, sizeof(SyncRaftVoteResult) * TSDB_MAX_REPLICA); +} + +void syncRaftProgressVisit(SSyncRaftProgressTracker* tracker, visitProgressFp visit, void* arg) { + int i; + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SSyncRaftProgress* progress = &(tracker->progressMap[i]); + visit(i, progress, arg); + } +} + +void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) { + if (tracker->votes[i] != SYNC_RAFT_VOTE_RESP_UNKNOWN) { + return; + } + + tracker->votes[i] = grant ? SYNC_RAFT_VOTE_RESP_GRANT : SYNC_RAFT_VOTE_RESP_REJECT; +} + +/** + * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the + * election outcome is known. + **/ +SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* rejected, int *granted) { + int i; + SSyncRaftProgress* progress; + int r, g; + + for (i = 0, r = 0, g = 0; i < TSDB_MAX_REPLICA; ++i) { + progress = &(tracker->progressMap[i]); + if (progress->id == SYNC_NON_NODE_ID) { + continue; + } + + if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + continue; + } + + if (tracker->votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + g++; + } else { + r++; + } + } + + if (rejected) *rejected = r; + if (granted) *granted = g; + return syncRaftVoteResult(&(tracker->config.voters), tracker->votes); +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c new file mode 100644 index 0000000000..a0e6a6782a --- /dev/null +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_quorum_majority.h" +#include "sync_raft_quorum_joint.h" +#include "sync_raft_quorum.h" + +/** + * syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending, lost, or won. A joint quorum + * requires both majority quorums to vote in favor. + **/ +SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const SyncRaftVoteResult* votes) { + SyncRaftVoteResult r1 = syncRaftMajorityVoteResult(&(config->majorityConfig[0]), votes); + SyncRaftVoteResult r2 = syncRaftMajorityVoteResult(&(config->majorityConfig[1]), votes); + + if (r1 == r2) { + // If they agree, return the agreed state. + return r1; + } + + if (r1 == SYNC_RAFT_VOTE_LOST || r2 == SYNC_RAFT_VOTE_LOST) { + // If either config has lost, loss is the only possible outcome. + return SYNC_RAFT_VOTE_LOST; + } + + // One side won, the other one is pending, so the whole outcome is. + return SYNC_RAFT_VOTE_PENDING; +} diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c new file mode 100644 index 0000000000..ea543d7335 --- /dev/null +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_quorum.h" +#include "sync_raft_quorum_majority.h" + +/** + * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns + * a result indicating whether the vote is pending (i.e. neither a quorum of + * yes/no has been reached), won (a quorum of yes has been reached), or lost (a + * quorum of no has been reached). + **/ +SyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const SyncRaftVoteResult* votes) { + if (config->replica == 0) { + return SYNC_RAFT_VOTE_WON; + } + + int i, g, r, missing; + for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { + if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { + continue; + } + + if (votes[i] == SYNC_RAFT_VOTE_RESP_UNKNOWN) { + missing += 1; + } else if (votes[i] == SYNC_RAFT_VOTE_RESP_GRANT) { + g +=1; + } else { + r += 1; + } + } + + int quorum = config->replica / 2 + 1; + if (g >= quorum) { + return SYNC_RAFT_VOTE_WON; + } + if (r + missing >= quorum) { + return SYNC_RAFT_VOTE_PENDING; + } + + return SYNC_RAFT_VOTE_LOST; +} \ No newline at end of file