This commit is contained in:
Yihao Deng 2024-05-30 03:00:56 +00:00
parent f74fad7e60
commit 5a91039f04
3 changed files with 18 additions and 7 deletions

View File

@ -62,6 +62,7 @@ typedef struct SRpcHandleInfo {
SRpcConnInfo conn;
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
} SRpcHandleInfo;
typedef struct SRpcMsg {

View File

@ -76,6 +76,9 @@ typedef struct SCliConn {
SDelayTask* task;
uint32_t clientIp;
uint32_t serverIp;
char* dstAddr;
char src[32];
char dst[32];
@ -1089,7 +1092,7 @@ void cliSendBatch(SCliConn* pConn) {
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
if (pHead->comp == 0) {
if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
@ -1167,7 +1170,7 @@ void cliSend(SCliConn* pConn) {
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
if (pHead->comp == 0) {
if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
@ -1398,6 +1401,12 @@ void cliConnCb(uv_connect_t* req, int status) {
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in saddr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr;
pConn->serverIp = saddr.sin_addr.s_addr;
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->pBatch != NULL) {
cliSendBatch(pConn);

View File

@ -623,7 +623,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst;
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
if (pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp && pTransInst->compressSize != -1 &&
pTransInst->compressSize < pMsg->contLen) {
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)len);
}
@ -688,11 +689,11 @@ static void destroyAllConn(SWorkThrd* pThrd) {
QUEUE_REMOVE(h);
QUEUE_INIT(h);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
}
transUnrefSrvHandle(c);
transUnrefSrvHandle(c);
}
}
void uvWorkerAsyncCb(uv_async_t* handle) {