enh: refactor func names doOnce, attempt, probe, and sendTo of syncLogRepl
This commit is contained in:
parent
f6be5f2c0e
commit
09786a127d
|
@ -77,18 +77,19 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr
|
||||||
|
|
||||||
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
SRaftId* pDestId, bool* pBarrier);
|
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
||||||
int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
|
||||||
int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
|
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
||||||
|
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
|
||||||
|
bool* pBarrier);
|
||||||
|
|
||||||
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
|
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
|
||||||
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
|
|
||||||
|
|
||||||
// SSyncLogBuffer
|
// SSyncLogBuffer
|
||||||
SSyncLogBuffer* syncLogBufferCreate();
|
SSyncLogBuffer* syncLogBufferCreate();
|
||||||
|
@ -100,6 +101,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||||
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
||||||
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
|
||||||
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
||||||
|
|
||||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
||||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
||||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
||||||
|
|
|
@ -666,7 +666,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
goto _out;
|
goto _out;
|
||||||
|
@ -774,7 +774,7 @@ int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNod
|
||||||
|
|
||||||
// attempt to replicate the raft log at index
|
// attempt to replicate the raft log at index
|
||||||
(void)syncLogReplReset(pMgr);
|
(void)syncLogReplReset(pMgr);
|
||||||
return syncLogReplReplicateProbe(pMgr, pNode, index);
|
return syncLogReplProbe(pMgr, pNode, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
|
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
|
||||||
|
@ -809,16 +809,16 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
if (pMgr->restored) {
|
if (pMgr->restored) {
|
||||||
(void)syncLogReplReplicateAttempt(pMgr, pNode);
|
(void)syncLogReplAttempt(pMgr, pNode);
|
||||||
} else {
|
} else {
|
||||||
(void)syncLogReplReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
(void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
||||||
ASSERT(!pMgr->restored);
|
ASSERT(!pMgr->restored);
|
||||||
ASSERT(pMgr->startIndex >= 0);
|
ASSERT(pMgr->startIndex >= 0);
|
||||||
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
||||||
|
@ -833,7 +833,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
SyncTerm term = -1;
|
SyncTerm term = -1;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -856,7 +856,7 @@ int32_t syncLogReplReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
ASSERT(pMgr->restored);
|
ASSERT(pMgr->restored);
|
||||||
|
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
|
@ -878,7 +878,7 @@ int32_t syncLogReplReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
SyncTerm term = -1;
|
SyncTerm term = -1;
|
||||||
if (syncLogReplReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -931,7 +931,7 @@ int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
|
||||||
pMgr->startIndex = pMgr->matchIndex;
|
pMgr->startIndex = pMgr->matchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
return syncLogReplReplicateAttempt(pMgr, pNode);
|
return syncLogReplAttempt(pMgr, pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncLogReplMgr* syncLogReplCreate() {
|
SSyncLogReplMgr* syncLogReplCreate() {
|
||||||
|
@ -1126,8 +1126,8 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogReplReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
|
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
|
||||||
SRaftId* pDestId, bool* pBarrier) {
|
bool* pBarrier) {
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
SRpcMsg msgOut = {0};
|
SRpcMsg msgOut = {0};
|
||||||
bool inBuf = false;
|
bool inBuf = false;
|
||||||
|
|
|
@ -74,7 +74,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
|
||||||
(void)syncLogReplReplicateOnce(pMgr, pNode);
|
(void)syncLogReplDoOnce(pMgr, pNode);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue