diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e44a4ebac8..e62ca7d69a 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bfae9a7111..d0fb5cbfdb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = NULL; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn); + } + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 6584814053..e894d73c59 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) { return 0; } -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { static const int HEADSIZE = sizeof(STransMsgHead); SConnBuffer* p = connBuf; @@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); - if (transResetBuffer(connBuf) < 0) { + if (transResetBuffer(connBuf, resetBuf) < 0) { return -1; } } else { @@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return total; } -int transResetBuffer(SConnBuffer* connBuf) { +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { SConnBuffer* p = connBuf; if (p->total < p->len) { int left = p->len - p->total; @@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) { p->total = 0; p->len = 0; if (p->cap > BUFFER_CAP) { - p->cap = BUFFER_CAP; - p->buf = taosMemoryRealloc(p->buf, p->cap); + if (resetBuf) { + p->cap = BUFFER_CAP; + p->buf = taosMemoryRealloc(p->buf, p->cap); + } } } else { ASSERTS(0, "invalid read from sock buf"); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 21ad5be869..04f6ac8c95 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) { STransMsgHead* pHead = NULL; - int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead); + int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); return false; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn); + } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); @@ -676,7 +680,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { taosMemoryFree(smsg); } static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } -static void destroyAllConn(SWorkThrd* pThrd) { + +static void destroyAllConn(SWorkThrd* pThrd) { tTrace("thread %p destroy all conn ", pThrd); while (!QUEUE_IS_EMPTY(&pThrd->conn)) { queue* h = QUEUE_HEAD(&pThrd->conn);