From b8f0c5244ea85b8ca2bd54422414287bf61f9adb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Jan 2022 10:18:01 +0800 Subject: [PATCH 1/2] handle mem leak --- source/libs/transport/src/transCli.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4d384814ec..f265acf8c1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -105,24 +105,23 @@ static void* clientThread(void* arg); static void clientHandleResp(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; SRpcInfo* pRpc = pCtx->pTransInst; - SRpcMsg rpcMsg; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + SRpcMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = transContFromHead(pHead); rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; - - // rpcMsg.pCont = conn->readBuf.buf; - // rpcMsg.contLen = conn->readBuf.len; rpcMsg.ahandle = pCtx->ahandle; + (pRpc->cfp)(NULL, &rpcMsg, NULL); conn->notifyCount += 1; SCliThrdObj* pThrd = conn->hostThrd; + tfree(conn->data); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); // start thread's timer of conn pool if not active @@ -145,7 +144,7 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = -1; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pRpc->cfp)(NULL, &rpcMsg, NULL); - + tfree(pConn->data); pConn->notifyCount += 1; destroyTransConnCtx(pCtx); clientConnDestroy(pConn, true); From 4cfe9a48e9043c5338597ab421cccdb3ed36ac83 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Jan 2022 10:21:38 +0800 Subject: [PATCH 2/2] set rpc client max recv_thread num --- source/libs/transport/src/rpcMain.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index d870ae98ab..54c33e7443 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -240,6 +240,9 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; + if (pRpc->numOfThreads >= 10) { + pRpc->numOfThreads = 10; + } } else { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } @@ -752,8 +755,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, sid, - hashstr, pConn->spi); + tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d", pRpc->label, pConn, pConn->linkUid, + sid, hashstr, pConn->spi); } return pConn;