From 9eafb7d8b9614cd9959e356d9e2b5c66052a12d0 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 9 Oct 2021 16:24:07 +0800 Subject: [PATCH 1/3] [feature][TD-10557]add raft interface --- include/libs/raft/raft.h | 142 ++++++++++++++++++++++++++++++++++- src/raft/CMakeLists.txt | 7 ++ src/raft/inc/raft.h | 157 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 src/raft/CMakeLists.txt create mode 100644 src/raft/inc/raft.h diff --git a/include/libs/raft/raft.h b/include/libs/raft/raft.h index 1eedc0f4b8..5b7f93276b 100644 --- a/include/libs/raft/raft.h +++ b/include/libs/raft/raft.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 TAOS Data, Inc. + * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -13,15 +13,149 @@ * along with this program. If not, see . */ -#ifndef _TD_RAFT_H_ -#define _TD_RAFT_H_ +#ifndef TD_RAFT_H +#define TD_RAFT_H #ifdef __cplusplus extern "C" { #endif +#include +#include "taosdef.h" + +typedef unsigned int RaftId; +typedef unsigned int RaftGroupId; + +// buffer holding data +typedef struct RaftBuffer { + void* data; + size_t len; +} RaftBuffer; + +// a single server information in a cluster +typedef struct RaftServer { + RaftId id; + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; +} RaftServer; + +// all servers in a cluster +typedef struct RaftConfiguration { + RaftServer *servers; + int nServer; +} RaftConfiguration; + +// raft lib module +struct Raft; +typedef struct Raft Raft; + +struct RaftNode; +typedef struct RaftNode RaftNode; + +// raft state machine +struct RaftFSM; +typedef struct RaftFSM { + // statemachine user data + void *data; + + // apply buffer data, bufs will be free by raft module + int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); + + // configuration commit callback + int (*onConfigurationCommit)(const RaftConfiguration* cluster); + + // fsm return snapshot in ppBuf, bufs will be free by raft module + // TODO: getSnapshot SHOULD be async? + int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); + + // fsm restore with pBuf data + int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); + + // fsm send data in buf to server,buf will be free by raft module + int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); +} RaftFSM; + +typedef struct RaftNodeOptions { + // user define state machine + RaftFSM* pFSM; + + // election timeout(in ms) + // by default: 1000 + int electionTimeoutMS; + + // heart timeout(in ms) + // by default: 100 + int heartbeatTimeoutMS; + + // install snapshot timeout(in ms) + int installSnapshotTimeoutMS; + + /** + * number of log entries before starting a new snapshot. + * by default: 1024 + */ + int snapshotThreshold; + + /** + * Number of log entries to keep in the log after a snapshot has + * been taken. + * by default: 128. + */ + int snapshotTrailing; + + /** + * Enable or disable pre-vote support. + * by default: false + */ + bool preVote; + +} RaftNodeOptions; + +// create raft lib +int RaftCreate(Raft** ppRaft); + +int RaftDestroy(Raft* pRaft); + +// start a raft node with options,node id,group id +int RaftStart(Raft* pRaft, + RaftId selfId, + RaftGroupId selfGroupId, + const RaftConfiguration* cluster, + const RaftNodeOptions* options, + RaftNode **ppNode); + +// stop a raft node +int RaftStop(RaftNode* pNode); + +// client apply a cmd in buf +typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); + +int RaftApply(RaftNode *pNode, + const RaftBuffer *pBuf, + RaftApplyFp applyCb); + +// recv data from other servers in cluster,buf will be free in raft +int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); + +// change cluster servers API +typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); + +int RaftAddServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +int RaftRemoveServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +// transfer leader to id +typedef void (*RaftTransferFp)(RaftId id, int result); +int RaftTransfer(RaftNode *pNode, + RaftId id, + RaftTransferFp transferCb); + #ifdef __cplusplus } #endif -#endif /*_TD_RAFT_H_*/ \ No newline at end of file +#endif /* TD_RAFT_H */ \ No newline at end of file diff --git a/src/raft/CMakeLists.txt b/src/raft/CMakeLists.txt new file mode 100644 index 0000000000..3dc6601038 --- /dev/null +++ b/src/raft/CMakeLists.txt @@ -0,0 +1,7 @@ +aux_source_directory(source RAFT_SRC) +add_library(raft ${RAFT_SRC}) +target_include_directories( + raft + PUBLIC "${CMAKE_SOURCE_DIR}/include/raft" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" +) \ No newline at end of file diff --git a/src/raft/inc/raft.h b/src/raft/inc/raft.h new file mode 100644 index 0000000000..222d8fe69d --- /dev/null +++ b/src/raft/inc/raft.h @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_RAFT_H +#define TD_RAFT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include "taosdef.h" + +typedef unsigned int RaftId; +typedef unsigned int RaftGroupId; + +// buffer holding data +typedef struct RaftBuffer { + void* data; + size_t len; +} RaftBuffer; + +// a single server information in a cluster +typedef struct RaftServer { + RaftId id; + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; +} RaftServer; + +// all servers in a cluster +typedef struct RaftConfiguration { + RaftServer *servers; + int nServer; +} RaftConfiguration; + +// raft lib module +struct Raft; +typedef struct Raft Raft; + +struct RaftNode; +typedef struct RaftNode RaftNode; + +// raft state machine +struct RaftFSM; +typedef struct RaftFSM { + // statemachine user data + void *data; + + // apply buffer data, bufs will be free by raft module + int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); + + // fsm return snapshot in ppBuf, bufs will be free by raft module + // TODO: getSnapshot SHOULD be async? + int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); + + // fsm restore with pBuf data + int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); + + // fsm send data in buf to server,buf will be free by raft module + int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); +} RaftFSM; + +typedef struct RaftNodeOptions { + // user define state machine + RaftFSM* pFSM; + + // election timeout(in ms) + // by default: 150 + int electionTimeoutMS; + + // heart timeout(in ms) + // by default: 50 + int heartbeatTimeoutMS; + + // install snapshot timeout(in ms) + int installSnapshotTimeoutMS; + + /** + * number of log entries before starting a new snapshot. + * by default: 1024 + */ + int snapshotThreshold; + + /** + * Number of log entries to keep in the log after a snapshot has + * been taken. + * by default: 128. + */ + int snapshotTrailing; + + /** + * Enable or disable pre-vote support. + * by default: false + */ + bool preVote; + +} RaftNodeOptions; + +int RaftCreate(Raft** ppRaft); + +int RaftDestroy(Raft* pRaft); + +// start a raft node with options and cluster configuration +int RaftStart(Raft* pRaft, + RaftId selfId, + RaftGroupId selfGroupId, + const RaftNodeOptions* options, + const RaftConfiguration* cluster + RaftNode **ppNode); + +// stop a raft node +int RaftStop(RaftNode* pNode); + +// client apply a cmd in buf +typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); + +int RaftApply(RaftNode *pNode, + const RaftBuffer *pBuf, + RaftApplyFp applyCb); + +// recv data from other servers in cluster,buf will be free in raft +int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); + +// change cluster servers API +typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); + +int RaftAddServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +int RaftRemoveServer(RaftNode *pNode, + const RaftServer* pServer, + RaftChangeFp changeCb); + +// transfer leader to id +typedef void (*RaftTransferFp)(RaftId id, int result); +int RaftTransfer(RaftNode *pNode, + RaftId id, + RaftTransferFp transferCb); + +#ifdef __cplusplus +} +#endif + +#endif /* TD_RAFT_H */ \ No newline at end of file From 3279770f504c41209d6f9afc82ccf90443adf842 Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 9 Oct 2021 16:25:58 +0800 Subject: [PATCH 2/3] [feature][TD-10557]add raft interface --- src/raft/inc/raft.h | 157 -------------------------------------------- 1 file changed, 157 deletions(-) delete mode 100644 src/raft/inc/raft.h diff --git a/src/raft/inc/raft.h b/src/raft/inc/raft.h deleted file mode 100644 index 222d8fe69d..0000000000 --- a/src/raft/inc/raft.h +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_RAFT_H -#define TD_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include "taosdef.h" - -typedef unsigned int RaftId; -typedef unsigned int RaftGroupId; - -// buffer holding data -typedef struct RaftBuffer { - void* data; - size_t len; -} RaftBuffer; - -// a single server information in a cluster -typedef struct RaftServer { - RaftId id; - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; -} RaftServer; - -// all servers in a cluster -typedef struct RaftConfiguration { - RaftServer *servers; - int nServer; -} RaftConfiguration; - -// raft lib module -struct Raft; -typedef struct Raft Raft; - -struct RaftNode; -typedef struct RaftNode RaftNode; - -// raft state machine -struct RaftFSM; -typedef struct RaftFSM { - // statemachine user data - void *data; - - // apply buffer data, bufs will be free by raft module - int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); - - // fsm return snapshot in ppBuf, bufs will be free by raft module - // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); - - // fsm restore with pBuf data - int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); - - // fsm send data in buf to server,buf will be free by raft module - int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); -} RaftFSM; - -typedef struct RaftNodeOptions { - // user define state machine - RaftFSM* pFSM; - - // election timeout(in ms) - // by default: 150 - int electionTimeoutMS; - - // heart timeout(in ms) - // by default: 50 - int heartbeatTimeoutMS; - - // install snapshot timeout(in ms) - int installSnapshotTimeoutMS; - - /** - * number of log entries before starting a new snapshot. - * by default: 1024 - */ - int snapshotThreshold; - - /** - * Number of log entries to keep in the log after a snapshot has - * been taken. - * by default: 128. - */ - int snapshotTrailing; - - /** - * Enable or disable pre-vote support. - * by default: false - */ - bool preVote; - -} RaftNodeOptions; - -int RaftCreate(Raft** ppRaft); - -int RaftDestroy(Raft* pRaft); - -// start a raft node with options and cluster configuration -int RaftStart(Raft* pRaft, - RaftId selfId, - RaftGroupId selfGroupId, - const RaftNodeOptions* options, - const RaftConfiguration* cluster - RaftNode **ppNode); - -// stop a raft node -int RaftStop(RaftNode* pNode); - -// client apply a cmd in buf -typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); - -int RaftApply(RaftNode *pNode, - const RaftBuffer *pBuf, - RaftApplyFp applyCb); - -// recv data from other servers in cluster,buf will be free in raft -int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); - -// change cluster servers API -typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); - -int RaftAddServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -int RaftRemoveServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -// transfer leader to id -typedef void (*RaftTransferFp)(RaftId id, int result); -int RaftTransfer(RaftNode *pNode, - RaftId id, - RaftTransferFp transferCb); - -#ifdef __cplusplus -} -#endif - -#endif /* TD_RAFT_H */ \ No newline at end of file From 4a883c23c5568392fbb58a9fe4b312e3bc08e868 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 9 Oct 2021 17:14:37 +0800 Subject: [PATCH 3/3] refactor tq --- include/libs/wal/wal.h | 4 ++-- include/server/vnode/tq/tq.h | 19 +++++-------------- source/server/vnode/tq/inc/tqInt.h | 11 +++++++++++ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e59d60f7dc..37cd263783 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -60,8 +60,8 @@ void walClose(twalh); //write int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); void walFsync(twalh, bool forceHint); -//int32_t walCommit(twalh, uint64_t ver); -//int32_t walRollback(twalh, uint64_t ver); +//int32_t walCommit(twalh, int64_t ver); +//int32_t walRollback(twalh, int64_t ver); //read int32_t walRead(twalh, SWalHead **, int64_t ver); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 91688e890d..dd355c8381 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -25,23 +25,14 @@ extern "C" { typedef struct STQ STQ; STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); -void tqCleanUp(STQ* pTq); +void tqCleanUp(STQ*); -//create persistent storage for meta info such as consuming offset -//return value > 0: cgId -//return value <= 0: error code -int tqCreateGroup(STQ*); -//create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); -//destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); -//delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); - -int tqPushMsg(STQ*, void *, int64_t version); +//void* will be replace by a msg type +int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqHandleMsg(STQ*, void *msg); +//void* will be replace by a msg type +int tqHandleMsg(STQ*, void* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 416a915456..a51f0b03af 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -25,6 +25,17 @@ extern "C" { //implement the array index //implement the ring buffer +//create persistent storage for meta info such as consuming offset +//return value > 0: cgId +//return value <= 0: error code +int tqCreateGroup(STQ*); +//create ring buffer in memory and load consuming offset +int tqOpenGroup(STQ*, int cgId); +//destroy ring buffer and persist consuming offset +int tqCloseGroup(STQ*, int cgId); +//delete persistent storage for meta info +int tqDropGroup(STQ*, int cgId); + #ifdef __cplusplus } #endif