Merge pull request #2195 from taosdata/hotfix/rpcFail
failed to open TCP server socket, return error for RPC initialization
This commit is contained in:
commit
4e469358d9
|
@ -92,6 +92,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
||||||
// Initialize the system
|
// Initialize the system
|
||||||
if (dnodeInitSystem() < 0) {
|
if (dnodeInitSystem() < 0) {
|
||||||
syslog(LOG_ERR, "Error initialize TDengine system");
|
syslog(LOG_ERR, "Error initialize TDengine system");
|
||||||
|
dPrint("Failed to start TDengine, please check the log at:%s", tsLogDir);
|
||||||
closelog();
|
closelog();
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ static void *taosProcessTcpData(void *param);
|
||||||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
||||||
static void taosFreeFdObj(SFdObj *pFdObj);
|
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||||
static void taosReportBrokenLink(SFdObj *pFdObj);
|
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||||
static void* taosAcceptTcpConnection(void *arg);
|
static void *taosAcceptTcpConnection(void *arg);
|
||||||
|
|
||||||
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
|
@ -80,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pServerObj->fd = -1;
|
||||||
pServerObj->thread = 0;
|
pServerObj->thread = 0;
|
||||||
pServerObj->ip = ip;
|
pServerObj->ip = ip;
|
||||||
pServerObj->port = port;
|
pServerObj->port = port;
|
||||||
|
@ -99,6 +100,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
// initialize parameters in case it may encounter error later
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
|
@ -106,18 +108,21 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
pThreadObj++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize mutex, thread, fd which may fail
|
||||||
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;;
|
break;;
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP epoll", label);
|
tError("%s failed to create TCP epoll", label);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
code = -1;
|
code = -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -125,7 +130,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,15 +137,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pThreadObj++;
|
pThreadObj++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
if (pServerObj->fd < 0) code = -1;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosCleanUpTcpServer(pServerObj);
|
taosCleanUpTcpServer(pServerObj);
|
||||||
pServerObj = NULL;
|
pServerObj = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -204,7 +211,7 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
tfree(pServerObj);
|
tfree(pServerObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* taosAcceptTcpConnection(void *arg) {
|
static void *taosAcceptTcpConnection(void *arg) {
|
||||||
int connFd = -1;
|
int connFd = -1;
|
||||||
struct sockaddr_in caddr;
|
struct sockaddr_in caddr;
|
||||||
int threadId = 0;
|
int threadId = 0;
|
||||||
|
@ -212,10 +219,6 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)arg;
|
pServerObj = (SServerObj *)arg;
|
||||||
|
|
||||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
|
||||||
if (pServerObj->fd < 0) return NULL;
|
|
||||||
|
|
||||||
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "rpcLog.h"
|
#include "rpcLog.h"
|
||||||
#include "rpcUdp.h"
|
#include "rpcUdp.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
|
@ -65,6 +66,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pSet = (SUdpConnSet *)malloc((size_t)size);
|
pSet = (SUdpConnSet *)malloc((size_t)size);
|
||||||
if (pSet == NULL) {
|
if (pSet == NULL) {
|
||||||
tError("%s failed to allocate UdpConn", label);
|
tError("%s failed to allocate UdpConn", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,30 +75,34 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pSet->port = port;
|
pSet->port = port;
|
||||||
pSet->shandle = shandle;
|
pSet->shandle = shandle;
|
||||||
pSet->fp = fp;
|
pSet->fp = fp;
|
||||||
|
pSet->threads = threads;
|
||||||
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
int i;
|
||||||
uint16_t ownPort;
|
uint16_t ownPort;
|
||||||
for (int i = 0; i < threads; ++i) {
|
for (i = 0; i < threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
ownPort = (port ? port + i : 0);
|
ownPort = (port ? port + i : 0);
|
||||||
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
||||||
if (pConn->fd < 0) {
|
if (pConn->fd < 0) {
|
||||||
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
|
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
|
||||||
taosCleanUpUdpConnection(pSet);
|
break;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
|
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
|
||||||
if (NULL == pConn->buffer) {
|
if (NULL == pConn->buffer) {
|
||||||
tError("%s failed to malloc recv buffer", label);
|
tError("%s failed to malloc recv buffer", label);
|
||||||
taosCleanUpUdpConnection(pSet);
|
break;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in sin;
|
struct sockaddr_in sin;
|
||||||
unsigned int addrlen = sizeof(sin);
|
unsigned int addrlen = sizeof(sin);
|
||||||
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
|
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
|
||||||
addrlen == sizeof(sin)) {
|
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
|
||||||
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,23 +113,22 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pConn->pSet = pSet;
|
pConn->pSet = pSet;
|
||||||
pConn->signature = pConn;
|
pConn->signature = pConn;
|
||||||
|
|
||||||
pthread_attr_t thAttr;
|
|
||||||
pthread_attr_init(&thAttr);
|
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||||
pthread_attr_destroy(&thAttr);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
|
||||||
taosCloseSocket(pConn->fd);
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thAttr);
|
||||||
|
|
||||||
|
if (i != threads) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosCleanUpUdpConnection(pSet);
|
taosCleanUpUdpConnection(pSet);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
++pSet->threads;
|
tTrace("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
|
||||||
}
|
|
||||||
|
|
||||||
tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads);
|
|
||||||
|
|
||||||
return pSet;
|
return pSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,16 +141,17 @@ void taosCleanUpUdpConnection(void *handle) {
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pConn->signature = NULL;
|
pConn->signature = NULL;
|
||||||
|
|
||||||
// shutdown to signal the thread to exit
|
// shutdown to signal the thread to exit
|
||||||
shutdown(pConn->fd, SHUT_RD);
|
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pthread_join(pConn->thread, NULL);
|
if (pConn->thread) pthread_join(pConn->thread, NULL);
|
||||||
free(pConn->buffer);
|
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
|
||||||
taosCloseSocket(pConn->fd);
|
tfree(pConn->buffer);
|
||||||
tTrace("chandle:%p is closed", pConn);
|
tTrace("UDP chandle:%p is closed", pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pSet);
|
tfree(pSet);
|
||||||
|
@ -159,7 +165,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
|
||||||
SUdpConn *pConn = pSet->udpConn + pSet->index;
|
SUdpConn *pConn = pSet->udpConn + pSet->index;
|
||||||
pConn->port = port;
|
pConn->port = port;
|
||||||
|
|
||||||
tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, pConn->localPort);
|
tTrace("%s UDP connection is setup, ip:%x:%hu", pConn->label, ip, port);
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue