From aff649deb783cf172c2cc083016d9be96825b38d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 27 Nov 2022 23:00:32 +0800 Subject: [PATCH 1/6] adjust parameer --- include/util/tdef.h | 2 +- source/common/src/tglobal.c | 3 +++ source/libs/transport/src/trans.c | 3 +++ source/libs/transport/src/transSvr.c | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 48dedd3e3e..ad44daed46 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -406,7 +406,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 10 +#define TSDB_MAX_RPC_THREADS 20 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f2d8b9aa7c..e27793bd56 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -306,6 +306,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + if (tsNumOfTaskQueueThreads >= 10) { + tsNumOfTaskQueueThreads = 10; + } if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1; return 0; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 94bc128de9..88888f2f84 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -58,6 +58,9 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->destroyFp = pInit->dfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + if (pRpc->numOfThreads <= 0) { + pRpc->numOfThreads = 1; + } uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b7fe404a4e..d00624c4d2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-worker"); + setThreadName("trans-srv-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 0611ecbe313e8d56ed1343da513e98c43744e0ee Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 28 Nov 2022 10:53:27 +0800 Subject: [PATCH 2/6] change paramter --- source/client/src/clientImpl.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/transport/src/transCli.c | 5 +++-- source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3140371c4..d8d1edc3d3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -131,7 +131,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); - p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); + p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(p); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5546d762f4..f78fd33e47 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -250,7 +250,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 4; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 55bfb57a82..2b54ce36f5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -653,9 +653,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static SCliConn* cliCreateConn(SCliThrd* pThrd) { SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); // read/write stream handle - conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_stream_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; + transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -1182,7 +1183,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - setThreadName("trans-cli-work"); + setThreadName("trans-cli-worker"); uv_run(pThrd->loop, UV_RUN_DEFAULT); tDebug("thread quit-thread:%08" PRId64, pThrd->pid); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 2759fb5aeb..5a5806417e 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,7 +202,7 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 0); + uv_tcp_nodelay(stream, 1); int ret = uv_tcp_keepalive(stream, 5, 60); return ret; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d00624c4d2..f93eb436a5 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-srv-work"); + setThreadName("trans-svr-worker"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 2bb59df74be05c7987cf095b5d051623160d0fa9 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 17:47:49 +0800 Subject: [PATCH 3/6] fix compile --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2b54ce36f5..fa7c375ee3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -653,10 +653,10 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static SCliConn* cliCreateConn(SCliThrd* pThrd) { SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); // read/write stream handle - conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_stream_t)); + conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - transSetConnOption((uv_tcp_t*)conn->stream); + //transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { From 25889595d8461b92a162b5310cae71bd53734d55 Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 18:01:15 +0800 Subject: [PATCH 4/6] fix compile --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fa7c375ee3..a0619def53 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1183,7 +1183,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - setThreadName("trans-cli-worker"); + setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); tDebug("thread quit-thread:%08" PRId64, pThrd->pid); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f93eb436a5..395e28d68f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-svr-worker"); + setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); From 6e7e02c3cdffcbd47c8da0f1398ba4cc78a68d6b Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Mon, 28 Nov 2022 19:13:11 +0800 Subject: [PATCH 5/6] fix compile --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a0619def53..71cc14493f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -656,7 +656,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - //transSetConnOption((uv_tcp_t*)conn->stream); + transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { From 3727cedeb3318becf2e725d66964f1b42504cd69 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 28 Nov 2022 20:43:33 +0800 Subject: [PATCH 6/6] change paramter --- source/libs/transport/src/transComm.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5a5806417e..ad8d57c97a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,9 +202,8 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 1); - int ret = uv_tcp_keepalive(stream, 5, 60); - return ret; + return uv_tcp_nodelay(stream, 1); + // int ret = uv_tcp_keepalive(stream, 5, 60); } SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {