From b20794f365058bbc6d6a7266530f046e6d81003b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sat, 21 May 2022 22:35:02 +0800 Subject: [PATCH] enh(sync) sync/mnode integration --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 10 +++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 9 +- source/dnode/mnode/impl/src/mnode.c | 98 +++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 2ce42d7a5f..a894a4962d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 04fe891ce9..8574e01226 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -56,6 +56,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } +static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + pMsg->info.node = pMgmt->pMnode; + + mndProcessSyncMsg(pMsg); + return; +} static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; @@ -201,7 +208,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-sync", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessSyncQueue, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index f8e5a65f0f..23baa43e97 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -356,6 +356,104 @@ int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { return code; } +#include "syncTools.h" + +int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { + SMnode *pMnode = pMsg->info.node; + void *ahandle = pMsg->info.ahandle; + + int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + + if (syncEnvIsStart()) { + SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync); + assert(pSyncNode != NULL); + + ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync); + SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync); + + SMsgHead *pHead = pMsg->pCont; + + char logBuf[512]; + char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync); + snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + taosMemoryFree(syncNodeStr); + + SRpcMsg *pRpcMsg = pMsg; + + if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); + syncClientRequestDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + + ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + + } else { + mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + } + + syncNodeRelease(pSyncNode); + } else { + mError("==mndProcessSyncMsg== error syncEnv stop"); + ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; + } + + return ret; + + + return 0; +} + int32_t mndProcessMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle;