diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 95a82cec68..1320e6e3ba 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -31,6 +31,7 @@ typedef struct SCliConn { char secured; uint64_t expireTime; int8_t notifyCount; // timers already notify to client + int32_t ref; } SCliConn; typedef struct SCliMsg { @@ -126,6 +127,7 @@ static void clientHandleResp(SCliConn* conn) { // buf alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); + uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); // start thread's timer of conn pool if not active @@ -137,6 +139,7 @@ static void clientHandleResp(SCliConn* conn) { static void clientHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { clientConnDestroy(pConn, true); + return; } tDebug("conn %p destroy", pConn); @@ -258,7 +261,7 @@ static bool clientReadComplete(SConnBuffer* data) { return false; } } -static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); @@ -270,6 +273,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { + uv_read_stop((uv_stream_t*)conn->stream); tDebug("conn %p read complete", conn); clientHandleResp(conn); } else { @@ -291,16 +295,19 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf static void clientConnDestroy(SCliConn* conn, bool clear) { // - tDebug("conn %p remove from conn pool", conn); - QUEUE_REMOVE(&conn->conn); - tDebug("conn %p remove from conn pool successfully", conn); - if (clear) { - uv_close((uv_handle_t*)conn->stream, clientDestroy); + conn->ref--; + if (conn->ref == 0) { + tDebug("conn %p remove from conn pool", conn); + QUEUE_REMOVE(&conn->conn); + tDebug("conn %p remove from conn pool successfully", conn); + if (clear) { + uv_close((uv_handle_t*)conn->stream, clientDestroy); + } } } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; - transDestroyBuffer(&conn->readBuf); + // transDestroyBuffer(&conn->readBuf); free(conn->stream); free(conn->writeReq); @@ -325,7 +332,7 @@ static void clientWriteCb(uv_write_t* req, int status) { // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); // pConn->stream->data = pConn; //} - uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); + uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); // impl later } @@ -371,11 +378,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("conn %p get from conn pool", conn); conn->data = pMsg; conn->writeReq->data = conn; - transDestroyBuffer(&conn->readBuf); + // transDestroyBuffer(&conn->readBuf); clientWrite(conn); } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); - + conn->ref++; // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));