enh: refactor some sync func names for pipelining

This commit is contained in:
Benguang Zhao 2023-01-05 15:40:43 +08:00
parent 3e215fc64c
commit 47ace00090
5 changed files with 28 additions and 31 deletions

View File

@ -258,8 +258,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg);
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg);
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);

View File

@ -78,14 +78,14 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);

View File

@ -127,7 +127,7 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
return 0;
}
SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* syncBuildRaftEntryFromAppendEntries(const SyncAppendEntries* pMsg) {
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
if (pEntry == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -181,7 +181,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}
SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg);
SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());

View File

@ -154,8 +154,8 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
return 0;
}
int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg) {
int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm,
SRpcMsg* pRpcMsg) {
uint32_t dataLen = pEntry->bytes;
uint32_t bytes = sizeof(SyncAppendEntries) + dataLen;
pRpcMsg->contLen = bytes;

View File

@ -617,7 +617,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
goto _out;
@ -647,8 +647,7 @@ _out:
return ret;
}
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
SyncAppendEntriesReply* pMsg) {
int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false);
@ -723,7 +722,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
// attempt to replicate the raft log at index
(void)syncLogReplMgrReset(pMgr);
return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index);
return syncLogReplMgrReplicateProbe(pMgr, pNode, index);
}
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
@ -751,9 +750,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
}
if (pMgr->restored) {
(void)syncLogReplMgrProcessReplyInNormalMode(pMgr, pNode, pMsg);
(void)syncLogReplMgrProcessReplyAsNormal(pMgr, pNode, pMsg);
} else {
(void)syncLogReplMgrProcessReplyInRecoveryMode(pMgr, pNode, pMsg);
(void)syncLogReplMgrProcessReplyAsRecovery(pMgr, pNode, pMsg);
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
@ -761,14 +760,14 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) {
(void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
(void)syncLogReplMgrReplicateAttempt(pMgr, pNode);
} else {
(void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex);
(void)syncLogReplMgrReplicateProbe(pMgr, pNode, pNode->pLogBuf->matchIndex);
}
return 0;
}
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored);
ASSERT(pMgr->startIndex >= 0);
int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
@ -783,7 +782,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
@ -807,7 +806,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
return 0;
}
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
ASSERT(pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
@ -827,7 +826,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
@ -857,7 +856,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
return 0;
}
int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
@ -876,7 +875,7 @@ int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode*
pMgr->startIndex = pMgr->matchIndex;
}
return syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode);
return syncLogReplMgrReplicateAttempt(pMgr, pNode);
}
SSyncLogReplMgr* syncLogReplMgrCreate() {
@ -1066,12 +1065,11 @@ SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode,
return pEntry;
}
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier) {
int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier) {
SSyncRaftEntry* pEntry = NULL;
SRpcMsg msgOut = {0};
bool inBuf = false;
int32_t ret = -1;
SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
@ -1097,14 +1095,13 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
}
if (pTerm) *pTerm = pEntry->term;
int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut);
int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
if (code < 0) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
goto _err;
}
(void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut);
ret = 0;
sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);