diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..00944115c0 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..a1b09b9ee8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = NULL; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn); + } + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..5598aa39e3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) { return 0; } -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { static const int HEADSIZE = sizeof(STransMsgHead); SConnBuffer* p = connBuf; @@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); - if (transResetBuffer(connBuf) < 0) { + if (transResetBuffer(connBuf, resetBuf) < 0) { return -1; } } else { @@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return total; } -int transResetBuffer(SConnBuffer* connBuf) { +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { SConnBuffer* p = connBuf; if (p->total < p->len) { int left = p->len - p->total; @@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) { p->total = 0; p->len = 0; if (p->cap > BUFFER_CAP) { - p->cap = BUFFER_CAP; - p->buf = taosMemoryRealloc(p->buf, p->cap); + if (resetBuf) { + p->cap = BUFFER_CAP; + p->buf = taosMemoryRealloc(p->buf, p->cap); + } } } else { ASSERTS(0, "invalid read from sock buf"); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 21ad5be869..c0b2eba90a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) { STransMsgHead* pHead = NULL; - int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead); + int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); return false; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn); + } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); @@ -677,17 +681,17 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { } static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } static void destroyAllConn(SWorkThrd* pThrd) { - tTrace("thread %p destroy all conn ", pThrd); - while (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* h = QUEUE_HEAD(&pThrd->conn); - QUEUE_REMOVE(h); - QUEUE_INIT(h); + tTrace("thread %p destroy all conn ", pThrd); + while (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* h = QUEUE_HEAD(&pThrd->conn); + QUEUE_REMOVE(h); + QUEUE_INIT(h); - SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); - while (T_REF_VAL_GET(c) >= 2) { - transUnrefSrvHandle(c); + SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); } - transUnrefSrvHandle(c); + transUnrefSrvHandle(c); } } void uvWorkerAsyncCb(uv_async_t* handle) {