diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4ad469f2eb..bf03ea26ba 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -175,6 +175,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_SHOULD_RELEASE(conn, head) \ do { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + int connStatus = conn->status; \ uint64_t ahandle = head->ahandle; \ CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ conn->status = ConnRelease; \ @@ -186,7 +187,9 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ destroyCmsg(pMsg); \ cliReleaseUnfinishedMsg(conn); \ - addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ + if (connStatus != ConnInPool) { \ + addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ + } \ return; \ } \ } while (0) @@ -450,7 +453,7 @@ void* destroyConnPool(void* pool) { while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); - QUEUE_REMOVE(h); + // QUEUE_REMOVE(h); SCliConn* c = QUEUE_DATA(h, SCliConn, conn); cliDestroyConn(c, true); } @@ -481,6 +484,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { conn->status = ConnNormal; QUEUE_REMOVE(&conn->conn); QUEUE_INIT(&conn->conn); + assert(h == &conn->conn); return conn; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -562,9 +566,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); } @@ -1008,7 +1010,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse, pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT); } - addConnToPool(pThrd->pool, pConn); + if (pConn->status != ConnInPool) { + addConnToPool(pThrd->pool, pConn); + } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg;