From 64d224a0d2b5450e221b0136b8a80b8d57184025 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Mar 2022 17:28:00 +0800 Subject: [PATCH] syncInt --- include/libs/sync/sync.h | 4 +- source/libs/sync/inc/syncInt.h | 72 +++++++++++++++++++----------- source/libs/sync/inc/syncVoteMgr.h | 6 +++ source/libs/sync/src/syncMain.c | 2 +- 4 files changed, 54 insertions(+), 30 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index cd04783dbc..53fad4607a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,9 +34,7 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_LEADER = 2, -} ESyncRole; - -typedef ESyncRole ESyncState; +} ESyncState; typedef struct SSyncBuffer { void* data; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0901330488..aedb9662b1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -25,6 +25,7 @@ extern "C" { #include #include "sync.h" #include "taosdef.h" +#include "tglobal.h" #include "tlog.h" #include "ttimer.h" @@ -91,31 +92,61 @@ typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; struct SSyncEnv; typedef struct SSyncEnv SSyncEnv; +struct SRaftStore; +typedef struct SRaftStore SRaftStore; + +struct SVotesGranted; +typedef struct SVotesGranted SVotesGranted; + +struct SVotesResponded; +typedef struct SVotesResponded SVotesResponded; + typedef struct SRaftId { SyncNodeId addr; // typedef uint64_t SyncNodeId; SyncGroupId vgId; // typedef int32_t SyncGroupId; } SRaftId; typedef struct SSyncNode { + // init by SSyncInfo SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - SSyncFSM* pFsm; - - // passed from outside - void* rpcClient; + void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + // init internal + SNodeInfo me; + int32_t peersNum; + SNodeInfo peers[TSDB_MAX_REPLICA]; + + // raft algorithm + SSyncFSM* pFsm; + SRaftId raftId; + SRaftId peersId[TSDB_MAX_REPLICA]; + int32_t replicaNum; + int32_t quorum; + + // life cycle int32_t refCount; int64_t rid; - SNodeInfo me; - SNodeInfo peers[TSDB_MAX_REPLICA]; - int32_t peersNum; + // tla+ server vars + ESyncState state; + SRaftStore* pRaftStore; - ESyncRole role; - SRaftId raftId; + // tla+ candidate vars + SVotesGranted* pVotesGranted; + SVotesResponded* pVotesResponded; + // tla+ leader vars + SHashObj* pNextIndex; + SHashObj* pMatchIndex; + + // tla+ log vars + SSyncLogStore* pLogStore; + SyncIndex commitIndex; + + // timer tmr_h pPingTimer; int32_t pingTimerMS; uint8_t pingTimerStart; @@ -136,32 +167,21 @@ typedef struct SSyncNode { // callback int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); - int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); - int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); - int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); - int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); - int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); } SSyncNode; SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); - -void syncNodeClose(SSyncNode* pSyncNode); - -void syncNodePingAll(SSyncNode* pSyncNode); - -void syncNodePingPeers(SSyncNode* pSyncNode); - -void syncNodePingSelf(SSyncNode* pSyncNode); - -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); - -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); +void syncNodeClose(SSyncNode* pSyncNode); +void syncNodePingAll(SSyncNode* pSyncNode); +void syncNodePingPeers(SSyncNode* pSyncNode); +void syncNodePingSelf(SSyncNode* pSyncNode); +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index cfcf58bee2..b841f2e316 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -26,6 +26,12 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +typedef struct SVotesGranted { +} SVotesGranted; + +typedef struct SVotesResponded { +} SVotesResponded; + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9cb3a61fff..7e01e7e81c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -88,7 +88,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { } } - pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); pSyncNode->pPingTimer = NULL;