From e62f3fa86a76a025ea5d8c53d8f42ad26ee8e86f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 18 Oct 2023 16:03:55 +0800 Subject: [PATCH] opt status send --- include/common/tmsgcb.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 3 ++ source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 5 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 53 +++++++++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 311bffb7da..7ab69759e3 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -52,6 +52,7 @@ typedef struct { void* data; void* mgmt; void* clientRpc; + void* statusClientRpc; void* serverRpc; PutToQueueFp putToQueueFp; GetQueueSizeFp qsizeFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index c7af552da4..3453731c03 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); + rpcSendRecv(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 3cf7a360f9..f11378bb71 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -48,6 +48,7 @@ typedef struct { typedef struct { void *serverRpc; void *clientRpc; + void *statusClientRpc; SDnodeHandle msgHandles[TDMT_MAX]; } SDnodeTrans; @@ -115,7 +116,9 @@ int32_t dmRunDnode(SDnode *pDnode); int32_t dmInitServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode); +int32_t dmInitStatusClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode); +void dmCleanupStatusClient(SDnode *pDnode); SMsgCb dmGetMsgcb(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 15697dc448..5164d60ba6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -20,8 +20,8 @@ #include "qworker.h" #include "tstream.h" #ifdef TD_TSZ -#include "tglobal.h" #include "tcompression.h" +#include "tglobal.h" #endif int32_t dmInitDnode(SDnode *pDnode) { @@ -66,7 +66,7 @@ int32_t dmInitDnode(SDnode *pDnode) { goto _OVER; } - if(dmInitModule(pDnode) != 0) { + if (dmInitModule(pDnode) != 0) { goto _OVER; } @@ -91,6 +91,7 @@ void dmCleanupDnode(SDnode *pDnode) { if (pDnode == NULL) return; dmCleanupClient(pDnode); + dmCleanupStatusClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); rpcCleanup(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dc48ff71f8..b273706feb 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -358,6 +358,50 @@ int32_t dmInitClient(SDnode *pDnode) { dDebug("dnode rpc client is initialized"); return 0; } +int32_t dmInitStatusClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + + SRpcInit rpcInit = {0}; + rpcInit.label = "DND-STATUS-C"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; + rpcInit.sessions = 1024; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = TSDB_DEFAULT_USER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.parent = pDnode; + rpcInit.rfp = rpcRfp; + rpcInit.compressSize = tsCompressMsgSize; + + rpcInit.retryMinInterval = tsRedirectPeriod; + rpcInit.retryStepFactor = tsRedirectFactor; + rpcInit.retryMaxInterval = tsRedirectMaxPeriod; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; + + rpcInit.failFastInterval = 5000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + + int32_t connLimitNum = 100; + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + + rpcInit.connLimitNum = connLimitNum; + rpcInit.connLimitLock = 1; + rpcInit.supportBatch = 1; + rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + + pTrans->statusClientRpc = rpcOpen(&rpcInit); + if (pTrans->statusClientRpc == NULL) { + dError("failed to init dnode rpc status client"); + return -1; + } + + dDebug("dnode rpc status client is initialized"); + return 0; +} void dmCleanupClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -367,6 +411,14 @@ void dmCleanupClient(SDnode *pDnode) { dDebug("dnode rpc client is closed"); } } +void dmCleanupStatusClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + if (pTrans->statusClientRpc) { + rpcClose(pTrans->statusClientRpc); + pTrans->statusClientRpc = NULL; + dDebug("dnode rpc status client is closed"); + } +} int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -405,6 +457,7 @@ void dmCleanupServer(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb msgCb = { .clientRpc = pDnode->trans.clientRpc, + .statusClientRpc = pDnode->trans.statusClientRpc, .serverRpc = pDnode->trans.serverRpc, .sendReqFp = dmSendReq, .sendRspFp = dmSendRsp,