From be17aa822b8ef88bfb32f252448c8a35850fa6d3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 14:21:15 +0800 Subject: [PATCH] fix: limit session num --- source/libs/transport/src/transCli.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 393360b4e9..182d2b59b8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -143,7 +143,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(SCliThrd* thread, char* key); +static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -577,7 +577,7 @@ void* destroyConnPool(void* pool) { return NULL; } -static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { +static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); STrans* pTranInst = pThrd->pTransInst; @@ -591,11 +591,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { list.list = nList; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + return NULL; } SMsgList* msglist = plist->list; if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { + *exceed = true; return NULL; } @@ -1131,11 +1132,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - SCliConn* conn = getConnFromPool(pThrd, key); + bool exceed = false; + SCliConn* conn = getConnFromPool(pThrd, key, &exceed); - if (conn == NULL) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, - pBatch->batchSize); + if (conn == NULL && exceed) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, + pBatch->batchSize, pTransInst->connLimitNum); cliDestroyBatch(pBatch); return; } @@ -1471,12 +1473,10 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; - STraceId* trace = &pMsg->msg.info.traceId; + STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { - tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } @@ -1501,6 +1501,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (conn == NULL && pMsg == NULL) { return; } + STraceId* trace = &pMsg->msg.info.traceId; if (conn != NULL) { transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);