diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c3b5beb3e3..b7a459f957 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -62,6 +62,7 @@ typedef struct SRpcHandleInfo { SRpcConnInfo conn; int8_t forbiddenIp; int8_t notFreeAhandle; + int8_t compressed; } SRpcHandleInfo; typedef struct SRpcMsg { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d0fb5cbfdb..8969d98228 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -76,6 +76,9 @@ typedef struct SCliConn { SDelayTask* task; + uint32_t clientIp; + uint32_t serverIp; + char* dstAddr; char src[32]; char dst[32]; @@ -1089,7 +1092,7 @@ void cliSendBatch(SCliConn* pConn) { } pHead->timestamp = taosHton64(taosGetTimestampUs()); - if (pHead->comp == 0) { + if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -1167,7 +1170,7 @@ void cliSend(SCliConn* pConn) { uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); } - if (pHead->comp == 0) { + if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -1398,6 +1401,12 @@ void cliConnCb(uv_connect_t* req, int status) { uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); transSockInfo2Str(&sockname, pConn->src); + struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; + struct sockaddr_in saddr = *(struct sockaddr_in*)&peername; + + pConn->clientIp = addr.sin_addr.s_addr; + pConn->serverIp = saddr.sin_addr.s_addr; + tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); if (pConn->pBatch != NULL) { cliSendBatch(pConn); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 89c5c71eb0..049554a7c9 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -623,7 +623,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { int32_t len = transMsgLenFromCont(pMsg->contLen); STrans* pTransInst = pConn->pTransInst; - if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pTransInst->compressSize != -1 && + pTransInst->compressSize < pMsg->contLen) { len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)len); } @@ -688,11 +689,11 @@ static void destroyAllConn(SWorkThrd* pThrd) { QUEUE_REMOVE(h); QUEUE_INIT(h); - SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); - while (T_REF_VAL_GET(c) >= 2) { - transUnrefSrvHandle(c); + SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); } - transUnrefSrvHandle(c); + transUnrefSrvHandle(c); } } void uvWorkerAsyncCb(uv_async_t* handle) {