From 0c654c517ebdb84082a7243bb1235c8c7365fa8d Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 24 Jul 2024 05:13:42 +0000 Subject: [PATCH] refactor sync --- source/libs/sync/src/syncMessage.c | 47 +++++++++++------------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 9e035f60c2..1c67c12a98 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -25,8 +25,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncTimeout* pTimeout = pMsg->pCont; @@ -43,13 +42,13 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen; + pMsg->pCont = rpcMallocCont(bytes); + if (pMsg->pCont == NULL) { + return terrno = TSDB_CODE_OUT_OF_MEMORY; + } pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->contLen = bytes; - if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } SyncClientRequest* pClientRequest = pMsg->pCont; pClientRequest->bytes = bytes; @@ -70,8 +69,7 @@ int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncClientRequest* pClientRequest = pMsg->pCont; @@ -91,8 +89,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncRequestVote* pRequestVote = pMsg->pCont; @@ -108,8 +105,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont; @@ -125,8 +121,7 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncAppendEntries* pAppendEntries = pMsg->pCont; @@ -143,8 +138,7 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont; @@ -161,8 +155,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE pRpcMsg->contLen = bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); if (pRpcMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncAppendEntries* pMsg = pRpcMsg->pCont; @@ -188,8 +181,7 @@ int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_HEARTBEAT; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncHeartbeat* pHeartbeat = pMsg->pCont; @@ -205,8 +197,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont; @@ -222,8 +213,7 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncSnapshotSend* pSnapshotSend = pMsg->pCont; @@ -240,8 +230,7 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont; @@ -257,8 +246,7 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont; @@ -274,8 +262,7 @@ int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_LOCAL_CMD; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return terrno = TSDB_CODE_OUT_OF_MEMORY; } SyncLocalCmd* pLocalCmd = pMsg->pCont;