support compress
This commit is contained in:
parent
6e548a96f8
commit
4507776c8e
|
@ -139,7 +139,7 @@ extern int32_t tsTtlPushInterval;
|
||||||
extern int32_t tsGrantHBInterval;
|
extern int32_t tsGrantHBInterval;
|
||||||
extern int32_t tsUptimeInterval;
|
extern int32_t tsUptimeInterval;
|
||||||
|
|
||||||
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||||
|
|
||||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||||
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
|
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
|
||||||
|
|
|
@ -82,6 +82,9 @@ typedef struct SRpcInit {
|
||||||
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
|
||||||
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
|
int32_t idleTime; // milliseconds, 0 means idle timer is disabled
|
||||||
|
|
||||||
|
const int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
||||||
|
const int8_t encryption; // encrypt or not
|
||||||
|
|
||||||
// the following is for client app ecurity only
|
// the following is for client app ecurity only
|
||||||
char *user; // user name
|
char *user; // user name
|
||||||
|
|
||||||
|
@ -115,10 +118,9 @@ typedef struct {
|
||||||
} SRpcCtx;
|
} SRpcCtx;
|
||||||
|
|
||||||
int32_t rpcInit();
|
int32_t rpcInit();
|
||||||
|
void rpcCleanup();
|
||||||
|
|
||||||
void rpcCleanup();
|
|
||||||
void *rpcOpen(const SRpcInit *pRpc);
|
void *rpcOpen(const SRpcInit *pRpc);
|
||||||
|
|
||||||
void rpcClose(void *);
|
void rpcClose(void *);
|
||||||
void rpcCloseImpl(void *);
|
void rpcCloseImpl(void *);
|
||||||
void *rpcMallocCont(int32_t contLen);
|
void *rpcMallocCont(int32_t contLen);
|
||||||
|
|
|
@ -200,15 +200,13 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
|
||||||
|
|
||||||
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
|
||||||
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
|
||||||
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
|
#define transContFromHead(msg) (((char*)msg) + sizeof(STransMsgHead))
|
||||||
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
|
||||||
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
|
||||||
#define transIsReq(type) (type & 1U)
|
#define transIsReq(type) (type & 1U)
|
||||||
|
|
||||||
#define transLabel(trans) ((STrans*)trans)->label
|
#define transLabel(trans) ((STrans*)trans)->label
|
||||||
|
|
||||||
void transFreeMsg(void* msg);
|
|
||||||
//
|
|
||||||
typedef struct SConnBuffer {
|
typedef struct SConnBuffer {
|
||||||
char* buf;
|
char* buf;
|
||||||
int len;
|
int len;
|
||||||
|
@ -415,6 +413,10 @@ void transThreadOnce();
|
||||||
void transInit();
|
void transInit();
|
||||||
void transCleanup();
|
void transCleanup();
|
||||||
|
|
||||||
|
void transFreeMsg(void* msg);
|
||||||
|
int32_t transCompressMsg(char* msg, int32_t len);
|
||||||
|
int32_t transDecompressMsg(char** msg, int32_t len);
|
||||||
|
|
||||||
int32_t transOpenRefMgt(int size, void (*func)(void*));
|
int32_t transOpenRefMgt(int size, void (*func)(void*));
|
||||||
void transCloseRefMgt(int32_t refMgt);
|
void transCloseRefMgt(int32_t refMgt);
|
||||||
int64_t transAddExHandle(int32_t refMgt, void* p);
|
int64_t transAddExHandle(int32_t refMgt, void* p);
|
||||||
|
|
|
@ -16,9 +16,7 @@
|
||||||
#ifndef _TD_TRANSPORT_INT_H_
|
#ifndef _TD_TRANSPORT_INT_H_
|
||||||
#define _TD_TRANSPORT_INT_H_
|
#define _TD_TRANSPORT_INT_H_
|
||||||
|
|
||||||
#ifdef USE_UV
|
|
||||||
#include <uv.h>
|
#include <uv.h>
|
||||||
#endif
|
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
@ -34,8 +32,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef USE_UV
|
|
||||||
|
|
||||||
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
||||||
|
@ -51,19 +47,20 @@ typedef struct {
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
|
|
||||||
|
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
||||||
|
int8_t encryption; // encrypt or not
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
bool (*retry)(int32_t code, tmsg_t msgType);
|
bool (*retry)(int32_t code, tmsg_t msgType);
|
||||||
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
||||||
int index;
|
|
||||||
|
|
||||||
|
int index;
|
||||||
void* parent;
|
void* parent;
|
||||||
void* tcphandle; // returned handle from TCP initialization
|
void* tcphandle; // returned handle from TCP initialization
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
} SRpcInfo;
|
} SRpcInfo;
|
||||||
|
|
||||||
#endif // USE_LIBUV
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -45,6 +45,10 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
if (pInit->label) {
|
if (pInit->label) {
|
||||||
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
|
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRpc->compressSize = pInit->compressSize;
|
||||||
|
pRpc->encryption = pInit->encryption;
|
||||||
|
|
||||||
// register callback handle
|
// register callback handle
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->retry = pInit->rfp;
|
pRpc->retry = pInit->rfp;
|
||||||
|
@ -130,9 +134,6 @@ void* rpcReallocCont(void* ptr, int32_t contLen) {
|
||||||
return st + TRANS_MSG_OVERHEAD;
|
return st + TRANS_MSG_OVERHEAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
|
|
||||||
void rpcCancelRequest(int64_t rid) { return; }
|
|
||||||
|
|
||||||
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
return transSendRequest(shandle, pEpSet, pMsg, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,13 +319,18 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
|
|
||||||
|
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
||||||
|
if (msgLen <= 0) {
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
|
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
}
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
if (cliRecvReleaseReq(conn, pHead)) {
|
if (cliRecvReleaseReq(conn, pHead)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -553,7 +558,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
if (conn->list->size >= 50) {
|
if (conn->list->size >= 50) {
|
||||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||||
arg->param1 = conn;
|
arg->param1 = conn;
|
||||||
arg->param2 = thrd;
|
arg->param2 = NULL;
|
||||||
|
|
||||||
STrans* pTransInst = thrd->pTransInst;
|
STrans* pTransInst = thrd->pTransInst;
|
||||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||||
|
@ -772,20 +777,19 @@ void cliSend(SCliConn* pConn) {
|
||||||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||||
pHead->traceId = pMsg->info.traceId;
|
pHead->traceId = pMsg->info.traceId;
|
||||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
|
if (pHead->persist == 1) {
|
||||||
|
CONN_SET_PERSIST_BY_APP(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
STraceId* trace = &pMsg->info.traceId;
|
STraceId* trace = &pMsg->info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
|
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
|
||||||
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
tDebug("no avaiable timer, create");
|
|
||||||
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
|
||||||
|
tDebug("no available timer, create a timer %p", timer);
|
||||||
uv_timer_init(pThrd->loop, timer);
|
uv_timer_init(pThrd->loop, timer);
|
||||||
}
|
}
|
||||||
timer->data = pConn;
|
timer->data = pConn;
|
||||||
|
@ -795,6 +799,11 @@ void cliSend(SCliConn* pConn) {
|
||||||
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
|
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) {
|
||||||
|
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
||||||
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
|
}
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
|
||||||
|
|
||||||
|
@ -1275,17 +1284,13 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
|
||||||
}
|
}
|
||||||
static FORCE_INLINE void doDelayTask(void* param) {
|
static FORCE_INLINE void doDelayTask(void* param) {
|
||||||
STaskArg* arg = param;
|
STaskArg* arg = param;
|
||||||
SCliMsg* pMsg = arg->param1;
|
cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
|
||||||
SCliThrd* pThrd = arg->param2;
|
|
||||||
taosMemoryFree(arg);
|
taosMemoryFree(arg);
|
||||||
|
|
||||||
cliHandleReq(pMsg, pThrd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCloseIdleConn(void* param) {
|
static void doCloseIdleConn(void* param) {
|
||||||
STaskArg* arg = param;
|
STaskArg* arg = param;
|
||||||
SCliConn* conn = arg->param1;
|
SCliConn* conn = arg->param1;
|
||||||
SCliThrd* pThrd = arg->param2;
|
|
||||||
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
|
||||||
conn->task = NULL;
|
conn->task = NULL;
|
||||||
cliDestroyConn(conn, true);
|
cliDestroyConn(conn, true);
|
||||||
|
|
|
@ -23,52 +23,64 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t refMgt;
|
static int32_t refMgt;
|
||||||
static int32_t instMgt;
|
static int32_t instMgt;
|
||||||
|
|
||||||
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) {
|
int32_t transCompressMsg(char* msg, int32_t len) {
|
||||||
return false;
|
int32_t ret = 0;
|
||||||
// SRpcHead* pHead = rpcHeadFromCont(pCont);
|
int compHdr = sizeof(STransCompMsg);
|
||||||
bool succ = false;
|
STransMsgHead* pHead = transHeadFromCont(msg);
|
||||||
int overhead = sizeof(STransCompMsg);
|
|
||||||
if (!NEEDTO_COMPRESSS_MSG(len)) {
|
|
||||||
return succ;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* buf = taosMemoryMalloc(len + overhead + 8); // 8 extra bytes
|
char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
|
||||||
*flen = len;
|
ret = len;
|
||||||
return succ;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead);
|
int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
|
||||||
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
|
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, compHdr);
|
||||||
/*
|
/*
|
||||||
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
* only the compressed size is less than the value of contLen - overhead, the compression is applied
|
||||||
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
|
||||||
*/
|
*/
|
||||||
if (clen > 0 && clen < len - overhead) {
|
if (clen > 0 && clen < len - compHdr) {
|
||||||
STransCompMsg* pComp = (STransCompMsg*)msg;
|
STransCompMsg* pComp = (STransCompMsg*)msg;
|
||||||
pComp->reserved = 0;
|
pComp->reserved = 0;
|
||||||
pComp->contLen = htonl(len);
|
pComp->contLen = htonl(len);
|
||||||
memcpy(msg + overhead, buf, clen);
|
memcpy(msg + compHdr, buf, clen);
|
||||||
|
|
||||||
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
|
tDebug("compress rpc msg, before:%d, after:%d", len, clen);
|
||||||
*flen = clen + overhead;
|
ret = clen + compHdr;
|
||||||
succ = true;
|
pHead->comp = 1;
|
||||||
} else {
|
} else {
|
||||||
*flen = len;
|
ret = len;
|
||||||
succ = false;
|
pHead->comp = 0;
|
||||||
}
|
}
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
return succ;
|
return ret;
|
||||||
}
|
}
|
||||||
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) {
|
int32_t transDecompressMsg(char** msg, int32_t len) {
|
||||||
// impl later
|
STransMsgHead* pHead = (STransMsgHead*)(*msg);
|
||||||
return false;
|
if (pHead->comp == 0) return 0;
|
||||||
STransCompMsg* pComp = (STransCompMsg*)msg;
|
|
||||||
|
|
||||||
int overhead = sizeof(STransCompMsg);
|
char* pCont = transContFromHead(pHead);
|
||||||
int clen = 0;
|
STransCompMsg* pComp = (STransCompMsg*)pCont;
|
||||||
return false;
|
int32_t oriLen = htonl(pComp->contLen);
|
||||||
|
|
||||||
|
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
|
||||||
|
STransMsgHead* pNewHead = (STransMsgHead*)buf;
|
||||||
|
|
||||||
|
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
|
||||||
|
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
|
||||||
|
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
|
||||||
|
|
||||||
|
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
|
||||||
|
|
||||||
|
taosMemoryFree(pHead);
|
||||||
|
|
||||||
|
*msg = buf;
|
||||||
|
if (decompLen != oriLen) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transFreeMsg(void* msg) {
|
void transFreeMsg(void* msg) {
|
||||||
|
|
|
@ -186,16 +186,22 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
||||||
static bool uvHandleReq(SSvrConn* pConn) {
|
static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
STrans* pTransInst = pConn->pTransInst;
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
|
|
||||||
STransMsgHead* msg = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
|
||||||
|
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
|
tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
||||||
|
|
||||||
if (uvRecvReleaseReq(pConn, pHead)) {
|
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||||
|
@ -399,17 +405,22 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
|
|
||||||
pHead->release = smsg->type == Release ? 1 : 0;
|
pHead->release = smsg->type == Release ? 1 : 0;
|
||||||
pHead->code = htonl(pMsg->code);
|
pHead->code = htonl(pMsg->code);
|
||||||
|
pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
|
||||||
|
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
|
|
||||||
STrans* pTransInst = pConn->pTransInst;
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
|
if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) {
|
||||||
|
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
||||||
|
pHead->msgLen = (int32_t)htonl((uint32_t)len);
|
||||||
|
}
|
||||||
|
|
||||||
STraceId* trace = &pMsg->info.traceId;
|
STraceId* trace = &pMsg->info.traceId;
|
||||||
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
|
||||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
|
||||||
pHead->msgLen = htonl(len);
|
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = (char*)pHead;
|
||||||
wb->len = len;
|
wb->len = len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue