From f0b31fed0f40da2ed62d563a3221c94492b3c89d Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 23 Oct 2021 22:29:00 +0800 Subject: [PATCH 1/2] [feeature][raft]add raft interface --- include/libs/raft/raft_sync.h | 131 ++++++++++++++++++++++++++++++++ source/libs/raft/CMakeLists.txt | 8 ++ source/libs/raft/src/raft.c | 4 +- 3 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 include/libs/raft/raft_sync.h diff --git a/include/libs/raft/raft_sync.h b/include/libs/raft/raft_sync.h new file mode 100644 index 0000000000..d59ab5206a --- /dev/null +++ b/include/libs/raft/raft_sync.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_RAFT_SYNC_H +#define TDENGINE_RAFT_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include "taosdef.h" +#include "wal.h" + +typedef uint32_t SyncNodeId; +typedef int64_t SyncVersion; +typedef uint64_t SSyncTerm; + +typedef enum { + TAOS_SYNC_ROLE_FOLLOWER = 0, + TAOS_SYNC_ROLE_CANDIDATE = 1, + TAOS_SYNC_ROLE_LEADER = 2, +} ESyncRole; + +typedef struct { + void* data; + size_t len; +} SSyncBuffer; + +typedef struct { + SyncNodeId nodeId; // node ID assigned by TDengine + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +} SNodeInfo; + +typedef struct { + int selfIndex; + int nNode; + SNodeInfo* nodeInfo; +} SSyncCluster; + +typedef struct { + int32_t selfIndex; + int nNode; + SyncNodeId* nodeId; + ESyncRole* role; +} SNodesRole; + +struct SSyncFSM; +typedef struct SSyncFSM { + void* pData; + + // apply committed log, bufs will be free by raft module + int (*applyLog)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf, void *pData); + + // cluster commit callback + int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); + + // fsm return snapshot in ppBuf, bufs will be free by raft module + // TODO: getSnapshot SHOULD be async? + int (*getSnapshot)(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int* objId, bool *isLast); + + // fsm apply snapshot with pBuf data + int (*applySnapshot)(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int objId, bool isLast); + + // call when restore snapshot and log done + int (*onRestoreDone)(struct SSyncFSM *fsm); + + void (*onRollback)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf); + + void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); + +} SSyncFSM; + +typedef struct SSyncServerState { + SyncNodeId voteFor; + SSyncTerm term; +} SSyncServerState; + +typedef struct SStateManager { + void* pData; + + void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state); + + const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); + + void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster); + + const SSyncCluster* (*readCluster)(struct SStateManager* stateMng); +} SStateManager; + +typedef struct { + int32_t vgId; // vgroup ID + + twalh walHandle; + + SyncVersion snapIndex; // initial version + SSyncCluster syncCfg; // configuration from mgmt + + SSyncFSM fsm; + + SStateManager stateManager; +} SSyncInfo; + +int32_t syncInit(); +void syncCleanUp(); + +SyncNodeId syncStart(const SSyncInfo *); +void syncStop(SyncNodeId); + +int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void *pData, bool isWeak); + +extern int32_t raftDebugFlag; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SYNC_H diff --git a/source/libs/raft/CMakeLists.txt b/source/libs/raft/CMakeLists.txt index 127adc106e..6cdc72dc8f 100644 --- a/source/libs/raft/CMakeLists.txt +++ b/source/libs/raft/CMakeLists.txt @@ -1,5 +1,13 @@ aux_source_directory(src RAFT_SRC) add_library(raft ${RAFT_SRC}) + +target_link_libraries( + raft + PUBLIC common + PUBLIC util + PUBLIC wal +) + target_include_directories( raft PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/raft" diff --git a/source/libs/raft/src/raft.c b/source/libs/raft/src/raft.c index 6dea4a4e57..852618186f 100644 --- a/source/libs/raft/src/raft.c +++ b/source/libs/raft/src/raft.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "raft_sync.h" \ No newline at end of file From 048329e95e0971a2d92327531cafb2ef83dfbace Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 25 Oct 2021 09:29:37 +0800 Subject: [PATCH 2/2] [feeature][raft]refactor raft interface to sync --- include/libs/raft/raft.h | 161 ------------------ .../libs/{raft/raft_sync.h => sync/sync.h} | 11 +- source/libs/CMakeLists.txt | 2 +- source/libs/{raft => sync}/CMakeLists.txt | 10 +- source/libs/{raft => sync}/inc/raftInt.h | 0 .../libs/{raft/src/raft.c => sync/src/sync.c} | 2 +- source/libs/{raft => sync}/test/raftTests.cpp | 0 7 files changed, 13 insertions(+), 173 deletions(-) delete mode 100644 include/libs/raft/raft.h rename include/libs/{raft/raft_sync.h => sync/sync.h} (90%) rename source/libs/{raft => sync}/CMakeLists.txt (52%) rename source/libs/{raft => sync}/inc/raftInt.h (100%) rename source/libs/{raft/src/raft.c => sync/src/sync.c} (96%) rename source/libs/{raft => sync}/test/raftTests.cpp (100%) diff --git a/include/libs/raft/raft.h b/include/libs/raft/raft.h deleted file mode 100644 index 5b7f93276b..0000000000 --- a/include/libs/raft/raft.h +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_RAFT_H -#define TD_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include "taosdef.h" - -typedef unsigned int RaftId; -typedef unsigned int RaftGroupId; - -// buffer holding data -typedef struct RaftBuffer { - void* data; - size_t len; -} RaftBuffer; - -// a single server information in a cluster -typedef struct RaftServer { - RaftId id; - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; -} RaftServer; - -// all servers in a cluster -typedef struct RaftConfiguration { - RaftServer *servers; - int nServer; -} RaftConfiguration; - -// raft lib module -struct Raft; -typedef struct Raft Raft; - -struct RaftNode; -typedef struct RaftNode RaftNode; - -// raft state machine -struct RaftFSM; -typedef struct RaftFSM { - // statemachine user data - void *data; - - // apply buffer data, bufs will be free by raft module - int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); - - // configuration commit callback - int (*onConfigurationCommit)(const RaftConfiguration* cluster); - - // fsm return snapshot in ppBuf, bufs will be free by raft module - // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); - - // fsm restore with pBuf data - int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); - - // fsm send data in buf to server,buf will be free by raft module - int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); -} RaftFSM; - -typedef struct RaftNodeOptions { - // user define state machine - RaftFSM* pFSM; - - // election timeout(in ms) - // by default: 1000 - int electionTimeoutMS; - - // heart timeout(in ms) - // by default: 100 - int heartbeatTimeoutMS; - - // install snapshot timeout(in ms) - int installSnapshotTimeoutMS; - - /** - * number of log entries before starting a new snapshot. - * by default: 1024 - */ - int snapshotThreshold; - - /** - * Number of log entries to keep in the log after a snapshot has - * been taken. - * by default: 128. - */ - int snapshotTrailing; - - /** - * Enable or disable pre-vote support. - * by default: false - */ - bool preVote; - -} RaftNodeOptions; - -// create raft lib -int RaftCreate(Raft** ppRaft); - -int RaftDestroy(Raft* pRaft); - -// start a raft node with options,node id,group id -int RaftStart(Raft* pRaft, - RaftId selfId, - RaftGroupId selfGroupId, - const RaftConfiguration* cluster, - const RaftNodeOptions* options, - RaftNode **ppNode); - -// stop a raft node -int RaftStop(RaftNode* pNode); - -// client apply a cmd in buf -typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); - -int RaftApply(RaftNode *pNode, - const RaftBuffer *pBuf, - RaftApplyFp applyCb); - -// recv data from other servers in cluster,buf will be free in raft -int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); - -// change cluster servers API -typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); - -int RaftAddServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -int RaftRemoveServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -// transfer leader to id -typedef void (*RaftTransferFp)(RaftId id, int result); -int RaftTransfer(RaftNode *pNode, - RaftId id, - RaftTransferFp transferCb); - -#ifdef __cplusplus -} -#endif - -#endif /* TD_RAFT_H */ \ No newline at end of file diff --git a/include/libs/raft/raft_sync.h b/include/libs/sync/sync.h similarity index 90% rename from include/libs/raft/raft_sync.h rename to include/libs/sync/sync.h index d59ab5206a..ee3ea8db9b 100644 --- a/include/libs/raft/raft_sync.h +++ b/include/libs/sync/sync.h @@ -25,7 +25,8 @@ extern "C" { #include "wal.h" typedef uint32_t SyncNodeId; -typedef int64_t SyncVersion; +typedef int32_t SyncGroupId; +typedef int64_t SyncIndex; typedef uint64_t SSyncTerm; typedef enum { @@ -63,7 +64,7 @@ typedef struct SSyncFSM { void* pData; // apply committed log, bufs will be free by raft module - int (*applyLog)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf, void *pData); + int (*applyLog)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData); // cluster commit callback int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); @@ -78,7 +79,7 @@ typedef struct SSyncFSM { // call when restore snapshot and log done int (*onRestoreDone)(struct SSyncFSM *fsm); - void (*onRollback)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf); + void (*onRollback)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf); void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); @@ -102,11 +103,11 @@ typedef struct SStateManager { } SStateManager; typedef struct { - int32_t vgId; // vgroup ID + SyncGroupId vgId; twalh walHandle; - SyncVersion snapIndex; // initial version + SyncIndex snapshotIndex; // initial version SSyncCluster syncCfg; // configuration from mgmt SSyncFSM fsm; diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index a7ec318eaa..3a975e679b 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -1,5 +1,5 @@ add_subdirectory(transport) -add_subdirectory(raft) +add_subdirectory(sync) add_subdirectory(tkv) add_subdirectory(index) add_subdirectory(wal) diff --git a/source/libs/raft/CMakeLists.txt b/source/libs/sync/CMakeLists.txt similarity index 52% rename from source/libs/raft/CMakeLists.txt rename to source/libs/sync/CMakeLists.txt index 6cdc72dc8f..124f4a1fee 100644 --- a/source/libs/raft/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -1,15 +1,15 @@ -aux_source_directory(src RAFT_SRC) -add_library(raft ${RAFT_SRC}) +aux_source_directory(src SYNC_SRC) +add_library(sync ${SYNC_SRC}) target_link_libraries( - raft + sync PUBLIC common PUBLIC util PUBLIC wal ) target_include_directories( - raft - PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/raft" + sync + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) \ No newline at end of file diff --git a/source/libs/raft/inc/raftInt.h b/source/libs/sync/inc/raftInt.h similarity index 100% rename from source/libs/raft/inc/raftInt.h rename to source/libs/sync/inc/raftInt.h diff --git a/source/libs/raft/src/raft.c b/source/libs/sync/src/sync.c similarity index 96% rename from source/libs/raft/src/raft.c rename to source/libs/sync/src/sync.c index 852618186f..35611f3da8 100644 --- a/source/libs/raft/src/raft.c +++ b/source/libs/sync/src/sync.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "raft_sync.h" \ No newline at end of file +#include "sync.h" \ No newline at end of file diff --git a/source/libs/raft/test/raftTests.cpp b/source/libs/sync/test/raftTests.cpp similarity index 100% rename from source/libs/raft/test/raftTests.cpp rename to source/libs/sync/test/raftTests.cpp