fix/retry-epset-inuse
This commit is contained in:
parent
c0bf7f06fb
commit
a01e55adcd
|
@ -290,8 +290,8 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode);
|
|||
// raft state change --------------
|
||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
|
||||
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
|
||||
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
|
||||
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id);
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr);
|
||||
void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr);
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
|
||||
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode);
|
||||
|
|
|
@ -134,7 +134,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
if(ths->raftCfg.cfg.nodeInfo[ths->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeStepDown(ths, pMsg->term, pMsg->srcId);
|
||||
resetElect = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
if (pMsg->term != raftStoreGetTerm(ths)) {
|
||||
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeStepDown(ths, pMsg->term, pMsg->srcId);
|
||||
return TSDB_CODE_SYN_WRONG_TERM;
|
||||
}
|
||||
|
||||
|
@ -79,7 +79,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
||||
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||
if (commitIndex >= ths->assignedCommitIndex) {
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeStepDown(ths, pMsg->term, pMsg->destId);
|
||||
}
|
||||
} else {
|
||||
TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex));
|
||||
|
|
|
@ -300,7 +300,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
|
|||
}
|
||||
|
||||
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
syncNodeBecomeFollower(ths, "force election");
|
||||
SRaftId id = {0};
|
||||
syncNodeBecomeFollower(ths, id, "force election");
|
||||
|
||||
SRpcMsg rsp = {
|
||||
.code = 0,
|
||||
|
@ -755,12 +756,24 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
|
|||
return lastIndex;
|
||||
}
|
||||
|
||||
static SRaftId syncGetRaftIdByEp(SSyncNode* pSyncNode, const SEp* pEp) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
if (strcmp(pEp->fqdn, (pSyncNode->peersNodeInfo)[i].nodeFqdn) == 0 &&
|
||||
pEp->port == (pSyncNode->peersNodeInfo)[i].nodePort) {
|
||||
return pSyncNode->peersId[i];
|
||||
}
|
||||
}
|
||||
return EMPTY_RAFT_ID;
|
||||
}
|
||||
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||
pEpSet->numOfEps = 0;
|
||||
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) return;
|
||||
|
||||
int index = -1;
|
||||
|
||||
int j = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
|
||||
|
@ -769,10 +782,23 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
|||
pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
|
||||
pEpSet->numOfEps++;
|
||||
sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
|
||||
SRaftId id = syncGetRaftIdByEp(pSyncNode, pEp);
|
||||
if (id.addr == pSyncNode->leaderCache.addr && id.vgId == pSyncNode->leaderCache.vgId && id.addr != 0 &&
|
||||
id.vgId != 0)
|
||||
index = j;
|
||||
j++;
|
||||
}
|
||||
if (pEpSet->numOfEps > 0) {
|
||||
if (index != -1) {
|
||||
pEpSet->inUse = index;
|
||||
} else {
|
||||
if (pSyncNode->myRaftId.addr == pSyncNode->leaderCache.addr &&
|
||||
pSyncNode->myRaftId.vgId == pSyncNode->leaderCache.vgId) {
|
||||
pEpSet->inUse = pSyncNode->raftCfg.cfg.myIndex;
|
||||
} else {
|
||||
pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||
}
|
||||
}
|
||||
// pEpSet->inUse = 0;
|
||||
}
|
||||
epsetSort(pEpSet);
|
||||
|
@ -1461,7 +1487,8 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
|
|||
// Raft 3.6.2 Committing entries from previous terms
|
||||
TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode));
|
||||
} else {
|
||||
syncNodeBecomeFollower(pSyncNode, "first start");
|
||||
SRaftId id = {0};
|
||||
syncNodeBecomeFollower(pSyncNode, id, "first start");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2021,7 +2048,7 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
|||
}
|
||||
}
|
||||
|
||||
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
||||
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm, SRaftId id) {
|
||||
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
|
||||
if (currentTerm > newTerm) {
|
||||
sNTrace(pSyncNode, "step down, ignore, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
||||
|
@ -2043,18 +2070,18 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
|||
raftStoreSetTerm(pSyncNode, newTerm);
|
||||
char tmpBuf[64];
|
||||
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
||||
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
||||
syncNodeBecomeFollower(pSyncNode, id, tmpBuf);
|
||||
raftStoreClearVote(pSyncNode);
|
||||
} else {
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
syncNodeBecomeFollower(pSyncNode, "step down");
|
||||
syncNodeBecomeFollower(pSyncNode, id, "step down");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
|
||||
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, SRaftId leaderId, const char* debugStr) {
|
||||
int32_t code = 0; // maybe clear leader cache
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
pSyncNode->leaderCache = EMPTY_RAFT_ID;
|
||||
|
@ -2062,7 +2089,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
|
||||
pSyncNode->hbSlowNum = 0;
|
||||
|
||||
// state change
|
||||
pSyncNode->leaderCache = leaderId; // state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
|
||||
|
@ -2875,7 +2902,8 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
|||
|
||||
if (!incfg) {
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
syncNodeStepDown(ths, currentTerm);
|
||||
SRaftId id = EMPTY_RAFT_ID;
|
||||
syncNodeStepDown(ths, currentTerm, id);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
@ -3664,7 +3692,8 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncLogRecvLocalCmd(ths, pMsg, "");
|
||||
|
||||
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||
syncNodeStepDown(ths, pMsg->currentTerm);
|
||||
SRaftId id = EMPTY_RAFT_ID;
|
||||
syncNodeStepDown(ths, pMsg->currentTerm, id);
|
||||
|
||||
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT || pMsg->cmd == SYNC_LOCAL_CMD_LEARNER_CMT) {
|
||||
if (syncLogBufferIsEmpty(ths->pLogBuf)) {
|
||||
|
|
|
@ -103,7 +103,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
|
||||
// maybe update term
|
||||
if (pMsg->term > raftStoreGetTerm(ths)) {
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeStepDown(ths, pMsg->term, pMsg->srcId);
|
||||
}
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
if (!(pMsg->term <= currentTerm)) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
|
@ -116,7 +116,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
raftStoreVote(ths, &(pMsg->srcId));
|
||||
|
||||
// candidate ?
|
||||
syncNodeStepDown(ths, currentTerm);
|
||||
syncNodeStepDown(ths, currentTerm, pMsg->srcId);
|
||||
|
||||
// forbid elect for this round
|
||||
resetElect = true;
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
if (pMsg->term > currentTerm) {
|
||||
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
|
||||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeStepDown(ths, pMsg->term, pMsg->destId);
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SYN_WRONG_TERM);
|
||||
}
|
||||
|
|
|
@ -1042,7 +1042,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
|||
|
||||
if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) {
|
||||
if (pMsg->term > raftStoreGetTerm(pSyncNode)) {
|
||||
syncNodeStepDown(pSyncNode, pMsg->term);
|
||||
syncNodeStepDown(pSyncNode, pMsg->term, pMsg->srcId);
|
||||
}
|
||||
} else {
|
||||
syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
|
||||
|
|
Loading…
Reference in New Issue