Merge pull request #16191 from taosdata/rpc/fixInvalidPacket
fix invalid packet
This commit is contained in:
commit
46d34959f6
|
@ -98,6 +98,11 @@ typedef void* queue[2];
|
||||||
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
|
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
|
||||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
|
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
|
||||||
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
|
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
|
||||||
|
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
||||||
|
|
||||||
|
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||||
|
|
||||||
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
|
@ -151,6 +156,7 @@ typedef struct {
|
||||||
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN];
|
char user[TSDB_UNI_LEN];
|
||||||
|
uint32_t magicNum;
|
||||||
STraceId traceId;
|
STraceId traceId;
|
||||||
uint64_t ahandle; // ahandle assigned by client
|
uint64_t ahandle; // ahandle assigned by client
|
||||||
uint32_t code; // del later
|
uint32_t code; // del later
|
||||||
|
@ -203,6 +209,7 @@ typedef struct SConnBuffer {
|
||||||
int cap;
|
int cap;
|
||||||
int left;
|
int left;
|
||||||
int total;
|
int total;
|
||||||
|
int invalid;
|
||||||
} SConnBuffer;
|
} SConnBuffer;
|
||||||
|
|
||||||
typedef void (*AsyncCB)(uv_async_t* handle);
|
typedef void (*AsyncCB)(uv_async_t* handle);
|
||||||
|
|
|
@ -221,7 +221,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
||||||
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
|
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
|
||||||
goto SEND_OVER;
|
goto SEND_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
SEND_OVER:
|
SEND_OVER:
|
||||||
|
|
|
@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) {
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||||
pHead->traceId = pMsg->info.traceId;
|
pHead->traceId = pMsg->info.traceId;
|
||||||
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
|
|
||||||
|
|
|
@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) {
|
||||||
buf->left = -1;
|
buf->left = -1;
|
||||||
buf->len = 0;
|
buf->len = 0;
|
||||||
buf->total = 0;
|
buf->total = 0;
|
||||||
|
buf->invalid = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int transDestroyBuffer(SConnBuffer* p) {
|
int transDestroyBuffer(SConnBuffer* p) {
|
||||||
|
@ -108,19 +109,25 @@ int transClearBuffer(SConnBuffer* buf) {
|
||||||
p->left = -1;
|
p->left = -1;
|
||||||
p->len = 0;
|
p->len = 0;
|
||||||
p->total = 0;
|
p->total = 0;
|
||||||
|
p->invalid = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
|
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||||
|
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->left != 0) {
|
if (p->left != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int total = connBuf->total;
|
int total = connBuf->total;
|
||||||
|
if (total >= HEADSIZE && !p->invalid) {
|
||||||
*buf = taosMemoryCalloc(1, total);
|
*buf = taosMemoryCalloc(1, total);
|
||||||
memcpy(*buf, p->buf, total);
|
memcpy(*buf, p->buf, total);
|
||||||
|
|
||||||
transResetBuffer(connBuf);
|
transResetBuffer(connBuf);
|
||||||
|
} else {
|
||||||
|
total = -1;
|
||||||
|
}
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
||||||
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
||||||
p->total = msgLen;
|
p->total = msgLen;
|
||||||
|
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
|
||||||
}
|
}
|
||||||
if (p->total >= p->len) {
|
if (p->total >= p->len) {
|
||||||
p->left = p->total - p->len;
|
p->left = p->total - p->len;
|
||||||
|
@ -180,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
p->left = 0;
|
p->left = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return p->left == 0 ? true : false;
|
|
||||||
|
return (p->left == 0 || p->invalid) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream) {
|
int transSetConnOption(uv_tcp_t* stream) {
|
||||||
|
|
|
@ -183,11 +183,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
||||||
tDebug("%p timeout since no activity", conn);
|
tDebug("%p timeout since no activity", conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvHandleReq(SSvrConn* pConn) {
|
static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
STransMsgHead* msg = NULL;
|
STrans* pTransInst = pConn->pTransInst;
|
||||||
int msgLen = 0;
|
|
||||||
|
|
||||||
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
STransMsgHead* msg = NULL;
|
||||||
|
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
||||||
|
if (msgLen <= 0) {
|
||||||
|
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
|
@ -200,9 +204,8 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
||||||
// transRefSrvHandle(pConn);
|
// transRefSrvHandle(pConn);
|
||||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||||
|
|
||||||
if (uvRecvReleaseReq(pConn, pHead)) {
|
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsg transMsg;
|
STransMsg transMsg;
|
||||||
|
@ -220,7 +223,6 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
tDebug("conn %p acquired by server app", pConn);
|
tDebug("conn %p acquired by server app", pConn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
STrans* pTransInst = pConn->pTransInst;
|
|
||||||
STraceId* trace = &pHead->traceId;
|
STraceId* trace = &pHead->traceId;
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
|
@ -258,21 +260,31 @@ static void uvHandleReq(SSvrConn* pConn) {
|
||||||
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
||||||
|
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
// opt
|
|
||||||
SSvrConn* conn = cli->data;
|
SSvrConn* conn = cli->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
|
||||||
STrans* pTransInst = conn->pTransInst;
|
STrans* pTransInst = conn->pTransInst;
|
||||||
|
|
||||||
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
|
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
|
||||||
|
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||||
uvHandleReq(conn);
|
if (uvHandleReq(conn) == false) {
|
||||||
|
destroyConn(conn, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn);
|
||||||
|
destroyConn(conn, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -364,6 +376,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
||||||
pHead->traceId = pMsg->info.traceId;
|
pHead->traceId = pMsg->info.traceId;
|
||||||
pHead->hasEpSet = pMsg->info.hasEpSet;
|
pHead->hasEpSet = pMsg->info.hasEpSet;
|
||||||
|
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||||
|
|
||||||
if (pConn->status == ConnNormal) {
|
if (pConn->status == ConnNormal) {
|
||||||
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
|
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
|
||||||
|
|
Loading…
Reference in New Issue