refactor(sync): use new interface in mnode

This commit is contained in:
Minghao Li 2022-10-17 17:01:42 +08:00
parent 092a07475d
commit 94d8c09e58
1 changed files with 142 additions and 90 deletions

View File

@ -501,99 +501,151 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return -1; return -1;
} }
// ToDo: ugly! use function pointer if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); syncTimeoutDestroy(pSyncMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING) {
} else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
syncClientRequestDestroy(pSyncMsg); code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { syncClientRequestDestroy(pSyncMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
syncRequestVoteDestroy(pSyncMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); syncRequestVoteDestroy(pSyncMsg);
code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
syncSnapshotSendDestroy(pSyncMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { syncAppendEntriesReplyDestroy(pSyncMsg);
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
syncSnapshotRspDestroy(pSyncMsg); code = syncSetStandby(pMgmt->sync);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { SRpcMsg rsp = {.code = code, .info = pMsg->info};
code = syncSetStandby(pMgmt->sync); tmsgSendRsp(&rsp);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
}
} else { } else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = -1;
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
}
} }
/*
// ToDo: ugly! use function pointer
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
}
} else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
}
}
*/
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
if (code != 0) { if (code != 0) {