Merge pull request #10527 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
This commit is contained in:
Li Minghao 2022-03-03 18:44:19 +08:00 committed by GitHub
commit 405e9e2c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 30 deletions

View File

@ -34,9 +34,7 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_FOLLOWER = 0,
TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_CANDIDATE = 1,
TAOS_SYNC_STATE_LEADER = 2, TAOS_SYNC_STATE_LEADER = 2,
} ESyncRole; } ESyncState;
typedef ESyncRole ESyncState;
typedef struct SSyncBuffer { typedef struct SSyncBuffer {
void* data; void* data;

View File

@ -25,6 +25,7 @@ extern "C" {
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "sync.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
@ -91,31 +92,61 @@ typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
struct SSyncEnv; struct SSyncEnv;
typedef struct SSyncEnv SSyncEnv; typedef struct SSyncEnv SSyncEnv;
struct SRaftStore;
typedef struct SRaftStore SRaftStore;
struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;
struct SVotesResponded;
typedef struct SVotesResponded SVotesResponded;
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId; SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId; SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId; } SRaftId;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm;
// passed from outside
void* rpcClient; void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
// init internal
SNodeInfo me;
int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA];
// raft algorithm
SSyncFSM* pFsm;
SRaftId raftId;
SRaftId peersId[TSDB_MAX_REPLICA];
int32_t replicaNum;
int32_t quorum;
// life cycle
int32_t refCount; int32_t refCount;
int64_t rid; int64_t rid;
SNodeInfo me; // tla+ server vars
SNodeInfo peers[TSDB_MAX_REPLICA]; ESyncState state;
int32_t peersNum; SRaftStore* pRaftStore;
ESyncRole role; // tla+ candidate vars
SRaftId raftId; SVotesGranted* pVotesGranted;
SVotesResponded* pVotesResponded;
// tla+ leader vars
SHashObj* pNextIndex;
SHashObj* pMatchIndex;
// tla+ log vars
SSyncLogStore* pLogStore;
SyncIndex commitIndex;
// timer
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
uint8_t pingTimerStart; uint8_t pingTimerStart;
@ -136,31 +167,20 @@ typedef struct SSyncNode {
// callback // callback
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -26,6 +26,12 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
typedef struct SVotesGranted {
} SVotesGranted;
typedef struct SVotesResponded {
} SVotesResponded;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -88,7 +88,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
} }
} }
pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;