fix invalid packet

This commit is contained in:
yihaoDeng 2022-08-17 17:58:46 +08:00
parent b694c2e490
commit 9eb99615fe
3 changed files with 28 additions and 12 deletions

View File

@ -98,6 +98,7 @@ typedef void* queue[2];
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;

View File

@ -112,15 +112,20 @@ int transClearBuffer(SConnBuffer* buf) {
} }
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->left != 0) { if (p->left != 0) {
return -1; return -1;
} }
int total = connBuf->total; int total = connBuf->total;
if (total >= HEADSIZE) {
*buf = taosMemoryCalloc(1, total); *buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total); memcpy(*buf, p->buf, total);
transResetBuffer(connBuf); transResetBuffer(connBuf);
} else {
total = -1;
}
return total; return total;
} }

View File

@ -184,10 +184,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
} }
static void uvHandleReq(SSvrConn* pConn) { static void uvHandleReq(SSvrConn* pConn) {
STransMsgHead* msg = NULL; STrans* pTransInst = pConn->pTransInst;
int msgLen = 0;
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); STransMsgHead* msg = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
if (msgLen <= 0) {
tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn);
transUnrefSrvHandle(pConn);
return;
}
STransMsgHead* pHead = (STransMsgHead*)msg; STransMsgHead* pHead = (STransMsgHead*)msg;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
@ -220,7 +225,6 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug("conn %p acquired by server app", pConn); tDebug("conn %p acquired by server app", pConn);
} }
} }
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
@ -268,11 +272,17 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
if (pBuf->len <= TRANS_PACKET_LIMIT) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
uvHandleReq(conn); uvHandleReq(conn);
} }
return; return;
} else {
tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn);
destroyConn(conn, true);
return;
}
} }
if (nread == 0) { if (nread == 0) {
return; return;