This commit is contained in:
Hongze Cheng 2021-10-25 15:07:41 +08:00
parent c7b69f4bc9
commit e938d5a29f
1 changed files with 29 additions and 29 deletions

View File

@ -25,36 +25,36 @@ extern "C" {
#include "wal.h" #include "wal.h"
typedef uint32_t SyncNodeId; typedef uint32_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SSyncTerm; typedef uint64_t SSyncTerm;
typedef enum { typedef enum {
TAOS_SYNC_ROLE_FOLLOWER = 0, TAOS_SYNC_ROLE_FOLLOWER = 0,
TAOS_SYNC_ROLE_CANDIDATE = 1, TAOS_SYNC_ROLE_CANDIDATE = 1,
TAOS_SYNC_ROLE_LEADER = 2, TAOS_SYNC_ROLE_LEADER = 2,
} ESyncRole; } ESyncRole;
typedef struct { typedef struct {
void* data; void* data;
size_t len; size_t len;
} SSyncBuffer; } SSyncBuffer;
typedef struct { typedef struct {
SyncNodeId nodeId; // node ID assigned by TDengine SyncNodeId nodeId; // node ID assigned by TDengine
uint16_t nodePort; // node sync Port uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
int selfIndex; int selfIndex;
int nNode; int nNode;
SNodeInfo* nodeInfo; SNodeInfo* nodeInfo;
} SSyncCluster; } SSyncCluster;
typedef struct { typedef struct {
int32_t selfIndex; int32_t selfIndex;
int nNode; int nNode;
SyncNodeId* nodeId; SyncNodeId* nodeId;
ESyncRole* role; ESyncRole* role;
} SNodesRole; } SNodesRole;
@ -64,30 +64,30 @@ typedef struct SSyncFSM {
void* pData; void* pData;
// apply committed log, bufs will be free by raft module // apply committed log, bufs will be free by raft module
int (*applyLog)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData); int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
// cluster commit callback // cluster commit callback
int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
// fsm return snapshot in ppBuf, bufs will be free by raft module // fsm return snapshot in ppBuf, bufs will be free by raft module
// TODO: getSnapshot SHOULD be async? // TODO: getSnapshot SHOULD be async?
int (*getSnapshot)(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int* objId, bool *isLast); int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast);
// fsm apply snapshot with pBuf data // fsm apply snapshot with pBuf data
int (*applySnapshot)(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int objId, bool isLast); int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast);
// call when restore snapshot and log done // call when restore snapshot and log done
int (*onRestoreDone)(struct SSyncFSM *fsm); int (*onRestoreDone)(struct SSyncFSM* fsm);
void (*onRollback)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf); void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); void (*onRoleChanged)(struct SSyncFSM* fsm, const SNodesRole* pRole);
} SSyncFSM; } SSyncFSM;
typedef struct SSyncServerState { typedef struct SSyncServerState {
SyncNodeId voteFor; SyncNodeId voteFor;
SSyncTerm term; SSyncTerm term;
} SSyncServerState; } SSyncServerState;
typedef struct SStateManager { typedef struct SStateManager {
@ -103,12 +103,12 @@ typedef struct SStateManager {
} SStateManager; } SStateManager;
typedef struct { typedef struct {
SyncGroupId vgId; SyncGroupId vgId;
twalh walHandle; twalh walHandle;
SyncIndex snapshotIndex; // initial version SyncIndex snapshotIndex; // initial version
SSyncCluster syncCfg; // configuration from mgmt SSyncCluster syncCfg; // configuration from mgmt
SSyncFSM fsm; SSyncFSM fsm;
@ -118,12 +118,12 @@ typedef struct {
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
SyncNodeId syncStart(const SSyncInfo *); SyncNodeId syncStart(const SSyncInfo*);
void syncStop(SyncNodeId); void syncStop(SyncNodeId);
int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void *pData, bool isWeak); int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void* pData, bool isWeak);
extern int32_t raftDebugFlag; extern int32_t raftDebugFlag;
#ifdef __cplusplus #ifdef __cplusplus
} }