fix & enhancement for udp connection handling
1. potential data race in `taosProcessMonitorTimer`: the issue does not exist at present because there's only one scheduler thread, which means there's no cocurrent calls to this function for a same `pMonitor`. but if more scheduler threads are created later, there's a data race issue in rare case. as threads number can be easily increased by increase the value of `taosTmrThreads`, it is very unlikely that the developer could realize this function need to be revised together. that's why i say it is a 'potential' issue. this issue happens in below scenario: a. scheduler thread1: `if (pSet)` is true and new timer is installed by `taosTmrReset`; b. scheduler thread1: `if (pMonitor->pSet == NULL)` is true but `taosTmrStopA` is blocked, either by the mutex of the timer or os thread scheduler. c. timer thread: 200ms elapse, new call to `taosProcessMonitorTimer` is initialized in scheduler thread2; d. scheduler thread2: `if (pMonitor->pTimer != tmrId)` is false; e. scheduler thread1: unblocked, stops timer and frees `pMonitor`; f. scheduler thread2: unexpected behavior because `pMonitor` is not valid any more. because the result of this issue is crash or worse, i suggest to fix it though the possibility of all the conditions are met is very very low. 2. `pthread_attr_t` related issues: per manual, an initialized `pthread_attr_t` can be reused, should be destroyed, and the behavior of re-init it is undefined. this issue exist in other modules also. 3. memory leaks; 4. improve failure case handling of `taosInitUdpConnection`; 5. typo
This commit is contained in:
parent
22aa8b08a7
commit
01bb110b45
|
@ -137,9 +137,7 @@ void taosProcessMonitorTimer(void *param, void *tmrId) {
|
||||||
tTrace("%s monitor timer is expired, update the link status", pSet->label);
|
tTrace("%s monitor timer is expired, update the link status", pSet->label);
|
||||||
(*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL);
|
(*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL);
|
||||||
taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
|
taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
|
||||||
}
|
} else {
|
||||||
|
|
||||||
if (pMonitor->pSet == NULL) {
|
|
||||||
taosTmrStopA(&pMonitor->pTimer);
|
taosTmrStopA(&pMonitor->pTimer);
|
||||||
free(pMonitor);
|
free(pMonitor);
|
||||||
}
|
}
|
||||||
|
@ -181,6 +179,7 @@ void *taosReadTcpData(void *argv) {
|
||||||
|
|
||||||
if (retLen != pInfo->msgLen) {
|
if (retLen != pInfo->msgLen) {
|
||||||
tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen);
|
tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen);
|
||||||
|
free(buffer);
|
||||||
} else {
|
} else {
|
||||||
(*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn);
|
(*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn);
|
||||||
}
|
}
|
||||||
|
@ -214,9 +213,11 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) {
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||||
code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor);
|
code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tTrace("%s faile to create thread to read tcp data, reason:%s", pSet->label, strerror(errno));
|
tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno));
|
||||||
|
pMonitor->pSet = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,6 +403,9 @@ void *taosUdpTcpConnection(void *argv) {
|
||||||
|
|
||||||
tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port);
|
tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port);
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pSet->tcpFd < 0) break;
|
if (pSet->tcpFd < 0) break;
|
||||||
socklen_t addrlen = sizeof(clientAddr);
|
socklen_t addrlen = sizeof(clientAddr);
|
||||||
|
@ -422,14 +426,14 @@ void *taosUdpTcpConnection(void *argv) {
|
||||||
pTransfer->port = clientAddr.sin_port;
|
pTransfer->port = clientAddr.sin_port;
|
||||||
pTransfer->pSet = pSet;
|
pTransfer->pSet = pSet;
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
|
||||||
if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) {
|
if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) {
|
||||||
tTrace("%s faile to create thread for UDP server, reason:%s", pSet->label, strerror(errno));
|
tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno));
|
||||||
|
free(pTransfer);
|
||||||
taosCloseSocket(connFd);
|
taosCloseSocket(connFd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -448,7 +452,6 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
||||||
memset(pSet, 0, (size_t)size);
|
memset(pSet, 0, (size_t)size);
|
||||||
strcpy(pSet->ip, ip);
|
strcpy(pSet->ip, ip);
|
||||||
pSet->port = port;
|
pSet->port = port;
|
||||||
pSet->threads = threads;
|
|
||||||
pSet->shandle = shandle;
|
pSet->shandle = shandle;
|
||||||
pSet->fp = fp;
|
pSet->fp = fp;
|
||||||
pSet->tcpFd = -1;
|
pSet->tcpFd = -1;
|
||||||
|
@ -458,8 +461,16 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
||||||
char udplabel[12];
|
char udplabel[12];
|
||||||
sprintf(udplabel, "%s.b", label);
|
sprintf(udplabel, "%s.b", label);
|
||||||
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
|
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
|
||||||
|
if (pSet->tmrCtrl == NULL) {
|
||||||
|
tError("%s failed to initialize tmrCtrl")
|
||||||
|
taosCleanUpUdpConnection(pSet);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
short ownPort;
|
short ownPort;
|
||||||
for (int i = 0; i < threads; ++i) {
|
for (int i = 0; i < threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
|
@ -467,6 +478,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
||||||
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
||||||
if (pConn->fd < 0) {
|
if (pConn->fd < 0) {
|
||||||
tError("%s failed to open UDP socket %s:%d", label, ip, port);
|
tError("%s failed to open UDP socket %s:%d", label, ip, port);
|
||||||
|
taosCleanUpUdpConnection(pSet);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,11 +489,10 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
||||||
pConn->localPort = (int16_t)ntohs(sin.sin_port);
|
pConn->localPort = (int16_t)ntohs(sin.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_init(&thAttr);
|
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
|
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
|
||||||
close(pConn->fd);
|
|
||||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||||
|
taosCloseSocket(pConn->fd);
|
||||||
|
taosCleanUpUdpConnection(pSet);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -496,6 +507,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
||||||
pthread_mutex_init(&pConn->mutex, NULL);
|
pthread_mutex_init(&pConn->mutex, NULL);
|
||||||
pConn->tmrCtrl = pSet->tmrCtrl;
|
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||||
}
|
}
|
||||||
|
++pSet->threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_destroy(&thAttr);
|
pthread_attr_destroy(&thAttr);
|
||||||
|
@ -520,6 +532,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp
|
||||||
// pthread_t thread;
|
// pthread_t thread;
|
||||||
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
|
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
|
||||||
pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet);
|
pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet);
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
|
|
||||||
return pSet;
|
return pSet;
|
||||||
}
|
}
|
||||||
|
@ -540,13 +553,16 @@ void taosCleanUpUdpConnection(void *handle) {
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pConn->signature = NULL;
|
pConn->signature = NULL;
|
||||||
|
pthread_cancel(pConn->thread);
|
||||||
taosCloseSocket(pConn->fd);
|
taosCloseSocket(pConn->fd);
|
||||||
if (pConn->hash) {
|
if (pConn->hash) {
|
||||||
taosCloseIpHash(pConn->hash);
|
taosCloseIpHash(pConn->hash);
|
||||||
pthread_mutex_destroy(&pConn->mutex);
|
pthread_mutex_destroy(&pConn->mutex);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pthread_cancel(pConn->thread);
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
|
pConn = pSet->udpConn + i;
|
||||||
pthread_join(pConn->thread, NULL);
|
pthread_join(pConn->thread, NULL);
|
||||||
tTrace("chandle:%p is closed", pConn);
|
tTrace("chandle:%p is closed", pConn);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue