From 01bb110b45df734ba16e4258580936a4ae7b52ae Mon Sep 17 00:00:00 2001 From: localvar Date: Sun, 18 Aug 2019 11:28:38 +0800 Subject: [PATCH 1/4] fix & enhancement for udp connection handling 1. potential data race in `taosProcessMonitorTimer`: the issue does not exist at present because there's only one scheduler thread, which means there's no cocurrent calls to this function for a same `pMonitor`. but if more scheduler threads are created later, there's a data race issue in rare case. as threads number can be easily increased by increase the value of `taosTmrThreads`, it is very unlikely that the developer could realize this function need to be revised together. that's why i say it is a 'potential' issue. this issue happens in below scenario: a. scheduler thread1: `if (pSet)` is true and new timer is installed by `taosTmrReset`; b. scheduler thread1: `if (pMonitor->pSet == NULL)` is true but `taosTmrStopA` is blocked, either by the mutex of the timer or os thread scheduler. c. timer thread: 200ms elapse, new call to `taosProcessMonitorTimer` is initialized in scheduler thread2; d. scheduler thread2: `if (pMonitor->pTimer != tmrId)` is false; e. scheduler thread1: unblocked, stops timer and frees `pMonitor`; f. scheduler thread2: unexpected behavior because `pMonitor` is not valid any more. because the result of this issue is crash or worse, i suggest to fix it though the possibility of all the conditions are met is very very low. 2. `pthread_attr_t` related issues: per manual, an initialized `pthread_attr_t` can be reused, should be destroyed, and the behavior of re-init it is undefined. this issue exist in other modules also. 3. memory leaks; 4. improve failure case handling of `taosInitUdpConnection`; 5. typo --- src/rpc/src/tudp.c | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/rpc/src/tudp.c b/src/rpc/src/tudp.c index 4b7d4c97fb..b79f40019e 100644 --- a/src/rpc/src/tudp.c +++ b/src/rpc/src/tudp.c @@ -137,9 +137,7 @@ void taosProcessMonitorTimer(void *param, void *tmrId) { tTrace("%s monitor timer is expired, update the link status", pSet->label); (*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL); taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer); - } - - if (pMonitor->pSet == NULL) { + } else { taosTmrStopA(&pMonitor->pTimer); free(pMonitor); } @@ -181,6 +179,7 @@ void *taosReadTcpData(void *argv) { if (retLen != pInfo->msgLen) { tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen); + free(buffer); } else { (*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn); } @@ -214,9 +213,11 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor); if (code < 0) { - tTrace("%s faile to create thread to read tcp data, reason:%s", pSet->label, strerror(errno)); + tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno)); + pMonitor->pSet = NULL; } + pthread_attr_destroy(&thattr); return code; } @@ -402,6 +403,9 @@ void *taosUdpTcpConnection(void *argv) { tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port); + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + while (1) { if (pSet->tcpFd < 0) break; socklen_t addrlen = sizeof(clientAddr); @@ -422,14 +426,14 @@ void *taosUdpTcpConnection(void *argv) { pTransfer->port = clientAddr.sin_port; pTransfer->pSet = pSet; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) { - tTrace("%s faile to create thread for UDP server, reason:%s", pSet->label, strerror(errno)); + tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno)); + free(pTransfer); taosCloseSocket(connFd); } } + pthread_attr_destroy(&thattr); return NULL; } @@ -448,7 +452,6 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void memset(pSet, 0, (size_t)size); strcpy(pSet->ip, ip); pSet->port = port; - pSet->threads = threads; pSet->shandle = shandle; pSet->fp = fp; pSet->tcpFd = -1; @@ -458,8 +461,16 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void char udplabel[12]; sprintf(udplabel, "%s.b", label); pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel); + if (pSet->tmrCtrl == NULL) { + tError("%s failed to initialize tmrCtrl") + taosCleanUpUdpConnection(pSet); + return NULL; + } // } + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + short ownPort; for (int i = 0; i < threads; ++i) { pConn = pSet->udpConn + i; @@ -467,6 +478,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pConn->fd = taosOpenUdpSocket(ip, ownPort); if (pConn->fd < 0) { tError("%s failed to open UDP socket %s:%d", label, ip, port); + taosCleanUpUdpConnection(pSet); return NULL; } @@ -477,11 +489,10 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pConn->localPort = (int16_t)ntohs(sin.sin_port); } - pthread_attr_init(&thAttr); - pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) { - close(pConn->fd); tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno)); + taosCloseSocket(pConn->fd); + taosCleanUpUdpConnection(pSet); return NULL; } @@ -496,6 +507,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void pthread_mutex_init(&pConn->mutex, NULL); pConn->tmrCtrl = pSet->tmrCtrl; } + ++pSet->threads; } pthread_attr_destroy(&thAttr); @@ -520,6 +532,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp // pthread_t thread; // pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet); pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet); + pthread_attr_destroy(&thattr); return pSet; } @@ -540,13 +553,16 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; pConn->signature = NULL; + pthread_cancel(pConn->thread); taosCloseSocket(pConn->fd); if (pConn->hash) { taosCloseIpHash(pConn->hash); pthread_mutex_destroy(&pConn->mutex); } + } - pthread_cancel(pConn->thread); + for (int i = 0; i < pSet->threads; ++i) { + pConn = pSet->udpConn + i; pthread_join(pConn->thread, NULL); tTrace("chandle:%p is closed", pConn); } From 36d85a600fd760f6387858ec105563451eeafd99 Mon Sep 17 00:00:00 2001 From: johnnyhou327 <31071594+johnnyhou327@users.noreply.github.com> Date: Sat, 24 Aug 2019 17:45:21 +0800 Subject: [PATCH 2/4] Update issue templates --- .github/ISSUE_TEMPLATE/bug-report.md | 2 +- .github/ISSUE_TEMPLATE/feature-request.md | 2 +- .github/ISSUE_TEMPLATE/general-questions.md | 2 +- .github/ISSUE_TEMPLATE/performance-related-questions.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md index e39a1b9da7..52f988c2cf 100644 --- a/.github/ISSUE_TEMPLATE/bug-report.md +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -2,7 +2,7 @@ name: Bug Report about: Create a report to help us improve title: '' -labels: '' +labels: bug assignees: '' --- diff --git a/.github/ISSUE_TEMPLATE/feature-request.md b/.github/ISSUE_TEMPLATE/feature-request.md index 36154b7dd6..b004aac4a9 100644 --- a/.github/ISSUE_TEMPLATE/feature-request.md +++ b/.github/ISSUE_TEMPLATE/feature-request.md @@ -2,7 +2,7 @@ name: Feature Request about: Suggest an idea for this project title: '' -labels: '' +labels: enhancement assignees: '' --- diff --git a/.github/ISSUE_TEMPLATE/general-questions.md b/.github/ISSUE_TEMPLATE/general-questions.md index 9992883c4b..314bb47483 100644 --- a/.github/ISSUE_TEMPLATE/general-questions.md +++ b/.github/ISSUE_TEMPLATE/general-questions.md @@ -2,7 +2,7 @@ name: General Questions about: General questions about TDengine's usage, user experiences, milestones etc. title: '' -labels: '' +labels: help wanted, question assignees: '' --- diff --git a/.github/ISSUE_TEMPLATE/performance-related-questions.md b/.github/ISSUE_TEMPLATE/performance-related-questions.md index ed100d4ea0..b36f7c6dc3 100644 --- a/.github/ISSUE_TEMPLATE/performance-related-questions.md +++ b/.github/ISSUE_TEMPLATE/performance-related-questions.md @@ -2,7 +2,7 @@ name: Performance-related Questions about: Any questions related to TDengine's performance. title: '' -labels: '' +labels: performance assignees: '' --- From a9ad56780f9957ff1f4dc58fc4cbbb45974f4f97 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 24 Aug 2019 18:02:11 +0800 Subject: [PATCH 3/4] fix issue #418 --- src/rpc/src/trpc.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index b380ead968..3798718043 100644 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -118,6 +118,7 @@ typedef struct rpc_server { int taosDebugFlag = 131; int tsRpcTimer = 300; int tsRpcMaxTime = 600; // seconds; +int tsRpcProgressTime = 10; // milliseocnds // not configurable int tsRpcMaxRetry; @@ -294,7 +295,7 @@ int taosSendQuickRsp(void *thandle, char rsptype, char code) { void *taosOpenRpc(SRpcInit *pRpc) { STaosRpc *pServer; - tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcTimer; + tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime; tsRpcHeadSize = sizeof(STaosHeader) + sizeof(SMsgNode); pServer = (STaosRpc *)malloc(sizeof(STaosRpc)); @@ -896,7 +897,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p", pServer->label, chann, sid, pHeader->meterId, pConn); pConn->tretry++; - taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer); + taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer); code = TSDB_CODE_ALREADY_PROCESSED; goto _exit; } else { From b61efa3e2b230fb437cd7c0ad1e8513d1e0d298f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Aug 2019 09:49:31 +0800 Subject: [PATCH 4/4] fix twrite problem --- src/os/linux/src/os.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/os/linux/src/os.c b/src/os/linux/src/os.c index e76f15e9ff..41dee25659 100644 --- a/src/os/linux/src/os.c +++ b/src/os/linux/src/os.c @@ -276,12 +276,12 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { } ssize_t twrite(int fd, void *buf, size_t n) { - size_t nleft, nwritten; - - nleft = n; + size_t nleft = n; + ssize_t nwritten = 0; + char *tbuf = (char *)buf while (nleft > 0) { - nwritten = write(fd, buf, nleft); + nwritten = write(fd, (void *)tbuf, nleft); if (nwritten < 0) { if (errno == EINTR) { continue; @@ -289,7 +289,7 @@ ssize_t twrite(int fd, void *buf, size_t n) { return -1; } nleft -= nwritten; - buf += nwritten; + tbuf += nwritten; } return n;