From eb6053ffc50748b6944f281c5a7992680130e883 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 17 Sep 2024 09:44:11 +0800 Subject: [PATCH] Merge branch '3.0' into enh/opt-transport --- source/libs/transport/src/transCli.c | 8 ++++++-- source/libs/transport/src/transSvr.c | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5513ea692b..54b73d14f7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -889,6 +889,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { + conn->broken = true; transUnrefCliHandle(conn); return; break; @@ -1191,7 +1192,11 @@ bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msg int32_t cliBatchSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - int32_t size = transQueueSize(&pConn->reqsToSend); + + if (pConn->broken) { + return 0; + } + int32_t size = transQueueSize(&pConn->reqsToSend); int32_t totalLen = 0; if (size == 0) { @@ -1263,7 +1268,6 @@ int32_t cliBatchSend(SCliConn* pConn) { TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } - transRefCliHandle(pConn); uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, size, totalLen); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 46bc1bfd2d..41038196d1 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -614,6 +614,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (true == pBuf->invalid || false == uvHandleReq(conn)) { tError("%s conn %p read invalid packet, received from %s, local info:%s", transLabel(pInst), conn, conn->dst, conn->src); + conn->broken = true; transUnrefSrvHandle(conn); return; }