From e7a72d5a284b6a204bcb3165e250a321917fbf8b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Feb 2023 13:57:26 +0800 Subject: [PATCH 1/2] fix: no resp when host machine shutdown --- source/libs/transport/src/transCli.c | 29 ++++++++++++++++++++++----- source/libs/transport/src/transComm.c | 4 ++++ source/os/src/osSocket.c | 8 ++++---- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1a99db5f99..01929b237f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -671,7 +671,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - transSetConnOption((uv_tcp_t*)conn->stream); + // transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -1061,9 +1061,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); + STraceId* trace = &pMsg->msg.info.traceId; if (!EPSET_IS_VALID(&pCtx->epSet)) { - tError("invalid epset"); + tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } @@ -1121,10 +1122,29 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); - STraceId* trace = &(pMsg->msg.info.traceId); tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); + if (fd == -1) { + tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleExcept(conn); + errno = 0; + return; + } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret != 0) { + tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } - int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); @@ -1139,7 +1159,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } - STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn); } static void cliAsyncCb(uv_async_t* handle) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1161ed7c00..41233a6526 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -205,6 +205,10 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { +#if defined(WINDOWS) || defined(DARWIN) +#else + uv_tcp_keepalive(stream, 1, 20); +#endif return uv_tcp_nodelay(stream, 1); // int ret = uv_tcp_keepalive(stream, 5, 60); } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 6611a937f2..3c7a1e64ee 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -55,7 +55,7 @@ typedef struct TdSocket { #endif int refId; SocketFd fd; -} * TdSocketPtr, TdSocket; +} *TdSocketPtr, TdSocket; typedef struct TdSocketServer { #if SOCKET_WITH_LOCK @@ -63,7 +63,7 @@ typedef struct TdSocketServer { #endif int refId; SocketFd fd; -} * TdSocketServerPtr, TdSocketServer; +} *TdSocketServerPtr, TdSocketServer; typedef struct TdEpoll { #if SOCKET_WITH_LOCK @@ -71,7 +71,7 @@ typedef struct TdEpoll { #endif int refId; EpollFd fd; -} * TdEpollPtr, TdEpoll; +} *TdEpollPtr, TdEpoll; #if 0 int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr, @@ -1005,7 +1005,7 @@ int32_t taosGetFqdn(char *fqdn) { // immediately // hints.ai_family = AF_INET; strcpy(fqdn, hostname); - strcpy(fqdn+strlen(hostname), ".local"); + strcpy(fqdn + strlen(hostname), ".local"); #else // __APPLE__ struct addrinfo hints = {0}; struct addrinfo *result = NULL; From 121afc60470fc3005e9ad4a1da6c36418731e6e5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 6 Feb 2023 10:23:25 +0800 Subject: [PATCH 2/2] rm invalid param on macos --- source/os/src/osSocket.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 3c7a1e64ee..66cddd4dbe 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -1060,7 +1060,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { #if defined(WINDOWS) SOCKET fd; #else - int fd; + int fd; #endif if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) { return -1; @@ -1071,11 +1071,12 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { return -1; } #elif defined(_TD_DARWIN_64) - uint32_t conn_timeout_ms = timeout * 1000; - if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { - taosCloseSocketNoCheck1(fd); - return -1; - } + // invalid config + // uint32_t conn_timeout_ms = timeout * 1000; + // if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { + // taosCloseSocketNoCheck1(fd); + // return -1; + //} #else // Linux like systems uint32_t conn_timeout_ms = timeout * 1000; if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {