From a5420dfaad260a06907080b3540b0d1b56546c99 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 5 Aug 2022 18:36:19 +0800 Subject: [PATCH] fix rpc perf --- source/libs/transport/src/transCli.c | 24 ++++++++++++++++-------- source/libs/transport/src/transSvr.c | 3 +-- source/libs/transport/test/svrBench.c | 17 ++++++++--------- 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 140d2ef792..11ba2a50d3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -16,7 +16,8 @@ #include "transComm.h" typedef struct SConnList { - queue conn; + queue conn; + int32_t size; } SConnList; typedef struct SCliConn { @@ -518,15 +519,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { if (QUEUE_IS_EMPTY(&plist->conn)) { return NULL; } + + plist->size -= 1; queue* h = QUEUE_HEAD(&plist->conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); - transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); - conn->task = NULL; - + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } return conn; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -555,13 +559,17 @@ static void addConnToPool(void* pool, SCliConn* conn) { assert(conn->list != NULL); QUEUE_INIT(&conn->q); QUEUE_PUSH(&conn->list->conn, &conn->q); + conn->list->size += 1; + conn->task = NULL; assert(!QUEUE_IS_EMPTY(&conn->list->conn)); - STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); - arg->param1 = conn; - arg->param2 = thrd; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + if (conn->list->size >= 10) { + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + } } static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 8b27d95e52..14b8b35478 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -75,7 +75,6 @@ typedef struct SWorkThrd { SAsyncPool* asyncPool; uv_prepare_t* prepare; queue msg; - TdThreadMutex msgMtx; queue conn; void* pTransInst; @@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("unexcept occurred, continue"); continue; } + // release handle to rpc init if (msg->type == Quit) { (*transAsyncHandle[msg->type])(msg, pThrd); @@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->msg); - taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 224f527385..6eb80c8504 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -75,15 +75,14 @@ void processShellMsg() { void *handle = pRpcMsg->info.handle; taosFreeQitem(pRpcMsg); - - { - SRpcMsg nRpcMsg = {0}; - nRpcMsg.pCont = rpcMallocCont(msgSize); - nRpcMsg.contLen = msgSize; - nRpcMsg.info.handle = handle; - nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; - rpcSendResponse(&nRpcMsg); - } + //{ + // SRpcMsg nRpcMsg = {0}; + // nRpcMsg.pCont = rpcMallocCont(msgSize); + // nRpcMsg.contLen = msgSize; + // nRpcMsg.info.handle = handle; + // nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; + // rpcSendResponse(&nRpcMsg); + //} } taosUpdateItemSize(qinfo.queue, numOfMsgs);