From 83449e4b70206c46fd1237603928b9f0945d08cf Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 22 Jul 2024 07:22:41 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 44 ++++++++++++++++------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fa7ae8c376..d0ce1c19bf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -196,8 +196,8 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); -static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn); -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); +static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later @@ -1225,6 +1225,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + int32_t code = 0; if (pThrd->quit == true) { cliDestroyBatch(pBatch); return; @@ -1253,15 +1254,13 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); - if (ipaddr == 0xffffffff) { + uint32_t ipaddr = 0; + if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - - cliHandleFastFail(conn, terrno); - terrno = 0; + cliHandleFastFail(conn, code); return; } struct sockaddr_in addr; @@ -1561,23 +1560,28 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { return 0; } -static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { + +static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { + int32_t code = 0; uint32_t addr = 0; size_t len = strlen(fqdn); uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { - terrno = TSDB_CODE_RPC_FQDN_ERROR; - tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); - return addr; + code = TSDB_CODE_RPC_FQDN_ERROR; + tError("failed to get ip from fqdn:%s since %s", fqdn, tstrerror(code)); + return code; } - taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); + if ((code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr)) != 0)) { + return code; + } + *ip = addr; } else { - addr = *v; + *ip = *v; } - return addr; + return 0; } static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later @@ -1622,6 +1626,7 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } + void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; @@ -1671,15 +1676,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->dstAddr = taosStrdup(addr); - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); - if (ipaddr == 0xffffffff) { + uint32_t ipaddr; + int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); + if (code != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - cliHandleExcept(conn, terrno); - terrno = 0; + cliHandleExcept(conn, code); return; } @@ -1697,12 +1702,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { errno = 0; return; } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); cliHandleExcept(conn, -1); return; } + ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); if (ret != 0) { tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); @@ -2926,6 +2933,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { code = TSDB_CODE_OUT_OF_MEMORY; break; } + cliMsg->ctx = pCtx; cliMsg->type = Update; cliMsg->refId = (int64_t)shandle;