refactor(sync): rename function

This commit is contained in:
Minghao Li 2022-10-18 14:16:39 +08:00
parent e810f2ad64
commit 49c19e13f2
6 changed files with 43 additions and 103 deletions

View File

@ -37,13 +37,10 @@ extern "C" {
// msource |-> i, // msource |-> i,
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode);
int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode);
int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -50,15 +50,15 @@ extern "C" {
// msource |-> i, // msource |-> i,
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg);
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncHeartbeat* pMsg);
int32_t syncNodeDoReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -129,7 +129,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
ASSERT(pState != NULL); ASSERT(pState != NULL);
if (pMsg->lastSendIndex == pState->lastSendIndex) { if (pMsg->lastSendIndex == pState->lastSendIndex) {
syncNodeDoAppendEntries(ths, &(pMsg->srcId)); syncNodeReplicateOne(ths, &(pMsg->srcId));
} }
} }

View File

@ -30,68 +30,6 @@
// msource |-> i, // msource |-> i,
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->term = pSyncNode->pRaftStore->currentTerm;
pMsg->lastLogIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
pMsg->lastLogTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg);
}
return ret;
}
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->term = pSyncNode->pRaftStore->currentTerm;
ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm));
ASSERT(ret == 0);
ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg);
}
return ret;
}
int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
syncNodeEventLog(pSyncNode, "not candidate, stop elect");
return 0;
}
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->term = pSyncNode->pRaftStore->currentTerm;
ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm));
ASSERT(ret == 0);
ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg);
}
return ret;
}
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "begin election"); syncNodeEventLog(pSyncNode, "begin election");
@ -121,32 +59,38 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
return ret; return ret;
} }
if (syncNodeIsMnode(pSyncNode)) {
switch (pSyncNode->pRaftCfg->snapshotStrategy) {
case SYNC_STRATEGY_NO_SNAPSHOT:
ret = syncNodeRequestVotePeers(pSyncNode); ret = syncNodeRequestVotePeers(pSyncNode);
break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
break;
default:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
}
} else {
ret = syncNodeDoRequestVote(pSyncNode);
}
ASSERT(ret == 0); ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
return ret; return ret;
} }
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
syncNodeEventLog(pSyncNode, "not candidate, stop elect");
return 0;
}
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = pSyncNode->peersId[i];
pMsg->term = pSyncNode->pRaftStore->currentTerm;
ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm));
ASSERT(ret == 0);
ret = syncNodeSendRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg);
}
return ret;
}
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncLogSendRequestVote(pSyncNode, pMsg, ""); syncLogSendRequestVote(pSyncNode, pMsg, "");

View File

@ -2299,7 +2299,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
syncNodeDoReplicate(pSyncNode); syncNodeReplicate(pSyncNode);
} }
} }
@ -2696,7 +2696,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
#endif #endif
// send msg // send msg
syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
syncHeartbeatDestroy(pSyncMsg); syncHeartbeatDestroy(pSyncMsg);
@ -2900,7 +2900,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
// if mulit replica, start replicate right now // if mulit replica, start replicate right now
if (ths->replicaNum > 1) { if (ths->replicaNum > 1) {
syncNodeDoReplicate(ths); syncNodeReplicate(ths);
} }
// if only myself, maybe commit right now // if only myself, maybe commit right now

View File

@ -47,9 +47,8 @@
// msource |-> i, // msource |-> i,
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
// next index // next index
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
@ -118,7 +117,7 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) {
return 0; return 0;
} }
int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) { int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "do replicate"); syncNodeEventLog(pSyncNode, "do replicate");
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
@ -128,7 +127,7 @@ int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]); SRaftId* pDestId = &(pSyncNode->peersId[i]);
ret = syncNodeDoAppendEntries(pSyncNode, pDestId); ret = syncNodeReplicateOne(pSyncNode, pDestId);
if (ret != 0) { if (ret != 0) {
char host[64]; char host[64];
int16_t port; int16_t port;
@ -175,7 +174,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
return ret; return ret;
} }
int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncLogSendHeartbeat(pSyncNode, pMsg, ""); syncLogSendHeartbeat(pSyncNode, pMsg, "");
@ -198,7 +197,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg); syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
// send msg // send msg
syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
syncHeartbeatDestroy(pSyncMsg); syncHeartbeatDestroy(pSyncMsg);
} }