feature(rpc): add connect timeout
This commit is contained in:
parent
15a0befa88
commit
9c510f331c
|
@ -19,53 +19,53 @@
|
|||
// If the error is in a third-party library, place this header file under the third-party library header file.
|
||||
// When you want to use this feature, you should find or add the same function in the following section.
|
||||
#ifndef ALLOW_FORBID_FUNC
|
||||
#define socket SOCKET_FUNC_TAOS_FORBID
|
||||
#define bind BIND_FUNC_TAOS_FORBID
|
||||
#define listen LISTEN_FUNC_TAOS_FORBID
|
||||
#define accept ACCEPT_FUNC_TAOS_FORBID
|
||||
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
|
||||
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
|
||||
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
|
||||
#define inet_addr INET_ADDR_FUNC_TAOS_FORBID
|
||||
#define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
|
||||
#define socket SOCKET_FUNC_TAOS_FORBID
|
||||
#define bind BIND_FUNC_TAOS_FORBID
|
||||
#define listen LISTEN_FUNC_TAOS_FORBID
|
||||
#define accept ACCEPT_FUNC_TAOS_FORBID
|
||||
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
|
||||
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
|
||||
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
|
||||
#define inet_addr INET_ADDR_FUNC_TAOS_FORBID
|
||||
#define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
|
||||
#endif
|
||||
|
||||
#if defined(WINDOWS)
|
||||
#if BYTE_ORDER == LITTLE_ENDIAN
|
||||
#include <stdlib.h>
|
||||
#define htobe16(x) _byteswap_ushort(x)
|
||||
#define htole16(x) (x)
|
||||
#define be16toh(x) _byteswap_ushort(x)
|
||||
#define le16toh(x) (x)
|
||||
#if BYTE_ORDER == LITTLE_ENDIAN
|
||||
#include <stdlib.h>
|
||||
#define htobe16(x) _byteswap_ushort(x)
|
||||
#define htole16(x) (x)
|
||||
#define be16toh(x) _byteswap_ushort(x)
|
||||
#define le16toh(x) (x)
|
||||
|
||||
#define htobe32(x) _byteswap_ulong(x)
|
||||
#define htole32(x) (x)
|
||||
#define be32toh(x) _byteswap_ulong(x)
|
||||
#define le32toh(x) (x)
|
||||
#define htobe32(x) _byteswap_ulong(x)
|
||||
#define htole32(x) (x)
|
||||
#define be32toh(x) _byteswap_ulong(x)
|
||||
#define le32toh(x) (x)
|
||||
|
||||
#define htobe64(x) _byteswap_uint64(x)
|
||||
#define htole64(x) (x)
|
||||
#define be64toh(x) _byteswap_uint64(x)
|
||||
#define le64toh(x) (x)
|
||||
#else
|
||||
#error byte order not supported
|
||||
#endif
|
||||
#define htobe64(x) _byteswap_uint64(x)
|
||||
#define htole64(x) (x)
|
||||
#define be64toh(x) _byteswap_uint64(x)
|
||||
#define le64toh(x) (x)
|
||||
#else
|
||||
#error byte order not supported
|
||||
#endif
|
||||
|
||||
#define __BYTE_ORDER BYTE_ORDER
|
||||
#define __BIG_ENDIAN BIG_ENDIAN
|
||||
#define __LITTLE_ENDIAN LITTLE_ENDIAN
|
||||
#define __PDP_ENDIAN PDP_ENDIAN
|
||||
#define __BYTE_ORDER BYTE_ORDER
|
||||
#define __BIG_ENDIAN BIG_ENDIAN
|
||||
#define __LITTLE_ENDIAN LITTLE_ENDIAN
|
||||
#define __PDP_ENDIAN PDP_ENDIAN
|
||||
|
||||
#else
|
||||
#include <netinet/in.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#if defined(_TD_DARWIN_64)
|
||||
#include <osEok.h>
|
||||
#else
|
||||
#include <netinet/in.h>
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
#if defined(_TD_DARWIN_64)
|
||||
#include <osEok.h>
|
||||
#else
|
||||
#include <netinet/in.h>
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -73,24 +73,24 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#if defined(WINDOWS)
|
||||
typedef int socklen_t;
|
||||
#define TAOS_EPOLL_WAIT_TIME 100
|
||||
typedef SOCKET eventfd_t;
|
||||
#define eventfd(a, b) -1
|
||||
#define EpollClose(pollFd) epoll_close(pollFd)
|
||||
#ifndef EPOLLWAKEUP
|
||||
#define EPOLLWAKEUP (1u << 29)
|
||||
#endif
|
||||
typedef int socklen_t;
|
||||
#define TAOS_EPOLL_WAIT_TIME 100
|
||||
typedef SOCKET eventfd_t;
|
||||
#define eventfd(a, b) -1
|
||||
#define EpollClose(pollFd) epoll_close(pollFd)
|
||||
#ifndef EPOLLWAKEUP
|
||||
#define EPOLLWAKEUP (1u << 29)
|
||||
#endif
|
||||
#elif defined(_TD_DARWIN_64)
|
||||
#define TAOS_EPOLL_WAIT_TIME 500
|
||||
typedef int32_t SOCKET;
|
||||
typedef SOCKET EpollFd;
|
||||
#define EpollClose(pollFd) epoll_close(pollFd)
|
||||
#define TAOS_EPOLL_WAIT_TIME 500
|
||||
typedef int32_t SOCKET;
|
||||
typedef SOCKET EpollFd;
|
||||
#define EpollClose(pollFd) epoll_close(pollFd)
|
||||
#else
|
||||
#define TAOS_EPOLL_WAIT_TIME 500
|
||||
typedef int32_t SOCKET;
|
||||
typedef SOCKET EpollFd;
|
||||
#define EpollClose(pollFd) taosCloseSocket(pollFd)
|
||||
#define TAOS_EPOLL_WAIT_TIME 500
|
||||
typedef int32_t SOCKET;
|
||||
typedef SOCKET EpollFd;
|
||||
#define EpollClose(pollFd) taosCloseSocket(pollFd)
|
||||
#endif
|
||||
|
||||
#if defined(_TD_DARWIN_64)
|
||||
|
@ -128,16 +128,17 @@ typedef struct TdSocket {
|
|||
#endif
|
||||
int refId;
|
||||
SocketFd fd;
|
||||
} *TdSocketPtr, TdSocket;
|
||||
} * TdSocketPtr, TdSocket;
|
||||
|
||||
typedef struct TdSocketServer *TdSocketServerPtr;
|
||||
typedef struct TdSocket *TdSocketPtr;
|
||||
typedef struct TdEpoll *TdEpollPtr;
|
||||
typedef struct TdSocket * TdSocketPtr;
|
||||
typedef struct TdEpoll * TdEpollPtr;
|
||||
|
||||
int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags, const struct sockaddr * to, int tolen);
|
||||
int32_t taosSendto(TdSocketPtr pSocket, void *msg, int len, unsigned int flags, const struct sockaddr *to, int tolen);
|
||||
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
|
||||
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
|
||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, int *addrLen);
|
||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
|
||||
int *addrLen);
|
||||
int32_t taosCloseSocketNoCheck1(SocketFd fd);
|
||||
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
|
||||
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
|
||||
|
@ -156,13 +157,15 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
|
|||
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
|
||||
void taosWinSocketInit();
|
||||
|
||||
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
|
||||
|
||||
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
||||
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
||||
int32_t taosKeepTcpAlive(TdSocketPtr pSocket);
|
||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen);
|
||||
|
||||
int32_t taosGetSocketName(TdSocketPtr pSocket,struct sockaddr *destAddr, int *addrLen);
|
||||
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen);
|
||||
|
||||
void taosBlockSIGPIPE();
|
||||
uint32_t taosGetIpv4FromFqdn(const char *);
|
||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
|||
#include <uv.h>
|
||||
#include "lz4.h"
|
||||
#include "os.h"
|
||||
#include "osSocket.h"
|
||||
#include "rpcCache.h"
|
||||
#include "rpcHead.h"
|
||||
#include "rpcLog.h"
|
||||
|
@ -105,6 +106,7 @@ typedef void* queue[2];
|
|||
|
||||
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
|
||||
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
|
||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||
|
||||
typedef struct {
|
||||
SRpcInfo* pRpc; // associated SRpcInfo
|
||||
|
|
|
@ -103,6 +103,10 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o
|
|||
static void cliDestroy(uv_handle_t* handle);
|
||||
static void cliSend(SCliConn* pConn);
|
||||
|
||||
/*
|
||||
* set TCP connection timeout per-socket level
|
||||
*/
|
||||
static int cliCreateSocket();
|
||||
// process data read from server, add decompress etc later
|
||||
static void cliHandleResp(SCliConn* conn);
|
||||
// handle except about conn
|
||||
|
@ -729,9 +733,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
if (ret) {
|
||||
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||
}
|
||||
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
|
||||
if (fd == -1) {
|
||||
tTrace("%s cli conn %p failed to create socket", pTransInst->label, conn);
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
}
|
||||
uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||
|
||||
struct sockaddr_in addr;
|
||||
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||
|
|
|
@ -37,20 +37,24 @@
|
|||
#include <unistd.h>
|
||||
|
||||
#if defined(DARWIN)
|
||||
#include <dispatch/dispatch.h>
|
||||
#include "osEok.h"
|
||||
#include <dispatch/dispatch.h>
|
||||
#include "osEok.h"
|
||||
#else
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/epoll.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#ifndef INVALID_SOCKET
|
||||
#define INVALID_SOCKET -1
|
||||
#endif
|
||||
|
||||
typedef struct TdSocketServer {
|
||||
#if SOCKET_WITH_LOCK
|
||||
TdThreadRwlock rwlock;
|
||||
#endif
|
||||
int refId;
|
||||
SocketFd fd;
|
||||
} *TdSocketServerPtr, TdSocketServer;
|
||||
} * TdSocketServerPtr, TdSocketServer;
|
||||
|
||||
typedef struct TdEpoll {
|
||||
#if SOCKET_WITH_LOCK
|
||||
|
@ -58,7 +62,7 @@ typedef struct TdEpoll {
|
|||
#endif
|
||||
int refId;
|
||||
EpollFd fd;
|
||||
} *TdEpollPtr, TdEpoll;
|
||||
} * TdEpollPtr, TdEpoll;
|
||||
|
||||
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
|
||||
int addrlen) {
|
||||
|
@ -94,7 +98,8 @@ int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, int *addrLen) {
|
||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
|
||||
int *addrLen) {
|
||||
if (pSocket == NULL || pSocket->fd < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -318,7 +323,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
|
|||
return -1;
|
||||
}
|
||||
int32_t nleft, nwritten;
|
||||
char *ptr = (char *)buf;
|
||||
char * ptr = (char *)buf;
|
||||
|
||||
nleft = nbytes;
|
||||
|
||||
|
@ -347,7 +352,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
|
|||
return -1;
|
||||
}
|
||||
int32_t nleft, nread;
|
||||
char *ptr = (char *)buf;
|
||||
char * ptr = (char *)buf;
|
||||
|
||||
nleft = nbytes;
|
||||
|
||||
|
@ -689,8 +694,7 @@ TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
|||
return (TdSocketServerPtr)pSocket;
|
||||
}
|
||||
|
||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr,
|
||||
int *addrLen) {
|
||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen) {
|
||||
if (pServerSocket == NULL || pServerSocket->fd < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -771,7 +775,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
|
|||
|
||||
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
|
||||
if (result) {
|
||||
struct sockaddr *sa = result->ai_addr;
|
||||
struct sockaddr * sa = result->ai_addr;
|
||||
struct sockaddr_in *si = (struct sockaddr_in *)sa;
|
||||
struct in_addr ia = si->sin_addr;
|
||||
uint32_t ip = ia.s_addr;
|
||||
|
@ -887,7 +891,6 @@ int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *a
|
|||
return getsockname(pSocket->fd, destAddr, addrLen);
|
||||
}
|
||||
|
||||
|
||||
TdEpollPtr taosCreateEpoll(int32_t size) {
|
||||
EpollFd fd = -1;
|
||||
#ifdef WINDOWS
|
||||
|
@ -939,3 +942,28 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
|
|||
taosMemoryFree(*ppEpoll);
|
||||
return code;
|
||||
}
|
||||
/*
|
||||
* Set TCP connection timeout per-socket level.
|
||||
* ref [https://github.com/libuv/help/issues/54]
|
||||
*/
|
||||
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
|
||||
#if defined(WINDOWS)
|
||||
SOCKET fd;
|
||||
#else
|
||||
int fd;
|
||||
#endif
|
||||
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
|
||||
return -1;
|
||||
}
|
||||
#if defined(WINDOWS)
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) {
|
||||
return -1;
|
||||
}
|
||||
#else // Linux like systems
|
||||
uint32_t conn_timeout_ms = conn_timeout_sec * 1000;
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
return (int)fd;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue