Merge pull request #1604 from taosdata/enhance/wworker
re-organize TCP connection code
This commit is contained in:
commit
d832adb4f5
|
@ -1,34 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _rpc_server_header_
|
|
||||||
#define _rpc_server_header_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "taosdef.h"
|
|
||||||
|
|
||||||
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
|
|
||||||
void taosCleanUpTcpServer(void *param);
|
|
||||||
void taosCloseTcpServerConnection(void *param);
|
|
||||||
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -13,20 +13,22 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _rpc_client_header_
|
#ifndef _rpc_tcp_header_
|
||||||
#define _rpc_client_header_
|
#define _rpc_tcp_header_
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "taosdef.h"
|
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
|
||||||
|
void taosCleanUpTcpServer(void *param);
|
||||||
|
|
||||||
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
|
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle);
|
||||||
void taosCleanUpTcpClient(void *chandle);
|
void taosCleanUpTcpClient(void *chandle);
|
||||||
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
|
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port);
|
||||||
void taosCloseTcpClientConnection(void *chandle);
|
|
||||||
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
|
void taosCloseTcpConnection(void *chandle);
|
||||||
|
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
|
@ -22,18 +22,18 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "rpcCache.h"
|
#include "rpcCache.h"
|
||||||
|
|
||||||
typedef struct _c_hash_t {
|
typedef struct SConnHash {
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
char connType;
|
char connType;
|
||||||
struct _c_hash_t *prev;
|
struct SConnHash *prev;
|
||||||
struct _c_hash_t *next;
|
struct SConnHash *next;
|
||||||
void * data;
|
void *data;
|
||||||
uint64_t time;
|
uint64_t time;
|
||||||
} SConnHash;
|
} SConnHash;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SConnHash ** connHashList;
|
SConnHash **connHashList;
|
||||||
mpool_h connHashMemPool;
|
mpool_h connHashMemPool;
|
||||||
int maxSessions;
|
int maxSessions;
|
||||||
int total;
|
int total;
|
||||||
|
|
|
@ -1,315 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "taosmsg.h"
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "tsocket.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
#include "rpcClient.h"
|
|
||||||
#include "rpcHead.h"
|
|
||||||
|
|
||||||
#ifndef EPOLLWAKEUP
|
|
||||||
#define EPOLLWAKEUP (1u << 29)
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct _tcp_fd {
|
|
||||||
void *signature;
|
|
||||||
int fd; // TCP socket FD
|
|
||||||
void * thandle;
|
|
||||||
uint32_t ip;
|
|
||||||
char ipstr[20];
|
|
||||||
uint16_t port;
|
|
||||||
struct _tcp_client *pTcp;
|
|
||||||
struct _tcp_fd * prev, *next;
|
|
||||||
} STcpFd;
|
|
||||||
|
|
||||||
typedef struct _tcp_client {
|
|
||||||
pthread_t thread;
|
|
||||||
STcpFd * pHead;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t fdReady;
|
|
||||||
int pollFd;
|
|
||||||
int numOfFds;
|
|
||||||
char label[12];
|
|
||||||
char ipstr[20];
|
|
||||||
void *shandle; // handle passed by upper layer during server initialization
|
|
||||||
void *(*processData)(SRecvInfo *pRecv);
|
|
||||||
} STcpClient;
|
|
||||||
|
|
||||||
#define maxTcpEvents 100
|
|
||||||
|
|
||||||
static void taosCleanUpTcpFdObj(STcpFd *pFdObj);
|
|
||||||
static void *taosReadTcpData(void *param);
|
|
||||||
|
|
||||||
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
|
|
||||||
STcpClient *pTcp;
|
|
||||||
pthread_attr_t thattr;
|
|
||||||
|
|
||||||
pTcp = (STcpClient *)malloc(sizeof(STcpClient));
|
|
||||||
memset(pTcp, 0, sizeof(STcpClient));
|
|
||||||
strcpy(pTcp->label, label);
|
|
||||||
strcpy(pTcp->ipstr, ip);
|
|
||||||
pTcp->shandle = shandle;
|
|
||||||
|
|
||||||
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
|
|
||||||
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
|
|
||||||
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTcp->pollFd = epoll_create(10); // size does not matter
|
|
||||||
if (pTcp->pollFd < 0) {
|
|
||||||
tError("%s failed to create TCP client epoll", label);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTcp->processData = fp;
|
|
||||||
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
|
|
||||||
pthread_attr_destroy(&thattr);
|
|
||||||
if (code != 0) {
|
|
||||||
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
|
|
||||||
|
|
||||||
return pTcp;
|
|
||||||
}
|
|
||||||
|
|
||||||
void taosCleanUpTcpClient(void *chandle) {
|
|
||||||
STcpClient *pTcp = (STcpClient *)chandle;
|
|
||||||
if (pTcp == NULL) return;
|
|
||||||
|
|
||||||
while (pTcp->pHead) {
|
|
||||||
taosCleanUpTcpFdObj(pTcp->pHead);
|
|
||||||
pTcp->pHead = pTcp->pHead->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
close(pTcp->pollFd);
|
|
||||||
|
|
||||||
pthread_cancel(pTcp->thread);
|
|
||||||
pthread_join(pTcp->thread, NULL);
|
|
||||||
|
|
||||||
// tTrace (":%s, all connections are cleaned up", pTcp->label);
|
|
||||||
|
|
||||||
tfree(pTcp);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
|
|
||||||
STcpClient * pTcp = (STcpClient *)shandle;
|
|
||||||
STcpFd * pFdObj;
|
|
||||||
struct epoll_event event;
|
|
||||||
struct in_addr destIp;
|
|
||||||
int fd;
|
|
||||||
|
|
||||||
fd = taosOpenTcpClientSocket(ip, port, pTcp->ipstr);
|
|
||||||
if (fd <= 0) return NULL;
|
|
||||||
|
|
||||||
pFdObj = (STcpFd *)malloc(sizeof(STcpFd));
|
|
||||||
if (pFdObj == NULL) {
|
|
||||||
tError("%s no enough resource to allocate TCP FD IDs", pTcp->label);
|
|
||||||
tclose(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pFdObj, 0, sizeof(STcpFd));
|
|
||||||
pFdObj->fd = fd;
|
|
||||||
strcpy(pFdObj->ipstr, ip);
|
|
||||||
inet_aton(ip, &destIp);
|
|
||||||
pFdObj->ip = destIp.s_addr;
|
|
||||||
pFdObj->port = port;
|
|
||||||
pFdObj->pTcp = pTcp;
|
|
||||||
pFdObj->thandle = thandle;
|
|
||||||
pFdObj->signature = pFdObj;
|
|
||||||
|
|
||||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
|
||||||
event.data.ptr = pFdObj;
|
|
||||||
if (epoll_ctl(pTcp->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
|
||||||
tError("%s failed to add TCP FD for epoll, error:%s", pTcp->label, strerror(errno));
|
|
||||||
tfree(pFdObj);
|
|
||||||
tclose(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the data process, add into the FdObj list
|
|
||||||
pthread_mutex_lock(&(pTcp->mutex));
|
|
||||||
pFdObj->next = pTcp->pHead;
|
|
||||||
if (pTcp->pHead) (pTcp->pHead)->prev = pFdObj;
|
|
||||||
pTcp->pHead = pFdObj;
|
|
||||||
pTcp->numOfFds++;
|
|
||||||
pthread_cond_signal(&pTcp->fdReady);
|
|
||||||
pthread_mutex_unlock(&(pTcp->mutex));
|
|
||||||
|
|
||||||
tTrace("%s TCP connection to %s:%hu is created, FD:%p numOfFds:%d", pTcp->label, ip, port, pFdObj, pTcp->numOfFds);
|
|
||||||
|
|
||||||
return pFdObj;
|
|
||||||
}
|
|
||||||
|
|
||||||
void taosCloseTcpClientConnection(void *chandle) {
|
|
||||||
STcpFd *pFdObj = (STcpFd *)chandle;
|
|
||||||
|
|
||||||
if (pFdObj == NULL) return;
|
|
||||||
|
|
||||||
taosCleanUpTcpFdObj(pFdObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
|
||||||
STcpFd *pFdObj = (STcpFd *)chandle;
|
|
||||||
|
|
||||||
if (chandle == NULL) return -1;
|
|
||||||
|
|
||||||
return (int)send(pFdObj->fd, data, (size_t)len, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosReportBrokenLink(STcpFd *pFdObj) {
|
|
||||||
SRecvInfo recvInfo;
|
|
||||||
STcpClient *pTcp = pFdObj->pTcp;
|
|
||||||
|
|
||||||
if (pFdObj->thandle) {
|
|
||||||
recvInfo.msg = NULL;
|
|
||||||
recvInfo.msgLen = 0;
|
|
||||||
recvInfo.ip = 0;
|
|
||||||
recvInfo.port = 0;
|
|
||||||
recvInfo.shandle = pTcp->shandle;
|
|
||||||
recvInfo.thandle = pFdObj->thandle;;
|
|
||||||
recvInfo.chandle = NULL;
|
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
|
||||||
(*(pTcp->processData))(&recvInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
|
||||||
|
|
||||||
if (pFdObj == NULL) return;
|
|
||||||
if (pFdObj->signature != pFdObj) return;
|
|
||||||
|
|
||||||
pFdObj->signature = NULL;
|
|
||||||
STcpClient *pTcp = pFdObj->pTcp;
|
|
||||||
|
|
||||||
epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
|
||||||
close(pFdObj->fd);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pTcp->mutex);
|
|
||||||
|
|
||||||
pTcp->numOfFds--;
|
|
||||||
|
|
||||||
if (pTcp->numOfFds < 0)
|
|
||||||
tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj);
|
|
||||||
|
|
||||||
if (pFdObj->prev) {
|
|
||||||
(pFdObj->prev)->next = pFdObj->next;
|
|
||||||
} else {
|
|
||||||
pTcp->pHead = pFdObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pFdObj->next) {
|
|
||||||
(pFdObj->next)->prev = pFdObj->prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pTcp->mutex);
|
|
||||||
|
|
||||||
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds);
|
|
||||||
|
|
||||||
tfree(pFdObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *taosReadTcpData(void *param) {
|
|
||||||
STcpClient *pTcp = (STcpClient *)param;
|
|
||||||
int i, fdNum;
|
|
||||||
STcpFd *pFdObj;
|
|
||||||
struct epoll_event events[maxTcpEvents];
|
|
||||||
SRecvInfo recvInfo;
|
|
||||||
SRpcHead rpcHead;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pthread_mutex_lock(&pTcp->mutex);
|
|
||||||
if (pTcp->numOfFds < 1) pthread_cond_wait(&pTcp->fdReady, &pTcp->mutex);
|
|
||||||
pthread_mutex_unlock(&pTcp->mutex);
|
|
||||||
|
|
||||||
fdNum = epoll_wait(pTcp->pollFd, events, maxTcpEvents, -1);
|
|
||||||
if (fdNum < 0) continue;
|
|
||||||
|
|
||||||
for (i = 0; i < fdNum; ++i) {
|
|
||||||
pFdObj = events[i].data.ptr;
|
|
||||||
|
|
||||||
if (events[i].events & EPOLLERR) {
|
|
||||||
tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (events[i].events & EPOLLHUP) {
|
|
||||||
tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
|
||||||
if (headLen != sizeof(SRpcHead)) {
|
|
||||||
tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
|
||||||
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
|
|
||||||
if (NULL == buffer) {
|
|
||||||
tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *msg = buffer + tsRpcOverhead;
|
|
||||||
int32_t leftLen = msgLen - headLen;
|
|
||||||
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
|
|
||||||
|
|
||||||
if (leftLen != retLen) {
|
|
||||||
tError("%s %p, read error, leftLen:%d retLen:%d",
|
|
||||||
pTcp->label, pFdObj->thandle, leftLen, retLen);
|
|
||||||
tfree(buffer);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
|
|
||||||
|
|
||||||
memcpy(msg, &rpcHead, sizeof(SRpcHead));
|
|
||||||
recvInfo.msg = msg;
|
|
||||||
recvInfo.msgLen = msgLen;
|
|
||||||
recvInfo.ip = pFdObj->ip;
|
|
||||||
recvInfo.port = pFdObj->port;
|
|
||||||
recvInfo.shandle = pTcp->shandle;
|
|
||||||
recvInfo.thandle = pFdObj->thandle;;
|
|
||||||
recvInfo.chandle = pFdObj;
|
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
|
||||||
|
|
||||||
pFdObj->thandle = (*(pTcp->processData))(&recvInfo);
|
|
||||||
|
|
||||||
if (pFdObj->thandle == NULL) taosCleanUpTcpFdObj(pFdObj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -17,13 +17,13 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmempool.h"
|
#include "tmempool.h"
|
||||||
|
|
||||||
typedef struct _ip_hash_t {
|
typedef struct SIpHash {
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
int hash;
|
int hash;
|
||||||
struct _ip_hash_t *prev;
|
struct SIpHash *prev;
|
||||||
struct _ip_hash_t *next;
|
struct SIpHash *next;
|
||||||
void * data;
|
void *data;
|
||||||
} SIpHash;
|
} SIpHash;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -47,7 +47,7 @@ int rpcHashIp(void *handle, uint32_t ip, uint16_t port) {
|
||||||
|
|
||||||
void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
|
void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
|
||||||
int hash;
|
int hash;
|
||||||
SIpHash * pNode;
|
SIpHash *pNode;
|
||||||
SHashObj *pObj;
|
SHashObj *pObj;
|
||||||
|
|
||||||
pObj = (SHashObj *)handle;
|
pObj = (SHashObj *)handle;
|
||||||
|
@ -70,7 +70,7 @@ void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
|
||||||
|
|
||||||
void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
|
void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
|
||||||
int hash;
|
int hash;
|
||||||
SIpHash * pNode;
|
SIpHash *pNode;
|
||||||
SHashObj *pObj;
|
SHashObj *pObj;
|
||||||
|
|
||||||
pObj = (SHashObj *)handle;
|
pObj = (SHashObj *)handle;
|
||||||
|
@ -102,7 +102,7 @@ void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
|
||||||
|
|
||||||
void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) {
|
void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) {
|
||||||
int hash;
|
int hash;
|
||||||
SIpHash * pNode;
|
SIpHash *pNode;
|
||||||
SHashObj *pObj;
|
SHashObj *pObj;
|
||||||
|
|
||||||
pObj = (SHashObj *)handle;
|
pObj = (SHashObj *)handle;
|
||||||
|
|
|
@ -27,8 +27,7 @@
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "rpcUdp.h"
|
#include "rpcUdp.h"
|
||||||
#include "rpcCache.h"
|
#include "rpcCache.h"
|
||||||
#include "rpcClient.h"
|
#include "rpcTcp.h"
|
||||||
#include "rpcServer.h"
|
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
|
@ -67,7 +66,7 @@ typedef struct {
|
||||||
void *udphandle;// returned handle from UDP initialization
|
void *udphandle;// returned handle from UDP initialization
|
||||||
void *pCache; // connection cache
|
void *pCache; // connection cache
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
struct _RpcConn *connList; // connection list
|
struct SRpcConn *connList; // connection list
|
||||||
} SRpcInfo;
|
} SRpcInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -88,7 +87,7 @@ typedef struct {
|
||||||
char msg[0]; // RpcHead starts from here
|
char msg[0]; // RpcHead starts from here
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
typedef struct _RpcConn {
|
typedef struct SRpcConn {
|
||||||
int sid; // session ID
|
int sid; // session ID
|
||||||
uint32_t ownId; // own link ID
|
uint32_t ownId; // own link ID
|
||||||
uint32_t peerId; // peer link ID
|
uint32_t peerId; // peer link ID
|
||||||
|
@ -156,8 +155,8 @@ void (*taosCleanUpConn[])(void *thandle) = {
|
||||||
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
|
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
|
||||||
taosSendUdpData,
|
taosSendUdpData,
|
||||||
taosSendUdpData,
|
taosSendUdpData,
|
||||||
taosSendTcpServerData,
|
taosSendTcpData,
|
||||||
taosSendTcpClientData
|
taosSendTcpData
|
||||||
};
|
};
|
||||||
|
|
||||||
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
|
void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) = {
|
||||||
|
@ -170,8 +169,8 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, char *ip, uint16_t port) =
|
||||||
void (*taosCloseConn[])(void *chandle) = {
|
void (*taosCloseConn[])(void *chandle) = {
|
||||||
NULL,
|
NULL,
|
||||||
NULL,
|
NULL,
|
||||||
taosCloseTcpServerConnection,
|
taosCloseTcpConnection,
|
||||||
taosCloseTcpClientConnection
|
taosCloseTcpConnection
|
||||||
};
|
};
|
||||||
|
|
||||||
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType);
|
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort, int8_t connType);
|
||||||
|
|
|
@ -18,30 +18,30 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "rpcServer.h"
|
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
|
#include "rpcTcp.h"
|
||||||
|
|
||||||
#define TAOS_IPv4ADDR_LEN 16
|
|
||||||
#ifndef EPOLLWAKEUP
|
#ifndef EPOLLWAKEUP
|
||||||
#define EPOLLWAKEUP (1u << 29)
|
#define EPOLLWAKEUP (1u << 29)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct _fd_obj {
|
typedef struct SFdObj {
|
||||||
void *signature;
|
void *signature;
|
||||||
int fd; // TCP socket FD
|
int fd; // TCP socket FD
|
||||||
void * thandle; // handle from upper layer, like TAOS
|
void *thandle; // handle from upper layer, like TAOS
|
||||||
char ipstr[TAOS_IPv4ADDR_LEN];
|
uint32_t ip;
|
||||||
unsigned int ip;
|
uint16_t port;
|
||||||
uint16_t port;
|
struct SThreadObj *pThreadObj;
|
||||||
struct _thread_obj *pThreadObj;
|
struct SFdObj *prev;
|
||||||
struct _fd_obj * prev, *next;
|
struct SFdObj *next;
|
||||||
} SFdObj;
|
} SFdObj;
|
||||||
|
|
||||||
typedef struct _thread_obj {
|
typedef struct SThreadObj {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
SFdObj * pHead;
|
SFdObj * pHead;
|
||||||
pthread_mutex_t threadMutex;
|
pthread_mutex_t mutex;
|
||||||
pthread_cond_t fdReady;
|
pthread_cond_t fdReady;
|
||||||
|
char ipstr[TSDB_IPv4ADDR_LEN];
|
||||||
int pollFd;
|
int pollFd;
|
||||||
int numOfFds;
|
int numOfFds;
|
||||||
int threadId;
|
int threadId;
|
||||||
|
@ -51,7 +51,7 @@ typedef struct _thread_obj {
|
||||||
} SThreadObj;
|
} SThreadObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char ip[40];
|
char ip[TSDB_IPv4ADDR_LEN];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
char label[12];
|
char label[12];
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
|
@ -60,13 +60,15 @@ typedef struct {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
static void taosCleanUpFdObj(SFdObj *pFdObj);
|
static void *taosProcessTcpData(void *param);
|
||||||
static void taosProcessTcpData(void *param);
|
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
||||||
static void taosAcceptTcpConnection(void *arg);
|
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||||
|
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||||
|
static void taosAcceptTcpConnection(void *arg);
|
||||||
|
|
||||||
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
||||||
strcpy(pServerObj->ip, ip);
|
strcpy(pServerObj->ip, ip);
|
||||||
|
@ -88,7 +90,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
||||||
strcpy(pThreadObj->label, label);
|
strcpy(pThreadObj->label, label);
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
|
||||||
code = pthread_mutex_init(&(pThreadObj->threadMutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
break;;
|
break;;
|
||||||
|
@ -110,7 +112,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
code = pthread_create(&(pThreadObj->thread), &thattr, (void *)taosProcessTcpData, (void *)(pThreadObj));
|
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
pthread_attr_destroy(&thattr);
|
pthread_attr_destroy(&thattr);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||||
|
@ -144,8 +146,8 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCleanUpTcpServer(void *handle) {
|
void taosCleanUpTcpServer(void *handle) {
|
||||||
SThreadObj *pThreadObj;
|
|
||||||
SServerObj *pServerObj = handle;
|
SServerObj *pServerObj = handle;
|
||||||
|
SThreadObj *pThreadObj;
|
||||||
|
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
|
|
||||||
|
@ -156,7 +158,7 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
pThreadObj = pServerObj->pThreadObj + i;
|
pThreadObj = pServerObj->pThreadObj + i;
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
while (pThreadObj->pHead) {
|
||||||
taosCleanUpFdObj(pThreadObj->pHead);
|
taosFreeFdObj(pThreadObj->pHead);
|
||||||
pThreadObj->pHead = pThreadObj->pHead;
|
pThreadObj->pHead = pThreadObj->pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +166,7 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
pthread_cancel(pThreadObj->thread);
|
pthread_cancel(pThreadObj->thread);
|
||||||
pthread_join(pThreadObj->thread, NULL);
|
pthread_join(pThreadObj->thread, NULL);
|
||||||
pthread_cond_destroy(&(pThreadObj->fdReady));
|
pthread_cond_destroy(&(pThreadObj->fdReady));
|
||||||
pthread_mutex_destroy(&(pThreadObj->threadMutex));
|
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
|
tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);
|
||||||
|
@ -173,14 +175,146 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
tfree(pServerObj);
|
tfree(pServerObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseTcpServerConnection(void *chandle) {
|
static void taosAcceptTcpConnection(void *arg) {
|
||||||
|
int connFd = -1;
|
||||||
|
struct sockaddr_in caddr;
|
||||||
|
int sockFd;
|
||||||
|
int threadId = 0;
|
||||||
|
SThreadObj *pThreadObj;
|
||||||
|
SServerObj *pServerObj;
|
||||||
|
|
||||||
|
pServerObj = (SServerObj *)arg;
|
||||||
|
|
||||||
|
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
if (sockFd < 0) return;
|
||||||
|
|
||||||
|
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
socklen_t addrlen = sizeof(caddr);
|
||||||
|
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen);
|
||||||
|
|
||||||
|
if (connFd < 0) {
|
||||||
|
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
|
||||||
|
taosKeepTcpAlive(connFd);
|
||||||
|
|
||||||
|
// pick up the thread to handle this connection
|
||||||
|
pThreadObj = pServerObj->pThreadObj + threadId;
|
||||||
|
|
||||||
|
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
||||||
|
if (pFdObj) {
|
||||||
|
pFdObj->ip = caddr.sin_addr.s_addr;
|
||||||
|
pFdObj->port = caddr.sin_port;
|
||||||
|
tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
|
||||||
|
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
||||||
|
} else {
|
||||||
|
close(connFd);
|
||||||
|
tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
// pick up next thread for next connection
|
||||||
|
threadId++;
|
||||||
|
threadId = threadId % pServerObj->numOfThreads;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
|
||||||
|
SThreadObj *pThreadObj;
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
|
||||||
|
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
|
||||||
|
memset(pThreadObj, 0, sizeof(SThreadObj));
|
||||||
|
strcpy(pThreadObj->label, label);
|
||||||
|
strcpy(pThreadObj->ipstr, ip);
|
||||||
|
pThreadObj->shandle = shandle;
|
||||||
|
|
||||||
|
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
|
||||||
|
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
|
||||||
|
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||||
|
if (pThreadObj->pollFd < 0) {
|
||||||
|
tError("%s failed to create TCP client epoll", label);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pThreadObj->processData = fp;
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
|
if (code != 0) {
|
||||||
|
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
|
||||||
|
|
||||||
|
return pThreadObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosCleanUpTcpClient(void *chandle) {
|
||||||
|
SThreadObj *pThreadObj = chandle;
|
||||||
|
if (pThreadObj == NULL) return;
|
||||||
|
|
||||||
|
while (pThreadObj->pHead) {
|
||||||
|
taosFreeFdObj(pThreadObj->pHead);
|
||||||
|
pThreadObj->pHead = pThreadObj->pHead->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
close(pThreadObj->pollFd);
|
||||||
|
|
||||||
|
pthread_cancel(pThreadObj->thread);
|
||||||
|
pthread_join(pThreadObj->thread, NULL);
|
||||||
|
|
||||||
|
tTrace (":%s, all connections are cleaned up", pThreadObj->label);
|
||||||
|
|
||||||
|
tfree(pThreadObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
|
||||||
|
SThreadObj * pThreadObj = shandle;
|
||||||
|
struct in_addr destIp;
|
||||||
|
|
||||||
|
int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr);
|
||||||
|
if (fd <= 0) return NULL;
|
||||||
|
|
||||||
|
inet_aton(ip, &destIp);
|
||||||
|
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
|
||||||
|
|
||||||
|
if (pFdObj) {
|
||||||
|
pFdObj->thandle = thandle;
|
||||||
|
pFdObj->port = port;
|
||||||
|
pFdObj->ip = destIp.s_addr;
|
||||||
|
tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d",
|
||||||
|
pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
|
||||||
|
} else {
|
||||||
|
close(fd);
|
||||||
|
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
return pFdObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
void taosCloseTcpConnection(void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
if (pFdObj == NULL) return;
|
if (pFdObj == NULL) return;
|
||||||
|
|
||||||
taosCleanUpFdObj(pFdObj);
|
taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
|
|
||||||
if (chandle == NULL) return -1;
|
if (chandle == NULL) return -1;
|
||||||
|
@ -188,8 +322,6 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void
|
||||||
return (int)send(pFdObj->fd, data, (size_t)len, 0);
|
return (int)send(pFdObj->fd, data, (size_t)len, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define maxEvents 10
|
|
||||||
|
|
||||||
static void taosReportBrokenLink(SFdObj *pFdObj) {
|
static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
|
|
||||||
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||||
|
@ -209,26 +341,26 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosProcessTcpData(void *param) {
|
#define maxEvents 10
|
||||||
SThreadObj * pThreadObj;
|
|
||||||
int i, fdNum;
|
static void *taosProcessTcpData(void *param) {
|
||||||
SFdObj * pFdObj;
|
SThreadObj *pThreadObj = param;
|
||||||
|
SFdObj *pFdObj;
|
||||||
struct epoll_event events[maxEvents];
|
struct epoll_event events[maxEvents];
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
pThreadObj = (SThreadObj *)param;
|
|
||||||
SRpcHead rpcHead;
|
SRpcHead rpcHead;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pthread_mutex_lock(&pThreadObj->threadMutex);
|
pthread_mutex_lock(&pThreadObj->mutex);
|
||||||
if (pThreadObj->numOfFds < 1) {
|
if (pThreadObj->numOfFds < 1) {
|
||||||
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->threadMutex);
|
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&pThreadObj->threadMutex);
|
pthread_mutex_unlock(&pThreadObj->mutex);
|
||||||
|
|
||||||
fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
||||||
if (fdNum < 0) continue;
|
if (fdNum < 0) continue;
|
||||||
|
|
||||||
for (i = 0; i < fdNum; ++i) {
|
for (int i = 0; i < fdNum; ++i) {
|
||||||
pFdObj = events[i].data.ptr;
|
pFdObj = events[i].data.ptr;
|
||||||
|
|
||||||
if (events[i].events & EPOLLERR) {
|
if (events[i].events & EPOLLERR) {
|
||||||
|
@ -270,7 +402,7 @@ static void taosProcessTcpData(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pTcp->label, pFdObj->ipstr, pFdObj->port, msgLen);
|
// tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen);
|
||||||
|
|
||||||
memcpy(msg, &rpcHead, sizeof(SRpcHead));
|
memcpy(msg, &rpcHead, sizeof(SRpcHead));
|
||||||
recvInfo.msg = msg;
|
recvInfo.msg = msg;
|
||||||
|
@ -283,91 +415,43 @@ static void taosProcessTcpData(void *param) {
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
recvInfo.connType = RPC_CONN_TCP;
|
||||||
|
|
||||||
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
||||||
if (pFdObj->thandle == NULL) taosCleanUpFdObj(pFdObj);
|
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosAcceptTcpConnection(void *arg) {
|
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
|
||||||
int connFd = -1;
|
|
||||||
struct sockaddr_in clientAddr;
|
|
||||||
int sockFd;
|
|
||||||
int threadId = 0;
|
|
||||||
SThreadObj * pThreadObj;
|
|
||||||
SServerObj * pServerObj;
|
|
||||||
SFdObj * pFdObj;
|
|
||||||
struct epoll_event event;
|
struct epoll_event event;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)arg;
|
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
|
||||||
|
if (pFdObj == NULL) return NULL;
|
||||||
|
|
||||||
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
pFdObj->fd = fd;
|
||||||
|
pFdObj->pThreadObj = pThreadObj;
|
||||||
|
pFdObj->signature = pFdObj;
|
||||||
|
|
||||||
if (sockFd < 0) {
|
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
||||||
tError("%s failed to open TCP socket, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
event.data.ptr = pFdObj;
|
||||||
return;
|
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||||
} else {
|
tfree(pFdObj);
|
||||||
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
// notify the data process, add into the FdObj list
|
||||||
socklen_t addrlen = sizeof(clientAddr);
|
pthread_mutex_lock(&(pThreadObj->mutex));
|
||||||
connFd = accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
|
pFdObj->next = pThreadObj->pHead;
|
||||||
|
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
|
||||||
|
pThreadObj->pHead = pFdObj;
|
||||||
|
pThreadObj->numOfFds++;
|
||||||
|
pthread_cond_signal(&pThreadObj->fdReady);
|
||||||
|
pthread_mutex_unlock(&(pThreadObj->mutex));
|
||||||
|
|
||||||
if (connFd < 0) {
|
return pFdObj;
|
||||||
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(clientAddr.sin_addr),
|
|
||||||
htons(clientAddr.sin_port));
|
|
||||||
taosKeepTcpAlive(connFd);
|
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
|
||||||
pThreadObj = pServerObj->pThreadObj + threadId;
|
|
||||||
|
|
||||||
pFdObj = (SFdObj *)malloc(sizeof(SFdObj));
|
|
||||||
if (pFdObj == NULL) {
|
|
||||||
tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label);
|
|
||||||
close(connFd);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pFdObj, 0, sizeof(SFdObj));
|
|
||||||
pFdObj->fd = connFd;
|
|
||||||
strcpy(pFdObj->ipstr, inet_ntoa(clientAddr.sin_addr));
|
|
||||||
pFdObj->ip = clientAddr.sin_addr.s_addr;
|
|
||||||
pFdObj->port = htons(clientAddr.sin_port);
|
|
||||||
pFdObj->pThreadObj = pThreadObj;
|
|
||||||
pFdObj->signature = pFdObj;
|
|
||||||
|
|
||||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
|
||||||
event.data.ptr = pFdObj;
|
|
||||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
|
||||||
tError("%s failed to add TCP FD for epoll(%s)", pServerObj->label, strerror(errno));
|
|
||||||
tfree(pFdObj);
|
|
||||||
close(connFd);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the data process, add into the FdObj list
|
|
||||||
pthread_mutex_lock(&(pThreadObj->threadMutex));
|
|
||||||
pFdObj->next = pThreadObj->pHead;
|
|
||||||
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
|
|
||||||
pThreadObj->pHead = pFdObj;
|
|
||||||
pThreadObj->numOfFds++;
|
|
||||||
pthread_cond_signal(&pThreadObj->fdReady);
|
|
||||||
pthread_mutex_unlock(&(pThreadObj->threadMutex));
|
|
||||||
|
|
||||||
tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
|
|
||||||
pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
|
||||||
|
|
||||||
// pick up next thread for next connection
|
|
||||||
threadId++;
|
|
||||||
threadId = threadId % pServerObj->numOfThreads;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
static void taosFreeFdObj(SFdObj *pFdObj) {
|
||||||
|
|
||||||
if (pFdObj == NULL) return;
|
if (pFdObj == NULL) return;
|
||||||
if (pFdObj->signature != pFdObj) return;
|
if (pFdObj->signature != pFdObj) return;
|
||||||
|
@ -378,7 +462,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
||||||
close(pFdObj->fd);
|
close(pFdObj->fd);
|
||||||
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
||||||
|
|
||||||
pthread_mutex_lock(&pThreadObj->threadMutex);
|
pthread_mutex_lock(&pThreadObj->mutex);
|
||||||
|
|
||||||
pThreadObj->numOfFds--;
|
pThreadObj->numOfFds--;
|
||||||
|
|
||||||
|
@ -398,7 +482,7 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
||||||
(pFdObj->next)->prev = pFdObj->prev;
|
(pFdObj->next)->prev = pFdObj->prev;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_unlock(&pThreadObj->threadMutex);
|
pthread_mutex_unlock(&pThreadObj->mutex);
|
||||||
|
|
||||||
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
|
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
|
||||||
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
|
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
|
||||||
|
@ -406,116 +490,4 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
||||||
tfree(pFdObj);
|
tfree(pFdObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
static void taosAcceptUDConnection(void *arg) {
|
|
||||||
int connFd = -1;
|
|
||||||
int sockFd;
|
|
||||||
int threadId = 0;
|
|
||||||
SThreadObj * pThreadObj;
|
|
||||||
SServerObj * pServerObj;
|
|
||||||
SFdObj * pFdObj;
|
|
||||||
struct epoll_event event;
|
|
||||||
|
|
||||||
pServerObj = (SServerObj *)arg;
|
|
||||||
sockFd = taosOpenUDServerSocket(pServerObj->ip, pServerObj->port);
|
|
||||||
|
|
||||||
if (sockFd < 0) {
|
|
||||||
tError("%s failed to open UD socket, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
tTrace("%s UD server is ready, ip:%s, port:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
connFd = accept(sockFd, NULL, NULL);
|
|
||||||
|
|
||||||
if (connFd < 0) {
|
|
||||||
tError("%s UD accept failure, errno:%d, reason:%s", pServerObj->label, errno, strerror(errno));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
|
||||||
pThreadObj = pServerObj->pThreadObj + threadId;
|
|
||||||
|
|
||||||
pFdObj = (SFdObj *)malloc(sizeof(SFdObj));
|
|
||||||
if (pFdObj == NULL) {
|
|
||||||
tError("%s no enough resource to allocate TCP FD IDs", pServerObj->label);
|
|
||||||
close(connFd);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pFdObj, 0, sizeof(SFdObj));
|
|
||||||
pFdObj->fd = connFd;
|
|
||||||
pFdObj->pThreadObj = pThreadObj;
|
|
||||||
|
|
||||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
|
||||||
event.data.ptr = pFdObj;
|
|
||||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
|
|
||||||
tError("%s failed to add UD FD for epoll, error:%s", pServerObj->label, strerror(errno));
|
|
||||||
tfree(pFdObj);
|
|
||||||
close(connFd);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// notify the data process, add into the FdObj list
|
|
||||||
pthread_mutex_lock(&(pThreadObj->threadMutex));
|
|
||||||
pFdObj->next = pThreadObj->pHead;
|
|
||||||
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
|
|
||||||
pThreadObj->pHead = pFdObj;
|
|
||||||
pThreadObj->numOfFds++;
|
|
||||||
pthread_cond_signal(&pThreadObj->fdReady);
|
|
||||||
pthread_mutex_unlock(&(pThreadObj->threadMutex));
|
|
||||||
|
|
||||||
tTrace("%s UD thread:%d, a new connection, numOfFds:%d", pServerObj->label, pThreadObj->threadId,
|
|
||||||
pThreadObj->numOfFds);
|
|
||||||
|
|
||||||
// pick up next thread for next connection
|
|
||||||
threadId++;
|
|
||||||
threadId = threadId % pServerObj->numOfThreads;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
void taosListTcpConnection(void *handle, char *buffer) {
|
|
||||||
SServerObj *pServerObj;
|
|
||||||
SThreadObj *pThreadObj;
|
|
||||||
SFdObj * pFdObj;
|
|
||||||
int i, numOfFds, numOfConns;
|
|
||||||
char * msg;
|
|
||||||
|
|
||||||
pServerObj = (SServerObj *)handle;
|
|
||||||
buffer[0] = 0;
|
|
||||||
msg = buffer;
|
|
||||||
numOfConns = 0;
|
|
||||||
|
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
|
||||||
|
|
||||||
for (i = 0; i < pServerObj->numOfThreads; ++i) {
|
|
||||||
numOfFds = 0;
|
|
||||||
sprintf(msg, "TCP:%s Thread:%d number of connections:%d\n", pServerObj->label, pThreadObj->threadId,
|
|
||||||
pThreadObj->numOfFds);
|
|
||||||
msg = msg + strlen(msg);
|
|
||||||
pFdObj = pThreadObj->pHead;
|
|
||||||
while (pFdObj) {
|
|
||||||
sprintf(msg, " ip:%s port:%hu\n", pFdObj->ipstr, pFdObj->port);
|
|
||||||
msg = msg + strlen(msg);
|
|
||||||
numOfFds++;
|
|
||||||
numOfConns++;
|
|
||||||
pFdObj = pFdObj->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfFds != pThreadObj->numOfFds)
|
|
||||||
tError("TCP:%s thread:%d BIG error, numOfFds:%d actual numOfFds:%d", pServerObj->label, pThreadObj->threadId,
|
|
||||||
pThreadObj->numOfFds, numOfFds);
|
|
||||||
|
|
||||||
pThreadObj++;
|
|
||||||
}
|
|
||||||
|
|
||||||
sprintf(msg, "TCP:%s total connections:%d\n", pServerObj->label, numOfConns);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
int tsUdpDelay = 0;
|
int tsUdpDelay = 0;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void * signature;
|
void *signature;
|
||||||
int index;
|
int index;
|
||||||
int fd;
|
int fd;
|
||||||
uint16_t port; // peer port
|
uint16_t port; // peer port
|
||||||
|
@ -53,23 +53,23 @@ typedef struct {
|
||||||
int server;
|
int server;
|
||||||
char ip[16]; // local IP
|
char ip[16]; // local IP
|
||||||
uint16_t port; // local Port
|
uint16_t port; // local Port
|
||||||
void * shandle; // handle passed by upper layer during server initialization
|
void *shandle; // handle passed by upper layer during server initialization
|
||||||
int threads;
|
int threads;
|
||||||
char label[12];
|
char label[12];
|
||||||
void * tmrCtrl;
|
void *tmrCtrl;
|
||||||
void *(*fp)(SRecvInfo *pPacket);
|
void *(*fp)(SRecvInfo *pPacket);
|
||||||
SUdpConn udpConn[];
|
SUdpConn udpConn[];
|
||||||
} SUdpConnSet;
|
} SUdpConnSet;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void * signature;
|
void *signature;
|
||||||
uint32_t ip; // dest IP
|
uint32_t ip; // dest IP
|
||||||
uint16_t port; // dest Port
|
uint16_t port; // dest Port
|
||||||
SUdpConn * pConn;
|
SUdpConn *pConn;
|
||||||
struct sockaddr_in destAdd;
|
struct sockaddr_in destAdd;
|
||||||
void * msgHdr;
|
void *msgHdr;
|
||||||
int totalLen;
|
int totalLen;
|
||||||
void * timer;
|
void *timer;
|
||||||
int emptyNum;
|
int emptyNum;
|
||||||
} SUdpBuf;
|
} SUdpBuf;
|
||||||
|
|
||||||
|
@ -78,8 +78,8 @@ static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
|
||||||
static void taosProcessUdpBufTimer(void *param, void *tmrId);
|
static void taosProcessUdpBufTimer(void *param, void *tmrId);
|
||||||
|
|
||||||
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
|
void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
|
||||||
SUdpConn * pConn;
|
SUdpConn *pConn;
|
||||||
SUdpConnSet * pSet;
|
SUdpConnSet *pSet;
|
||||||
|
|
||||||
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
|
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
|
||||||
pSet = (SUdpConnSet *)malloc((size_t)size);
|
pSet = (SUdpConnSet *)malloc((size_t)size);
|
||||||
|
@ -164,7 +164,7 @@ void *taosInitUdpConnection(char *ip, uint16_t port, char *label, int threads, v
|
||||||
|
|
||||||
void taosCleanUpUdpConnection(void *handle) {
|
void taosCleanUpUdpConnection(void *handle) {
|
||||||
SUdpConnSet *pSet = (SUdpConnSet *)handle;
|
SUdpConnSet *pSet = (SUdpConnSet *)handle;
|
||||||
SUdpConn * pConn;
|
SUdpConn *pConn;
|
||||||
|
|
||||||
if (pSet == NULL) return;
|
if (pSet == NULL) return;
|
||||||
|
|
||||||
|
@ -205,10 +205,10 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, char *ip, uint16_t por
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *taosRecvUdpData(void *param) {
|
static void *taosRecvUdpData(void *param) {
|
||||||
|
SUdpConn *pConn = param;
|
||||||
struct sockaddr_in sourceAdd;
|
struct sockaddr_in sourceAdd;
|
||||||
int dataLen;
|
int dataLen;
|
||||||
unsigned int addLen;
|
unsigned int addLen;
|
||||||
SUdpConn * pConn = (SUdpConn *)param;
|
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
int minSize = sizeof(SRpcHead);
|
int minSize = sizeof(SRpcHead);
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
|
@ -274,7 +274,7 @@ static void *taosRecvUdpData(void *param) {
|
||||||
|
|
||||||
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
|
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
|
||||||
SUdpConn *pConn = (SUdpConn *)chandle;
|
SUdpConn *pConn = (SUdpConn *)chandle;
|
||||||
SUdpBuf * pBuf;
|
SUdpBuf *pBuf;
|
||||||
|
|
||||||
if (pConn == NULL || pConn->signature != pConn) return -1;
|
if (pConn == NULL || pConn->signature != pConn) return -1;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue