diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bc1c6386f6..c6f3066be7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -104,6 +104,12 @@ typedef void* queue[2]; #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8fdfcd5309..763483cbf6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + if (cliRecvReleaseReq(conn, pHead)) { + return; + } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; - if (cliRecvReleaseReq(conn, pHead)) { - return; - } - if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -598,7 +601,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - cliHandleResp(conn); + if (pBuf->invalid) { + cliHandleExcept(conn); + break; + } else { + cliHandleResp(conn); + } } return; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3ba8186e9d..a4d679b281 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -188,7 +188,6 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return (p->left == 0 || p->invalid) ? true : false; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c980b70abd..db05aefe7b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -198,15 +198,16 @@ static bool uvHandleReq(SSvrConn* pConn) { pHead->msgLen = htonl(pHead->msgLen); memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvRecvReleaseReq(pConn, pHead)) { + return true; + } + // TODO(dengyihao): time-consuming task throwed into BG Thread // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); // wreq->data = pConn; // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { - return true; - } STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); @@ -274,14 +275,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - if (uvHandleReq(conn) == false) { + if (pBuf->invalid) { + tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); destroyConn(conn, true); return; + } else { + if (false == uvHandleReq(conn)) break; } } return; } else { - tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); destroyConn(conn, true); return; } @@ -872,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) { } static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; + if (conn == NULL) { return; } @@ -887,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); destroySmsg(msg); } - - transReqQueueClear(&conn->wreqQueue); transQueueDestroy(&conn->srvMsgs); + transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp);