Merge pull request #10634 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
This commit is contained in:
commit
0e11e65773
|
@ -31,9 +31,9 @@ typedef int64_t SyncIndex;
|
||||||
typedef uint64_t SyncTerm;
|
typedef uint64_t SyncTerm;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_STATE_FOLLOWER = 0,
|
TAOS_SYNC_STATE_FOLLOWER = 100,
|
||||||
TAOS_SYNC_STATE_CANDIDATE = 1,
|
TAOS_SYNC_STATE_CANDIDATE = 101,
|
||||||
TAOS_SYNC_STATE_LEADER = 2,
|
TAOS_SYNC_STATE_LEADER = 102,
|
||||||
} ESyncState;
|
} ESyncState;
|
||||||
|
|
||||||
typedef struct SSyncBuffer {
|
typedef struct SSyncBuffer {
|
||||||
|
@ -134,6 +134,7 @@ typedef struct SSyncInfo {
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
char path[TSDB_FILENAME_LEN];
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
char walPath[TSDB_FILENAME_LEN];
|
||||||
SSyncFSM* pFsm;
|
SSyncFSM* pFsm;
|
||||||
|
|
||||||
void* rpcClient;
|
void* rpcClient;
|
||||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include "cJSON.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -99,8 +100,8 @@ typedef struct SRaftStore SRaftStore;
|
||||||
struct SVotesGranted;
|
struct SVotesGranted;
|
||||||
typedef struct SVotesGranted SVotesGranted;
|
typedef struct SVotesGranted SVotesGranted;
|
||||||
|
|
||||||
struct SVotesResponded;
|
struct SVotesRespond;
|
||||||
typedef struct SVotesResponded SVotesResponded;
|
typedef struct SVotesRespond SVotesRespond;
|
||||||
|
|
||||||
typedef struct SRaftId {
|
typedef struct SRaftId {
|
||||||
SyncNodeId addr; // typedef uint64_t SyncNodeId;
|
SyncNodeId addr; // typedef uint64_t SyncNodeId;
|
||||||
|
@ -112,17 +113,18 @@ typedef struct SSyncNode {
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
char path[TSDB_FILENAME_LEN];
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
char walPath[TSDB_FILENAME_LEN];
|
||||||
void* rpcClient;
|
void* rpcClient;
|
||||||
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||||
void* queue;
|
void* queue;
|
||||||
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
|
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
SNodeInfo me;
|
SNodeInfo myNodeInfo;
|
||||||
SRaftId raftId;
|
SRaftId myRaftId;
|
||||||
|
|
||||||
int32_t peersNum;
|
int32_t peersNum;
|
||||||
SNodeInfo peers[TSDB_MAX_REPLICA];
|
SNodeInfo peersNodeInfo[TSDB_MAX_REPLICA];
|
||||||
SRaftId peersId[TSDB_MAX_REPLICA];
|
SRaftId peersId[TSDB_MAX_REPLICA];
|
||||||
|
|
||||||
int32_t replicaNum;
|
int32_t replicaNum;
|
||||||
|
@ -143,7 +145,7 @@ typedef struct SSyncNode {
|
||||||
|
|
||||||
// tla+ candidate vars
|
// tla+ candidate vars
|
||||||
SVotesGranted* pVotesGranted;
|
SVotesGranted* pVotesGranted;
|
||||||
SVotesResponded* pVotesResponded;
|
SVotesRespond* pVotesRespond;
|
||||||
|
|
||||||
// tla+ leader vars
|
// tla+ leader vars
|
||||||
SHashObj* pNextIndex;
|
SHashObj* pNextIndex;
|
||||||
|
@ -153,7 +155,7 @@ typedef struct SSyncNode {
|
||||||
SSyncLogStore* pLogStore;
|
SSyncLogStore* pLogStore;
|
||||||
SyncIndex commitIndex;
|
SyncIndex commitIndex;
|
||||||
|
|
||||||
// timer
|
// ping timer
|
||||||
tmr_h pPingTimer;
|
tmr_h pPingTimer;
|
||||||
int32_t pingTimerMS;
|
int32_t pingTimerMS;
|
||||||
uint64_t pingTimerLogicClock;
|
uint64_t pingTimerLogicClock;
|
||||||
|
@ -161,6 +163,7 @@ typedef struct SSyncNode {
|
||||||
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
|
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
|
||||||
uint64_t pingTimerCounter;
|
uint64_t pingTimerCounter;
|
||||||
|
|
||||||
|
// elect timer
|
||||||
tmr_h pElectTimer;
|
tmr_h pElectTimer;
|
||||||
int32_t electTimerMS;
|
int32_t electTimerMS;
|
||||||
uint64_t electTimerLogicClock;
|
uint64_t electTimerLogicClock;
|
||||||
|
@ -168,6 +171,7 @@ typedef struct SSyncNode {
|
||||||
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
|
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
|
||||||
uint64_t electTimerCounter;
|
uint64_t electTimerCounter;
|
||||||
|
|
||||||
|
// heartbeat timer
|
||||||
tmr_h pHeartbeatTimer;
|
tmr_h pHeartbeatTimer;
|
||||||
int32_t heartbeatTimerMS;
|
int32_t heartbeatTimerMS;
|
||||||
uint64_t heartbeatTimerLogicClock;
|
uint64_t heartbeatTimerLogicClock;
|
||||||
|
@ -188,6 +192,8 @@ typedef struct SSyncNode {
|
||||||
|
|
||||||
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
|
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
|
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
||||||
|
char* syncNode2Str(const SSyncNode* pSyncNode);
|
||||||
|
|
||||||
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||||
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -53,6 +53,7 @@ extern "C" {
|
||||||
//
|
//
|
||||||
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -44,6 +44,11 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
|
||||||
// ---- misc ----
|
// ---- misc ----
|
||||||
int32_t syncUtilRand(int32_t max);
|
int32_t syncUtilRand(int32_t max);
|
||||||
int32_t syncUtilElectRandomMS();
|
int32_t syncUtilElectRandomMS();
|
||||||
|
int32_t syncUtilQuorum(int32_t replicaNum);
|
||||||
|
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
|
||||||
|
cJSON* syncUtilRaftId2Json(const SRaftId* p);
|
||||||
|
char* syncUtilRaftId2Str(const SRaftId* p);
|
||||||
|
const char* syncUtilState2String(ESyncState state);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,10 +28,14 @@ extern "C" {
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
|
||||||
|
// SVotesGranted -----------------------------
|
||||||
typedef struct SVotesGranted {
|
typedef struct SVotesGranted {
|
||||||
|
SRaftId (*replicas)[TSDB_MAX_REPLICA];
|
||||||
|
int32_t replicaNum;
|
||||||
|
bool isGranted[TSDB_MAX_REPLICA];
|
||||||
|
int32_t votes;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
int32_t quorum;
|
int32_t quorum;
|
||||||
int32_t votes;
|
|
||||||
bool toLeader;
|
bool toLeader;
|
||||||
SSyncNode *pSyncNode;
|
SSyncNode *pSyncNode;
|
||||||
} SVotesGranted;
|
} SVotesGranted;
|
||||||
|
@ -41,7 +45,10 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted);
|
||||||
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
|
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
|
||||||
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
|
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
|
||||||
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
|
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
|
||||||
|
cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
|
||||||
|
char * voteGranted2Str(SVotesGranted *pVotesGranted);
|
||||||
|
|
||||||
|
// SVotesRespond -----------------------------
|
||||||
typedef struct SVotesRespond {
|
typedef struct SVotesRespond {
|
||||||
SRaftId (*replicas)[TSDB_MAX_REPLICA];
|
SRaftId (*replicas)[TSDB_MAX_REPLICA];
|
||||||
bool isRespond[TSDB_MAX_REPLICA];
|
bool isRespond[TSDB_MAX_REPLICA];
|
||||||
|
@ -51,6 +58,7 @@ typedef struct SVotesRespond {
|
||||||
} SVotesRespond;
|
} SVotesRespond;
|
||||||
|
|
||||||
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
|
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
|
||||||
|
void votesRespondDestory(SVotesRespond *pVotesRespond);
|
||||||
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
|
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
|
||||||
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
|
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
|
||||||
void Reset(SVotesRespond *pVotesRespond, SyncTerm term);
|
void Reset(SVotesRespond *pVotesRespond, SyncTerm term);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "syncElection.h"
|
#include "syncElection.h"
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// RequestVote(i, j) ==
|
// RequestVote(i, j) ==
|
||||||
|
@ -28,11 +29,31 @@
|
||||||
// mdest |-> j])
|
// mdest |-> j])
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
|
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
|
||||||
|
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
SyncRequestVote* pMsg = syncRequestVoteBuild();
|
||||||
|
pMsg->srcId = pSyncNode->myRaftId;
|
||||||
|
pMsg->destId = pSyncNode->peersId[i];
|
||||||
|
pMsg->currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||||
|
pMsg->lastLogIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore);
|
||||||
|
pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
|
||||||
|
assert(ret == 0);
|
||||||
|
syncRequestVoteDestroy(pMsg);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||||
|
|
||||||
// start election
|
// start election
|
||||||
syncNodeRequestVotePeers(pSyncNode);
|
int32_t ret = syncNodeRequestVotePeers(pSyncNode);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
||||||
|
|
|
@ -20,10 +20,12 @@
|
||||||
#include "syncEnv.h"
|
#include "syncEnv.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncRaft.h"
|
#include "syncRaft.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
#include "syncRequestVote.h"
|
#include "syncRequestVote.h"
|
||||||
#include "syncRequestVoteReply.h"
|
#include "syncRequestVoteReply.h"
|
||||||
#include "syncTimeout.h"
|
#include "syncTimeout.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
static int32_t tsNodeRefId = -1;
|
static int32_t tsNodeRefId = -1;
|
||||||
|
|
||||||
|
@ -35,6 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
|
||||||
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||||
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||||
|
|
||||||
|
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||||
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
|
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
|
||||||
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
|
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
|
||||||
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
|
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
|
||||||
|
@ -71,29 +74,60 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
memset(pSyncNode, 0, sizeof(SSyncNode));
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
|
||||||
|
// init by SSyncInfo
|
||||||
pSyncNode->vgId = pSyncInfo->vgId;
|
pSyncNode->vgId = pSyncInfo->vgId;
|
||||||
pSyncNode->syncCfg = pSyncInfo->syncCfg;
|
pSyncNode->syncCfg = pSyncInfo->syncCfg;
|
||||||
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
|
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
|
||||||
pSyncNode->pFsm = pSyncInfo->pFsm;
|
memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath));
|
||||||
|
|
||||||
pSyncNode->rpcClient = pSyncInfo->rpcClient;
|
pSyncNode->rpcClient = pSyncInfo->rpcClient;
|
||||||
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
|
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
|
||||||
pSyncNode->queue = pSyncInfo->queue;
|
pSyncNode->queue = pSyncInfo->queue;
|
||||||
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
|
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
|
||||||
|
|
||||||
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
|
// init internal
|
||||||
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
|
pSyncNode->myNodeInfo = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
|
||||||
|
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncInfo->vgId, &pSyncNode->myRaftId);
|
||||||
|
|
||||||
|
// init peersNum, peers, peersId
|
||||||
|
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
|
||||||
int j = 0;
|
int j = 0;
|
||||||
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
|
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
|
||||||
if (i != pSyncInfo->syncCfg.myIndex) {
|
if (i != pSyncInfo->syncCfg.myIndex) {
|
||||||
pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
|
pSyncNode->peersNodeInfo[j] = pSyncInfo->syncCfg.nodeInfo[i];
|
||||||
j++;
|
j++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncInfo->vgId, &pSyncNode->peersId[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init replicaNum, replicasId
|
||||||
|
pSyncNode->replicaNum = pSyncInfo->syncCfg.replicaNum;
|
||||||
|
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
|
||||||
|
syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft algorithm
|
||||||
|
pSyncNode->pFsm = pSyncInfo->pFsm;
|
||||||
|
pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
|
||||||
|
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||||
|
|
||||||
|
// init life cycle
|
||||||
|
|
||||||
|
// init server vars
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
|
pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath);
|
||||||
|
assert(pSyncNode->pRaftStore != NULL);
|
||||||
|
|
||||||
|
// init candidate vars
|
||||||
|
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
|
||||||
|
assert(pSyncNode->pVotesGranted != NULL);
|
||||||
|
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
|
||||||
|
assert(pSyncNode->pVotesRespond != NULL);
|
||||||
|
|
||||||
|
// init leader vars
|
||||||
|
pSyncNode->pNextIndex = NULL;
|
||||||
|
pSyncNode->pMatchIndex = NULL;
|
||||||
|
|
||||||
// init ping timer
|
// init ping timer
|
||||||
pSyncNode->pPingTimer = NULL;
|
pSyncNode->pPingTimer = NULL;
|
||||||
|
@ -119,6 +153,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer;
|
pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer;
|
||||||
pSyncNode->heartbeatTimerCounter = 0;
|
pSyncNode->heartbeatTimerCounter = 0;
|
||||||
|
|
||||||
|
// init callback
|
||||||
pSyncNode->FpOnPing = syncNodeOnPingCb;
|
pSyncNode->FpOnPing = syncNodeOnPingCb;
|
||||||
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
|
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
|
||||||
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
|
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
|
||||||
|
@ -135,6 +170,134 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
free(pSyncNode);
|
free(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
// init by SSyncInfo
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId);
|
||||||
|
cJSON_AddStringToObject(pRoot, "path", pSyncNode->path);
|
||||||
|
cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
|
||||||
|
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue);
|
||||||
|
cJSON_AddStringToObject(pRoot, "queue", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);
|
||||||
|
|
||||||
|
// init internal
|
||||||
|
cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
|
||||||
|
cJSON_AddItemToObject(pRoot, "myNodeInfo", pMe);
|
||||||
|
cJSON* pRaftId = syncUtilRaftId2Json(&pSyncNode->myRaftId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "myRaftId", pRaftId);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "peersNum", pSyncNode->peersNum);
|
||||||
|
cJSON* pPeers = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "peersNodeInfo", pPeers);
|
||||||
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
cJSON_AddItemToArray(pPeers, syncUtilNodeInfo2Json(&pSyncNode->peersNodeInfo[i]));
|
||||||
|
}
|
||||||
|
cJSON* pPeersId = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "peersId", pPeersId);
|
||||||
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
cJSON_AddItemToArray(pPeersId, syncUtilRaftId2Json(&pSyncNode->peersId[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncNode->replicaNum);
|
||||||
|
cJSON* pReplicasId = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "replicasId", pReplicasId);
|
||||||
|
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||||
|
cJSON_AddItemToArray(pReplicasId, syncUtilRaftId2Json(&pSyncNode->replicasId[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
// raft algorithm
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pFsm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pFsm", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "quorum", pSyncNode->quorum);
|
||||||
|
cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
|
||||||
|
cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);
|
||||||
|
|
||||||
|
// tla+ server vars
|
||||||
|
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
|
||||||
|
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
|
||||||
|
|
||||||
|
// tla+ candidate vars
|
||||||
|
|
||||||
|
// tla+ leader vars
|
||||||
|
|
||||||
|
// tla+ log vars
|
||||||
|
|
||||||
|
// ping timer
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pPingTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pPingTimer", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "pingTimerMS", pSyncNode->pingTimerMS);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClock);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pingTimerLogicClock", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerLogicClockUser);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pingTimerLogicClockUser", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpPingTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpPingTimer", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->pingTimerCounter);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pingTimerCounter", u64buf);
|
||||||
|
|
||||||
|
// elect timer
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pElectTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pElectTimer", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "electTimerMS", pSyncNode->electTimerMS);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClock);
|
||||||
|
cJSON_AddStringToObject(pRoot, "electTimerLogicClock", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerLogicClockUser);
|
||||||
|
cJSON_AddStringToObject(pRoot, "electTimerLogicClockUser", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpElectTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpElectTimer", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->electTimerCounter);
|
||||||
|
cJSON_AddStringToObject(pRoot, "electTimerCounter", u64buf);
|
||||||
|
|
||||||
|
// heartbeat timer
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pHeartbeatTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pHeartbeatTimer", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "heartbeatTimerMS", pSyncNode->heartbeatTimerMS);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClock);
|
||||||
|
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClock", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerLogicClockUser);
|
||||||
|
cJSON_AddStringToObject(pRoot, "heartbeatTimerLogicClockUser", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpHeartbeatTimer);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpHeartbeatTimer", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pSyncNode->heartbeatTimerCounter);
|
||||||
|
cJSON_AddStringToObject(pRoot, "heartbeatTimerCounter", u64buf);
|
||||||
|
|
||||||
|
// callback
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPing);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnPing", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnPingReply);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnPingReply", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVote);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnRequestVote", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnRequestVoteReply);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnRequestVoteReply", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntries);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnAppendEntries", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnAppendEntriesReply);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnAppendEntriesReply", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpOnTimeout);
|
||||||
|
cJSON_AddStringToObject(pRoot, "FpOnTimeout", u64buf);
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SSyncNode", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncNode2Str(const SSyncNode* pSyncNode) {
|
||||||
|
cJSON* pJson = syncNode2Json(pSyncNode);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
|
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilraftId2EpSet(destRaftId, &epSet);
|
syncUtilraftId2EpSet(destRaftId, &epSet);
|
||||||
|
@ -183,7 +346,7 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) {
|
||||||
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
|
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId);
|
||||||
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
|
||||||
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
syncPingDestroy(pMsg);
|
syncPingDestroy(pMsg);
|
||||||
|
@ -194,8 +357,8 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId);
|
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &destId);
|
||||||
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &destId);
|
||||||
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
ret = syncNodePing(pSyncNode, &destId, pMsg);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
syncPingDestroy(pMsg);
|
syncPingDestroy(pMsg);
|
||||||
|
@ -204,7 +367,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
|
int32_t syncNodePingSelf(SSyncNode* pSyncNode) {
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
|
SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
|
||||||
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
|
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
syncPingDestroy(pMsg);
|
syncPingDestroy(pMsg);
|
||||||
|
@ -285,7 +448,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId);
|
SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId);
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
syncPingReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||||
|
@ -351,7 +514,38 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
|
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)param;
|
||||||
|
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
|
||||||
|
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
|
||||||
|
SyncTimeout* pSyncMsg =
|
||||||
|
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
|
||||||
|
pSyncNode->heartbeatTimerMS, pSyncNode);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
|
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
// reset timer ms
|
||||||
|
// pSyncNode->heartbeatTimerMS += 100;
|
||||||
|
|
||||||
|
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
|
||||||
|
&pSyncNode->pHeartbeatTimer);
|
||||||
|
} else {
|
||||||
|
sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",
|
||||||
|
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
|
if (term > pSyncNode->pRaftStore->currentTerm) {
|
||||||
|
pSyncNode->pRaftStore->currentTerm = term;
|
||||||
|
pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID;
|
||||||
|
raftStorePersist(pSyncNode->pRaftStore);
|
||||||
|
syncNodeBecomeFollower(pSyncNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
@ -383,7 +577,7 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
|
||||||
//
|
//
|
||||||
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||||
pSyncNode->leaderCache = pSyncNode->raftId;
|
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||||
|
|
||||||
// next Index +=1
|
// next Index +=1
|
||||||
// match Index = 0;
|
// match Index = 0;
|
||||||
|
|
|
@ -41,7 +41,16 @@
|
||||||
// mdest |-> j])
|
// mdest |-> j])
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
|
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
|
||||||
|
// start replicate
|
||||||
|
int32_t ret = syncNodeAppendEntriesPeers(pSyncNode);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
||||||
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
|
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
|
||||||
|
|
|
@ -44,7 +44,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
|
||||||
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
|
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
|
||||||
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
|
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
|
||||||
++(ths->heartbeatTimerCounter);
|
++(ths->heartbeatTimerCounter);
|
||||||
syncNodeAppendEntriesPeers(ths);
|
syncNodeReplicate(ths);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sTrace("unknown timeoutType:%d", pMsg->timeoutType);
|
sTrace("unknown timeoutType:%d", pMsg->timeoutType);
|
||||||
|
|
|
@ -97,4 +97,55 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
|
||||||
|
|
||||||
int32_t syncUtilRand(int32_t max) { return rand() % max; }
|
int32_t syncUtilRand(int32_t max) { return rand() % max; }
|
||||||
|
|
||||||
int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
|
int32_t syncUtilElectRandomMS() { return ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
|
||||||
|
|
||||||
|
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
|
||||||
|
|
||||||
|
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncUtilRaftId2Json(const SRaftId* p) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", p->addr);
|
||||||
|
cJSON_AddStringToObject(pRoot, "addr", u64buf);
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(p->addr, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pRoot, "host", host);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "port", port);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", p->vgId);
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SRaftId", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncUtilRaftId2Str(const SRaftId* p) {
|
||||||
|
cJSON* pJson = syncUtilRaftId2Json(p);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* syncUtilState2String(ESyncState state) {
|
||||||
|
if (state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
return "TAOS_SYNC_STATE_FOLLOWER";
|
||||||
|
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
return "TAOS_SYNC_STATE_CANDIDATE";
|
||||||
|
} else if (state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
return "TAOS_SYNC_STATE_LEADER";
|
||||||
|
} else {
|
||||||
|
return "TAOS_SYNC_STATE_UNKNOWN";
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,15 +14,25 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
// SVotesGranted -----------------------------
|
||||||
|
static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
|
||||||
|
memset(pVotesGranted->isGranted, 0, sizeof(pVotesGranted->isGranted));
|
||||||
|
pVotesGranted->votes = 0;
|
||||||
|
}
|
||||||
|
|
||||||
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
|
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
|
||||||
SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
|
SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
|
||||||
assert(pVotesGranted != NULL);
|
assert(pVotesGranted != NULL);
|
||||||
memset(pVotesGranted, 0, sizeof(SVotesGranted));
|
memset(pVotesGranted, 0, sizeof(SVotesGranted));
|
||||||
|
|
||||||
pVotesGranted->quorum = pSyncNode->quorum;
|
pVotesGranted->replicas = &(pSyncNode->replicasId);
|
||||||
|
pVotesGranted->replicaNum = pSyncNode->replicaNum;
|
||||||
|
voteGrantedClearVotes(pVotesGranted);
|
||||||
|
|
||||||
pVotesGranted->term = 0;
|
pVotesGranted->term = 0;
|
||||||
pVotesGranted->votes = 0;
|
pVotesGranted->quorum = pSyncNode->quorum;
|
||||||
pVotesGranted->toLeader = false;
|
pVotesGranted->toLeader = false;
|
||||||
pVotesGranted->pSyncNode = pSyncNode;
|
pVotesGranted->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
@ -43,15 +53,73 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
|
||||||
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
|
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
|
||||||
assert(pMsg->voteGranted == true);
|
assert(pMsg->voteGranted == true);
|
||||||
assert(pMsg->term == pVotesGranted->term);
|
assert(pMsg->term == pVotesGranted->term);
|
||||||
pVotesGranted->votes++;
|
assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
|
||||||
|
|
||||||
|
int j = -1;
|
||||||
|
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||||
|
if (syncUtilSameId(&((*(pVotesGranted->replicas))[i]), &(pMsg->srcId))) {
|
||||||
|
j = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(j != -1);
|
||||||
|
assert(j >= 0 && j < pVotesGranted->replicaNum);
|
||||||
|
|
||||||
|
if (pVotesGranted->isGranted[j] != true) {
|
||||||
|
++(pVotesGranted->votes);
|
||||||
|
pVotesGranted->isGranted[j] = true;
|
||||||
|
}
|
||||||
|
assert(pVotesGranted->votes <= pVotesGranted->replicaNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
|
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
|
||||||
pVotesGranted->term = term;
|
pVotesGranted->term = term;
|
||||||
pVotesGranted->votes = 0;
|
voteGrantedClearVotes(pVotesGranted);
|
||||||
pVotesGranted->toLeader = false;
|
pVotesGranted->toLeader = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
|
||||||
|
cJSON *pReplicas = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
|
||||||
|
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||||
|
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
|
||||||
|
}
|
||||||
|
int *arr = (int *)malloc(sizeof(int) * pVotesGranted->replicaNum);
|
||||||
|
for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
|
||||||
|
arr[i] = pVotesGranted->isGranted[i];
|
||||||
|
}
|
||||||
|
cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
|
||||||
|
free(arr);
|
||||||
|
cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pVotesGranted->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
|
|
||||||
|
bool majority = voteGrantedMajority(pVotesGranted);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "majority", majority);
|
||||||
|
|
||||||
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
|
||||||
|
cJSON *pJson = voteGranted2Json(pVotesGranted);
|
||||||
|
char * serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// SVotesRespond -----------------------------
|
||||||
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
|
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
|
||||||
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
|
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
|
||||||
assert(pVotesRespond != NULL);
|
assert(pVotesRespond != NULL);
|
||||||
|
@ -65,6 +133,12 @@ SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
|
||||||
return pVotesRespond;
|
return pVotesRespond;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void votesRespondDestory(SVotesRespond *pVotesRespond) {
|
||||||
|
if (pVotesRespond != NULL) {
|
||||||
|
free(pVotesRespond);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
|
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
|
||||||
|
|
|
@ -10,6 +10,9 @@ add_executable(syncIOSendMsgServerTest "")
|
||||||
add_executable(syncRaftStoreTest "")
|
add_executable(syncRaftStoreTest "")
|
||||||
add_executable(syncEnqTest "")
|
add_executable(syncEnqTest "")
|
||||||
add_executable(syncIndexTest "")
|
add_executable(syncIndexTest "")
|
||||||
|
add_executable(syncInitTest "")
|
||||||
|
add_executable(syncUtilTest "")
|
||||||
|
add_executable(syncVotesGrantedTest "")
|
||||||
|
|
||||||
|
|
||||||
target_sources(syncTest
|
target_sources(syncTest
|
||||||
|
@ -60,6 +63,18 @@ target_sources(syncIndexTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncIndexTest.cpp"
|
"syncIndexTest.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncInitTest
|
||||||
|
PRIVATE
|
||||||
|
"syncInitTest.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncUtilTest
|
||||||
|
PRIVATE
|
||||||
|
"syncUtilTest.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncVotesGrantedTest
|
||||||
|
PRIVATE
|
||||||
|
"syncVotesGrantedTest.cpp"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_include_directories(syncTest
|
target_include_directories(syncTest
|
||||||
|
@ -122,6 +137,21 @@ target_include_directories(syncIndexTest
|
||||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncInitTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncUtilTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncVotesGrantedTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(syncTest
|
target_link_libraries(syncTest
|
||||||
|
@ -172,6 +202,18 @@ target_link_libraries(syncIndexTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncInitTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncUtilTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncVotesGrantedTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
|
@ -84,7 +84,7 @@ int main(int argc, char** argv) {
|
||||||
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
|
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId);
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
||||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 5;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
SSyncFSM* pFsm;
|
||||||
|
|
||||||
|
SSyncNode* syncNodeInit() {
|
||||||
|
syncInfo.vgId = 1234;
|
||||||
|
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||||
|
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||||
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
|
syncInfo.pFsm = pFsm;
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
|
||||||
|
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
|
||||||
|
|
||||||
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||||
|
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i] = pSyncNode->replicasId[i];
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
|
||||||
|
myIndex = 0;
|
||||||
|
if (argc >= 2) {
|
||||||
|
myIndex = atoi(argv[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
ret = syncEnvStart();
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncInitTest();
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
char* serialized = syncNode2Str(pSyncNode);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
|
||||||
|
initRaftId(pSyncNode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -25,7 +25,8 @@ SSyncNode* doSync(int myIndex) {
|
||||||
syncInfo.queue = gSyncIO->pMsgQ;
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
syncInfo.pFsm = pFsm;
|
syncInfo.pFsm = pFsm;
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path");
|
||||||
|
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path");
|
||||||
|
|
||||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
pCfg->myIndex = myIndex;
|
pCfg->myIndex = myIndex;
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
#include "syncUtil.h"
|
||||||
|
//#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
void electRandomMSTest() {
|
||||||
|
for (int i = 0; i < 10; ++i) {
|
||||||
|
int32_t ms = syncUtilElectRandomMS();
|
||||||
|
printf("syncUtilElectRandomMS: %d \n", ms);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
logTest();
|
||||||
|
electRandomMSTest();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,156 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
|
||||||
|
int32_t replicaNum = 3;
|
||||||
|
int32_t myIndex = 0;
|
||||||
|
|
||||||
|
SRaftId ids[TSDB_MAX_REPLICA];
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
SSyncFSM* pFsm;
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
|
|
||||||
|
SSyncNode* syncNodeInit() {
|
||||||
|
syncInfo.vgId = 1234;
|
||||||
|
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||||
|
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||||
|
syncInfo.queue = gSyncIO->pMsgQ;
|
||||||
|
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||||
|
syncInfo.pFsm = pFsm;
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path");
|
||||||
|
snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path");
|
||||||
|
|
||||||
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||||
|
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
|
pSyncNode = syncNodeOpen(&syncInfo);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
|
||||||
|
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
|
||||||
|
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
|
||||||
|
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
|
||||||
|
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
|
||||||
|
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
|
||||||
|
gSyncIO->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* syncInitTest() { return syncNodeInit(); }
|
||||||
|
|
||||||
|
void initRaftId(SSyncNode* pSyncNode) {
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
ids[i] = pSyncNode->replicasId[i];
|
||||||
|
char* s = syncUtilRaftId2Str(&ids[i]);
|
||||||
|
printf("raftId[%d] : %s\n", i, s);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = 143 + 64;
|
||||||
|
|
||||||
|
myIndex = 0;
|
||||||
|
if (argc >= 2) {
|
||||||
|
myIndex = atoi(argv[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
ret = syncEnvStart();
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncInitTest();
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
char* serialized = syncNode2Str(pSyncNode);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
|
||||||
|
initRaftId(pSyncNode);
|
||||||
|
|
||||||
|
SVotesGranted* pVotesGranted = voteGrantedCreate(pSyncNode);
|
||||||
|
assert(pVotesGranted != NULL);
|
||||||
|
|
||||||
|
printf("---------------------------------------\n");
|
||||||
|
{
|
||||||
|
char* serialized = voteGranted2Str(pVotesGranted);
|
||||||
|
assert(serialized != NULL);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncTerm term = 1234;
|
||||||
|
printf("---------------------------------------\n");
|
||||||
|
voteGrantedReset(pVotesGranted, term);
|
||||||
|
{
|
||||||
|
char* serialized = voteGranted2Str(pVotesGranted);
|
||||||
|
assert(serialized != NULL);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild();
|
||||||
|
reply->destId = pSyncNode->myRaftId;
|
||||||
|
reply->srcId = ids[i];
|
||||||
|
reply->term = term;
|
||||||
|
reply->voteGranted = true;
|
||||||
|
|
||||||
|
voteGrantedVote(pVotesGranted, reply);
|
||||||
|
{
|
||||||
|
char* serialized = voteGranted2Str(pVotesGranted);
|
||||||
|
assert(serialized != NULL);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
voteGrantedVote(pVotesGranted, reply);
|
||||||
|
{
|
||||||
|
char* serialized = voteGranted2Str(pVotesGranted);
|
||||||
|
assert(serialized != NULL);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("---------------------------------------\n");
|
||||||
|
voteGrantedReset(pVotesGranted, 123456789);
|
||||||
|
{
|
||||||
|
char* serialized = voteGranted2Str(pVotesGranted);
|
||||||
|
assert(serialized != NULL);
|
||||||
|
printf("%s\n", serialized);
|
||||||
|
free(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
voteGrantedDestroy(pVotesGranted);
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue