Merge pull request #19779 from taosdata/fix/hostShutDown
fix: no resp when host machine shutdown
This commit is contained in:
commit
c4295e1b29
|
@ -671,7 +671,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
|
||||||
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t));
|
||||||
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
|
||||||
conn->stream->data = conn;
|
conn->stream->data = conn;
|
||||||
transSetConnOption((uv_tcp_t*)conn->stream);
|
// transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
|
|
||||||
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
|
||||||
if (timer == NULL) {
|
if (timer == NULL) {
|
||||||
|
@ -1067,9 +1067,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
|
||||||
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
|
|
||||||
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||||
tError("invalid epset");
|
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1138,10 +1139,29 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
addr.sin_addr.s_addr = ipaddr;
|
addr.sin_addr.s_addr = ipaddr;
|
||||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||||
|
|
||||||
STraceId* trace = &(pMsg->msg.info.traceId);
|
|
||||||
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||||
|
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
|
||||||
|
if (fd == -1) {
|
||||||
|
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
|
||||||
|
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
errno = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||||
|
if (ret != 0) {
|
||||||
|
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||||
|
if (ret != 0) {
|
||||||
|
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
|
tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
|
||||||
uv_err_name(ret));
|
uv_err_name(ret));
|
||||||
|
@ -1156,7 +1176,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
}
|
}
|
||||||
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||||
}
|
}
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
|
||||||
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
||||||
}
|
}
|
||||||
static void cliAsyncCb(uv_async_t* handle) {
|
static void cliAsyncCb(uv_async_t* handle) {
|
||||||
|
|
|
@ -205,6 +205,10 @@ bool transReadComplete(SConnBuffer* connBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream) {
|
int transSetConnOption(uv_tcp_t* stream) {
|
||||||
|
#if defined(WINDOWS) || defined(DARWIN)
|
||||||
|
#else
|
||||||
|
uv_tcp_keepalive(stream, 1, 20);
|
||||||
|
#endif
|
||||||
return uv_tcp_nodelay(stream, 1);
|
return uv_tcp_nodelay(stream, 1);
|
||||||
// int ret = uv_tcp_keepalive(stream, 5, 60);
|
// int ret = uv_tcp_keepalive(stream, 5, 60);
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,7 @@ typedef struct TdSocket {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
SocketFd fd;
|
SocketFd fd;
|
||||||
} * TdSocketPtr, TdSocket;
|
} *TdSocketPtr, TdSocket;
|
||||||
|
|
||||||
typedef struct TdSocketServer {
|
typedef struct TdSocketServer {
|
||||||
#if SOCKET_WITH_LOCK
|
#if SOCKET_WITH_LOCK
|
||||||
|
@ -63,7 +63,7 @@ typedef struct TdSocketServer {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
SocketFd fd;
|
SocketFd fd;
|
||||||
} * TdSocketServerPtr, TdSocketServer;
|
} *TdSocketServerPtr, TdSocketServer;
|
||||||
|
|
||||||
typedef struct TdEpoll {
|
typedef struct TdEpoll {
|
||||||
#if SOCKET_WITH_LOCK
|
#if SOCKET_WITH_LOCK
|
||||||
|
@ -71,7 +71,7 @@ typedef struct TdEpoll {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
EpollFd fd;
|
EpollFd fd;
|
||||||
} * TdEpollPtr, TdEpoll;
|
} *TdEpollPtr, TdEpoll;
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
|
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
|
||||||
|
@ -1005,7 +1005,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
// immediately
|
// immediately
|
||||||
// hints.ai_family = AF_INET;
|
// hints.ai_family = AF_INET;
|
||||||
strcpy(fqdn, hostname);
|
strcpy(fqdn, hostname);
|
||||||
strcpy(fqdn+strlen(hostname), ".local");
|
strcpy(fqdn + strlen(hostname), ".local");
|
||||||
#else // __APPLE__
|
#else // __APPLE__
|
||||||
struct addrinfo hints = {0};
|
struct addrinfo hints = {0};
|
||||||
struct addrinfo *result = NULL;
|
struct addrinfo *result = NULL;
|
||||||
|
@ -1060,7 +1060,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
||||||
#if defined(WINDOWS)
|
#if defined(WINDOWS)
|
||||||
SOCKET fd;
|
SOCKET fd;
|
||||||
#else
|
#else
|
||||||
int fd;
|
int fd;
|
||||||
#endif
|
#endif
|
||||||
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
|
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1071,11 +1071,12 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#elif defined(_TD_DARWIN_64)
|
#elif defined(_TD_DARWIN_64)
|
||||||
uint32_t conn_timeout_ms = timeout * 1000;
|
// invalid config
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
// uint32_t conn_timeout_ms = timeout * 1000;
|
||||||
taosCloseSocketNoCheck1(fd);
|
// if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||||
return -1;
|
// taosCloseSocketNoCheck1(fd);
|
||||||
}
|
// return -1;
|
||||||
|
//}
|
||||||
#else // Linux like systems
|
#else // Linux like systems
|
||||||
uint32_t conn_timeout_ms = timeout * 1000;
|
uint32_t conn_timeout_ms = timeout * 1000;
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||||
|
|
Loading…
Reference in New Issue