diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 0340bf026b..e6147a2e0e 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -624,6 +624,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); +int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 201f4a4180..b8f1a5238f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -256,70 +256,133 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SRpcMsg *pRpcMsg = pMsg; - if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { - SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + // ToDo: ugly! use function pointer + // use different strategy + if (syncNodeSnapshotEnable(pSyncNode)) { + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); - ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); - syncTimeoutDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { - SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); - ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); - ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); - syncPingReplyDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { - SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); - ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); - syncClientRequestDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { - SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + ret = vnodeSetStandBy(pVnode); + if (ret != 0 && terrno != 0) ret = terrno; + SRpcMsg rsp = {.code = ret, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } else { + vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); + ret = -1; + } - ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); - syncRequestVoteDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); - - ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); - syncRequestVoteReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { - SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); - - ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); - syncAppendEntriesDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); - - ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); - - } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - ret = vnodeSetStandBy(pVnode); - if (ret != 0 && terrno != 0) ret = terrno; - SRpcMsg rsp = {.code = ret, .info = pMsg->info}; - tmsgSendRsp(&rsp); } else { - vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = -1; + // use wal first strategy + + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { + SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); + syncClientRequestBatchDestroyDeep(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) { + SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesBatchDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + ASSERT(pSyncMsg != NULL); + ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + ret = vnodeSetStandBy(pVnode); + if (ret != 0 && terrno != 0) ret = terrno; + SRpcMsg rsp = {.code = ret, .info = pMsg->info}; + tmsgSendRsp(&rsp); + } else { + vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); + ret = -1; + } } syncNodeRelease(pSyncNode); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 21e7f7c7af..8db619270c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,7 +96,7 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 200 // ms retry interval +#define TRANS_RETRY_INTERVAL 30 // ms retry interval #define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg;