From 72cb9c01ea200299b35f47af94ef70e77b0c0081 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 2 Nov 2021 14:53:09 +0800 Subject: [PATCH] minor changes --- source/dnode/mgmt/src/dnodeTransport.c | 63 ++++++++++++-------------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 222d59ba55..8170c8c21d 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -25,11 +25,11 @@ #include "dnodeMnode.h" #include "dnodeVnodes.h" #include "mnode.h" -#include "vnode.h" + static struct { - void *serverRpc; - void *clientRpc; + void *peerRpc; void *shellRpc; + void *clientRpc; MsgFp msgFp[TSDB_MSG_TYPE_MAX]; } tsTrans; @@ -120,7 +120,7 @@ static void dnodeInitMsgFp() { } static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + SRpcMsg rspMsg = {.handle = pMsg->handle}; int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { @@ -154,7 +154,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } } -static int32_t dnodeInitServer() { +static int32_t dnodeInitPeerServer() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; @@ -165,8 +165,8 @@ static int32_t dnodeInitServer() { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - tsTrans.serverRpc = rpcOpen(&rpcInit); - if (tsTrans.serverRpc == NULL) { + tsTrans.peerRpc = rpcOpen(&rpcInit); + if (tsTrans.peerRpc == NULL) { dError("failed to init peer rpc server"); return -1; } @@ -175,10 +175,10 @@ static int32_t dnodeInitServer() { return 0; } -static void dnodeCleanupServer() { - if (tsTrans.serverRpc) { - rpcClose(tsTrans.serverRpc); - tsTrans.serverRpc = NULL; +static void dnodeCleanupPeerServer() { + if (tsTrans.peerRpc) { + rpcClose(tsTrans.peerRpc); + tsTrans.peerRpc = NULL; dInfo("dnode peer server is closed"); } } @@ -205,7 +205,8 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } static int32_t dnodeInitClient() { - char secret[TSDB_KEY_LEN] = "secret"; + char secret[TSDB_KEY_LEN] = "secret"; + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.label = "DND-C"; @@ -237,7 +238,7 @@ static void dnodeCleanupClient() { } static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; + SRpcMsg rspMsg = {.handle = pMsg->handle}; int32_t msgType = pMsg->msgType; if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { @@ -272,14 +273,6 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } } -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } - -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - SRpcEpSet epSet = {0}; - dnodeGetMnodeEpSetForPeer(&epSet); - dnodeSendMsgToDnode(&epSet, rpcMsg); -} - static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { SRpcEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); @@ -293,20 +286,16 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); tstrncpy(pMsg->user, user, sizeof(pMsg->user)); - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pMsg; - rpcMsg.contLen = sizeof(SAuthMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_AUTH; - dDebug("user:%s, send auth msg to mnodes", user); + SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH}; SRpcMsg rpcRsp = {0}; dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); } else { - SAuthRsp *pRsp = rpcRsp.pCont; dDebug("user:%s, auth msg received from mnodes", user); + SAuthRsp *pRsp = rpcRsp.pCont; memcpy(secret, pRsp->secret, TSDB_KEY_LEN); memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); *spi = pRsp->spi; @@ -317,7 +306,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c return rpcRsp.code; } -static int32_t dnodeInitShell() { +static int32_t dnodeInitShellServer() { int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { numOfThreads = 1; @@ -344,7 +333,7 @@ static int32_t dnodeInitShell() { return 0; } -static void dnodeCleanupShell() { +static void dnodeCleanupShellServer() { if (tsTrans.shellRpc) { rpcClose(tsTrans.shellRpc); tsTrans.shellRpc = NULL; @@ -356,11 +345,11 @@ int32_t dnodeInitTrans() { return -1; } - if (dnodeInitServer() != 0) { + if (dnodeInitPeerServer() != 0) { return -1; } - if (dnodeInitShell() != 0) { + if (dnodeInitShellServer() != 0) { return -1; } @@ -368,7 +357,15 @@ int32_t dnodeInitTrans() { } void dnodeCleanupTrans() { - dnodeCleanupShell(); - dnodeCleanupServer(); + dnodeCleanupShellServer(); + dnodeCleanupPeerServer(); dnodeCleanupClient(); } + +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } + +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { + SRpcEpSet epSet = {0}; + dnodeGetMnodeEpSetForPeer(&epSet); + dnodeSendMsgToDnode(&epSet, rpcMsg); +} \ No newline at end of file