fix invalid packet
This commit is contained in:
parent
851aa0b026
commit
c6f60ba3d7
|
@ -99,6 +99,12 @@ typedef void* queue[2];
|
|||
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
|
||||
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
|
||||
|
||||
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
||||
|
||||
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||
|
||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||
|
||||
typedef SRpcMsg STransMsg;
|
||||
typedef SRpcCtx STransCtx;
|
||||
typedef SRpcCtxVal STransCtxVal;
|
||||
|
@ -151,6 +157,7 @@ typedef struct {
|
|||
char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset
|
||||
|
||||
char user[TSDB_UNI_LEN];
|
||||
uint32_t magicNum;
|
||||
STraceId traceId;
|
||||
uint64_t ahandle; // ahandle assigned by client
|
||||
uint32_t code; // del later
|
||||
|
@ -203,6 +210,7 @@ typedef struct SConnBuffer {
|
|||
int cap;
|
||||
int left;
|
||||
int total;
|
||||
int invalid;
|
||||
} SConnBuffer;
|
||||
|
||||
typedef void (*AsyncCB)(uv_async_t* handle);
|
||||
|
|
|
@ -127,6 +127,8 @@ static void cliAsyncCb(uv_async_t* handle);
|
|||
static void cliIdleCb(uv_idle_t* handle);
|
||||
static void cliPrepareCb(uv_prepare_t* handle);
|
||||
|
||||
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
|
||||
|
||||
static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||
|
||||
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||
|
@ -211,28 +213,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
uint64_t ahandle = head->ahandle; \
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \
|
||||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \
|
||||
if (cliMsg->type == Release) return; \
|
||||
} \
|
||||
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \
|
||||
if (T_REF_VAL_GET(conn) > 1) { \
|
||||
transUnrefCliHandle(conn); \
|
||||
} \
|
||||
destroyCmsg(pMsg); \
|
||||
cliReleaseUnfinishedMsg(conn); \
|
||||
transQueueClear(&conn->cliMsgs); \
|
||||
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
|
||||
return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
||||
do { \
|
||||
|
@ -346,10 +326,17 @@ void cliHandleResp(SCliConn* conn) {
|
|||
}
|
||||
|
||||
STransMsgHead* pHead = NULL;
|
||||
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
||||
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
|
||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||
return;
|
||||
}
|
||||
pHead->code = htonl(pHead->code);
|
||||
pHead->msgLen = htonl(pHead->msgLen);
|
||||
|
||||
if (cliRecvReleaseReq(conn, pHead)) {
|
||||
return;
|
||||
}
|
||||
|
||||
STransMsg transMsg = {0};
|
||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
transMsg.pCont = transContFromHead((char*)pHead);
|
||||
|
@ -361,7 +348,6 @@ void cliHandleResp(SCliConn* conn) {
|
|||
|
||||
SCliMsg* pMsg = NULL;
|
||||
STransConnCtx* pCtx = NULL;
|
||||
CONN_SHOULD_RELEASE(conn, pHead);
|
||||
|
||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
pMsg = transQueuePop(&conn->cliMsgs);
|
||||
|
@ -625,7 +611,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
|||
pBuf->len += nread;
|
||||
while (transReadComplete(pBuf)) {
|
||||
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||
cliHandleResp(conn);
|
||||
if (pBuf->invalid) {
|
||||
cliHandleExcept(conn);
|
||||
break;
|
||||
} else {
|
||||
cliHandleResp(conn);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -786,6 +777,7 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
|
||||
|
@ -1053,6 +1045,30 @@ static void cliPrepareCb(uv_prepare_t* handle) {
|
|||
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
|
||||
}
|
||||
|
||||
bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
|
||||
if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||
uint64_t ahandle = pHead->ahandle;
|
||||
SCliMsg* pMsg = NULL;
|
||||
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
|
||||
transClearBuffer(&conn->readBuf);
|
||||
transFreeMsg(transContFromHead((char*)pHead));
|
||||
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) {
|
||||
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0);
|
||||
if (cliMsg->type == Release) return true;
|
||||
}
|
||||
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId);
|
||||
if (T_REF_VAL_GET(conn) > 1) {
|
||||
transUnrefCliHandle(conn);
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
cliReleaseUnfinishedMsg(conn);
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static void* cliWorkThread(void* arg) {
|
||||
SCliThrd* pThrd = (SCliThrd*)arg;
|
||||
pThrd->pid = taosGetSelfPthreadId();
|
||||
|
|
|
@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) {
|
|||
buf->left = -1;
|
||||
buf->len = 0;
|
||||
buf->total = 0;
|
||||
buf->invalid = 0;
|
||||
return 0;
|
||||
}
|
||||
int transDestroyBuffer(SConnBuffer* p) {
|
||||
|
@ -108,19 +109,24 @@ int transClearBuffer(SConnBuffer* buf) {
|
|||
p->left = -1;
|
||||
p->len = 0;
|
||||
p->total = 0;
|
||||
p->invalid = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||
SConnBuffer* p = connBuf;
|
||||
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||
SConnBuffer* p = connBuf;
|
||||
if (p->left != 0) {
|
||||
return -1;
|
||||
}
|
||||
int total = connBuf->total;
|
||||
*buf = taosMemoryCalloc(1, total);
|
||||
memcpy(*buf, p->buf, total);
|
||||
|
||||
transResetBuffer(connBuf);
|
||||
if (total >= HEADSIZE && !p->invalid) {
|
||||
*buf = taosMemoryCalloc(1, total);
|
||||
memcpy(*buf, p->buf, total);
|
||||
transResetBuffer(connBuf);
|
||||
} else {
|
||||
total = -1;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
|
@ -173,6 +179,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
|||
memcpy((char*)&head, connBuf->buf, sizeof(head));
|
||||
int32_t msgLen = (int32_t)htonl(head.msgLen);
|
||||
p->total = msgLen;
|
||||
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum));
|
||||
}
|
||||
if (p->total >= p->len) {
|
||||
p->left = p->total - p->len;
|
||||
|
@ -180,7 +187,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
|||
p->left = 0;
|
||||
}
|
||||
}
|
||||
return p->left == 0 ? true : false;
|
||||
return (p->left == 0 || p->invalid) ? true : false;
|
||||
}
|
||||
|
||||
int transSetConnOption(uv_tcp_t* stream) {
|
||||
|
|
|
@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle);
|
|||
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
||||
static void uvPrepareCb(uv_prepare_t* handle);
|
||||
|
||||
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
|
||||
|
||||
/*
|
||||
* time-consuming task throwed into BG work thread
|
||||
*/
|
||||
|
@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg);
|
|||
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
|
||||
static bool addHandleToAcceptloop(void* arg);
|
||||
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
reallocConnRef(conn); \
|
||||
tTrace("conn %p received release request", conn); \
|
||||
\
|
||||
STraceId traceId = head->traceId; \
|
||||
conn->status = ConnRelease; \
|
||||
transClearBuffer(&conn->readBuf); \
|
||||
transFreeMsg(transContFromHead((char*)head)); \
|
||||
\
|
||||
STransMsg tmsg = { \
|
||||
.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
|
||||
srvMsg->msg = tmsg; \
|
||||
srvMsg->type = Release; \
|
||||
srvMsg->pConn = conn; \
|
||||
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
|
||||
return; \
|
||||
} \
|
||||
if (conn->regArg.init) { \
|
||||
tTrace("conn %p release, notify server app", conn); \
|
||||
STrans* pTransInst = conn->pTransInst; \
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
|
||||
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
|
||||
} \
|
||||
uvStartSendRespInternal(srvMsg); \
|
||||
return; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define SRV_RELEASE_UV(loop) \
|
||||
do { \
|
||||
uv_walk(loop, uvWalkCb, NULL); \
|
||||
|
@ -212,17 +183,25 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
|
|||
tDebug("%p timeout since no activity", conn);
|
||||
}
|
||||
|
||||
static void uvHandleReq(SSvrConn* pConn) {
|
||||
STransMsgHead* msg = NULL;
|
||||
int msgLen = 0;
|
||||
static bool uvHandleReq(SSvrConn* pConn) {
|
||||
STrans* pTransInst = pConn->pTransInst;
|
||||
|
||||
msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
||||
STransMsgHead* msg = NULL;
|
||||
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg);
|
||||
if (msgLen <= 0) {
|
||||
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
||||
return false;
|
||||
}
|
||||
|
||||
STransMsgHead* pHead = (STransMsgHead*)msg;
|
||||
pHead->code = htonl(pHead->code);
|
||||
pHead->msgLen = htonl(pHead->msgLen);
|
||||
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
||||
|
||||
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO(dengyihao): time-consuming task throwed into BG Thread
|
||||
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
|
||||
// wreq->data = pConn;
|
||||
|
@ -230,8 +209,6 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
// transRefSrvHandle(pConn);
|
||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||
|
||||
CONN_SHOULD_RELEASE(pConn, pHead);
|
||||
|
||||
STransMsg transMsg;
|
||||
memset(&transMsg, 0, sizeof(transMsg));
|
||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||
|
@ -247,7 +224,6 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
tDebug("conn %p acquired by server app", pConn);
|
||||
}
|
||||
}
|
||||
STrans* pTransInst = pConn->pTransInst;
|
||||
STraceId* trace = &pHead->traceId;
|
||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
|
@ -285,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
||||
|
||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
return true;
|
||||
}
|
||||
|
||||
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||
|
@ -295,11 +272,18 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
if (nread > 0) {
|
||||
pBuf->len += nread;
|
||||
tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
|
||||
while (transReadComplete(pBuf)) {
|
||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||
uvHandleReq(conn);
|
||||
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
||||
while (transReadComplete(pBuf)) {
|
||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||
if (pBuf->invalid) {
|
||||
destroyConn(conn, true);
|
||||
break;
|
||||
} else {
|
||||
if (false == uvHandleReq(conn)) break;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (nread == 0) {
|
||||
return;
|
||||
|
@ -391,6 +375,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
pHead->ahandle = (uint64_t)pMsg->info.ahandle;
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->hasEpSet = pMsg->info.hasEpSet;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
|
||||
if (pConn->status == ConnNormal) {
|
||||
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
|
||||
|
@ -591,6 +576,36 @@ static void uvPrepareCb(uv_prepare_t* handle) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
|
||||
if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) {
|
||||
reallocConnRef(pConn);
|
||||
tTrace("conn %p received release request", pConn);
|
||||
|
||||
STraceId traceId = pHead->traceId;
|
||||
pConn->status = ConnRelease;
|
||||
transClearBuffer(&pConn->readBuf);
|
||||
transFreeMsg(transContFromHead((char*)pHead));
|
||||
|
||||
STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527};
|
||||
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg));
|
||||
srvMsg->msg = tmsg;
|
||||
srvMsg->type = Release;
|
||||
srvMsg->pConn = pConn;
|
||||
if (!transQueuePush(&pConn->srvMsgs, srvMsg)) {
|
||||
return true;
|
||||
}
|
||||
if (pConn->regArg.init) {
|
||||
tTrace("conn %p release, notify server app", pConn);
|
||||
STrans* pTransInst = pConn->pTransInst;
|
||||
(*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL);
|
||||
memset(&pConn->regArg, 0, sizeof(pConn->regArg));
|
||||
}
|
||||
uvStartSendResp(srvMsg);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static void uvWorkDoTask(uv_work_t* req) {
|
||||
// doing time-consumeing task
|
||||
// only auth conn currently, add more func later
|
||||
|
|
Loading…
Reference in New Issue