From 0e5cc6e46ad71bd94577bf9fe9aae9fc66c49b0b Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:40:56 +0000 Subject: [PATCH 1/6] TD-1645 --- src/rpc/src/rpcTcp.c | 51 +++++++++++++++++++++++++++++--------------- src/rpc/src/rpcUdp.c | 12 ++++++++--- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2a3facdb36..97a3dad1eb 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -62,7 +62,7 @@ typedef struct { char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; - SThreadObj *pThreadObj; + SThreadObj **pThreadObj; pthread_t thread; } SServerObj; @@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); + pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); @@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); // initialize parameters in case it may encounter error later - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pServerObj->pThreadObj); + free(pServerObj); + return NULL; + } + + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; - pThreadObj++; } // initialize mutex, thread, fd which may fail - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = pServerObj->pThreadObj[i]; code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); @@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->threadId = i; - pThreadObj++; } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); @@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; + if (pThreadObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + return; + } + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed @@ -184,14 +197,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); if (fd != -1) taosCloseSocket(fd); - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); - } } void taosStopTcpServer(void *handle) { @@ -210,7 +216,7 @@ void taosCleanUpTcpServer(void *handle) { if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj + i; + pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); pthread_mutex_destroy(&(pThreadObj->mutex)); } @@ -249,7 +255,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; + pThreadObj = pServerObj->pThreadObj[threadId]; SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { @@ -329,8 +335,6 @@ void taosCleanUpTcpClient(void *chandle) { taosStopTcpThread(pThreadObj); tDebug ("%s TCP client is cleaned up", pThreadObj->label); - - taosTFree(pThreadObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -503,8 +507,21 @@ static void *taosProcessTcpData(void *param) { pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } + + if (pThreadObj->stop) break; } + if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); + } + + tDebug("%s TCP thread exits ...", pThreadObj->label); + taosTFree(pThreadObj); + return NULL; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9..250c4c38f2 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,15 +140,18 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); + pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - pthread_join(pConn->thread, NULL); + if (pConn->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pConn->thread, NULL); + } } - taosTFree(pConn->buffer); - // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -230,6 +233,9 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } + taosTFree(pConn->buffer); + tDebug("%s UDP recv thread exits", pConn->label); + return NULL; } From 30284cbe59f37a234c8ad61fe7c16f49c1648bf8 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:48:10 +0000 Subject: [PATCH 2/6] TD-1645 --- src/sync/src/taosTcpPool.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index eda822b1ec..539cfb64da 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { continue; } } + } + + if (pThread->stop) break; } uDebug("%p TCP epoll thread exits", pThread); From f949b732f5051ad95893c5ea994907362e9dcb5f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Thu, 1 Oct 2020 14:55:00 +0000 Subject: [PATCH 3/6] TD-1645 --- src/rpc/src/rpcTcp.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 97a3dad1eb..2119824b3d 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -205,7 +205,14 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); - if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL); + + if (taosCheckPthreadValid(pServerObj->thread)) { + if (pServerObj->thread == pthread_self()) { + pthread_detach(pthread_self()); + } else { + pthread_join(pServerObj->thread, NULL); + } + } tDebug("%s TCP server is stopped", pServerObj->label); } From 38dc4f4d161f23653a2e69b199637a6f8f4723c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 2 Oct 2020 13:31:15 +0800 Subject: [PATCH 4/6] TD-1645 fix compile error in windotws --- src/rpc/src/rpcTcp.c | 4 ++-- src/rpc/src/rpcUdp.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2119824b3d..aacdede543 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -174,7 +174,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; - if (pThreadObj->thread == pthread_self()) { + if (taosComparePthread(pThreadObj->thread, pthread_self())) { pthread_detach(pthread_self()); return; } @@ -207,7 +207,7 @@ void taosStopTcpServer(void *handle) { if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if (taosCheckPthreadValid(pServerObj->thread)) { - if (pServerObj->thread == pthread_self()) { + if (taosComparePthread(pServerObj->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pServerObj->thread, NULL); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 250c4c38f2..4fd0318ae6 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -146,7 +146,7 @@ void taosStopUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (pConn->thread == pthread_self()) { + if (taosComparePthread(pConn->thread, pthread_self())) { pthread_detach(pthread_self()); } else { pthread_join(pConn->thread, NULL); From a8b8ecb20e38e23a59b99924e0c701adda458084 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 2 Oct 2020 14:56:54 +0000 Subject: [PATCH 5/6] TD-1645 --- src/connector/go | 2 +- src/rpc/src/rpcTcp.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index aacdede543..46555b3647 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,7 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); if (fd != -1) taosCloseSocket(fd); } From c608fba88ed0c69d19551bb64de77994f8d971e3 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 3 Oct 2020 04:30:02 +0000 Subject: [PATCH 6/6] TD-1645 --- src/rpc/src/rpcTcp.c | 7 +++++-- src/rpc/src/rpcUdp.c | 12 +++--------- src/sync/src/taosTcpPool.c | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 46555b3647..06b5d39d61 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -196,7 +196,10 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) pthread_join(pThreadObj->thread, NULL); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { + pthread_join(pThreadObj->thread, NULL); + } + if (fd != -1) taosCloseSocket(fd); } @@ -225,7 +228,6 @@ void taosCleanUpTcpServer(void *handle) { for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); - pthread_mutex_destroy(&(pThreadObj->mutex)); } tDebug("%s TCP server is cleaned up", pServerObj->label); @@ -526,6 +528,7 @@ static void *taosProcessTcpData(void *param) { taosFreeFdObj(pFdObj); } + pthread_mutex_destroy(&(pThreadObj->mutex)); tDebug("%s TCP thread exits ...", pThreadObj->label); taosTFree(pThreadObj); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4fd0318ae6..4ea47582b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -140,18 +140,15 @@ void taosStopUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); - pConn->fd = -1; } for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; if (taosCheckPthreadValid(pConn->thread)) { - if (taosComparePthread(pConn->thread, pthread_self())) { - pthread_detach(pthread_self()); - } else { - pthread_join(pConn->thread, NULL); - } + pthread_join(pConn->thread, NULL); } + taosTFree(pConn->buffer); + // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } tDebug("%s UDP is stopped", pSet->label); @@ -233,9 +230,6 @@ static void *taosRecvUdpData(void *param) { (*(pConn->processData))(&recvInfo); } - taosTFree(pConn->buffer); - tDebug("%s UDP recv thread exits", pConn->label); - return NULL; } diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index 539cfb64da..6a210a136f 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -324,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { } pthread_join(thread, NULL); - taosClose(fd); + if (fd >= 0) taosClose(fd); }