From a01e55adcd122363a03cfa96a5d1777eb9264e91 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 29 Nov 2024 17:07:45 +0800 Subject: [PATCH] fix/retry-epset-inuse --- source/libs/sync/inc/syncInt.h | 4 +- source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncAppendEntriesReply.c | 4 +- source/libs/sync/src/syncMain.c | 49 +++++++++++++++---- source/libs/sync/src/syncRequestVote.c | 4 +- source/libs/sync/src/syncRequestVoteReply.c | 2 +- source/libs/sync/src/syncSnapshot.c | 2 +- 7 files changed, 48 insertions(+), 19 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b19d1184a7..83221c8158 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -290,8 +290,8 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode); // raft state change -------------- void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term); -void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm); -void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr); +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id); +void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr); void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr); void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9fc39ec463..13ce8c0bf0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -134,7 +134,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){ - syncNodeStepDown(ths, pMsg->term); + syncNodeStepDown(ths, pMsg->term, pMsg->srcId); resetElect = true; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index a7f36be9e9..9b2a7c46ff 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -61,7 +61,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pMsg->term != raftStoreGetTerm(ths)) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); - syncNodeStepDown(ths, pMsg->term); + syncNodeStepDown(ths, pMsg->term, pMsg->srcId); return TSDB_CODE_SYN_WRONG_TERM; } @@ -79,7 +79,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { - syncNodeStepDown(ths, pMsg->term); + syncNodeStepDown(ths, pMsg->term, pMsg->destId); } } else { TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5bdac16f42..06e3765fcf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -300,7 +300,8 @@ int32_t syncLeaderTransfer(int64_t rid) { } int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) { - syncNodeBecomeFollower(ths, "force election"); + SRaftId id = {0}; + syncNodeBecomeFollower(ths, id, "force election"); SRpcMsg rsp = { .code = 0, @@ -755,12 +756,24 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho return lastIndex; } +static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) { + for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 && + pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) { + return pSyncNode->peersId[i]; + } + } + return EMPTY_RAFT_ID; +} + void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { pEpSet->numOfEps = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) return; + int index = -1; + int j = 0; for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue; @@ -769,10 +782,23 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort; pEpSet->numOfEps++; sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port); + SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp); + if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 && + id.vgId != 0) + index = j; j++; } if (pEpSet->numOfEps > 0) { - pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps; + if (index != -1) { + pEpSet->inUse = index; + } else { + if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr && + pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) { + pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex; + } else { + pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps; + } + } // pEpSet->inUse = 0; } epsetSort(pEpSet); @@ -1461,7 +1487,8 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode)); } else { - syncNodeBecomeFollower(pSyncNode, "first start"); + SRaftId id = {0}; + syncNodeBecomeFollower(pSyncNode, id, "first start"); } } @@ -2021,7 +2048,7 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { } } -void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { +void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) { SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); if (currentTerm > newTerm) { sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm); @@ -2043,18 +2070,18 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { raftStoreSetTerm(pSyncNode, newTerm); char tmpBuf[64]; snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm); - syncNodeBecomeFollower(pSyncNode, tmpBuf); + syncNodeBecomeFollower(pSyncNode, id, tmpBuf); raftStoreClearVote(pSyncNode); } else { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { - syncNodeBecomeFollower(pSyncNode, "step down"); + syncNodeBecomeFollower(pSyncNode, id, "step down"); } } } void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } -void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { +void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) { int32_t code = 0; // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; @@ -2062,7 +2089,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->hbSlowNum = 0; - // state change + pSyncNode->leaderCache = leaderId; // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->roleTimeMs = taosGetTimestampMs(); if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) { @@ -2875,7 +2902,8 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (!incfg) { SyncTerm currentTerm = raftStoreGetTerm(ths); - syncNodeStepDown(ths, currentTerm); + SRaftId id = EMPTY_RAFT_ID; + syncNodeStepDown(ths, currentTerm, id); return 1; } } @@ -3664,7 +3692,8 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - syncNodeStepDown(ths, pMsg->currentTerm); + SRaftId id = EMPTY_RAFT_ID; + syncNodeStepDown(ths, pMsg->currentTerm, id); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) { if (syncLogBufferIsEmpty(ths->pLogBuf)) { diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index fe5b3eb7ad..c887846915 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -103,7 +103,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg); // maybe update term if (pMsg->term > raftStoreGetTerm(ths)) { - syncNodeStepDown(ths, pMsg->term); + syncNodeStepDown(ths, pMsg->term, pMsg->srcId); } SyncTerm currentTerm = raftStoreGetTerm(ths); if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR; @@ -116,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { raftStoreVote(ths, &(pMsg->srcId)); // candidate ? - syncNodeStepDown(ths, currentTerm); + syncNodeStepDown(ths, currentTerm, pMsg->srcId); // forbid elect for this round resetElect = true; diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 9f2d746755..b2222a1fe7 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -58,7 +58,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->term > currentTerm) { syncLogRecvRequestVoteReply(ths, pMsg, "error term"); - syncNodeStepDown(ths, pMsg->term); + syncNodeStepDown(ths, pMsg->term, pMsg->destId); TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM); } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a89667ad3d..1720935e9e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1042,7 +1042,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) { if (pMsg->term > raftStoreGetTerm(pSyncNode)) { - syncNodeStepDown(pSyncNode, pMsg->term); + syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId); } } else { syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);