From de4efca56f283e7ce0488410eed590649e2f0ff8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jul 2023 12:12:56 +0000 Subject: [PATCH] update ip --- source/libs/transport/src/transCli.c | 53 ++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8062a0618b..01223a2be9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -73,7 +73,7 @@ typedef struct SCliConn { SDelayTask* task; - char* ip; + char* dstAddr; char src[32]; char dst[32]; @@ -196,6 +196,7 @@ static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later static void cliHandleResp(SCliConn* conn); // handle except about conn @@ -543,6 +544,7 @@ void cliConnTimeout(uv_timer_t* handle) { taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, UV_ECANCELED); } void cliReadTimeoutCb(uv_timer_t* handle) { @@ -719,7 +721,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1); + conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1); } SConnList* pList = conn->list; @@ -878,7 +880,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { connList->list->numOfConn--; connList->size--; } else { - SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1); + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1); if (connList != NULL) connList->list->numOfConn--; } conn->list = NULL; @@ -923,7 +925,7 @@ static void cliDestroy(uv_handle_t* handle) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); - taosMemoryFree(conn->ip); + taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); cliDestroyConnMsgs(conn, true); @@ -1168,7 +1170,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { if (conn == NULL) { conn = cliCreateConn(pThrd); conn->pBatch = pBatch; - conn->ip = taosStrdup(pList->dst); + conn->dstAddr = taosStrdup(pList->dst); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); if (ipaddr == 0xffffffff) { @@ -1213,6 +1215,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); return; } @@ -1271,11 +1275,11 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { STraceId* trace = &pMsg->msg.info.traceId; tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -1287,12 +1291,12 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem)); } } } else { tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - pConn, pConn->ip, uv_strerror(status)); + pConn, pConn->dstAddr, uv_strerror(status)); cliDestroyBatch(pConn->pBatch); pConn->pBatch = NULL; } @@ -1314,6 +1318,7 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); if (timeout == false) { cliHandleFastFail(pConn, status); } else if (timeout == true) { @@ -1483,9 +1488,34 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) } static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later + uint32_t addr = taosGetIpv4FromFqdn(fqdn); + if (addr != 0xffffffff) { + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + if (addr != *v) { + char old[64] = {0}, new[64] = {0}; + tinet_ntoa(old, *v); + tinet_ntoa(new, addr); + tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); + taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + } + } return; } +static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { + if (dst == NULL) return; + + int16_t i = 0, len = strlen(dst); + for (i = len - 1; i >= 0; i--) { + if (dst[i] == ':') break; + } + if (i > 0) { + char fqdn[TSDB_FQDN_LEN + 1] = {0}; + memcpy(fqdn, dst, i); + cliUpdateFqdnCache(cache, fqdn); + } +} + static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; @@ -1560,7 +1590,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->ip = taosStrdup(addr); + conn->dstAddr = taosStrdup(addr); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { @@ -1578,7 +1608,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); + tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { @@ -1608,6 +1638,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, ret); return; }