From 09786a127d4fd0a3abc960a05b0f43a6651be223 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 17:57:24 +0800 Subject: [PATCH] enh: refactor func names doOnce, attempt, probe, and sendTo of syncLogRepl --- source/libs/sync/inc/syncPipeline.h | 14 ++++++++------ source/libs/sync/src/syncPipeline.c | 24 ++++++++++++------------ source/libs/sync/src/syncReplication.c | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 68db811b12..d709e33cd4 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -77,18 +77,19 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); -int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier); -int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); +int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); + +int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, + bool* pBarrier); int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); -int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode); // SSyncLogBuffer SSyncLogBuffer* syncLogBufferCreate(); @@ -100,6 +101,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); + int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 22c87343ef..519a19e8c7 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -666,7 +666,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } bool barrier = false; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; @@ -774,7 +774,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod // attempt to replicate the raft log at index (void)syncLogReplReset(pMgr); - return syncLogReplReplicateProbe(pMgr, pNode, index); + return syncLogReplProbe(pMgr, pNode, index); } int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { @@ -809,16 +809,16 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp return 0; } -int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { - (void)syncLogReplReplicateAttempt(pMgr, pNode); + (void)syncLogReplAttempt(pMgr, pNode); } else { - (void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); + (void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { +int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); @@ -833,7 +833,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -856,7 +856,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI return 0; } -int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; @@ -878,7 +878,7 @@ int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; - if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { + if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; @@ -931,7 +931,7 @@ int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, pMgr->startIndex = pMgr->matchIndex; } - return syncLogReplReplicateAttempt(pMgr, pNode); + return syncLogReplAttempt(pMgr, pNode); } SSyncLogReplMgr* syncLogReplCreate() { @@ -1126,8 +1126,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, return pEntry; } -int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, - SRaftId* pDestId, bool* pBarrier) { +int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId, + bool* pBarrier) { SSyncRaftEntry* pEntry = NULL; SRpcMsg msgOut = {0}; bool inBuf = false; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 43d2bc839b..8ac9a860e3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { continue; } SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; - (void)syncLogReplReplicateOnce(pMgr, pNode); + (void)syncLogReplDoOnce(pMgr, pNode); } return 0; }