diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 40fab00056..6ef8fc2d92 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -21,9 +21,11 @@ extern "C" { #endif void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); +void taosStopTcpServer(void *param); void taosCleanUpTcpServer(void *param); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); +void taosStopTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle); void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); diff --git a/src/rpc/inc/rpcUdp.h b/src/rpc/inc/rpcUdp.h index fd60f4a089..c1da6a9240 100644 --- a/src/rpc/inc/rpcUdp.h +++ b/src/rpc/inc/rpcUdp.h @@ -23,6 +23,7 @@ extern "C" { #include "taosdef.h" void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int, void *fp, void *shandle); +void taosStopUdpConnection(void *handle); void taosCleanUpUdpConnection(void *handle); int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle); void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 007a511adf..f48d207d7b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -153,6 +153,13 @@ void (*taosCleanUpConn[])(void *thandle) = { taosCleanUpTcpClient }; +void (*taosStopConn[])(void *thandle) = { + taosStopUdpConnection, + taosStopUdpConnection, + taosStopTcpServer, + taosStopTcpClient, +}; + int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { taosSendUdpData, taosSendUdpData, @@ -289,12 +296,18 @@ void *rpcOpen(const SRpcInit *pInit) { void rpcClose(void *param) { SRpcInfo *pRpc = (SRpcInfo *)param; + // stop connection to outside first + (*taosStopConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); + (*taosStopConn[pRpc->connType])(pRpc->udphandle); + + // close all connections for (int i = 0; i < pRpc->sessions; ++i) { if (pRpc->connList && pRpc->connList[i].user[0]) { rpcCloseConn((void *)(pRpc->connList + i)); } } + // clean up (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); @@ -588,6 +601,7 @@ static void rpcReleaseConn(SRpcConn *pConn) { pConn->inTranId = 0; pConn->outTranId = 0; pConn->secured = 0; + pConn->peerId = 0; pConn->peerIp = 0; pConn->peerPort = 0; pConn->pReqMsg = NULL; @@ -627,6 +641,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { pConn->spi = pRpc->spi; pConn->encrypt = pRpc->encrypt; if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN); + tTrace("%s %p client connection is allocated", pRpc->label, pConn); } return pConn; @@ -681,6 +696,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); + tTrace("%s %p server connection is allocated", pRpc->label, pConn); } return pConn; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 5d156492c7..aa94accceb 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -190,22 +190,28 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - -void taosCleanUpTcpServer(void *handle) { +void taosStopTcpServer(void *handle) { SServerObj *pServerObj = handle; - SThreadObj *pThreadObj; if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); if(pServerObj->thread) pthread_join(pServerObj->thread, NULL); + tTrace("%s TCP server is stopped", pServerObj->label); +} + +void taosCleanUpTcpServer(void *handle) { + SServerObj *pServerObj = handle; + SThreadObj *pThreadObj; + if (pServerObj == NULL) return; + for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; taosStopTcpThread(pThreadObj); pthread_mutex_destroy(&(pThreadObj->mutex)); } - tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label); + tTrace("%s TCP server is cleaned up", pServerObj->label); tfree(pServerObj->pThreadObj); tfree(pServerObj); @@ -226,7 +232,7 @@ static void *taosAcceptTcpConnection(void *arg) { connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); if (connFd == -1) { if (errno == EINVAL) { - tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); + tTrace("%s TCP server stop accepting new connections, exiting", pServerObj->label); break; } @@ -304,12 +310,19 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * return pThreadObj; } +void taosStopTcpClient(void *chandle) { + SThreadObj *pThreadObj = chandle; + if (pThreadObj == NULL) return; + + tTrace ("%s TCP client is stopped", pThreadObj->label); +} + void taosCleanUpTcpClient(void *chandle) { SThreadObj *pThreadObj = chandle; if (pThreadObj == NULL) return; taosStopTcpThread(pThreadObj); - tTrace ("%s, all connections are cleaned up", pThreadObj->label); + tTrace ("%s TCP client is cleaned up", pThreadObj->label); tfree(pThreadObj); } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 41446f87fb..f88db3a226 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -30,7 +30,6 @@ #define RPC_MAX_UDP_SIZE 65480 typedef struct { - void *signature; int index; int fd; uint16_t port; // peer port @@ -111,7 +110,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads pConn->processData = fp; pConn->index = i; pConn->pSet = pSet; - pConn->signature = pConn; int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn); if (code != 0) { @@ -132,7 +130,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads return pSet; } -void taosCleanUpUdpConnection(void *handle) { +void taosStopUdpConnection(void *handle) { SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConn *pConn; @@ -140,8 +138,6 @@ void taosCleanUpUdpConnection(void *handle) { for (int i = 0; i < pSet->threads; ++i) { pConn = pSet->udpConn + i; - pConn->signature = NULL; - if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >=0) taosCloseSocket(pConn->fd); } @@ -150,9 +146,24 @@ void taosCleanUpUdpConnection(void *handle) { pConn = pSet->udpConn + i; if (pConn->thread) pthread_join(pConn->thread, NULL); tfree(pConn->buffer); - tTrace("%s UDP thread is closed, inedx:%d", pConn->label, i); + // tTrace("%s UDP thread is closed, index:%d", pConn->label, i); } + tTrace("%s UDP is stopped", pSet->label); +} + +void taosCleanUpUdpConnection(void *handle) { + SUdpConnSet *pSet = (SUdpConnSet *)handle; + SUdpConn *pConn; + + if (pSet == NULL) return; + + for (int i = 0; i < pSet->threads; ++i) { + pConn = pSet->udpConn + i; + if (pConn->fd >=0) taosCloseSocket(pConn->fd); + } + + tTrace("%s UDP is cleaned up", pSet->label); tfree(pSet); } @@ -185,7 +196,7 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); if(dataLen <= 0) { - tTrace("%s UDP socket was closed, exiting", pConn->label); + tTrace("%s UDP socket was closed, exiting(%s)", pConn->label, strerror(errno)); break; } @@ -221,7 +232,7 @@ static void *taosRecvUdpData(void *param) { int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) { SUdpConn *pConn = (SUdpConn *)chandle; - if (pConn == NULL || pConn->signature != pConn) return -1; + if (pConn == NULL) return -1; struct sockaddr_in destAdd; memset(&destAdd, 0, sizeof(destAdd));