diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3bd94dbab5..bd89f6af3a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -258,8 +258,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId); -int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, - SRpcMsg* pRpcMsg); +int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg); int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a0a0691694..fb5541f916 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -78,14 +78,14 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) { SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier); +int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); +int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1dc6905b88..d9b13610e3 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -127,7 +127,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } -SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { +SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) { SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); if (pEntry == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -181,7 +181,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg); + SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg); if (pEntry == NULL) { sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr()); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 467b4e2219..af2555153b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -154,8 +154,8 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, - SRpcMsg* pRpcMsg) { +int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg) { uint32_t dataLen = pEntry->bytes; uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; pRpcMsg->contLen = bytes; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 410986b87a..4a5dc46c76 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -617,7 +617,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } bool barrier = false; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; @@ -647,8 +647,7 @@ _out: return ret; } -int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, - SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; SRaftId destId = pMsg->srcId; ASSERT(pMgr->restored == false); @@ -723,7 +722,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod // attempt to replicate the raft log at index (void)syncLogReplMgrReset(pMgr); - return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index); + return syncLogReplMgrReplicateProbe(pMgr, pNode, index); } int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { @@ -751,9 +750,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync } if (pMgr->restored) { - (void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg); + (void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg); } else { - (void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg); + (void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg); } taosThreadMutexUnlock(&pBuf->mutex); return 0; @@ -761,14 +760,14 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { - (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + (void)syncLogReplMgrReplicateAttempt(pMgr, pNode); } else { - (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex); + (void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF); @@ -783,7 +782,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -807,7 +806,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode return 0; } -int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -827,7 +826,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -857,7 +856,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p return 0; } -int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { +int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { ASSERT(pMgr->restored == true); if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) { if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) { @@ -876,7 +875,7 @@ int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pMgr->startIndex = pMgr->matchIndex; } - return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); + return syncLogReplMgrReplicateAttempt(pMgr, pNode); } SSyncLogReplMgr* syncLogReplMgrCreate() { @@ -1066,12 +1065,11 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { +int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, + SRaftId* pDestId, bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SRpcMsg msgOut = {0}; bool inBuf = false; - int32_t ret = -1; SyncTerm prevLogTerm = -1; SSyncLogBuffer* pBuf = pNode->pLogBuf; @@ -1097,14 +1095,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn } if (pTerm) *pTerm = pEntry->term; - int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut); + int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut); if (code < 0) { sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); goto _err; } (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); - ret = 0; sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);