diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 128dbf4050..09651edd44 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -40,6 +40,7 @@ extern "C" { // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode); +int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 7b34718ad9..4ca2985ebb 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -299,6 +299,8 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode); +bool syncNodeIsMnode(SSyncNode* pSyncNode); + // trace log void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index e1bb889393..58083997f9 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -62,6 +62,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg); +int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 375f2e5730..bcecb32883 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -70,6 +70,26 @@ int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { return ret; } +int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode) { + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->peersId[i]; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + + ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm)); + ASSERT(ret == 0); + + ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); + ASSERT(ret == 0); + syncRequestVoteDestroy(pMsg); + } + return ret; +} + int32_t syncNodeElect(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "begin election"); @@ -98,20 +118,25 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { return ret; } - switch (pSyncNode->pRaftCfg->snapshotStrategy) { - case SYNC_STRATEGY_NO_SNAPSHOT: - ret = syncNodeRequestVotePeers(pSyncNode); - break; + if (syncNodeIsMnode(pSyncNode)) { + switch (pSyncNode->pRaftCfg->snapshotStrategy) { + case SYNC_STRATEGY_NO_SNAPSHOT: + ret = syncNodeRequestVotePeers(pSyncNode); + break; - case SYNC_STRATEGY_STANDARD_SNAPSHOT: - case SYNC_STRATEGY_WAL_FIRST: - ret = syncNodeRequestVotePeersSnapshot(pSyncNode); - break; + case SYNC_STRATEGY_STANDARD_SNAPSHOT: + case SYNC_STRATEGY_WAL_FIRST: + ret = syncNodeRequestVotePeersSnapshot(pSyncNode); + break; - default: - ret = syncNodeRequestVotePeers(pSyncNode); - break; + default: + ret = syncNodeRequestVotePeers(pSyncNode); + break; + } + } else { + ret = syncNodeDoRequestVote(pSyncNode); } + ASSERT(ret == 0); syncNodeResetElectTimer(pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b7a8e35cf3..3732dce70a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1262,6 +1262,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); + } else { syncNodeBecomeFollower(pSyncNode, "first start"); } @@ -1491,6 +1492,14 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode); + + do { + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStart(pSyncNode, pSyncTimer); + } + } while (0); + return ret; } @@ -1514,6 +1523,13 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { sTrace("vgId:%d, sync %s stop heartbeat timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); + do { + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); + syncHbTimerStop(pSyncNode, pSyncTimer); + } + } while (0); + return ret; } @@ -2177,10 +2193,6 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); - syncHbTimerStop(pSyncNode, pSyncTimer); - } // reset elect timer syncNodeResetElectTimer(pSyncNode); @@ -2280,10 +2292,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // start heartbeat timer syncNodeStartHeartbeatTimer(pSyncNode); - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); - syncHbTimerStart(pSyncNode, pSyncTimer); - } + + // send heartbeat right now + syncNodeHeartbeatPeers(pSyncNode); // call back if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { @@ -2318,6 +2329,8 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { syncMaybeAdvanceCommitIndex(pSyncNode); } +bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } + void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; @@ -2817,6 +2830,10 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsg; syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg); + if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) { + syncNodeBecomeFollower(ths, "become follower by hb"); + } + if (pMsg->term == ths->pRaftStore->currentTerm) { sInfo("vgId:%d, heartbeat reset timer", 1); syncNodeResetElectTimer(ths); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 8a3940a363..8acdb5cea1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -506,4 +506,25 @@ int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const syncHeartbeat2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg); return ret; +} + +int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId); + pSyncMsg->srcId = pSyncNode->myRaftId; + pSyncMsg->destId = pSyncNode->peersId[i]; + pSyncMsg->term = pSyncNode->pRaftStore->currentTerm; + pSyncMsg->commitIndex = pSyncNode->commitIndex; + pSyncMsg->privateTerm = 0; + + SRpcMsg rpcMsg; + syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg); + + // send msg + syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); + + syncHeartbeatDestroy(pSyncMsg); + } + + return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 74c4babf18..7df025b5f8 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -215,4 +215,60 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { return 0; } -int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { return 0; } \ No newline at end of file +int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped"); + return -1; + } + + bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); + + // maybe update term + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); +#if 0 + if (logOK) { + syncNodeUpdateTerm(ths, pMsg->term); + } else { + syncNodeUpdateTermWithoutStepDown(ths, pMsg->term); + } +#endif + } + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); + + bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && + ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); + if (grant) { + // maybe has already voted for pMsg->srcId + // vote again, no harm + raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); + + // forbid elect for this round + syncNodeResetElectTimer(ths); + } + + // send msg + SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->voteGranted = grant; + + // trace log + do { + char logBuf[32]; + snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted); + syncLogRecvRequestVote(ths, pMsg, logBuf); + syncLogSendRequestVoteReply(ths, pReply, ""); + } while (0); + + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncRequestVoteReplyDestroy(pReply); + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 84b3df0bf5..74db7a3e51 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -157,4 +157,66 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl return 0; } -int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { return 0; } \ No newline at end of file +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped"); + return -1; + } + + // drop stale response + if (pMsg->term < ths->pRaftStore->currentTerm) { + syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response"); + return -1; + } + + // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); + // no need this code, because if I receive reply.term, then I must have sent for that term. + // if (pMsg->term > ths->pRaftStore->currentTerm) { + // syncNodeUpdateTerm(ths, pMsg->term); + // } + + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncLogRecvRequestVoteReply(ths, pMsg, "error term"); + return -1; + } + + syncLogRecvRequestVoteReply(ths, pMsg, ""); + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + + // This tallies votes even when the current state is not Candidate, + // but they won't be looked at, so it doesn't matter. + if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { + if (ths->pVotesRespond->term != pMsg->term) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "vote respond error vote-respond-mgr term:%lu, msg term:lu", + ths->pVotesRespond->term, pMsg->term); + syncNodeErrorLog(ths, logBuf); + return -1; + } + + votesRespondAdd(ths->pVotesRespond, pMsg); + if (pMsg->voteGranted) { + // add vote + voteGrantedVote(ths->pVotesGranted, pMsg); + + // maybe to leader + if (voteGrantedMajority(ths->pVotesGranted)) { + if (!ths->pVotesGranted->toLeader) { + syncNodeCandidate2Leader(ths); + + // prevent to leader again! + ths->pVotesGranted->toLeader = true; + } + } + } else { + ; + // do nothing + // UNCHANGED <> + } + } + + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index c3c8131cbb..ed09922b9e 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -101,8 +101,9 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ++(ths->heartbeatTimerCounter); sTrace("vgId:%d, sync timer, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId, ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser); - syncNodeReplicate(ths, true); + // syncNodeReplicate(ths, true); } + } else { sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType); }