update ip
This commit is contained in:
parent
0d922c0d66
commit
de4efca56f
|
@ -73,7 +73,7 @@ typedef struct SCliConn {
|
|||
|
||||
SDelayTask* task;
|
||||
|
||||
char* ip;
|
||||
char* dstAddr;
|
||||
char src[32];
|
||||
char dst[32];
|
||||
|
||||
|
@ -196,6 +196,7 @@ static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp);
|
|||
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn);
|
||||
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn);
|
||||
|
||||
static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst);
|
||||
// process data read from server, add decompress etc later
|
||||
static void cliHandleResp(SCliConn* conn);
|
||||
// handle except about conn
|
||||
|
@ -543,6 +544,7 @@ void cliConnTimeout(uv_timer_t* handle) {
|
|||
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||
conn->timer = NULL;
|
||||
|
||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||
cliHandleFastFail(conn, UV_ECANCELED);
|
||||
}
|
||||
void cliReadTimeoutCb(uv_timer_t* handle) {
|
||||
|
@ -719,7 +721,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
cliDestroyConnMsgs(conn, false);
|
||||
|
||||
if (conn->list == NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
||||
}
|
||||
|
||||
SConnList* pList = conn->list;
|
||||
|
@ -878,7 +880,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
connList->list->numOfConn--;
|
||||
connList->size--;
|
||||
} else {
|
||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
|
||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
||||
if (connList != NULL) connList->list->numOfConn--;
|
||||
}
|
||||
conn->list = NULL;
|
||||
|
@ -923,7 +925,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
|||
|
||||
transReleaseExHandle(transGetRefMgt(), conn->refId);
|
||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||
taosMemoryFree(conn->ip);
|
||||
taosMemoryFree(conn->dstAddr);
|
||||
taosMemoryFree(conn->stream);
|
||||
|
||||
cliDestroyConnMsgs(conn, true);
|
||||
|
@ -1168,7 +1170,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
|||
if (conn == NULL) {
|
||||
conn = cliCreateConn(pThrd);
|
||||
conn->pBatch = pBatch;
|
||||
conn->ip = taosStrdup(pList->dst);
|
||||
conn->dstAddr = taosStrdup(pList->dst);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
|
||||
if (ipaddr == 0xffffffff) {
|
||||
|
@ -1213,6 +1215,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
|
|||
conn->timer->data = NULL;
|
||||
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||
conn->timer = NULL;
|
||||
|
||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||
cliHandleFastFail(conn, -1);
|
||||
return;
|
||||
}
|
||||
|
@ -1271,11 +1275,11 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
|
||||
STraceId* trace = &pMsg->msg.info.traceId;
|
||||
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
|
||||
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status));
|
||||
TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status));
|
||||
|
||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1);
|
||||
int64_t cTimestamp = taosGetTimestampMs();
|
||||
if (item != NULL) {
|
||||
int32_t elapse = cTimestamp - item->timestamp;
|
||||
|
@ -1287,12 +1291,12 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
}
|
||||
} else {
|
||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
|
||||
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
|
||||
pConn, pConn->ip, uv_strerror(status));
|
||||
pConn, pConn->dstAddr, uv_strerror(status));
|
||||
cliDestroyBatch(pConn->pBatch);
|
||||
pConn->pBatch = NULL;
|
||||
}
|
||||
|
@ -1314,6 +1318,7 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
}
|
||||
|
||||
if (status != 0) {
|
||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr);
|
||||
if (timeout == false) {
|
||||
cliHandleFastFail(pConn, status);
|
||||
} else if (timeout == true) {
|
||||
|
@ -1483,9 +1488,34 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
|
|||
}
|
||||
static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
||||
// impl later
|
||||
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
|
||||
if (addr != 0xffffffff) {
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||
if (addr != *v) {
|
||||
char old[64] = {0}, new[64] = {0};
|
||||
tinet_ntoa(old, *v);
|
||||
tinet_ntoa(new, addr);
|
||||
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
|
||||
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) {
|
||||
if (dst == NULL) return;
|
||||
|
||||
int16_t i = 0, len = strlen(dst);
|
||||
for (i = len - 1; i >= 0; i--) {
|
||||
if (dst[i] == ':') break;
|
||||
}
|
||||
if (i > 0) {
|
||||
char fqdn[TSDB_FQDN_LEN + 1] = {0};
|
||||
memcpy(fqdn, dst, i);
|
||||
cliUpdateFqdnCache(cache, fqdn);
|
||||
}
|
||||
}
|
||||
|
||||
static void doFreeTimeoutMsg(void* param) {
|
||||
STaskArg* arg = param;
|
||||
SCliMsg* pMsg = arg->param1;
|
||||
|
@ -1560,7 +1590,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
|
||||
transQueuePush(&conn->cliMsgs, pMsg);
|
||||
|
||||
conn->ip = taosStrdup(addr);
|
||||
conn->dstAddr = taosStrdup(addr);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
|
||||
if (ipaddr == 0xffffffff) {
|
||||
|
@ -1578,7 +1608,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
addr.sin_addr.s_addr = ipaddr;
|
||||
addr.sin_port = (uint16_t)htons(port);
|
||||
|
||||
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
|
||||
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr);
|
||||
pThrd->newConnCount++;
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
|
||||
if (fd == -1) {
|
||||
|
@ -1608,6 +1638,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||
conn->timer = NULL;
|
||||
|
||||
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
|
||||
cliHandleFastFail(conn, ret);
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue