From cfc9bb1effdab4339fd3478f822d0a2887f0df66 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Feb 2023 11:37:43 +0800 Subject: [PATCH 1/5] opt transport opt --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7f9a261cf2..c1ee87657d 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -284,9 +284,9 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); - connLimitNum = TMAX(connLimitNum, 100); - connLimitNum = TMIN(connLimitNum, 500); + int32_t connLimitNum = 1000 / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 100); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; From edb3d6f8f3a10c6cd0eefef14296bb6031bb2dcf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Feb 2023 19:19:44 +0800 Subject: [PATCH 2/5] change transport param --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c1ee87657d..659dae142e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -286,7 +286,7 @@ int32_t dmInitClient(SDnode *pDnode) { int32_t connLimitNum = 1000 / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); - connLimitNum = TMIN(connLimitNum, 100); + connLimitNum = TMIN(connLimitNum, 50); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; From 098949b528a59c5af225bc23b7ab47f438bbd142 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Feb 2023 18:10:42 +0800 Subject: [PATCH 3/5] add trans param --- include/common/tglobal.h | 9 +++++---- source/common/src/tglobal.c | 18 ++++++++++++++++-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 ++-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 5a0c0e0777..e92afc2222 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -49,6 +49,7 @@ extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; +extern int32_t tsNumOfRpcSessions; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; @@ -86,9 +87,9 @@ extern int32_t tsTelemInterval; extern char tsTelemServer[]; extern uint16_t tsTelemPort; extern bool tsEnableCrashReport; -extern char* tsTelemUri; -extern char* tsClientCrashReportUri; -extern char* tsSvrCrashReportUri; +extern char *tsTelemUri; +extern char *tsClientCrashReportUri; +extern char *tsSvrCrashReportUri; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing @@ -159,7 +160,7 @@ extern int32_t tsUptimeInterval; extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryInterval; -extern bool tsDisableStream; +extern bool tsDisableStream; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1e5291b7cb..e636dffdd9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,6 +41,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; +int32_t tsNumOfRpcSessions = 2000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -54,7 +55,6 @@ int32_t tsNumOfQnodeQueryThreads = 4; int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; - // sync raft int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; @@ -392,6 +392,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 1024, 0) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; @@ -504,6 +507,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "numOfRpcSessions"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfRpcSessions = 2000; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); + pItem->i32 = tsNumOfRpcSessions; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -721,6 +732,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -771,7 +783,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; GRANT_CFG_GET; @@ -980,6 +992,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32; } else if (strcasecmp("numOfRpcThreads", name) == 0) { tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; + } else if (strcasecmp("numOfRpcSessions", name) == 0) { + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; } else if (strcasecmp("numOfCommitThreads", name) == 0) { tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; } else if (strcasecmp("numOfMnodeReadThreads", name) == 0) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 659dae142e..3992dbc3e1 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -284,9 +284,9 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; - int32_t connLimitNum = 1000 / (tsNumOfRpcThreads * 3); + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 10); - connLimitNum = TMIN(connLimitNum, 50); + connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; From dd30b3577cbc67616d768e4899e8d3e66210c500 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Feb 2023 18:29:11 +0800 Subject: [PATCH 4/5] add trans param --- source/common/src/tglobal.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e636dffdd9..e3f08e912a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -393,7 +393,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1; tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); - if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 1024, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3992dbc3e1..3a1ca161a9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -291,7 +291,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; - rpcInit.batchSize = 16 * 1024; + rpcInit.batchSize = 8 * 1024; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { From 1971ec0adb0ff52cc3b2a07267f765f1dad2059d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Feb 2023 22:17:44 +0800 Subject: [PATCH 5/5] add trans param --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2c862ed45b..7e1aeafaad 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2285,7 +2285,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } - if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { + /*if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) { char key[TSDB_FQDN_LEN + 64] = {0}; char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet); uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet); @@ -2297,7 +2297,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_MAX_SESSIONS; } - } + }*/ TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());