diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 37d0dff095..5af5b5f988 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1562,7 +1562,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } -void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { +inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -1634,7 +1634,7 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { taosMemoryFree(pCfgStr); } -void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { +inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -1701,7 +1701,7 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { taosMemoryFree(pCfgStr); } -char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { +inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); @@ -1724,7 +1724,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { +inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { bool b1 = false; bool b2 = false; @@ -2942,7 +2942,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { return true; } -void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { +inline void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2953,7 +2953,7 @@ void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { +inline void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) { char logBuf[256]; char host[64]; uint16_t port; @@ -2964,7 +2964,7 @@ void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, c syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { +inline void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2974,7 +2974,7 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { +inline void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -2984,7 +2984,7 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { +inline void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -2999,7 +2999,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { +inline void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -3014,7 +3014,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { +inline void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -3027,7 +3027,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { +inline void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); @@ -3040,7 +3040,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { +inline void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); @@ -3052,7 +3052,7 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncNodeEventLog(pSyncNode, logBuf); } -void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { +inline void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { char host[64]; uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index bad32c5f91..806c138b72 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -51,15 +51,21 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { return -1; } + bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || + ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && + (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); + + // log not ok, do not update term, ignore it + if (pMsg->term > ths->pRaftStore->currentTerm && !logOK) { + return -1; + } + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); } ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || - ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && - (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); if (grant) { @@ -94,48 +100,6 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { return ret; } -#if 0 -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteCb== term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteLog2(logBuf, pMsg); - - if (pMsg->term > ths->pRaftStore->currentTerm) { - syncNodeUpdateTerm(ths, pMsg->term); - } - ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); - - bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || - ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && - (pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore))); - bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK && - ((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId)))); - if (grant) { - // maybe has already voted for pMsg->srcId - // vote again, no harm - raftStoreVote(ths->pRaftStore, &(pMsg->srcId)); - - // forbid elect for this round - syncNodeResetElectTimer(ths); - } - - SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->voteGranted = grant; - - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncRequestVoteReplyDestroy(pReply); - - return ret; -} -#endif - static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) { SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 566b80881f..05a1708b0b 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -93,65 +93,6 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) return 0; } -#if 0 -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm); - syncRequestVoteReplyLog2(logBuf, pMsg); - - if (pMsg->term < ths->pRaftStore->currentTerm) { - sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, - ths->pRaftStore->currentTerm); - return ret; - } - - // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); - // no need this code, because if I receive reply.term, then I must have sent for that term. - // if (pMsg->term > ths->pRaftStore->currentTerm) { - // syncNodeUpdateTerm(ths, pMsg->term); - // } - - if (pMsg->term > ths->pRaftStore->currentTerm) { - char logBuf[128] = {0}; - snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, pMsg->term, - ths->pRaftStore->currentTerm); - syncNodePrint2(logBuf, ths); - sError("%s", logBuf); - return ret; - } - - ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - - // This tallies votes even when the current state is not Candidate, - // but they won't be looked at, so it doesn't matter. - if (ths->state == TAOS_SYNC_STATE_CANDIDATE) { - votesRespondAdd(ths->pVotesRespond, pMsg); - if (pMsg->voteGranted) { - // add vote - voteGrantedVote(ths->pVotesGranted, pMsg); - - // maybe to leader - if (voteGrantedMajority(ths->pVotesGranted)) { - if (!ths->pVotesGranted->toLeader) { - syncNodeCandidate2Leader(ths); - - // prevent to leader again! - ths->pVotesGranted->toLeader = true; - } - } - } else { - ; - // do nothing - // UNCHANGED <> - } - } - - return ret; -} -#endif - int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t ret = 0;