opt status send

This commit is contained in:
yihaoDeng 2023-10-18 16:03:55 +08:00
parent c6dd73b68b
commit e62f3fa86a
5 changed files with 61 additions and 3 deletions

View File

@ -52,6 +52,7 @@ typedef struct {
void* data; void* data;
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
void* statusClientRpc;
void* serverRpc; void* serverRpc;
PutToQueueFp putToQueueFp; PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;

View File

@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet); dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); rpcSendRecv(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[256]; char tbuf[256];

View File

@ -48,6 +48,7 @@ typedef struct {
typedef struct { typedef struct {
void *serverRpc; void *serverRpc;
void *clientRpc; void *clientRpc;
void *statusClientRpc;
SDnodeHandle msgHandles[TDMT_MAX]; SDnodeHandle msgHandles[TDMT_MAX];
} SDnodeTrans; } SDnodeTrans;
@ -115,7 +116,9 @@ int32_t dmRunDnode(SDnode *pDnode);
int32_t dmInitServer(SDnode *pDnode); int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode);
int32_t dmInitStatusClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode);
void dmCleanupStatusClient(SDnode *pDnode);
SMsgCb dmGetMsgcb(SDnode *pDnode); SMsgCb dmGetMsgcb(SDnode *pDnode);
int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode);
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);

View File

@ -20,8 +20,8 @@
#include "qworker.h" #include "qworker.h"
#include "tstream.h" #include "tstream.h"
#ifdef TD_TSZ #ifdef TD_TSZ
#include "tglobal.h"
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h"
#endif #endif
int32_t dmInitDnode(SDnode *pDnode) { int32_t dmInitDnode(SDnode *pDnode) {
@ -91,6 +91,7 @@ void dmCleanupDnode(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) return;
dmCleanupClient(pDnode); dmCleanupClient(pDnode);
dmCleanupStatusClient(pDnode);
dmCleanupServer(pDnode); dmCleanupServer(pDnode);
dmClearVars(pDnode); dmClearVars(pDnode);
rpcCleanup(); rpcCleanup();

View File

@ -358,6 +358,50 @@ int32_t dmInitClient(SDnode *pDnode) {
dDebug("dnode rpc client is initialized"); dDebug("dnode rpc client is initialized");
return 0; return 0;
} }
int32_t dmInitStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
rpcInit.label = "DND-STATUS-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 5000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
int32_t connLimitNum = 100;
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->statusClientRpc = rpcOpen(&rpcInit);
if (pTrans->statusClientRpc == NULL) {
dError("failed to init dnode rpc status client");
return -1;
}
dDebug("dnode rpc status client is initialized");
return 0;
}
void dmCleanupClient(SDnode *pDnode) { void dmCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
@ -367,6 +411,14 @@ void dmCleanupClient(SDnode *pDnode) {
dDebug("dnode rpc client is closed"); dDebug("dnode rpc client is closed");
} }
} }
void dmCleanupStatusClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->statusClientRpc) {
rpcClose(pTrans->statusClientRpc);
pTrans->statusClientRpc = NULL;
dDebug("dnode rpc status client is closed");
}
}
int32_t dmInitServer(SDnode *pDnode) { int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
@ -405,6 +457,7 @@ void dmCleanupServer(SDnode *pDnode) {
SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) {
SMsgCb msgCb = { SMsgCb msgCb = {
.clientRpc = pDnode->trans.clientRpc, .clientRpc = pDnode->trans.clientRpc,
.statusClientRpc = pDnode->trans.statusClientRpc,
.serverRpc = pDnode->trans.serverRpc, .serverRpc = pDnode->trans.serverRpc,
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,