diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 49486bc12d..c3566c7c82 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -258,8 +258,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId); -int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, - SRpcMsg* pRpcMsg); +int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg); int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 55cb0d7db6..504a9f0bd7 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -78,14 +78,14 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier); +int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 83d1777f44..e77a8d4be3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -128,7 +128,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { +SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) { SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); if (pEntry == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -182,7 +182,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg); + SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr()); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 29f327c35c..7d534c671e 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -154,8 +154,8 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, - SRpcMsg* pRpcMsg) { +int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg) { uint32_t dataLen = pEntry->bytes; uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; pRpcMsg->contLen = bytes; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index b1f955b8df..b61fc2e90d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -642,7 +642,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; @@ -674,8 +674,7 @@ _out: return ret; } -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, - SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); @@ -750,7 +749,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod // attempt to replicate the raft log at index (void)syncLogReplMgrReset(pMgr); - return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index); + return syncLogReplMgrReplicateProbe(pMgr, pNode, index); } int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { @@ -778,9 +777,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync } if (pMgr->restored) { - (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg); + (void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg); } else { - (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg); + (void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg); } taosThreadMutexUnlock(&pBuf->mutex); return 0; @@ -788,14 +787,14 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { - (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + (void)syncLogReplMgrReplicateAttempt(pMgr, pNode); } else { - (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex); + (void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); @@ -810,7 +809,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -833,7 +832,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode return 0; } -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -854,7 +853,8 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int64_t pos = index % pMgr->size; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + SyncTerm term = -1; + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -888,7 +888,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p return 0; } -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { ASSERT(pMgr->restored == true); if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { @@ -907,7 +907,7 @@ int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pMgr->startIndex = pMgr->matchIndex; } - return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + return syncLogReplMgrReplicateAttempt(pMgr, pNode); } SSyncLogReplMgr* syncLogReplMgrCreate() { @@ -1101,12 +1101,11 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { +int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SRpcMsg msgOut = {0}; bool inBuf = false; - int32_t ret = -1; SyncTerm prevLogTerm = -1; SSyncLogBuffer* pBuf = pNode->pLogBuf; @@ -1132,14 +1131,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn } if (pTerm) *pTerm = pEntry->term; - int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut); + int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut); if (code < 0) { sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); goto _err; } (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); - ret = 0; sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index e9a18dfe86..069ea2ea88 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -48,15 +48,6 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { SyncTerm myLastTerm = syncNodeGetLastTerm(ths); SyncIndex myLastIndex = syncNodeGetLastIndex(ths); - if (pMsg->lastLogIndex < ths->commitIndex) { - sNTrace(ths, - "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 - ", recv-term:%" PRIu64 "}", - myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); - - return false; - } - if (myLastTerm == SYNC_TERM_INVALID) { sNTrace(ths, "logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 @@ -70,6 +61,13 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* ths, SyncRequestVote* pMsg) { "logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); + + if (pMsg->lastLogIndex < ths->commitIndex) { + sNWarn(ths, + "logok:1, commit rollback required. {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 + ", recv-lindex:%" PRId64 ", recv-term:%" PRIu64 "}", + myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term); + } return true; } @@ -137,4 +135,4 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogSendRequestVoteReply(ths, pReply, ""); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); return 0; -} \ No newline at end of file +}