From b34a5bdf51dc8acab94042e85ddf4254aa0cb201 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Jan 2022 19:19:04 +0800 Subject: [PATCH] add more trace info and handle conn except --- source/libs/transport/src/transCli.c | 19 +++++++++++-------- source/libs/transport/src/transSrv.c | 7 ++++++- source/libs/transport/test/rclient.c | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e5b80d8419..67afd46b1c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -267,20 +267,20 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - tDebug("conn read complete"); + tDebug("conn %p read complete", conn); clientHandleResp(conn); } else { - tDebug("conn read half packet, continue to read"); + tDebug("conn %p read partial packet, continue to read", conn); } return; } assert(nread <= 0); if (nread == 0) { - tError("conn closed"); + tError("conn %p closed", conn); return; } if (nread < 0) { - tError("conn read error: %s", uv_err_name(nread)); + tError("conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } // tDebug("Read error %s\n", uv_err_name(nread)); @@ -307,9 +307,9 @@ static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tDebug("conn data already was written out"); + tDebug("conn %p data already was written out", pConn); } else { - tError("failed to write: %s", uv_err_name(status)); + tError("conn %p failed to write: %s", pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } @@ -334,7 +334,7 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen); + tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -342,9 +342,11 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliConn* pConn = req->data; if (status != 0) { // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("failed to connect server, errmsg: %s", uv_strerror(status)); + tError("conn %p failed to connect server: %s", pConn, uv_strerror(status)); clientHandleExcept(pConn); + return; } + tDebug("conn %p create", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -360,6 +362,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later + tDebug("conn %p get from conn pool", conn); conn->data = pMsg; conn->writeReq->data = conn; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 3b7b4d6fea..34e621f0f3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -27,7 +27,6 @@ typedef struct SConn { int ref; int persist; // persist connection or not SConnBuffer connBuf; // read buf, - int count; int inType; void* pTransInst; // rpc init void* ahandle; // @@ -272,6 +271,7 @@ static void uvHandleReq(SConn* pConn) { rpcMsg.ahandle = NULL; rpcMsg.handle = pConn; + pConn->ref++; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -432,6 +432,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(pending == UV_TCP); SConn* pConn = createConn(); + pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); @@ -520,6 +521,7 @@ void* workerThread(void* arg) { static SConn* createConn() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); + ++pConn->ref; return pConn; } @@ -527,6 +529,9 @@ static void destroyConn(SConn* conn, bool clear) { if (conn == NULL) { return; } + if (--conn->ref == 0) { + return; + } if (clear) { uv_close((uv_handle_t*)conn->pTcp, NULL); } diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index fd3496cc17..84814f39fc 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -63,7 +63,7 @@ static void *sendRequest(void *param) { if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); - tDebug("recv response"); + tDebug("recv response succefully"); // usleep(100000000); }