Merge pull request #16226 from taosdata/feat/mergeMain
feat: merge main branch
This commit is contained in:
commit
b3351a6d22
|
@ -104,6 +104,12 @@ typedef void* queue[2];
|
||||||
|
|
||||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
|
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
||||||
|
|
||||||
|
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||||
|
|
||||||
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
typedef SRpcCtx STransCtx;
|
typedef SRpcCtx STransCtx;
|
||||||
typedef SRpcCtxVal STransCtxVal;
|
typedef SRpcCtxVal STransCtxVal;
|
||||||
|
|
|
@ -318,10 +318,17 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) {
|
||||||
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
pHead->code = htonl(pHead->code);
|
pHead->code = htonl(pHead->code);
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
|
|
||||||
|
if (cliRecvReleaseReq(conn, pHead)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
STransMsg transMsg = {0};
|
STransMsg transMsg = {0};
|
||||||
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
|
||||||
transMsg.pCont = transContFromHead((char*)pHead);
|
transMsg.pCont = transContFromHead((char*)pHead);
|
||||||
|
@ -333,10 +340,6 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
SCliMsg* pMsg = NULL;
|
SCliMsg* pMsg = NULL;
|
||||||
STransConnCtx* pCtx = NULL;
|
STransConnCtx* pCtx = NULL;
|
||||||
if (cliRecvReleaseReq(conn, pHead)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
if (CONN_NO_PERSIST_BY_APP(conn)) {
|
||||||
pMsg = transQueuePop(&conn->cliMsgs);
|
pMsg = transQueuePop(&conn->cliMsgs);
|
||||||
|
|
||||||
|
@ -598,8 +601,13 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
pBuf->len += nread;
|
pBuf->len += nread;
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
if (pBuf->invalid) {
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
cliHandleResp(conn);
|
cliHandleResp(conn);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -188,7 +188,6 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
p->left = 0;
|
p->left = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return (p->left == 0 || p->invalid) ? true : false;
|
return (p->left == 0 || p->invalid) ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,15 +198,16 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
pHead->msgLen = htonl(pHead->msgLen);
|
pHead->msgLen = htonl(pHead->msgLen);
|
||||||
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
memcpy(pConn->user, pHead->user, strlen(pHead->user));
|
||||||
|
|
||||||
|
if (uvRecvReleaseReq(pConn, pHead)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO(dengyihao): time-consuming task throwed into BG Thread
|
// TODO(dengyihao): time-consuming task throwed into BG Thread
|
||||||
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
|
// uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t));
|
||||||
// wreq->data = pConn;
|
// wreq->data = pConn;
|
||||||
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
// uv_read_stop((uv_stream_t*)pConn->pTcp);
|
||||||
// transRefSrvHandle(pConn);
|
// transRefSrvHandle(pConn);
|
||||||
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
// uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask);
|
||||||
if (uvRecvReleaseReq(pConn, pHead)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
STransMsg transMsg;
|
STransMsg transMsg;
|
||||||
memset(&transMsg, 0, sizeof(transMsg));
|
memset(&transMsg, 0, sizeof(transMsg));
|
||||||
|
@ -274,14 +275,16 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
||||||
while (transReadComplete(pBuf)) {
|
while (transReadComplete(pBuf)) {
|
||||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||||
if (uvHandleReq(conn) == false) {
|
if (pBuf->invalid) {
|
||||||
|
tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn);
|
||||||
destroyConn(conn, true);
|
destroyConn(conn, true);
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
if (false == uvHandleReq(conn)) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn);
|
|
||||||
destroyConn(conn, true);
|
destroyConn(conn, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -872,6 +875,7 @@ static int reallocConnRef(SSvrConn* conn) {
|
||||||
}
|
}
|
||||||
static void uvDestroyConn(uv_handle_t* handle) {
|
static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSvrConn* conn = handle->data;
|
SSvrConn* conn = handle->data;
|
||||||
|
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -887,9 +891,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
|
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
transReqQueueClear(&conn->wreqQueue);
|
|
||||||
transQueueDestroy(&conn->srvMsgs);
|
transQueueDestroy(&conn->srvMsgs);
|
||||||
|
transReqQueueClear(&conn->wreqQueue);
|
||||||
|
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
taosMemoryFree(conn->pTcp);
|
taosMemoryFree(conn->pTcp);
|
||||||
|
|
Loading…
Reference in New Issue