From 44311425db094fba75c0b7ad6ad83b8d42f45931 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 19:52:14 +0800 Subject: [PATCH] multi process --- include/common/tmsgcb.h | 2 -- source/common/src/tmsgcb.c | 4 ---- source/dnode/mgmt/bm/src/bmInt.c | 1 - source/dnode/mgmt/main/src/dndTransport.c | 1 - source/dnode/mgmt/mm/src/mmInt.c | 7 +------ source/dnode/mgmt/qm/src/qmInt.c | 7 +------ source/dnode/mgmt/sm/src/smInt.c | 7 +------ source/dnode/mgmt/vm/src/vmInt.c | 6 +----- source/dnode/mgmt/vm/src/vmMsg.c | 6 +----- source/dnode/mnode/impl/src/mndProfile.c | 6 ------ 10 files changed, 5 insertions(+), 42 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index cb59599d9a..6c3671a8d6 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -50,7 +50,6 @@ typedef struct { PutToQueueFp queueFps[QUEUE_MAX]; GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; @@ -60,7 +59,6 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgReleaseHandle(void* handle, int8_t type); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index e90634a604..cb5e2b07c1 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -32,10 +32,6 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { - return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); -} - void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index 4b87f4463c..cbaf247a5b 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -22,7 +22,6 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index 07ea0309a8..a87937bc8d 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -398,7 +398,6 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { .pWrapper = pWrapper, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .releaseHandleFp = dndReleaseHandle, - .sendMnodeReqFp = dndSendReqToMnode, .sendReqFp = dndSendReqToDnode, .sendRspFp = dndSendRsp, }; diff --git a/source/dnode/mgmt/mm/src/mmInt.c b/source/dnode/mgmt/mm/src/mmInt.c index f4e00b5921..301ca598e6 100644 --- a/source/dnode/mgmt/mm/src/mmInt.c +++ b/source/dnode/mgmt/mm/src/mmInt.c @@ -39,16 +39,11 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/qm/src/qmInt.c b/source/dnode/mgmt/qm/src/qmInt.c index 11c80a2904..8079859b96 100644 --- a/source/dnode/mgmt/qm/src/qmInt.c +++ b/source/dnode/mgmt/qm/src/qmInt.c @@ -19,15 +19,10 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.qsizeFp = qmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/sm/src/smInt.c b/source/dnode/mgmt/sm/src/smInt.c index a639fc76bb..6d2e2aaefd 100644 --- a/source/dnode/mgmt/sm/src/smInt.c +++ b/source/dnode/mgmt/sm/src/smInt.c @@ -19,12 +19,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/vm/src/vmInt.c b/source/dnode/mgmt/vm/src/vmInt.c index b52c6253dc..d9ef7b5ae6 100644 --- a/source/dnode/mgmt/vm/src/vmInt.c +++ b/source/dnode/mgmt/vm/src/vmInt.c @@ -128,16 +128,12 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { diff --git a/source/dnode/mgmt/vm/src/vmMsg.c b/source/dnode/mgmt/vm/src/vmMsg.c index 89cf8dc5da..c3ad74d246 100644 --- a/source/dnode/mgmt/vm/src/vmMsg.c +++ b/source/dnode/mgmt/vm/src/vmMsg.c @@ -76,16 +76,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6c38d8626c..cad89399a3 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -433,12 +433,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { pHeartbeat->connId = htonl(pHeartbeat->connId); pHeartbeat->pid = htonl(pHeartbeat->pid); - SRpcConnInfo info = {0}; - if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { - mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } - SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); if (pConn == NULL) { pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);