Merge pull request #10587 from taosdata/fix/ZhiqiangWang/TD-13760-libuv-replace-socket-error

[TD-13760]<fix>: libuv replace socket error.
This commit is contained in:
Zhiqiang Wang 2022-03-07 22:45:25 +08:00 committed by GitHub
commit cc68f2e2df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 134 additions and 143 deletions

View File

@ -34,16 +34,10 @@
#include <sys/epoll.h>
#endif
#ifdef USE_UV
#include <uv.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
#ifndef USE_UV
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
@ -63,8 +57,6 @@ int32_t taosCloseSocket(SocketFd fd);
void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
void taosSetMaskSIGPIPE();
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
@ -94,14 +86,13 @@ SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
int32_t taosGetFqdn(char *);
void taosBlockSIGPIPE();
uint32_t taosGetIpv4FromFqdn(const char *);
int32_t taosGetFqdn(char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
#endif
void taosBlockSIGPIPE();
void taosIgnSIGPIPE();
void taosSetMaskSIGPIPE();
#ifdef __cplusplus
}

View File

@ -17,7 +17,7 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
)
add_test(
NAME catalogTest
COMMAND catalogTest
)
# add_test(
# NAME catalogTest
# COMMAND catalogTest
# )

View File

@ -3,10 +3,11 @@ add_library(monitor STATIC ${MONITOR_SRC})
target_include_directories(
monitor
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/monitor"
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(monitor os util common)
target_link_libraries(monitor os util common transport)
if(${BUILD_TEST})
add_subdirectory(test)

View File

@ -111,24 +111,22 @@ _OVER:
}
#ifdef USE_UV
#include <uv.h>
static void clientConnCb(uv_connect_t* req, int32_t status) {
if (status < 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("Connection error %s\n", uv_strerror(status));
uv_close((uv_handle_t*)req->handle, NULL);
return;
}
// impl later
uv_buf_t* wb = req->data;
if (wb == NULL) {
uv_close((uv_handle_t*)req->handle, NULL);
}
assert(wb != NULL);
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t*)req->handle, NULL);
}
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);

View File

@ -7,4 +7,4 @@ target_include_directories(
)
target_link_libraries(
os pthread dl rt m
)
)

View File

@ -34,8 +34,6 @@
#include <unistd.h>
#endif
#ifndef USE_UV
// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
@ -131,18 +129,8 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0;
}
void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); }
void taosSetMaskSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to setmask SIGPIPE");
}
}
#endif
@ -223,9 +211,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
return 0;
}
void taosIgnSIGPIPE() {}
void taosSetMaskSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0;
@ -282,98 +267,6 @@ uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(va
#define TCP_CONN_TIMEOUT 3000 // conn timeout
int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
//printf("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
struct addrinfo hints = {0};
struct addrinfo *result = NULL;
#ifdef __APPLE__
// on macosx, hostname -f has the form of xxx.local
// which will block getaddrinfo for a few seconds if AI_CANONNAME is set
// thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
// immediately
hints.ai_family = AF_INET;
#else // __APPLE__
hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
//printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
#ifdef __APPLE__
// refer to comments above
strcpy(fqdn, hostname);
#else // __APPLE__
strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
freeaddrinfo(result);
return 0;
}
uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
struct addrinfo hints = {0};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result = NULL;
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
} else {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
}
// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nwritten;
char * ptr = (char *)buf;
@ -754,10 +647,6 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return sockFd;
}
void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
#define COPY_SIZE 32768
// sendfile shall be used
@ -795,12 +684,9 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
return len;
}
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
void taosBlockSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
@ -808,7 +694,122 @@ void taosBlockSIGPIPE() {
if (rc != 0) {
//printf("failed to block SIGPIPE");
}
#endif
}
uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
struct addrinfo hints = {0};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result = NULL;
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
} else {
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
void taosBlockSIGPIPE() {}
#endif
//printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
}
int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
//printf("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
struct addrinfo hints = {0};
struct addrinfo *result = NULL;
#ifdef __APPLE__
// on macosx, hostname -f has the form of xxx.local
// which will block getaddrinfo for a few seconds if AI_CANONNAME is set
// thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
// immediately
hints.ai_family = AF_INET;
#else // __APPLE__
hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
//printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
#ifdef __APPLE__
// refer to comments above
strcpy(fqdn, hostname);
#else // __APPLE__
strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
freeaddrinfo(result);
return 0;
}
// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
void taosIgnSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
signal(SIGPIPE, SIG_IGN);
#endif
}
void taosSetMaskSIGPIPE() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#else
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to setmask SIGPIPE");
}
#endif
}