Merge pull request #10233 from taosdata/feature/3.0_mhli

TD-13476 add sync.h
This commit is contained in:
Li Minghao 2022-02-12 21:14:14 +08:00 committed by GitHub
commit 67f91c6d06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 65 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
typedef int32_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SyncTerm; typedef uint64_t SyncTerm;
@ -46,109 +46,113 @@ typedef struct {
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
int32_t selfIndex; int32_t replicaNum;
int32_t replica;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCluster; } SSyncCfg;
typedef struct { typedef struct {
int32_t selfIndex; int32_t replicaNum;
int32_t replica; SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
SNodeInfo node[TSDB_MAX_REPLICA];
ESyncState role[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA];
} SNodesRole; } SNodesRole;
// abstract definition of snapshot
typedef struct SSnapshot {
void* data;
SyncIndex lastApplyIndex;
} SSnapshot;
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* pData; void* data;
// apply committed log, bufs will be free by sync module // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
// cluster commit callback // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
// fsm return snapshot in ppBuf, bufs will be free by sync module // when log entry is updated by a new one, FpRollBackCb is called
// TODO: getSnapshot SHOULD be async? // user can do something to roll back. for example, delete data from tsdb, or just ignore it
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code);
// fsm apply snapshot with pBuf data // user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast); int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
// call when restore snapshot and log done // user should implement this function, restore "data" from "snapshot"
int32_t (*onRestoreDone)(struct SSyncFSM* fsm); int32_t (*FpRestoreSnapshot)(const SSnapshot* snapshot);
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
void (*onRoleChanged)(struct SSyncFSM* fsm, const SNodesRole* pRole);
} SSyncFSM; } SSyncFSM;
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore { typedef struct SSyncLogStore {
void* pData; void* data;
// write log with given index // append one log entry
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);
/** // get one log entry, user need to free pBuf->data
* read log from given index(included) with limit, return the actual num in nBuf, int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf);
* pBuf will be free in sync module
**/
int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit,
SSyncBuffer* pBuf, int* nBuf);
// mark log with given index has been commtted // update log store commit index with "index"
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
// prune log before given index(not included) // truncate log with index, entries after the given index (>index) will be deleted
int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index);
// rollback log after given index(included) // return commit index of log
int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
// return index of last entry
SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore);
// return last index of log
SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore);
} SSyncLogStore; } SSyncLogStore;
typedef struct SStateManager { // raft need to persist two variables in storage: currentTerm, voteFor
void* pData; typedef struct SStateMgr {
void* data;
// save serialized server state data, buffer will be free by Sync int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm);
int32_t (*saveServerState)(struct SStateManager* stateMng, const char* buffer, int n); int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm);
// read serialized server state data, buffer will be free by Sync int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor);
int32_t (*readServerState)(struct SStateManager* stateMng, char** ppBuffer, int* n); int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor);
// save serialized cluster state data, buffer will be free by Sync int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
void (*saveClusterState)(struct SStateManager* stateMng, const char* buffer, int n); int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
// read serialized cluster state data, buffer will be free by Sync } SStateMgr;
int32_t (*readClusterState)(struct SStateManager* stateMng, char** ppBuffer, int* n);
} SStateManager;
typedef struct { typedef struct {
SyncGroupId vgId; SyncGroupId vgId;
SyncIndex appliedIndex; SSyncCfg syncCfg;
SSyncCluster syncCfg;
SSyncFSM fsm;
SSyncLogStore logStore; SSyncLogStore logStore;
SStateManager stateManager; SStateMgr stateManager;
SSyncFSM syncFsm;
} SSyncInfo; } SSyncInfo;
struct SSyncNode; // will be defined in syncInt.h, here just for complie
typedef struct SSyncNode SSyncNode; typedef struct SSyncNode {
} SSyncNode;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
SSyncNode* syncStart(const SSyncInfo*); int64_t syncStart(const SSyncInfo*);
void syncReconfig(const SSyncNode*, const SSyncCluster*); void syncStop(int64_t rid);
void syncStop(const SSyncNode*); int32_t syncReconfig(int64_t rid, const SSyncCfg*);
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak); // int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole*);
int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
extern int32_t sDebugFlag; extern int32_t sDebugFlag;