refactor transport

This commit is contained in:
Yihao Deng 2024-07-22 07:22:41 +00:00
parent ee7b67e018
commit 83449e4b70
1 changed files with 26 additions and 18 deletions

View File

@ -196,8 +196,8 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr);
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
// process data read from server, add decompress etc later // process data read from server, add decompress etc later
@ -1225,6 +1225,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) {
taosMemoryFree(pBatch); taosMemoryFree(pBatch);
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
int32_t code = 0;
if (pThrd->quit == true) { if (pThrd->quit == true) {
cliDestroyBatch(pBatch); cliDestroyBatch(pBatch);
return; return;
@ -1253,15 +1254,13 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
conn->pBatch = pBatch; conn->pBatch = pBatch;
conn->dstAddr = taosStrdup(pList->dst); conn->dstAddr = taosStrdup(pList->dst);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); uint32_t ipaddr = 0;
if (ipaddr == 0xffffffff) { if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) {
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
conn->timer->data = NULL; conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer); taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
cliHandleFastFail(conn, code);
cliHandleFastFail(conn, terrno);
terrno = 0;
return; return;
} }
struct sockaddr_in addr; struct sockaddr_in addr;
@ -1561,23 +1560,28 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
return 0; return 0;
} }
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) {
int32_t code = 0;
uint32_t addr = 0; uint32_t addr = 0;
size_t len = strlen(fqdn); size_t len = strlen(fqdn);
uint32_t* v = taosHashGet(cache, fqdn, len); uint32_t* v = taosHashGet(cache, fqdn, len);
if (v == NULL) { if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn); addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) { if (addr == 0xffffffff) {
terrno = TSDB_CODE_RPC_FQDN_ERROR; code = TSDB_CODE_RPC_FQDN_ERROR;
tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); tError("failed to get ip from fqdn:%s since %s", fqdn, tstrerror(code));
return addr; return code;
} }
taosHashPut(cache, fqdn, len, &addr, sizeof(addr)); if ((code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr)) != 0)) {
return code;
}
*ip = addr;
} else { } else {
addr = *v; *ip = *v;
} }
return addr; return 0;
} }
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
// impl later // impl later
@ -1622,6 +1626,7 @@ static void doFreeTimeoutMsg(void* param) {
doNotifyApp(pMsg, pThrd, code); doNotifyApp(pMsg, pThrd, code);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
@ -1671,15 +1676,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
conn->dstAddr = taosStrdup(addr); conn->dstAddr = taosStrdup(addr);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); uint32_t ipaddr;
if (ipaddr == 0xffffffff) { int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr);
if (code != 0) {
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
conn->timer->data = NULL; conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer); taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
cliHandleExcept(conn, terrno); cliHandleExcept(conn, code);
terrno = 0;
return; return;
} }
@ -1697,12 +1702,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
errno = 0; errno = 0;
return; return;
} }
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) { if (ret != 0) {
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
cliHandleExcept(conn, -1); cliHandleExcept(conn, -1);
return; return;
} }
ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle);
if (ret != 0) { if (ret != 0) {
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
@ -2926,6 +2933,7 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
cliMsg->ctx = pCtx; cliMsg->ctx = pCtx;
cliMsg->type = Update; cliMsg->type = Update;
cliMsg->refId = (int64_t)shandle; cliMsg->refId = (int64_t)shandle;