support compress
This commit is contained in:
parent
37a0b9c759
commit
841207666a
|
@ -379,7 +379,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
STraceId* trace = &transMsg.info.traceId;
|
STraceId* trace = &transMsg.info.traceId;
|
||||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
|
||||||
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code));
|
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
|
||||||
|
|
||||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
@ -782,8 +782,6 @@ void cliSend(SCliConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STraceId* trace = &pMsg->info.traceId;
|
STraceId* trace = &pMsg->info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
|
||||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
|
||||||
|
|
||||||
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
|
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
|
||||||
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
||||||
|
@ -799,10 +797,12 @@ void cliSend(SCliConn* pConn) {
|
||||||
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
|
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) {
|
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
|
||||||
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
}
|
}
|
||||||
|
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
|
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
||||||
|
|
|
@ -36,7 +36,6 @@ int32_t transCompressMsg(char* msg, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
|
int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
|
||||||
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, compHdr);
|
|
||||||
/*
|
/*
|
||||||
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||||
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||||
|
|
|
@ -235,10 +235,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
|
|
||||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen);
|
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen);
|
||||||
} else {
|
} else {
|
||||||
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
|
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
|
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen, pHead->noResp, transMsg.code);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pHead->noResp = 1,
|
// pHead->noResp = 1,
|
||||||
|
@ -411,14 +411,14 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
|
|
||||||
STrans* pTransInst = pConn->pTransInst;
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) {
|
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
|
||||||
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)len);
|
pHead->msgLen = (int32_t)htonl((uint32_t)len);
|
||||||
}
|
}
|
||||||
|
|
||||||
STraceId* trace = &pMsg->info.traceId;
|
STraceId* trace = &pMsg->info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
|
||||||
|
|
||||||
wb->base = (char*)pHead;
|
wb->base = (char*)pHead;
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
|
|
Loading…
Reference in New Issue