From 9eb99615fe58b552ba0ad0222d9e8249b05eb99c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 17:58:46 +0800 Subject: [PATCH 1/6] fix invalid packet --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transComm.c | 13 +++++++++---- source/libs/transport/src/transSvr.c | 26 ++++++++++++++++++-------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6b52c74271..1efb0bb316 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -98,6 +98,7 @@ typedef void* queue[2]; #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4272ec0b1c..af49e5845b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -112,15 +112,20 @@ int transClearBuffer(SConnBuffer* buf) { } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3512b27bf8..a94eb01beb 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -184,10 +184,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); + transUnrefSrvHandle(pConn); + return; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -220,7 +225,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -268,11 +272,17 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + uvHandleReq(conn); + } + return; + } else { + tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; From e016c8f729c7d3677c66616257e2a15c856118d6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 20:04:04 +0800 Subject: [PATCH 2/6] fix invalid packet --- source/libs/transport/inc/transComm.h | 6 ++++++ source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 8 ++++++-- source/libs/transport/src/transSvr.c | 16 ++++++++++------ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 1efb0bb316..bc1c6386f6 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -100,6 +100,10 @@ typedef void* queue[2]; #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; @@ -152,6 +156,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -204,6 +209,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index add007e14d..8fdfcd5309 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index af49e5845b..3ba8186e9d 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,6 +109,7 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } @@ -119,7 +121,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return -1; } int total = connBuf->total; - if (total >= HEADSIZE) { + if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); transResetBuffer(connBuf); @@ -178,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -185,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a94eb01beb..be6eafba97 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -183,15 +183,14 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { +static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; STransMsgHead* msg = NULL; int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); if (msgLen <= 0) { - tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); - transUnrefSrvHandle(pConn); - return; + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; } STransMsgHead* pHead = (STransMsgHead*)msg; @@ -207,7 +206,7 @@ static void uvHandleReq(SSvrConn* pConn) { // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); if (uvRecvReleaseReq(pConn, pHead)) { - return; + return true; } STransMsg transMsg; @@ -262,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -275,7 +275,10 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (uvHandleReq(conn) == false) { + destroyConn(conn, true); + break; + } } return; } else { @@ -374,6 +377,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); From 4dc5df8453da96264097d857e88211212aa68197 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 10:34:55 +0800 Subject: [PATCH 3/6] fix invalid packet --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index be6eafba97..10063df329 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -277,7 +277,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (uvHandleReq(conn) == false) { destroyConn(conn, true); - break; + return; } } return; From f3ec1593bcd85eadff34d141e4825e58314c3b86 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:55:56 +0800 Subject: [PATCH 4/6] fix invalid packet --- source/libs/transport/src/transSvr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 10063df329..753ea00a45 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -265,10 +265,10 @@ static bool uvHandleReq(SSvrConn* pConn) { } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SSvrConn* conn = cli->data; + SSvrConn* conn = cli->data; + STrans* pTransInst = conn->pTransInst; + SConnBuffer* pBuf = &conn->readBuf; - STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); From 15110480bc3839889c851b45353f678b2b0ad3bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 14:22:07 +0800 Subject: [PATCH 5/6] fix invalid packet --- source/libs/transport/src/transSvr.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 753ea00a45..c980b70abd 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -204,7 +204,6 @@ static bool uvHandleReq(SSvrConn* pConn) { // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { return true; } From 13a93d68ebdddc3d27ce39fa1962586f044b70aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 16:09:46 +0800 Subject: [PATCH 6/6] fix invalid packet --- source/libs/transport/src/thttp.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..dc0f9128b0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -221,7 +221,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - code = 0; SEND_OVER: