Merge pull request #25340 from taosdata/fix/TD-29533
fix: arb assigned step down need to reset token
This commit is contained in:
commit
2a7a64cf5a
|
@ -78,21 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
||||||
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||||
if (commitIndex >= ths->assignedCommitIndex) {
|
if (commitIndex >= ths->assignedCommitIndex) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
raftStoreNextTerm(ths);
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
|
||||||
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (syncNodeAssignedLeader2Leader(ths) != 0) {
|
|
||||||
sError("vgId:%d, failed to change state from assigned leader to leader", ths->vgId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&ths->arbTokenMutex);
|
|
||||||
syncUtilGenerateArbToken(ths->myNodeInfo.nodeId, ths->vgId, ths->arbToken);
|
|
||||||
sInfo("vgId:%d, assigned leader to leader, arbToken:%s", ths->vgId, ths->arbToken);
|
|
||||||
taosThreadMutexUnlock(&ths->arbTokenMutex);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
||||||
|
|
|
@ -503,20 +503,6 @@ int32_t syncEndSnapshot(int64_t rid) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
sError("sync step down error");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
syncNodeStepDown(pSyncNode, newTerm);
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
@ -1277,7 +1263,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
|
|
||||||
// start in syncNodeStart
|
// start in syncNodeStart
|
||||||
// start raft
|
// start raft
|
||||||
// syncNodeBecomeFollower(pSyncNode);
|
|
||||||
|
|
||||||
int64_t timeNow = taosGetTimestampMs();
|
int64_t timeNow = taosGetTimestampMs();
|
||||||
pSyncNode->startTime = timeNow;
|
pSyncNode->startTime = timeNow;
|
||||||
|
@ -1848,20 +1833,6 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
||||||
|
|
||||||
// persist cfg
|
// persist cfg
|
||||||
syncWriteCfgFile(pSyncNode);
|
syncWriteCfgFile(pSyncNode);
|
||||||
|
|
||||||
#if 0
|
|
||||||
// change isStandBy to normal (election timeout)
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
|
||||||
syncNodeBecomeLeader(pSyncNode, "");
|
|
||||||
|
|
||||||
// Raft 3.6.2 Committing entries from previous terms
|
|
||||||
syncNodeAppendNoop(pSyncNode);
|
|
||||||
// syncMaybeAdvanceCommitIndex(pSyncNode);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
} else {
|
} else {
|
||||||
// persist cfg
|
// persist cfg
|
||||||
syncWriteCfgFile(pSyncNode);
|
syncWriteCfgFile(pSyncNode);
|
||||||
|
@ -1874,18 +1845,6 @@ _END:
|
||||||
}
|
}
|
||||||
|
|
||||||
// raft state change --------------
|
// raft state change --------------
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
|
||||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
|
||||||
raftStoreSetTerm(pSyncNode, term);
|
|
||||||
char tmpBuf[64];
|
|
||||||
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %" PRId64, term);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
|
||||||
raftStoreClearVote(pSyncNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
if (term > raftStoreGetTerm(pSyncNode)) {
|
if (term > raftStoreGetTerm(pSyncNode)) {
|
||||||
raftStoreSetTerm(pSyncNode, term);
|
raftStoreSetTerm(pSyncNode, term);
|
||||||
|
@ -1903,13 +1862,19 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
||||||
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
sNTrace(pSyncNode, "step down, new-term:%" PRId64 ", current-term:%" PRId64, newTerm, currentTerm);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
if (pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||||
|
taosThreadMutexLock(&pSyncNode->arbTokenMutex);
|
||||||
|
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncNode->vgId, pSyncNode->arbToken);
|
||||||
|
sInfo("vgId:%d, step down as assigned leader, new arbToken:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||||
|
taosThreadMutexUnlock(&pSyncNode->arbTokenMutex);
|
||||||
|
}
|
||||||
|
|
||||||
if (currentTerm < newTerm) {
|
if (currentTerm < newTerm) {
|
||||||
raftStoreSetTerm(pSyncNode, newTerm);
|
raftStoreSetTerm(pSyncNode, newTerm);
|
||||||
char tmpBuf[64];
|
char tmpBuf[64];
|
||||||
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRId64, newTerm);
|
||||||
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
syncNodeBecomeFollower(pSyncNode, tmpBuf);
|
||||||
raftStoreClearVote(pSyncNode);
|
raftStoreClearVote(pSyncNode);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
syncNodeBecomeFollower(pSyncNode, "step down");
|
syncNodeBecomeFollower(pSyncNode, "step down");
|
||||||
|
@ -2170,28 +2135,6 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||||
sNTrace(pSyncNode, "follower to candidate");
|
sNTrace(pSyncNode, "follower to candidate");
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
|
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "leader to follower");
|
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
|
||||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
|
||||||
|
|
||||||
sNTrace(pSyncNode, "leader to follower");
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
|
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
|
||||||
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
|
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
|
||||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
|
||||||
|
|
||||||
sNTrace(pSyncNode, "candidate to follower");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) {
|
||||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
|
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER);
|
||||||
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
|
syncNodeBecomeLeader(pSyncNode, "assigned leader to leader");
|
||||||
|
|
Loading…
Reference in New Issue