diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 92e7b555a4..c5fdc27426 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -243,6 +243,8 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg); int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8dd972506a..c95a3aba62 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2313,33 +2313,6 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return 0; } -int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, SRpcMsg* pRpcMsg) { - uint32_t dataLen = pEntry->bytes; - uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; - pRpcMsg->contLen = bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - if (pRpcMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - SyncAppendEntries* pMsg = pRpcMsg->pCont; - pMsg->bytes = pRpcMsg->contLen; - pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; - pMsg->dataLen = dataLen; - - (void)memcpy(pMsg->data, pEntry, dataLen); - - pMsg->prevLogIndex = pEntry->index - 1; - pMsg->prevLogTerm = prevLogTerm; - pMsg->vgId = pNode->vgId; - pMsg->srcId = pNode->myRaftId; - pMsg->term = pNode->pRaftStore->currentTerm; - pMsg->commitIndex = pNode->commitIndex; - pMsg->privateTerm = 0; - return 0; -} - // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index ce98419980..ef1d585a89 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "syncMessage.h" #include "syncRaftEntry.h" +#include "syncRaftStore.h" int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, SSyncNode* pNode) { @@ -152,6 +153,34 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } +int32_t syncBuildAppendEntriesFromRaftLog(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm, + SRpcMsg* pRpcMsg) { + uint32_t dataLen = pEntry->bytes; + uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; + pRpcMsg->contLen = bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + if (pRpcMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + SyncAppendEntries* pMsg = pRpcMsg->pCont; + pMsg->bytes = pRpcMsg->contLen; + pMsg->msgType = pRpcMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; + pMsg->dataLen = dataLen; + + (void)memcpy(pMsg->data, pEntry, dataLen); + + pMsg->prevLogIndex = pEntry->index - 1; + pMsg->prevLogTerm = prevLogTerm; + pMsg->vgId = pNode->vgId; + pMsg->srcId = pNode->myRaftId; + pMsg->term = pNode->pRaftStore->currentTerm; + pMsg->commitIndex = pNode->commitIndex; + pMsg->privateTerm = 0; + return 0; +} + int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) { int32_t bytes = sizeof(SyncHeartbeat); pMsg->pCont = rpcMallocCont(bytes); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index c8bb6260e8..501efa8782 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -911,7 +911,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn } if (pTerm) *pTerm = pEntry->term; - int32_t code = syncLogToAppendEntries(pNode, pEntry, prevLogTerm, &msgOut); + int32_t code = syncBuildAppendEntriesFromRaftLog(pNode, pEntry, prevLogTerm, &msgOut); if (code < 0) { sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); goto _err;