From cb2527f71f7a7c6894e8eecf193428a727a00f77 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 23 May 2022 13:44:49 +0800 Subject: [PATCH] refactor: sync integrate into mnode --- include/dnode/mnode/mnode.h | 1 - source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 2 -- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 28 --------------------- source/dnode/mnode/impl/src/mnode.c | 6 ----- 4 files changed, 37 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5b028d5055..f2c8c916c8 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -90,7 +90,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndProcessApplyMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 380ae63b8d..75e83d6547 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -33,7 +33,6 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; - SSingleWorker applyWorker; SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; @@ -60,7 +59,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 0f1d325237..f408992aa6 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -62,18 +62,6 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { mndProcessSyncMsg(pMsg); } -static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { - SMnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from mnode-apply queue", pMsg); - - pMsg->info.node = pMgmt->pMnode; - mndProcessApplyMsg(pMsg); - - dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); @@ -88,10 +76,6 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } -int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg); -} - int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } @@ -179,18 +163,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } - SSingleWorkerCfg aCfg = { - .min = 1, - .max = 1, - .name = "mnode-apply", - .fp = (FItem)mmProcessApplyQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) { - dError("failed to start mnode mnode-apply worker since %s", terrstr()); - return -1; - } - SSingleWorkerCfg mCfg = { .min = 1, .max = 1, diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 554556f5b0..4e3c8b8773 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -346,12 +346,6 @@ void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } -int32_t mndProcessApplyMsg(SRpcMsg *pMsg) { - SSdbRaw *pRaw = pMsg->pCont; - SMnode *pMnode = pMsg->info.node; - return sdbWriteWithoutFree(pMnode->pSdb, pRaw); -} - int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; void *ahandle = pMsg->info.ahandle;