From 4507776c8e5e38e56c72f7447f3f71640f26563a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Oct 2022 22:09:35 +0800 Subject: [PATCH 1/5] support compress --- include/common/tglobal.h | 2 +- include/libs/transport/trpc.h | 6 ++- source/libs/transport/inc/transComm.h | 8 +-- source/libs/transport/inc/transportInt.h | 11 ++-- source/libs/transport/src/trans.c | 7 +-- source/libs/transport/src/transCli.c | 31 ++++++----- source/libs/transport/src/transComm.c | 66 ++++++++++++++---------- source/libs/transport/src/transSvr.c | 23 ++++++--- 8 files changed, 92 insertions(+), 62 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bd5e74387e..6db2ffb7ea 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -139,7 +139,7 @@ extern int32_t tsTtlPushInterval; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; -#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) +//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 655c903c0b..f8a08288de 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -82,6 +82,9 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int32_t idleTime; // milliseconds, 0 means idle timer is disabled + const int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + const int8_t encryption; // encrypt or not + // the following is for client app ecurity only char *user; // user name @@ -115,10 +118,9 @@ typedef struct { } SRpcCtx; int32_t rpcInit(); +void rpcCleanup(); -void rpcCleanup(); void *rpcOpen(const SRpcInit *pRpc); - void rpcClose(void *); void rpcCloseImpl(void *); void *rpcMallocCont(int32_t contLen); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c6f3066be7..b83f84e3f2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -200,15 +200,13 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co #define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) #define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) -#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define transContFromHead(msg) (((char*)msg) + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transIsReq(type) (type & 1U) #define transLabel(trans) ((STrans*)trans)->label -void transFreeMsg(void* msg); -// typedef struct SConnBuffer { char* buf; int len; @@ -415,6 +413,10 @@ void transThreadOnce(); void transInit(); void transCleanup(); +void transFreeMsg(void* msg); +int32_t transCompressMsg(char* msg, int32_t len); +int32_t transDecompressMsg(char** msg, int32_t len); + int32_t transOpenRefMgt(int size, void (*func)(void*)); void transCloseRefMgt(int32_t refMgt); int64_t transAddExHandle(int32_t refMgt, void* p); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 6aeeffa192..b9167501e2 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,9 +16,7 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ -#ifdef USE_UV #include -#endif #include "lz4.h" #include "os.h" #include "taoserror.h" @@ -34,8 +32,6 @@ extern "C" { #endif -#ifdef USE_UV - void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); @@ -51,19 +47,20 @@ typedef struct { char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); - int index; + int index; void* parent; void* tcphandle; // returned handle from TCP initialization int64_t refId; TdThreadMutex mutex; } SRpcInfo; -#endif // USE_LIBUV - #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 9e0a8f2a10..e5a783292d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -45,6 +45,10 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN); } + + pRpc->compressSize = pInit->compressSize; + pRpc->encryption = pInit->encryption; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; @@ -130,9 +134,6 @@ void* rpcReallocCont(void* ptr, int32_t contLen) { return st + TRANS_MSG_OVERHEAD; } -int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } - int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { return transSendRequest(shandle, pEpSet, pMsg, NULL); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bd99a23267..819eb51f1f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -319,13 +319,18 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (msgLen <= 0) { tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } + + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - if (cliRecvReleaseReq(conn, pHead)) { return; } @@ -553,7 +558,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 50) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; - arg->param2 = thrd; + arg->param2 = NULL; STrans* pTransInst = thrd->pTransInst; conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); @@ -772,20 +777,19 @@ void cliSend(SCliConn* pConn) { memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); + if (pHead->persist == 1) { + CONN_SET_PERSIST_BY_APP(pConn); + } STraceId* trace = &pMsg->info.traceId; tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); - if (pHead->persist == 1) { - CONN_SET_PERSIST_BY_APP(pConn); - } - if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { - tDebug("no avaiable timer, create"); timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + tDebug("no available timer, create a timer %p", timer); uv_timer_init(pThrd->loop, timer); } timer->data = pConn; @@ -795,6 +799,11 @@ void cliSend(SCliConn* pConn) { uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); } + if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); @@ -1275,17 +1284,13 @@ FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst) { } static FORCE_INLINE void doDelayTask(void* param) { STaskArg* arg = param; - SCliMsg* pMsg = arg->param1; - SCliThrd* pThrd = arg->param2; + cliHandleReq((SCliMsg*)arg->param1, (SCliThrd*)arg->param2); taosMemoryFree(arg); - - cliHandleReq(pMsg, pThrd); } static void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; - SCliThrd* pThrd = arg->param2; tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); conn->task = NULL; cliDestroyConn(conn, true); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index e86314d4bd..eaa2e6c661 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -23,52 +23,64 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; static int32_t instMgt; -bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { - return false; - // SRpcHead* pHead = rpcHeadFromCont(pCont); - bool succ = false; - int overhead = sizeof(STransCompMsg); - if (!NEEDTO_COMPRESSS_MSG(len)) { - return succ; - } +int32_t transCompressMsg(char* msg, int32_t len) { + int32_t ret = 0; + int compHdr = sizeof(STransCompMsg); + STransMsgHead* pHead = transHeadFromCont(msg); - char* buf = taosMemoryMalloc(len + overhead + 8); // 8 extra bytes + char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes if (buf == NULL) { tError("failed to allocate memory for rpc msg compression, contLen:%d", len); - *flen = len; - return succ; + ret = len; + return ret; } - int32_t clen = LZ4_compress_default(msg, buf, len, len + overhead); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, overhead); + int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, compHdr); /* * only the compressed size is less than the value of contLen - overhead, the compression is applied * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message */ - if (clen > 0 && clen < len - overhead) { + if (clen > 0 && clen < len - compHdr) { STransCompMsg* pComp = (STransCompMsg*)msg; pComp->reserved = 0; pComp->contLen = htonl(len); - memcpy(msg + overhead, buf, clen); + memcpy(msg + compHdr, buf, clen); tDebug("compress rpc msg, before:%d, after:%d", len, clen); - *flen = clen + overhead; - succ = true; + ret = clen + compHdr; + pHead->comp = 1; } else { - *flen = len; - succ = false; + ret = len; + pHead->comp = 0; } taosMemoryFree(buf); - return succ; + return ret; } -bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { - // impl later - return false; - STransCompMsg* pComp = (STransCompMsg*)msg; +int32_t transDecompressMsg(char** msg, int32_t len) { + STransMsgHead* pHead = (STransMsgHead*)(*msg); + if (pHead->comp == 0) return 0; - int overhead = sizeof(STransCompMsg); - int clen = 0; - return false; + char* pCont = transContFromHead(pHead); + STransCompMsg* pComp = (STransCompMsg*)pCont; + int32_t oriLen = htonl(pComp->contLen); + + char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + STransMsgHead* pNewHead = (STransMsgHead*)buf; + + int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), pNewHead->content, + len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen); + memcpy((char*)pNewHead, (char*)pHead, sizeof(STransMsgHead)); + + pNewHead->msgLen = htonl(oriLen + sizeof(STransMsgHead)); + + taosMemoryFree(pHead); + + *msg = buf; + if (decompLen != oriLen) { + return -1; + } + return 0; } void transFreeMsg(void* msg) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c63f8f398b..99855d7557 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -186,16 +186,22 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; - STransMsgHead* msg = NULL; - int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* pHead = NULL; + + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead); if (msgLen <= 0) { tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); return false; } - STransMsgHead* pHead = (STransMsgHead*)msg; + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); + return false; + } + pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + memcpy(pConn->user, pHead->user, strlen(pHead->user)); if (uvRecvReleaseReq(pConn, pHead)) { @@ -399,17 +405,22 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); + pHead->msgLen = htonl(pMsg->contLen + sizeof(STransMsgHead)); char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - STrans* pTransInst = pConn->pTransInst; + STrans* pTransInst = pConn->pTransInst; + if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) { + len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)len); + } + STraceId* trace = &pMsg->info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn, TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); - pHead->msgLen = htonl(len); - wb->base = msg; + wb->base = (char*)pHead; wb->len = len; } From 37a0b9c759d8c676be100b639f96963e434f90aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Oct 2022 22:18:44 +0800 Subject: [PATCH 2/5] support compress --- include/libs/transport/trpc.h | 4 ++-- source/client/src/clientEnv.c | 6 ++++-- source/client/src/clientImpl.c | 9 +++++---- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/function/src/udfd.c | 3 ++- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index f8a08288de..56c5750475 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -82,8 +82,8 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int32_t idleTime; // milliseconds, 0 means idle timer is disabled - const int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size - const int8_t encryption; // encrypt or not + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size + int8_t encryption; // encrypt or not // the following is for client app ecurity only char *user; // user name diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 5792f498ef..fc489f08af 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -71,7 +71,8 @@ static void deregisterRequest(SRequestObj *pRequest) { int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int64_t duration = taosGetTimestampUs() - pRequest->metric.start; - tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " + tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 + " elapsed:%.2f ms, " "current:%d, app current:%d", pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); @@ -84,7 +85,7 @@ static void deregisterRequest(SRequestObj *pRequest) { atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 - "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%"PRIx64, + "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd, @@ -144,6 +145,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.compressSize = tsCompressMsgSize; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8ffc88ec28..ec5c39062e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -868,10 +868,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { return code; } -//todo refacto the error code mgmt +// todo refacto the error code mgmt void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; - STscObj* pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; pRequest->code = code; if (pResult) { @@ -899,8 +899,8 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { pRequest->requestId); if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { - tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, - pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId); + tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, + tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; schedulerFreeJob(&pRequest->body.queryJob, 0); qDestroyQuery(pRequest->pQuery); @@ -1970,6 +1970,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.sessions = 16; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; clientRpc = rpcOpen(&rpcInit); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e83f1f7cab..542baaec09 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -277,6 +277,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.parent = pDnode; rpcInit.rfp = rpcRfp; + rpcInit.compressSize = tsCompressMsgSize; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 14e358ea00..a319b5a25b 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -657,7 +657,8 @@ int32_t udfdOpenClientRpc() { rpcInit.user = TSDB_DEFAULT_USER; rpcInit.parent = &global; rpcInit.rfp = udfdRpcRfp; - + rpcInit.compressSize = tsCompressMsgSize; + global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); From 841207666a28467eb9f2a8ba5434dbd1180a579a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Oct 2022 10:20:01 +0800 Subject: [PATCH 3/5] support compress --- source/libs/transport/src/transCli.c | 8 ++++---- source/libs/transport/src/transComm.c | 1 - source/libs/transport/src/transSvr.c | 8 ++++---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 819eb51f1f..2e33b42609 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -379,7 +379,7 @@ void cliHandleResp(SCliConn* conn) { STraceId* trace = &transMsg.info.traceId; tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, - TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, tstrerror(transMsg.code)); + TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); @@ -782,8 +782,6 @@ void cliSend(SCliConn* pConn) { } STraceId* trace = &pMsg->info.traceId; - tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; @@ -799,10 +797,12 @@ void cliSend(SCliConn* pConn) { uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); } - if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); } + tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index eaa2e6c661..42079ff005 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -36,7 +36,6 @@ int32_t transCompressMsg(char* msg, int32_t len) { } int32_t clen = LZ4_compress_default(msg, buf, len, len + compHdr); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", len, clen, compHdr); /* * only the compressed size is less than the value of contLen - overhead, the compression is applied * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 99855d7557..42204c71f6 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -235,10 +235,10 @@ static bool uvHandleReq(SSvrConn* pConn) { transRefSrvHandle(pConn); tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen); + TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code); + TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen, pHead->noResp, transMsg.code); } // pHead->noResp = 1, @@ -411,14 +411,14 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { int32_t len = transMsgLenFromCont(pMsg->contLen); STrans* pTransInst = pConn->pTransInst; - if (pTransInst->compressSize != -1 && pTransInst->compressSize > pMsg->contLen) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { len = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); pHead->msgLen = (int32_t)htonl((uint32_t)len); } STraceId* trace = &pMsg->info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, len:%d", transLabel(pTransInst), pConn, - TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, pMsg->contLen); + TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len); wb->base = (char*)pHead; wb->len = len; From 5badb8f05689d384e992988e6e95a7b1db2fdc3d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Oct 2022 11:38:07 +0800 Subject: [PATCH 4/5] fix(tsc): fix invalid free --- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2e33b42609..83323725be 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -379,7 +379,7 @@ void cliHandleResp(SCliConn* conn) { STraceId* trace = &transMsg.info.traceId; tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn, - TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code)); + TMSG_INFO(pHead->msgType), conn->dst, conn->src, msgLen, tstrerror(transMsg.code)); if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 42204c71f6..683d488bbf 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -235,10 +235,10 @@ static bool uvHandleReq(SSvrConn* pConn) { transRefSrvHandle(pConn); tGDebug("%s conn %p %s received from %s, local info:%s, len:%d", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen); + TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen); } else { tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, resp:%d, code:%d", transLabel(pTransInst), pConn, - TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, pHead->msgLen, pHead->noResp, transMsg.code); + TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transMsg.code); } // pHead->noResp = 1, From 2bd62bc586f005ed402b267476be86ae7e4a81f1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Oct 2022 16:05:59 +0800 Subject: [PATCH 5/5] fix: [TD-19500] del duplicate msg --- source/libs/transport/src/transSvr.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 683d488bbf..57bf8a34b3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1171,6 +1171,11 @@ _return2: return -1; } int transSendResponse(const STransMsg* msg) { + if (msg->info.noResp) { + rpcFreeCont(msg->pCont); + tTrace("no need send resp"); + return 0; + } SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1209,6 +1214,8 @@ int transRegisterMsg(const STransMsg* msg) { ASYNC_CHECK_HANDLE(exh, refId); STransMsg tmsg = *msg; + tmsg.info.noResp = 1; + tmsg.info.refId = refId; SWorkThrd* pThrd = exh->pThrd;