Merge pull request #19408 from taosdata/FIX/TD-21428-main
enh: vote for higher lastLogTerm despite commitIndex
This commit is contained in:
commit
9c5b5e93d9
|
@ -258,8 +258,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
|
|||
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
|
||||
SRpcMsg* pRpcMsg);
|
||||
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
|
||||
SRpcMsg* pRpcMsg);
|
||||
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
|
||||
|
|
|
@ -78,14 +78,14 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
|
|||
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||
|
||||
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
||||
SRaftId* pDestId, bool* pBarrier);
|
||||
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
||||
SRaftId* pDestId, bool* pBarrier);
|
||||
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||
|
||||
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||
|
||||
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
|
||||
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||
|
|
|
@ -128,7 +128,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
|
||||
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||
if (pEntry == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -182,7 +182,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
goto _IGNORE;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg);
|
||||
SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||
|
|
|
@ -154,8 +154,8 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
|
||||
SRpcMsg* pRpcMsg) {
|
||||
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
|
||||
SRpcMsg* pRpcMsg) {
|
||||
uint32_t dataLen = pEntry->bytes;
|
||||
uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
|
||||
pRpcMsg->contLen = bytes;
|
||||
|
|
|
@ -642,7 +642,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
|||
}
|
||||
|
||||
bool barrier = false;
|
||||
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId,
|
||||
terrstr(), index, pDestId->addr);
|
||||
goto _out;
|
||||
|
@ -674,8 +674,7 @@ _out:
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
|
||||
SyncAppendEntriesReply* pMsg) {
|
||||
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
SRaftId destId = pMsg->srcId;
|
||||
ASSERT(pMgr->restored == false);
|
||||
|
@ -750,7 +749,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
|
|||
|
||||
// attempt to replicate the raft log at index
|
||||
(void)syncLogReplMgrReset(pMgr);
|
||||
return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index);
|
||||
return syncLogReplMgrReplicateProbe(pMgr, pNode, index);
|
||||
}
|
||||
|
||||
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
|
||||
|
@ -778,9 +777,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
|
|||
}
|
||||
|
||||
if (pMgr->restored) {
|
||||
(void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg);
|
||||
(void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg);
|
||||
} else {
|
||||
(void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg);
|
||||
(void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg);
|
||||
}
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
return 0;
|
||||
|
@ -788,14 +787,14 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
|
|||
|
||||
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||
if (pMgr->restored) {
|
||||
(void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
|
||||
(void)syncLogReplMgrReplicateAttempt(pMgr, pNode);
|
||||
} else {
|
||||
(void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
||||
(void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
||||
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
||||
ASSERT(!pMgr->restored);
|
||||
ASSERT(pMgr->startIndex >= 0);
|
||||
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
||||
|
@ -810,7 +809,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
|
|||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||
bool barrier = false;
|
||||
SyncTerm term = -1;
|
||||
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||
terrstr(), index, pDestId->addr);
|
||||
return -1;
|
||||
|
@ -833,7 +832,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||
ASSERT(pMgr->restored);
|
||||
|
||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||
|
@ -854,7 +853,8 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
|||
int64_t pos = index % pMgr->size;
|
||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||
bool barrier = false;
|
||||
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
SyncTerm term = -1;
|
||||
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||
terrstr(), index, pDestId->addr);
|
||||
return -1;
|
||||
|
@ -888,7 +888,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
|
||||
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
|
||||
ASSERT(pMgr->restored == true);
|
||||
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
|
||||
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
|
||||
|
@ -907,7 +907,7 @@ int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode*
|
|||
pMgr->startIndex = pMgr->matchIndex;
|
||||
}
|
||||
|
||||
return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
|
||||
return syncLogReplMgrReplicateAttempt(pMgr, pNode);
|
||||
}
|
||||
|
||||
SSyncLogReplMgr* syncLogReplMgrCreate() {
|
||||
|
@ -1101,12 +1101,11 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
|
|||
return pEntry;
|
||||
}
|
||||
|
||||
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
||||
SRaftId* pDestId, bool* pBarrier) {
|
||||
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
||||
SRaftId* pDestId, bool* pBarrier) {
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
SRpcMsg msgOut = {0};
|
||||
bool inBuf = false;
|
||||
int32_t ret = -1;
|
||||
SyncTerm prevLogTerm = -1;
|
||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||
|
||||
|
@ -1132,14 +1131,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
|
|||
}
|
||||
if (pTerm) *pTerm = pEntry->term;
|
||||
|
||||
int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut);
|
||||
int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
|
||||
if (code < 0) {
|
||||
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
(void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);
|
||||
ret = 0;
|
||||
|
||||
sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
|
||||
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
|
||||
|
|
|
@ -48,15 +48,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
SyncTerm myLastTerm = syncNodeGetLastTerm(ths);
|
||||
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
|
||||
|
||||
if (pMsg->lastLogIndex < ths->commitIndex) {
|
||||
sNTrace(ths,
|
||||
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
|
||||
", recv-term:%" PRIu64 "}",
|
||||
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (myLastTerm == SYNC_TERM_INVALID) {
|
||||
sNTrace(ths,
|
||||
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
|
||||
|
@ -70,6 +61,13 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) {
|
|||
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
|
||||
", recv-term:%" PRIu64 "}",
|
||||
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
|
||||
|
||||
if (pMsg->lastLogIndex < ths->commitIndex) {
|
||||
sNWarn(ths,
|
||||
"logok:1, commit rollback required. {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64
|
||||
", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}",
|
||||
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -137,4 +135,4 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncLogSendRequestVoteReply(ths, pReply, "");
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue