Merge pull request #1595 from taosdata/enhance/wworker

remove possible memory leaks
This commit is contained in:
slguan 2020-04-12 23:08:43 +08:00 committed by GitHub
commit 3c38c5f993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 122 deletions

View File

@ -65,18 +65,18 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
pTcp->shandle = shandle; pTcp->shandle = shandle;
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) { if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno)); tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
return NULL; return NULL;
} }
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) { if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL; return NULL;
} }
pTcp->pollFd = epoll_create(10); // size does not matter pTcp->pollFd = epoll_create(10); // size does not matter
if (pTcp->pollFd < 0) { if (pTcp->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP client epoll", label);
return NULL; return NULL;
} }
@ -87,11 +87,11 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp)); int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno)); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
return NULL; return NULL;
} }
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port); tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
return pTcp; return pTcp;
} }
@ -181,18 +181,30 @@ int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void
return (int)send(pFdObj->fd, data, (size_t)len, 0); return (int)send(pFdObj->fd, data, (size_t)len, 0);
} }
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) { static void taosReportBrokenLink(STcpFd *pFdObj) {
STcpClient *pTcp;
SRecvInfo recvInfo; SRecvInfo recvInfo;
STcpClient *pTcp = pFdObj->pTcp;
if (pFdObj->thandle) {
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
(*(pTcp->processData))(&recvInfo);
}
}
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
pTcp = pFdObj->pTcp; pFdObj->signature = NULL;
if (pTcp == NULL) { STcpClient *pTcp = pFdObj->pTcp;
tError("double free TcpFdObj!!!!");
return;
}
epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL); epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd); close(pFdObj->fd);
@ -202,7 +214,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pTcp->numOfFds--; pTcp->numOfFds--;
if (pTcp->numOfFds < 0) if (pTcp->numOfFds < 0)
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj); tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj);
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
@ -216,19 +228,8 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
pthread_mutex_unlock(&pTcp->mutex); pthread_mutex_unlock(&pTcp->mutex);
recvInfo.msg = NULL; tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds);
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pTcp->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pTcp->processData))(&recvInfo);
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
memset(pFdObj, 0, sizeof(STcpFd));
tfree(pFdObj); tfree(pFdObj);
} }
@ -252,29 +253,29 @@ static void *taosReadTcpData(void *param) {
pFdObj = events[i].data.ptr; pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
tTrace("%s TCP error happened on FD\n", pTcp->label); tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle);
taosCleanUpTcpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
tTrace("%s TCP FD hang up\n", pTcp->label); tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle);
taosCleanUpTcpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d", pTcp->label, headLen); tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen);
taosCleanUpTcpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead); char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
if (NULL == buffer) { if (NULL == buffer) {
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen); tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen);
taosCleanUpTcpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
@ -283,9 +284,10 @@ static void *taosReadTcpData(void *param) {
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen); tError("%s %p, read error, leftLen:%d retLen:%d",
pTcp->label, pFdObj->thandle, leftLen, retLen);
tfree(buffer); tfree(buffer);
taosCleanUpTcpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }

View File

@ -817,7 +817,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn); tTrace("%s %p, link is broken", pRpc->label, pConn);
pConn->chandle = NULL; // pConn->chandle = NULL;
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;

View File

@ -65,88 +65,94 @@ static void taosProcessTcpData(void *param);
static void taosAcceptTcpConnection(void *arg); static void taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
int i;
SServerObj *pServerObj; SServerObj *pServerObj;
pthread_attr_t thattr;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
pServerObj = (SServerObj *)malloc(sizeof(SServerObj)); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
strcpy(pServerObj->ip, ip); strcpy(pServerObj->ip, ip);
pServerObj->port = port; pServerObj->port = port;
strcpy(pServerObj->label, label); strcpy(pServerObj->label, label);
pServerObj->numOfThreads = numOfThreads; pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj) * (size_t)numOfThreads); pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
free(pServerObj);
return NULL; return NULL;
} }
memset(pServerObj->pThreadObj, 0, sizeof(SThreadObj) * (size_t)numOfThreads);
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int code = 0;
pThreadObj = pServerObj->pThreadObj; pThreadObj = pServerObj->pThreadObj;
for (i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->processData = fp; pThreadObj->processData = fp;
strcpy(pThreadObj->label, label); strcpy(pThreadObj->label, label);
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
if (pthread_mutex_init(&(pThreadObj->threadMutex), NULL) < 0) { code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL);
tError("%s failed to init TCP process data mutex, reason:%s", label, strerror(errno)); if (code < 0) {
return NULL; tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
break;;
} }
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) { code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno)); if (code != 0) {
return NULL; tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
break;
} }
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
return NULL; code = -1;
break;
} }
if (pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj)) != 0) { pthread_attr_t thattr;
tError("%s failed to create TCP process data thread, reason:%s", label, strerror(errno)); pthread_attr_init(&thattr);
return NULL; pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj));
pthread_attr_destroy(&thattr);
if (code != 0) {
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
break;
} }
pThreadObj->threadId = i; pThreadObj->threadId = i;
pThreadObj++; pThreadObj++;
} }
if (pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)) != 0) { if (code == 0) {
tError("%s failed to create TCP accept thread, reason:%s", label, strerror(errno)); pthread_attr_t thattr;
return NULL; pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
pthread_attr_destroy(&thattr);
if (code != 0) {
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
}
} }
/* if (code != 0) {
if ( pthread_create(&(pServerObj->thread), &thattr, free(pServerObj->pThreadObj);
(void*)taosAcceptUDConnection, (void *)(pServerObj)) != 0 ) { free(pServerObj);
tError("%s failed to create UD accept thread, reason:%s", label, pServerObj = NULL;
strerror(errno)); } else {
return NULL; tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
} }
*/
pthread_attr_destroy(&thattr);
tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
return (void *)pServerObj; return (void *)pServerObj;
} }
void taosCleanUpTcpServer(void *handle) { void taosCleanUpTcpServer(void *handle) {
int i;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
SServerObj *pServerObj = (SServerObj *)handle; SServerObj *pServerObj = handle;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
pthread_cancel(pServerObj->thread); pthread_cancel(pServerObj->thread);
pthread_join(pServerObj->thread, NULL); pthread_join(pServerObj->thread, NULL);
for (i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
while (pThreadObj->pHead) { while (pThreadObj->pHead) {
@ -161,22 +167,21 @@ void taosCleanUpTcpServer(void *handle) {
pthread_mutex_destroy(&(pThreadObj->threadMutex)); pthread_mutex_destroy(&(pThreadObj->threadMutex));
} }
tfree(pServerObj->pThreadObj);
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
tfree(pServerObj->pThreadObj);
tfree(pServerObj); tfree(pServerObj);
} }
void taosCloseTcpServerConnection(void *chandle) { void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle; SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
taosCleanUpFdObj(pFdObj); taosCleanUpFdObj(pFdObj);
} }
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle; SFdObj *pFdObj = chandle;
if (chandle == NULL) return -1; if (chandle == NULL) return -1;
@ -185,6 +190,25 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void
#define maxEvents 10 #define maxEvents 10
static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) {
SRecvInfo recvInfo;
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo);
}
}
static void taosProcessTcpData(void *param) { static void taosProcessTcpData(void *param) {
SThreadObj * pThreadObj; SThreadObj * pThreadObj;
int i, fdNum; int i, fdNum;
@ -208,29 +232,29 @@ static void taosProcessTcpData(void *param) {
pFdObj = events[i].data.ptr; pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
tTrace("%s TCP thread:%d, error happened on FD", pThreadObj->label, pThreadObj->threadId); tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
taosCleanUpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
tTrace("%s TCP thread:%d, FD hang up", pThreadObj->label, pThreadObj->threadId); tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
taosCleanUpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno); tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
taosCleanUpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
char *buffer = malloc(msgLen + tsRpcOverhead); char *buffer = malloc(msgLen + tsRpcOverhead);
if ( NULL == buffer) { if ( NULL == buffer) {
tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen); tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
taosCleanUpFdObj(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
@ -239,8 +263,9 @@ static void taosProcessTcpData(void *param) {
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen); tError("%s %p, read error, leftLen:%d retLen:%d",
taosCleanUpFdObj(pFdObj); pThreadObj->label, pFdObj->thandle, leftLen, retLen);
taosReportBrokenLink(pFdObj);
tfree(buffer); tfree(buffer);
continue; continue;
} }
@ -278,10 +303,10 @@ static void taosAcceptTcpConnection(void *arg) {
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) { if (sockFd < 0) {
tError("%s failed to open TCP socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
return; return;
} else { } else {
tTrace("%s TCP server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
} }
while (1) { while (1) {
@ -289,11 +314,11 @@ static void taosAcceptTcpConnection(void *arg) {
connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 0) { if (connFd < 0) {
tError("%s TCP accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
continue; continue;
} }
tTrace("%s TCP connection from ip:%s port:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr), tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
htons(clientAddr.sin_port)); htons(clientAddr.sin_port));
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
@ -318,7 +343,7 @@ static void taosAcceptTcpConnection(void *arg) {
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
event.data.ptr = pFdObj; event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
tError("%s failed to add TCP FD for epoll, error:%s", pServerObj->label, strerror(errno)); tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno));
tfree(pFdObj); tfree(pFdObj);
close(connFd); close(connFd);
continue; continue;
@ -333,7 +358,7 @@ static void taosAcceptTcpConnection(void *arg) {
pthread_cond_signal(&pThreadObj->fdReady); pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->threadMutex)); pthread_mutex_unlock(&(pThreadObj->threadMutex));
tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds); pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
// pick up next thread for next connection // pick up next thread for next connection
@ -343,26 +368,23 @@ static void taosAcceptTcpConnection(void *arg) {
} }
static void taosCleanUpFdObj(SFdObj *pFdObj) { static void taosCleanUpFdObj(SFdObj *pFdObj) {
SThreadObj *pThreadObj;
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
pThreadObj = pFdObj->pThreadObj; pFdObj->signature = NULL;
if (pThreadObj == NULL) { SThreadObj *pThreadObj = pFdObj->pThreadObj;
tError("FdObj double clean up!!!");
return;
}
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd); close(pFdObj->fd);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
pthread_mutex_lock(&pThreadObj->threadMutex); pthread_mutex_lock(&pThreadObj->threadMutex);
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId); tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
// remove from the FdObject list // remove from the FdObject list
@ -378,23 +400,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->threadMutex); pthread_mutex_unlock(&pThreadObj->threadMutex);
// notify the upper layer, so it will clean the associated context tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
SRecvInfo recvInfo; pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
tfree(pFdObj); tfree(pFdObj);
} }

View File

@ -78,7 +78,6 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
static void taosProcessUdpBufTimer(void *param, void *tmrId); static void taosProcessUdpBufTimer(void *param, void *tmrId);
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
pthread_attr_t thAttr;
SUdpConn * pConn; SUdpConn * pConn;
SUdpConnSet * pSet; SUdpConnSet * pSet;
@ -106,9 +105,6 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
} }
} }
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
uint16_t ownPort; uint16_t ownPort;
for (int i = 0; i < threads; ++i) { for (int i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
@ -146,19 +142,21 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
pConn->tmrCtrl = pSet->tmrCtrl; pConn->tmrCtrl = pSet->tmrCtrl;
} }
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
pthread_attr_destroy(&thAttr);
if (code != 0) { if (code != 0) {
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
taosCloseSocket(pConn->fd); taosCloseSocket(pConn->fd);
taosCleanUpUdpConnection(pSet); taosCleanUpUdpConnection(pSet);
pthread_attr_destroy(&thAttr);
return NULL; return NULL;
} }
++pSet->threads; ++pSet->threads;
} }
pthread_attr_destroy(&thAttr);
tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads); tTrace("%s UDP connection is initialized, ip:%s port:%hu threads:%d", label, ip, port, threads);
return pSet; return pSet;

View File

@ -204,6 +204,8 @@ int main(int argc, char *argv[]) {
tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads); tPrint("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs*appThreads);
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
getchar();
taosCloseLogger(); taosCloseLogger();
return 0; return 0;