Merge pull request #16231 from taosdata/refactor/rpcPacket
refactor rpc code
This commit is contained in:
commit
22e599739b
|
@ -20,7 +20,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
// clang-format off
|
||||
int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
||||
int32_t lastMsgType = pTask->lastMsgType;
|
||||
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
|
||||
|
@ -1104,7 +1104,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
|
||||
#if 1
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
@ -1114,7 +1114,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
#else
|
||||
if (TDMT_VND_SUBMIT != msgType) {
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
@ -1136,3 +1136,4 @@ _return:
|
|||
taosMemoryFreeClear(msg);
|
||||
SCH_RET(code);
|
||||
}
|
||||
// clang-format on
|
||||
|
|
|
@ -1432,7 +1432,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
|
|||
if (pThrd == NULL && valid == false) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return -1;
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
|
||||
|
@ -1477,7 +1477,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
|
|||
if (pThrd == NULL && valid == false) {
|
||||
transFreeMsg(pReq->pCont);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||
return -1;
|
||||
return TSDB_CODE_RPC_BROKEN_LINK;
|
||||
}
|
||||
|
||||
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
|
||||
|
|
|
@ -275,16 +275,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|||
if (pBuf->len <= TRANS_PACKET_LIMIT) {
|
||||
while (transReadComplete(pBuf)) {
|
||||
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
|
||||
if (pBuf->invalid) {
|
||||
tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn);
|
||||
if (true == pBuf->invalid || false == uvHandleReq(conn)) {
|
||||
tError("%s conn %p read invalid packet", transLabel(pTransInst), conn);
|
||||
destroyConn(conn, true);
|
||||
return;
|
||||
} else {
|
||||
if (false == uvHandleReq(conn)) break;
|
||||
}
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
tError("%s conn %p read invalid packet, exceed limit", transLabel(pTransInst), conn);
|
||||
destroyConn(conn, true);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue