diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4a7cf8da42..70d4d43f09 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1064,20 +1064,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { uv_read_stop(conn->stream); conn->seq = 0; - int32_t code = allocConnRef(conn, true); - if (code != 0) { - cliDestroyConn(conn, true); - return; - } SCliThrd* thrd = conn->hostThrd; cliResetConnTimer(conn); if (T_REF_VAL_GET(conn) > 1) { transUnrefCliHandle(conn); } - cliDestroyConnMsgs(conn, false); - conn->seq = 0; if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); @@ -1248,7 +1241,6 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); - // TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); transQueuePush(&pConn->reqs, pReq); @@ -1332,7 +1324,44 @@ _failed: taosMemoryFree(conn); return code; } -static void cliDestroyConn(SCliConn* conn, bool clear) {} +static void cliDestroyConn(SCliConn* conn, bool clear) { + SCliThrd* pThrd = conn->hostThrd; + tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + conn->broken = true; + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + + conn->broken = true; + if (conn->list == NULL) { + conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); + } + + if (conn->list) { + SConnList* list = conn->list; + list->list->numOfConn--; + if (conn->status == ConnInPool) { + list->size--; + } + } + conn->list = NULL; + + (void)transReleaseExHandle(transGetRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + conn->refId = -1; + + if (conn->task != NULL) { + transDQCancel(pThrd->timeoutQueue, conn->task); + conn->task = NULL; + } + // cliResetTimer(pThrd, conn); + + if (clear) { + if (!uv_is_closing((uv_handle_t*)conn->stream)) { + (void)uv_read_stop(conn->stream); + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } + } +} static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; @@ -1350,6 +1379,8 @@ static void cliDestroy(uv_handle_t* handle) { cliDestroyConnMsgs(conn, true); + delConnFromHeapCache(pThrd->connHeapCache, conn); + tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); (void)transDestroyBuffer(&conn->readBuf); @@ -1435,7 +1466,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) { int32_t totalLen = 0; if (size == 0) { - tError("%s conn %p not msg to send", pInst->label, pConn); + tDebug("%s conn %p not msg to send", pInst->label, pConn); return; } uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); @@ -3590,6 +3621,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { code = transHeapInsert(p, pConn); tDebug("add conn %p to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap, pConn->reqRefCnt); + return code; } static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) { @@ -3656,7 +3688,7 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p) { // impl later if (p->inHeap == 0) { tDebug("failed to del conn %p since not in heap", p); - return TSDB_CODE_INVALID_PARA; + return 0; } p->inHeap = 0; p->reqRefCnt--;