Merge pull request #11946 from taosdata/feature/rpc_timeout
feature(rpc): add connect timeout
This commit is contained in:
commit
d1d41dfdc3
|
@ -137,7 +137,8 @@ typedef struct TdEpoll *TdEpollPtr;
|
||||||
int32_t taosSendto(TdSocketPtr pSocket, void *msg, int len, unsigned int flags, const struct sockaddr *to, int tolen);
|
int32_t taosSendto(TdSocketPtr pSocket, void *msg, int len, unsigned int flags, const struct sockaddr *to, int tolen);
|
||||||
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
|
int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len);
|
||||||
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
|
int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len);
|
||||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, int *addrLen);
|
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
|
||||||
|
int *addrLen);
|
||||||
int32_t taosCloseSocketNoCheck1(SocketFd fd);
|
int32_t taosCloseSocketNoCheck1(SocketFd fd);
|
||||||
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
|
int32_t taosCloseSocket(TdSocketPtr *ppSocket);
|
||||||
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
|
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer);
|
||||||
|
@ -156,6 +157,8 @@ int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes);
|
||||||
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
|
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len);
|
||||||
void taosWinSocketInit();
|
void taosWinSocketInit();
|
||||||
|
|
||||||
|
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
|
||||||
|
|
||||||
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
||||||
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
||||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
||||||
|
|
|
@ -21,6 +21,7 @@ extern "C" {
|
||||||
#include <uv.h>
|
#include <uv.h>
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "osSocket.h"
|
||||||
#include "rpcCache.h"
|
#include "rpcCache.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
#include "rpcLog.h"
|
#include "rpcLog.h"
|
||||||
|
@ -105,6 +106,7 @@ typedef void* queue[2];
|
||||||
|
|
||||||
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
|
#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit
|
||||||
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
|
#define TRANS_RETRY_INTERVAL 5 // ms retry interval
|
||||||
|
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRpcInfo* pRpc; // associated SRpcInfo
|
SRpcInfo* pRpc; // associated SRpcInfo
|
||||||
|
|
|
@ -103,6 +103,10 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o
|
||||||
static void cliDestroy(uv_handle_t* handle);
|
static void cliDestroy(uv_handle_t* handle);
|
||||||
static void cliSend(SCliConn* pConn);
|
static void cliSend(SCliConn* pConn);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* set TCP connection timeout per-socket level
|
||||||
|
*/
|
||||||
|
static int cliCreateSocket();
|
||||||
// process data read from server, add decompress etc later
|
// process data read from server, add decompress etc later
|
||||||
static void cliHandleResp(SCliConn* conn);
|
static void cliHandleResp(SCliConn* conn);
|
||||||
// handle except about conn
|
// handle except about conn
|
||||||
|
@ -729,9 +733,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
if (ret) {
|
if (ret) {
|
||||||
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||||
}
|
}
|
||||||
|
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
|
||||||
|
if (fd == -1) {
|
||||||
|
tTrace("%s cli conn %p failed to create socket", pTransInst->label, conn);
|
||||||
|
cliHandleExcept(conn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
|
||||||
addr.sin_family = AF_INET;
|
addr.sin_family = AF_INET;
|
||||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||||
|
|
|
@ -44,6 +44,10 @@
|
||||||
#endif
|
#endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifndef INVALID_SOCKET
|
||||||
|
#define INVALID_SOCKET -1
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct TdSocketServer {
|
typedef struct TdSocketServer {
|
||||||
#if SOCKET_WITH_LOCK
|
#if SOCKET_WITH_LOCK
|
||||||
TdThreadRwlock rwlock;
|
TdThreadRwlock rwlock;
|
||||||
|
@ -94,7 +98,8 @@ int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, int *addrLen) {
|
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
|
||||||
|
int *addrLen) {
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
if (pSocket == NULL || pSocket->fd < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -689,8 +694,7 @@ TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
||||||
return (TdSocketServerPtr)pSocket;
|
return (TdSocketServerPtr)pSocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr,
|
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen) {
|
||||||
int *addrLen) {
|
|
||||||
if (pServerSocket == NULL || pServerSocket->fd < 0) {
|
if (pServerSocket == NULL || pServerSocket->fd < 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -887,7 +891,6 @@ int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *a
|
||||||
return getsockname(pSocket->fd, destAddr, addrLen);
|
return getsockname(pSocket->fd, destAddr, addrLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TdEpollPtr taosCreateEpoll(int32_t size) {
|
TdEpollPtr taosCreateEpoll(int32_t size) {
|
||||||
EpollFd fd = -1;
|
EpollFd fd = -1;
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -939,3 +942,28 @@ int32_t taosCloseEpoll(TdEpollPtr *ppEpoll) {
|
||||||
taosMemoryFree(*ppEpoll);
|
taosMemoryFree(*ppEpoll);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
* Set TCP connection timeout per-socket level.
|
||||||
|
* ref [https://github.com/libuv/help/issues/54]
|
||||||
|
*/
|
||||||
|
int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec) {
|
||||||
|
#if defined(WINDOWS)
|
||||||
|
SOCKET fd;
|
||||||
|
#else
|
||||||
|
int fd;
|
||||||
|
#endif
|
||||||
|
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#if defined(WINDOWS)
|
||||||
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&conn_timeout_sec, sizeof(conn_timeout_sec))) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#else // Linux like systems
|
||||||
|
uint32_t conn_timeout_ms = conn_timeout_sec * 1000;
|
||||||
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
return (int)fd;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue