remove possible memory leaks
This commit is contained in:
parent
e4956e3f03
commit
50d7e6876d
|
@ -65,79 +65,85 @@ static void taosProcessTcpData(void *param);
|
||||||
static void taosAcceptTcpConnection(void *arg);
|
static void taosAcceptTcpConnection(void *arg);
|
||||||
|
|
||||||
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||||
int i;
|
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
pthread_attr_t thattr;
|
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)malloc(sizeof(SServerObj));
|
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
||||||
strcpy(pServerObj->ip, ip);
|
strcpy(pServerObj->ip, ip);
|
||||||
pServerObj->port = port;
|
pServerObj->port = port;
|
||||||
strcpy(pServerObj->label, label);
|
strcpy(pServerObj->label, label);
|
||||||
pServerObj->numOfThreads = numOfThreads;
|
pServerObj->numOfThreads = numOfThreads;
|
||||||
|
|
||||||
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads);
|
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
||||||
if (pServerObj->pThreadObj == NULL) {
|
if (pServerObj->pThreadObj == NULL) {
|
||||||
tError("TCP:%s no enough memory", label);
|
tError("TCP:%s no enough memory", label);
|
||||||
|
free(pServerObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
|
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
|
|
||||||
|
int code = 0;
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
for (i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
strcpy(pThreadObj->label, label);
|
strcpy(pThreadObj->label, label);
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
|
||||||
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) {
|
code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL);
|
||||||
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno));
|
if (code < 0) {
|
||||||
return NULL;
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
|
break;;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
|
code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
|
||||||
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
|
if (code != 0) {
|
||||||
return NULL;
|
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP epoll", label);
|
tError("%s failed to create TCP epoll", label);
|
||||||
return NULL;
|
code = -1;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) {
|
pthread_attr_t thattr;
|
||||||
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno));
|
pthread_attr_init(&thattr);
|
||||||
return NULL;
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj));
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->threadId = i;
|
pThreadObj->threadId = i;
|
||||||
pThreadObj++;
|
pThreadObj++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) {
|
if (code == 0) {
|
||||||
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno));
|
pthread_attr_t thattr;
|
||||||
return NULL;
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
if (code != 0) {
|
||||||
if ( pthread_create(&(pServerObj->thread), &thattr,
|
free(pServerObj->pThreadObj);
|
||||||
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) {
|
free(pServerObj);
|
||||||
tError("%s failed to create UD accept thread, reason:%s", label,
|
pServerObj = NULL;
|
||||||
strerror(errno));
|
} else {
|
||||||
return NULL;
|
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
pthread_attr_destroy(&thattr);
|
|
||||||
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
|
|
||||||
|
|
||||||
return (void *)pServerObj;
|
return (void *)pServerObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCleanUpTcpServer(void *handle) {
|
void taosCleanUpTcpServer(void *handle) {
|
||||||
int i;
|
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
SServerObj *pServerObj = (SServerObj *)handle;
|
SServerObj *pServerObj = (SServerObj *)handle;
|
||||||
|
|
||||||
|
@ -146,7 +152,7 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
pthread_cancel(pServerObj->thread);
|
pthread_cancel(pServerObj->thread);
|
||||||
pthread_join(pServerObj->thread, NULL);
|
pthread_join(pServerObj->thread, NULL);
|
||||||
|
|
||||||
for (i = 0; i < pServerObj->numOfThreads; ++i) {
|
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
||||||
pThreadObj = pServerObj->pThreadObj + i;
|
pThreadObj = pServerObj->pThreadObj + i;
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
while (pThreadObj->pHead) {
|
||||||
|
@ -161,9 +167,9 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
pthread_mutex_destroy(&(pThreadObj->threadMutex));
|
pthread_mutex_destroy(&(pThreadObj->threadMutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pServerObj->pThreadObj);
|
|
||||||
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
|
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
|
||||||
|
|
||||||
|
tfree(pServerObj->pThreadObj);
|
||||||
tfree(pServerObj);
|
tfree(pServerObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,10 +284,10 @@ static void taosAcceptTcpConnection(void *arg) {
|
||||||
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
|
||||||
if (sockFd < 0) {
|
if (sockFd < 0) {
|
||||||
tError("%s failed to open TCP socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s TCP server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -289,11 +295,11 @@ static void taosAcceptTcpConnection(void *arg) {
|
||||||
connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
|
connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
|
||||||
|
|
||||||
if (connFd < 0) {
|
if (connFd < 0) {
|
||||||
tError("%s TCP accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno));
|
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("%s TCP connection from ip:%s port:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
|
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
|
||||||
htons(clientAddr.sin_port));
|
htons(clientAddr.sin_port));
|
||||||
taosKeepTcpAlive(connFd);
|
taosKeepTcpAlive(connFd);
|
||||||
|
|
||||||
|
@ -318,7 +324,7 @@ static void taosAcceptTcpConnection(void *arg) {
|
||||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
||||||
event.data.ptr = pFdObj;
|
event.data.ptr = pFdObj;
|
||||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
||||||
tError("%s failed to add TCP FD for epoll, error:%s", pServerObj->label, strerror(errno));
|
tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno));
|
||||||
tfree(pFdObj);
|
tfree(pFdObj);
|
||||||
close(connFd);
|
close(connFd);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -78,7 +78,6 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
|
||||||
static void taosProcessUdpBufTimer(void *param, void *tmrId);
|
static void taosProcessUdpBufTimer(void *param, void *tmrId);
|
||||||
|
|
||||||
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
|
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
|
||||||
pthread_attr_t thAttr;
|
|
||||||
SUdpConn * pConn;
|
SUdpConn * pConn;
|
||||||
SUdpConnSet * pSet;
|
SUdpConnSet * pSet;
|
||||||
|
|
||||||
|
@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_init(&thAttr);
|
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
|
|
||||||
uint16_t ownPort;
|
uint16_t ownPort;
|
||||||
for (int i = 0; i < threads; ++i) {
|
for (int i = 0; i < threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
|
@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
|
||||||
pConn->tmrCtrl = pSet->tmrCtrl;
|
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||||
|
pthread_attr_destroy(&thAttr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||||
taosCloseSocket(pConn->fd);
|
taosCloseSocket(pConn->fd);
|
||||||
taosCleanUpUdpConnection(pSet);
|
taosCleanUpUdpConnection(pSet);
|
||||||
pthread_attr_destroy(&thAttr);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
++pSet->threads;
|
++pSet->threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_destroy(&thAttr);
|
|
||||||
tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads);
|
tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads);
|
||||||
|
|
||||||
return pSet;
|
return pSet;
|
||||||
|
|
Loading…
Reference in New Issue