diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 11f3f070c0..82c4ef8278 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -671,7 +671,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - transSetConnOption((uv_tcp_t*)conn->stream); + // transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -1067,9 +1067,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); + STraceId* trace = &pMsg->msg.info.traceId; if (!EPSET_IS_VALID(&pCtx->epSet)) { - tError("invalid epset"); + tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } @@ -1138,10 +1139,29 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons((uint16_t)conn->port); - STraceId* trace = &(pMsg->msg.info.traceId); tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); + if (fd == -1) { + tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleExcept(conn); + errno = 0; + return; + } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream); + if (ret != 0) { + tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleExcept(conn); + return; + } - int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); @@ -1156,7 +1176,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } - STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn); } static void cliAsyncCb(uv_async_t* handle) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1161ed7c00..41233a6526 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -205,6 +205,10 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { +#if defined(WINDOWS) || defined(DARWIN) +#else + uv_tcp_keepalive(stream, 1, 20); +#endif return uv_tcp_nodelay(stream, 1); // int ret = uv_tcp_keepalive(stream, 5, 60); } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index de628c7596..6b0bfb3a02 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -55,7 +55,7 @@ typedef struct TdSocket { #endif int refId; SocketFd fd; -} * TdSocketPtr, TdSocket; +} *TdSocketPtr, TdSocket; typedef struct TdSocketServer { #if SOCKET_WITH_LOCK @@ -63,7 +63,7 @@ typedef struct TdSocketServer { #endif int refId; SocketFd fd; -} * TdSocketServerPtr, TdSocketServer; +} *TdSocketServerPtr, TdSocketServer; typedef struct TdEpoll { #if SOCKET_WITH_LOCK @@ -71,7 +71,7 @@ typedef struct TdEpoll { #endif int refId; EpollFd fd; -} * TdEpollPtr, TdEpoll; +} *TdEpollPtr, TdEpoll; #if 0 int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr, @@ -1005,7 +1005,7 @@ int32_t taosGetFqdn(char *fqdn) { // immediately // hints.ai_family = AF_INET; strcpy(fqdn, hostname); - strcpy(fqdn+strlen(hostname), ".local"); + strcpy(fqdn + strlen(hostname), ".local"); #else // __APPLE__ struct addrinfo hints = {0}; struct addrinfo *result = NULL; @@ -1060,7 +1060,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { #if defined(WINDOWS) SOCKET fd; #else - int fd; + int fd; #endif if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) { return -1; @@ -1071,11 +1071,12 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { return -1; } #elif defined(_TD_DARWIN_64) - uint32_t conn_timeout_ms = timeout * 1000; - if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { - taosCloseSocketNoCheck1(fd); - return -1; - } + // invalid config + // uint32_t conn_timeout_ms = timeout * 1000; + // if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { + // taosCloseSocketNoCheck1(fd); + // return -1; + //} #else // Linux like systems uint32_t conn_timeout_ms = timeout * 1000; if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {