Merge pull request #17305 from taosdata/feature/rpc_compress

support compress
This commit is contained in:
Shengliang Guan 2022-10-13 13:58:36 +08:00 committed by GitHub
commit bf5c14b64c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 111 additions and 70 deletions

View File

@ -140,7 +140,7 @@ extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval; extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval; extern int32_t tsUptimeInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) //#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);

View File

@ -82,6 +82,9 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
@ -115,10 +118,9 @@ typedef struct {
} SRpcCtx; } SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc); void *rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void rpcCloseImpl(void *); void rpcCloseImpl(void *);
void *rpcMallocCont(int32_t contLen); void *rpcMallocCont(int32_t contLen);

View File

@ -71,7 +71,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
" elapsed:%.2f ms, "
"current:%d, app current:%d", "current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
@ -84,7 +85,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64, "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd,
pRequest->metric.planEnd - pRequest->metric.semanticEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
@ -144,6 +145,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user; rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to server"); tscError("failed to init connection to server");

View File

@ -1991,6 +1991,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.sessions = 16; rpcInit.sessions = 16;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);

View File

@ -277,6 +277,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp; rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {

View File

@ -657,7 +657,8 @@ int32_t udfdOpenClientRpc() {
rpcInit.user = TSDB_DEFAULT_USER; rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global; rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp; rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) { if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client"); fnError("failed to init dnode rpc client");

View File

@ -200,15 +200,13 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) #define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transContFromHead(msg) (((char*)msg) + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U) #define transIsReq(type) (type & 1U)
#define transLabel(trans) ((STrans*)trans)->label #define transLabel(trans) ((STrans*)trans)->label
void transFreeMsg(void* msg);
//
typedef struct SConnBuffer { typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;
@ -415,6 +413,10 @@ void transThreadOnce();
void transInit(); void transInit();
void transCleanup(); void transCleanup();
void transFreeMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len);
int32_t transDecompressMsg(char** msg, int32_t len);
int32_t transOpenRefMgt(int size, void (*func)(void*)); int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseRefMgt(int32_t refMgt); void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(int32_t refMgt, void* p); int64_t transAddExHandle(int32_t refMgt, void* p);

View File

@ -16,9 +16,7 @@
#ifndef _TD_TRANSPORT_INT_H_ #ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_
#ifdef USE_UV
#include <uv.h> #include <uv.h>
#endif
#include "lz4.h" #include "lz4.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
@ -34,8 +32,6 @@
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
@ -51,19 +47,20 @@ typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);
int index;
int index;
void* parent; void* parent;
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization
int64_t refId; int64_t refId;
TdThreadMutex mutex; TdThreadMutex mutex;
} SRpcInfo; } SRpcInfo;
#endif // USE_LIBUV
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -45,6 +45,10 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN); tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
} }
pRpc->compressSize = pInit->compressSize;
pRpc->encryption = pInit->encryption;
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
@ -130,9 +134,6 @@ void* rpcReallocCont(void* ptr, int32_t contLen) {
return st + TRANS_MSG_OVERHEAD; return st + TRANS_MSG_OVERHEAD;
} }
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL); return transSendRequest(shandle, pEpSet, pMsg, NULL);
} }

View File

