refactor sync
This commit is contained in:
parent
c50a783b0f
commit
0c654c517e
|
@ -25,8 +25,7 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
||||||
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
|
pMsg->msgType = (timeoutType == SYNC_TIMEOUT_ELECTION) ? TDMT_SYNC_TIMEOUT_ELECTION : TDMT_SYNC_TIMEOUT;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncTimeout* pTimeout = pMsg->pCont;
|
SyncTimeout* pTimeout = pMsg->pCont;
|
||||||
|
@ -43,13 +42,13 @@ int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t l
|
||||||
|
|
||||||
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
|
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
|
||||||
int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
|
int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
|
||||||
|
|
||||||
pMsg->pCont = rpcMallocCont(bytes);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncClientRequest* pClientRequest = pMsg->pCont;
|
SyncClientRequest* pClientRequest = pMsg->pCont;
|
||||||
pClientRequest->bytes = bytes;
|
pClientRequest->bytes = bytes;
|
||||||
|
@ -70,8 +69,7 @@ int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry*
|
||||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncClientRequest* pClientRequest = pMsg->pCont;
|
SyncClientRequest* pClientRequest = pMsg->pCont;
|
||||||
|
@ -91,8 +89,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncRequestVote* pRequestVote = pMsg->pCont;
|
SyncRequestVote* pRequestVote = pMsg->pCont;
|
||||||
|
@ -108,8 +105,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
|
SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
|
||||||
|
@ -125,8 +121,7 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntries* pAppendEntries = pMsg->pCont;
|
SyncAppendEntries* pAppendEntries = pMsg->pCont;
|
||||||
|
@ -143,8 +138,7 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
|
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
|
||||||
|
@ -161,8 +155,7 @@ int32_t syncBuildAppendEntriesFromRaftEntry(SSyncNode* pNode, SSyncRaftEntry* pE
|
||||||
pRpcMsg->contLen = bytes;
|
pRpcMsg->contLen = bytes;
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
if (pRpcMsg->pCont == NULL) {
|
if (pRpcMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||||
|
@ -188,8 +181,7 @@ int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
||||||
|
@ -205,8 +197,7 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
|
SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
|
||||||
|
@ -222,8 +213,7 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
|
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
|
SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
|
||||||
|
@ -240,8 +230,7 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
|
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
|
||||||
|
@ -257,8 +246,7 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
|
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
|
SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
|
||||||
|
@ -274,8 +262,7 @@ int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncLocalCmd* pLocalCmd = pMsg->pCont;
|
SyncLocalCmd* pLocalCmd = pMsg->pCont;
|
||||||
|
|
Loading…
Reference in New Issue