From 45bb65336c63c841eaf001231aec3a29f8afed45 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 14 Sep 2024 09:47:31 +0800 Subject: [PATCH] opt transport --- source/common/src/tglobal.c | 2 +- source/libs/transport/src/transCli.c | 60 +++++++++++++++------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 69171892cb..c69088bb24 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -601,7 +601,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { cfgAddInt64(pCfg, "randErrorDivisor", tsRandErrDivisor, 1, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "randErrorScope", tsRandErrScope, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_BOTH)); - // tsNumOfRpcThreads = tsNumOfCores / 2; + tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH, CFG_DYN_NONE)); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c38cca9f2c..d7aab8357d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -231,9 +231,7 @@ static void cliDestroyBatch(SCliBatch* pBatch); static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx); static FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr); -static FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* resp); - -// static FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code); +static FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* thrd, SCliReq* pReq, STransMsg* resp); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); @@ -519,6 +517,7 @@ void cliHandleResp(SCliConn* conn) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); // TODO: notify cb + ASSERT(0); pThrd->notifyExceptCb(pThrd, NULL, NULL); return; } @@ -1411,7 +1410,12 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { (void)uv_walk(pThrd->loop, cliWalkCb, NULL); } static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; } -static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; } +static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { + SReqCtx* pCtx = pReq->ctx; + pThrd->cvtAddr = pCtx->cvtAddr; + destroyReq(pReq); + return; +} FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { if (pCvtAddr->cvt == false) { @@ -1437,33 +1441,33 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, SReqCtx* pCtx) { return transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet) ? false : true; } -FORCE_INLINE int32_t cliBuildExceptResp(SCliReq* pReq, STransMsg* pResp) { +FORCE_INLINE int32_t cliBuildExceptResp(SCliThrd* pThrd, SCliReq* pReq, STransMsg* pResp) { if (pReq == NULL) return -1; + STrans* pInst = pThrd->pInst; + + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + STransMsg resp = {0}; + // resp.code = (conn->connnected ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL); + pResp->msgType = pReq ? pReq->msg.msgType + 1 : 0; + pResp->info.cliVer = pInst->compatibilityVer; + pResp->info.ahandle = pCtx ? pCtx->ahandle : 0; + if (pReq) { + pResp->info.traceId = pReq->msg.info.traceId; + } + + // handle noresp and inter manage msg + if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) { + destroyReq(pReq); + return 0; + } if (pResp->code == 0) { pResp->code = TSDB_CODE_RPC_BROKEN_LINK; } - pResp->msgType = pReq->msg.msgType + 1; - pResp->info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; - pResp->info.traceId = pReq->msg.info.traceId; return 0; } -// FORCE_INLINE int32_t cliBuildExceptRespAndNotifyCb(SCliThrd* pThrd, SCliReq* pReq, int32_t code) { -// STrans* pInst = pThrd->pInst; - -// STransMsg resp = {.code = code}; -// code = cliBuildExceptResp(pReq, &resp); -// if (code != 0) { -// return code; -// } -// resp.info.cliVer = pInst->compatibilityVer; -// pInst->cfp(pInst->parent, &resp, NULL); - -// return 0; -// } - static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { int32_t code = 0; uint32_t addr = 0; @@ -1641,6 +1645,8 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { _exception: resp.code = code; + STraceId* trace = &pReq->msg.info.traceId; + tGWarn("%s failed to process req, reason:%s", pInst->label, tstrerror(code)); (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); return; } @@ -1923,14 +1929,14 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); } int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { - STrans* pInst = ((SCliThrd*)thrd)->pInst; - int32_t code = cliBuildExceptResp(pReq, pResp); - + SCliThrd* pThrd = thrd; + STrans* pInst = pThrd->pInst; + int32_t code = cliBuildExceptResp(pThrd, pReq, pResp); if (code != 0) { return code; } - pResp->info.cliVer = pInst->compatibilityVer; pInst->cfp(pInst->parent, pResp, NULL); + destroyReq(pReq); return code; } @@ -3122,7 +3128,7 @@ static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHea static FORCE_INLINE int8_t shouldSWitchToOtherConn(int32_t reqNum, int32_t sentNum, int32_t stateNum) { int32_t total = reqNum + sentNum + stateNum; - if (total >= 16) { + if (total >= 4) { return 1; }