enh(sync): add snapshotEnable2
This commit is contained in:
parent
bdc9acbf2e
commit
d6c57bd372
|
@ -394,7 +394,24 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
|
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
|
// option ----------------------------------
|
||||||
|
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
|
||||||
|
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||||
|
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||||
|
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
|
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
|
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
|
typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
|
||||||
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "mndDef.h"
|
#include "mndDef.h"
|
||||||
|
|
||||||
#include "sdb.h"
|
#include "sdb.h"
|
||||||
|
#include "sync.h"
|
||||||
#include "syncTools.h"
|
#include "syncTools.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
|
@ -368,41 +368,84 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
syncRpcMsgLog2(logBuf, pMsg);
|
syncRpcMsgLog2(logBuf, pMsg);
|
||||||
taosMemoryFree(syncNodeStr);
|
taosMemoryFree(syncNodeStr);
|
||||||
|
|
||||||
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
// ugly! use function pointer
|
||||||
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
if (syncNodeSnapshotEnable(pSyncNode)) {
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
syncPingDestroy(pSyncMsg);
|
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
syncPingDestroy(pSyncMsg);
|
||||||
syncPingReplyDestroy(pSyncMsg);
|
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||||
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesDestroy(pSyncMsg);
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
} else {
|
||||||
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
|
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||||
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||||
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||||
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||||
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||||
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||||
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
} else {
|
||||||
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
|
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -93,6 +93,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
|
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,14 +137,25 @@ typedef struct SSyncNode {
|
||||||
uint64_t heartbeatTimerCounter;
|
uint64_t heartbeatTimerCounter;
|
||||||
|
|
||||||
// callback
|
// callback
|
||||||
|
FpOnPingCb FpOnPing;
|
||||||
|
FpOnPingReplyCb FpOnPingReply;
|
||||||
|
FpOnClientRequestCb FpOnClientRequest;
|
||||||
|
FpOnTimeoutCb FpOnTimeout;
|
||||||
|
FpOnRequestVoteCb FpOnRequestVote;
|
||||||
|
FpOnRequestVoteReplyCb FpOnRequestVoteReply;
|
||||||
|
FpOnAppendEntriesCb FpOnAppendEntries;
|
||||||
|
FpOnAppendEntriesReplyCb FpOnAppendEntriesReply;
|
||||||
|
|
||||||
|
/*
|
||||||
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
|
int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg);
|
||||||
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
|
int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||||
int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg);
|
int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||||
|
int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
|
||||||
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
|
int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
|
*/
|
||||||
|
|
||||||
// tools
|
// tools
|
||||||
SSyncRespMgr* pSyncRespMgr;
|
SSyncRespMgr* pSyncRespMgr;
|
||||||
|
@ -164,6 +175,9 @@ void syncNodeStart(SSyncNode* pSyncNode);
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
|
// option
|
||||||
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
// ping --------------
|
// ping --------------
|
||||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
||||||
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
|
int32_t syncNodePingSelf(SSyncNode* pSyncNode);
|
||||||
|
|
|
@ -52,6 +52,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
||||||
|
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
|
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
|
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -427,3 +427,5 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; }
|
|
@ -94,3 +94,5 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; }
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "syncElection.h"
|
#include "syncElection.h"
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
|
#include "syncRaftCfg.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
|
@ -73,7 +74,12 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = syncNodeRequestVotePeers(pSyncNode);
|
if (pSyncNode->pRaftCfg->snapshotEnable) {
|
||||||
|
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
|
||||||
|
} else {
|
||||||
|
ret = syncNodeRequestVotePeers(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
syncNodeResetElectTimer(pSyncNode);
|
syncNodeResetElectTimer(pSyncNode);
|
||||||
|
|
||||||
|
|
|
@ -555,12 +555,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
pSyncNode->FpOnPing = syncNodeOnPingCb;
|
pSyncNode->FpOnPing = syncNodeOnPingCb;
|
||||||
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
|
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
|
||||||
pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
|
pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb;
|
||||||
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
|
|
||||||
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
|
|
||||||
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
|
|
||||||
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
|
|
||||||
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
|
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
|
||||||
|
|
||||||
|
if (pSyncNode->pRaftCfg->snapshotEnable) {
|
||||||
|
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
|
||||||
|
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
|
||||||
|
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
|
||||||
|
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
|
||||||
|
} else {
|
||||||
|
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
|
||||||
|
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
|
||||||
|
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesSnapshotCb;
|
||||||
|
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplySnapshotCb;
|
||||||
|
}
|
||||||
|
|
||||||
// tools
|
// tools
|
||||||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
|
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
|
||||||
assert(pSyncNode->pSyncRespMgr != NULL);
|
assert(pSyncNode->pSyncRespMgr != NULL);
|
||||||
|
@ -675,6 +683,9 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
// taosMemoryFree(pSyncNode);
|
// taosMemoryFree(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// option
|
||||||
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
|
||||||
|
|
||||||
// ping --------------
|
// ping --------------
|
||||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
||||||
syncPingLog2((char*)"==syncNodePing==", pMsg);
|
syncPingLog2((char*)"==syncNodePing==", pMsg);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
#include "syncIndexMgr.h"
|
#include "syncIndexMgr.h"
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
|
#include "syncRaftCfg.h"
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftLog.h"
|
#include "syncRaftLog.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
|
@ -113,9 +114,17 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { return 0; }
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
|
||||||
// start replicate
|
// start replicate
|
||||||
int32_t ret = syncNodeAppendEntriesPeers(pSyncNode);
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
if (pSyncNode->pRaftCfg->snapshotEnable) {
|
||||||
|
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
|
||||||
|
} else {
|
||||||
|
ret = syncNodeAppendEntriesPeers(pSyncNode);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,3 +77,5 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { return 0; }
|
|
@ -92,3 +92,5 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { return 0; }
|
Loading…
Reference in New Issue