refactor transport

This commit is contained in:
Yihao Deng 2024-06-14 13:24:58 +00:00
parent e9c358c9d4
commit 1ae87fe097
2 changed files with 13 additions and 7 deletions

View File

@ -2423,7 +2423,7 @@ static void doCloseIdleConn(void* param) {
cliDestroyConn(conn, true); cliDestroyConn(conn, true);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { static void cliPerfLog_schedMsg(SCliMsg* pMsg, char* label) {
if (!(rpcDebugFlag & DEBUG_DEBUG)) { if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return; return;
} }
@ -2439,7 +2439,7 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) {
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); cliPerfLog_schedMsg(pMsg, transLabel(pThrd->pTransInst));
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg; arg->param1 = pMsg;

View File

@ -336,7 +336,11 @@ void uvWhiteListSetConnVer(SIpWhiteListTab* pWhite, SSvrConn* pConn) {
pConn->whiteListVer = pWhite->ver; pConn->whiteListVer = pWhite->ver;
} }
static void uvPerfLog(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) { static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMsg) {
if (!(rpcDebugFlag & DEBUG_DEBUG)) {
return;
}
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId; STraceId* trace = &pHead->traceId;
@ -344,7 +348,7 @@ static void uvPerfLog(SSvrConn* pConn, STransMsgHead* pHead, STransMsg* pTransMs
static int64_t EXCEPTION_LIMIT_US = 100 * 1000; static int64_t EXCEPTION_LIMIT_US = 100 * 1000;
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); // transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) { if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen, transLabel(pTransInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
@ -386,6 +390,8 @@ static void uvMaySetConnAcquired(SSvrConn* pConn, STransMsgHead* pHead) {
pConn->status = ConnAcquire; pConn->status = ConnAcquire;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tDebug("conn %p acquired by server app", pConn); tDebug("conn %p acquired by server app", pConn);
} else if (pHead->noResp == 0) {
transRefSrvHandle(pConn);
} }
} }
} }
@ -429,8 +435,6 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
uvMaySetConnAcquired(pConn, pHead);
// pHead->noResp = 1, // pHead->noResp = 1,
// 1. server application should not send resp on handle // 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
@ -444,7 +448,9 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = uvValidConn(pConn); transMsg.info.forbiddenIp = uvValidConn(pConn);
uvPerfLog(pConn, pHead, &transMsg); uvMaySetConnAcquired(pConn, pHead);
uvPerfLog_receive(pConn, pHead, &transMsg);
// set up conn info // set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.conn); SRpcConnInfo* pConnInfo = &(transMsg.info.conn);