From b765f21e052676002cf6e585337db055b1ec3f9a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 16 May 2022 16:32:30 +0800 Subject: [PATCH] refactor: adjust SRpcMsg --- include/common/tmsgcb.h | 4 +-- source/common/src/tmsgcb.c | 16 ++------- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 5 ++- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 34 +++---------------- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 ++ source/dnode/mgmt/node_util/src/dmEps.c | 18 ++++++++++ source/libs/transport/src/transSrv.c | 4 +-- 7 files changed, 33 insertions(+), 50 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index d507372d56..fb30ef324c 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -42,8 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(const SRpcMsg* pRsp); -typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp); -typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet); +typedef void (*SendRedirectRspFp)(const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc); @@ -56,7 +55,6 @@ typedef struct { GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; SendRspFp sendRspFp; - SendMnodeRecvFp sendMnodeRecvFp; SendRedirectRspFp sendRedirectRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index f2655fb03c..4487644c05 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -46,21 +46,9 @@ void tmsgSendRsp(SRpcMsg* pRsp) { } void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) { + // cannot be empty, but not checked for faster detect SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp; - if (fp != NULL) { - (*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } -} - -void tmsgSendMnodeRecv(SRpcMsg* pReq, SRpcMsg* pRsp) { - SendMnodeRecvFp fp = tsDefaultMsgCb.sendMnodeRecvFp; - if (fp != NULL) { - (*fp)(tsDefaultMsgCb.pWrapper, pReq, pRsp); - } else { - terrno = TSDB_CODE_INVALID_PTR; - } + (*fp)(pRsp, pNewEpSet); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ddc22b9d05..210b68e82b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -87,7 +87,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcRsp = {0}; dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle); - tmsgSendMnodeRecv(&rpcMsg, &rpcRsp); + + SEpSet epSet = {0}; + dmGetMnodeEpSet(pMgmt->pData, &epSet); + rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 29dbed36c5..4b817c7cef 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -21,26 +21,6 @@ #define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" -static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - SDnodeData *pData = &pDnode->data; - taosRLockLatch(&pData->latch); - *pEpSet = pData->mnodeEps; - taosRUnLockLatch(&pData->latch); -} - -static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - SDnodeData *pData = &pDnode->data; - - taosWLockLatch(&pData->latch); - pData->mnodeEps = *pEpSet; - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); - } - - taosWUnLockLatch(&pData->latch); -} - static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if (IsReq(pRpc) && rpcGetConnInfo(pRpc->info.handle, &connInfo) != 0) { @@ -214,7 +194,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); + dmGetMnodeEpSet(&pDnode->data, &epSet); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { @@ -257,12 +237,6 @@ static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRp } } -static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) { - SEpSet epSet = {0}; - dmGetMnodeEpSet(pWrapper->pDnode, &epSet); - dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp); -} - static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; if (pDnode->status != DND_STAT_RUNNING || pDnode->trans.clientRpc == NULL) { @@ -286,7 +260,8 @@ static inline void dmSendRsp(const SRpcMsg *pRsp) { } } -static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { +static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { + SMgmtWrapper *pWrapper = pRsp->info.wrapper; if (InChildProc(pWrapper->proc.ptype)) { dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); } else { @@ -407,7 +382,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SRpcMsg rpcRsp = {0}; SEpSet epSet = {0}; dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); - dmGetMnodeEpSet(pDnode, &epSet); + dmGetMnodeEpSet(&pDnode->data, &epSet); dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { @@ -469,7 +444,6 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { .clientRpc = pWrapper->pDnode->trans.clientRpc, .sendReqFp = dmSendReq, .sendRspFp = dmSendRsp, - .sendMnodeRecvFp = dmSendToMnodeRecv, .sendRedirectRspFp = dmSendRedirectRsp, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .releaseHandleFp = dmReleaseHandle, diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index b86d71cb90..dc5999a7e6 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -179,6 +179,8 @@ int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm); int32_t dmReadEps(SDnodeData *pData); int32_t dmWriteEps(SDnodeData *pData); void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); +void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); +void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 69e4a1efc4..e488aa1082 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -307,3 +307,21 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { taosRUnLockLatch(&pData->latch); return changed; } + +void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { + taosRLockLatch(&pData->latch); + *pEpSet = pData->mnodeEps; + taosRUnLockLatch(&pData->latch); +} + +void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + + taosWLockLatch(&pData->latch); + pData->mnodeEps = *pEpSet; + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } + + taosWUnLockLatch(&pData->latch); +} diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 8b3ed50c71..8091344d44 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -440,9 +440,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d msglen:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + ntohs(pConn->locaddr.sin_port), len); pHead->msgLen = htonl(len); wb->base = msg;