@ -319,13 +319,18 @@ void cliHandleResp(SCliConn* conn) {
} }
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (msgLen <= 0) {
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return; return;
} }
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
if (cliRecvReleaseReq(conn, pHead)) { if (cliRecvReleaseReq(conn, pHead)) {
return; return;
} }
@ -374,7 +379,7 @@ void cliHandleResp(SCliConn* conn) {
STraceId* trace = &transMsg.info.traceId; STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code)); TMSG_INFO(pHead->msgType), conn->dst, conn->src, msgLen, tstrerror(transMsg.code));
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
@ -553,7 +558,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 50) { if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn; arg->param1 = conn;
arg->param2 = thrd; arg->param2 = NULL;
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
@ -772,20 +777,17 @@ void cliSend(SCliConn* pConn) {
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen);
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
} }
STraceId* trace = &pMsg->info.traceId;
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
tDebug("no avaiable timer, create");
timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
tDebug("no available timer, create a timer %p", timer);
uv_timer_init(pThrd->loop, timer); uv_timer_init(pThrd->loop, timer);
} }
timer->data = pConn; timer->data = pConn;
@ -795,6 +797,13 @@ void cliSend(SCliConn* pConn) {
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
} }
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
@ -1275,17 +1284,13 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) {
} }
static FORCE_INLINE void doDelayTask(void* param) { static FORCE_INLINE void doDelayTask(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
SCliMsg* pMsg = arg->param1; cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2);
SCliThrd* pThrd = arg->param2;
taosMemoryFree(arg); taosMemoryFree(arg);
cliHandleReq(pMsg, pThrd);
} }
static void doCloseIdleConn(void* param) { static void doCloseIdleConn(void* param) {
STaskArg* arg = param; STaskArg* arg = param;
SCliConn* conn = arg->param1; SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL; conn->task = NULL;
cliDestroyConn(conn, true); cliDestroyConn(conn, true);

View File

@ -23,52 +23,63 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
static int32_t instMgt; static int32_t instMgt;
bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { int32_t transCompressMsg(char* msg, int32_t len) {
return false; int32_t ret = 0;
// SRpcHead* pHead = rpcHeadFromCont(pCont); int compHdr = sizeof(STransCompMsg);
bool succ = false; STransMsgHead* pHead = transHeadFromCont(msg);
int overhead = sizeof(STransCompMsg);
if (!NEEDTO_COMPRESSS_MSG(len)) {
return succ;
}
char* buf = taosMemoryMalloc(len + overhead + 8); // 8 extra bytes char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes
if (buf == NULL) { if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", len); tError("failed to allocate memory for rpc msg compression, contLen:%d", len);
*flen = len; ret = len;
return succ; return ret;
} }
int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead); int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead);
/* /*
* only the compressed size is less than the value of contLen - overhead, the compression is applied * only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/ */
if (clen > 0 && clen < len - overhead) { if (clen > 0 && clen < len - compHdr) {
STransCompMsg* pComp = (STransCompMsg*)msg; STransCompMsg* pComp = (STransCompMsg*)msg;
pComp->reserved = 0; pComp->reserved = 0;
pComp->contLen = htonl(len); pComp->contLen = htonl(len);
memcpy(msg + overhead, buf, clen); memcpy(msg + compHdr, buf, clen);
tDebug("compress rpc msg, before:%d, after:%d", len, clen); tDebug("compress rpc msg, before:%d, after:%d", len, clen);
*flen = clen + overhead; ret = clen + compHdr;
succ = true; pHead->comp = 1;
} else { } else {
*flen = len; ret = len;
succ = false; pHead->comp = 0;
} }
taosMemoryFree(buf); taosMemoryFree(buf);
return succ; return ret;
} }
bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { int32_t transDecompressMsg(char** msg, int32_t len) {
// impl later STransMsgHead* pHead = (STransMsgHead*)(*msg);
return false; if (pHead->comp == 0) return 0;
STransCompMsg* pComp = (STransCompMsg*)msg;
int overhead = sizeof(STransCompMsg); char* pCont = transContFromHead(pHead);
int clen = 0; STransCompMsg* pComp = (STransCompMsg*)pCont;
return false; int32_t oriLen = htonl(pComp->contLen);
char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead));
STransMsgHead* pNewHead = (STransMsgHead*)buf;
int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content,
len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen);
memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead));
pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead));
taosMemoryFree(pHead);
*msg = buf;
if (decompLen != oriLen) {
return -1;
}
return 0;
} }
void transFreeMsg(void* msg) { void transFreeMsg(void* msg) {

View File

@ -186,16 +186,22 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
static bool uvHandleReq(SSvrConn* pConn) { static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
STransMsgHead* msg = NULL; STransMsgHead* pHead = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
if (msgLen <= 0) { if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false; return false;
} }
STransMsgHead* pHead = (STransMsgHead*)msg; if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
return false;
}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
memcpy(pConn->user, pHead->user, strlen(pHead->user)); memcpy(pConn->user, pHead->user, strlen(pHead->user));
if (uvRecvReleaseReq(pConn, pHead)) { if (uvRecvReleaseReq(pConn, pHead)) {
@ -229,10 +235,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen); TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen);
} else { } else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code); TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code);
} }
// pHead->noResp = 1, // pHead->noResp = 1,
@ -399,17 +405,22 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->release = smsg->type == Release ? 1 : 0; pHead->release = smsg->type == Release ? 1 : 0;
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead));
char* msg = (char*)pHead; char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
pHead->msgLen = (int32_t)htonl((uint32_t)len);
}
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn, tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
pHead->msgLen = htonl(len);
wb->base = msg; wb->base = (char*)pHead;
wb->len = len; wb->len = len;
} }
@ -1160,6 +1171,11 @@ _return2:
return -1; return -1;
} }
int transSendResponse(const STransMsg* msg) { int transSendResponse(const STransMsg* msg) {
if (msg->info.noResp) {
rpcFreeCont(msg->pCont);
tTrace("no need send resp");
return 0;
}
SExHandle* exh = msg->info.handle; SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId; int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
@ -1198,6 +1214,8 @@ int transRegisterMsg(const STransMsg* msg) {
ASYNC_CHECK_HANDLE(exh, refId); ASYNC_CHECK_HANDLE(exh, refId);
STransMsg tmsg = *msg; STransMsg tmsg = *msg;
tmsg.info.noResp = 1;
tmsg.info.refId = refId; tmsg.info.refId = refId;
SWorkThrd* pThrd = exh->pThrd; SWorkThrd* pThrd = exh->pThrd;