reset tcp
This commit is contained in:
parent
35d220c852
commit
35c7506dd0
|
@ -21,13 +21,6 @@
|
|||
#include "rpcLog.h"
|
||||
#include "rpcHead.h"
|
||||
#include "rpcTcp.h"
|
||||
#include "tlist.h"
|
||||
|
||||
typedef struct SConnItem {
|
||||
SOCKET fd;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
} SConnItem;
|
||||
|
||||
typedef struct SFdObj {
|
||||
void *signature;
|
||||
|
@ -45,12 +38,6 @@ typedef struct SThreadObj {
|
|||
pthread_t thread;
|
||||
SFdObj * pHead;
|
||||
pthread_mutex_t mutex;
|
||||
// receive the notify from dispatch thread
|
||||
|
||||
int notifyReceiveFd;
|
||||
int notifySendFd;
|
||||
SList *connQueue;
|
||||
|
||||
uint32_t ip;
|
||||
bool stop;
|
||||
EpollFd pollFd;
|
||||
|
@ -82,7 +69,6 @@ typedef struct {
|
|||
} SServerObj;
|
||||
|
||||
static void *taosProcessTcpData(void *param);
|
||||
static void *taosProcessServerTcpData(void *param);
|
||||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
|
||||
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||
|
@ -138,7 +124,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||
pThreadObj->shandle = shandle;
|
||||
pThreadObj->stop = false;
|
||||
pThreadObj->connQueue = tdListNew(sizeof(SConnItem));
|
||||
}
|
||||
|
||||
// initialize mutex, thread, fd which may fail
|
||||
|
@ -157,25 +142,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
break;
|
||||
}
|
||||
|
||||
int fds[2];
|
||||
if (pipe(fds)) {
|
||||
tError("%s failed to create pipe", label);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
pThreadObj->notifyReceiveFd = fds[0];
|
||||
pThreadObj->notifySendFd = fds[1];
|
||||
struct epoll_event event;
|
||||
event.events = EPOLLIN | EPOLLRDHUP;
|
||||
event.data.fd = pThreadObj->notifyReceiveFd;
|
||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, pThreadObj->notifyReceiveFd , &event) < 0) {
|
||||
tError("%s failed to create pipe", label);
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessServerTcpData, (void *)(pThreadObj));
|
||||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||
if (code != 0) {
|
||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||
break;
|
||||
|
@ -308,12 +275,17 @@ static void *taosAcceptTcpConnection(void *arg) {
|
|||
// pick up the thread to handle this connection
|
||||
pThreadObj = pServerObj->pThreadObj[threadId];
|
||||
|
||||
pthread_mutex_lock(&(pThreadObj->mutex));
|
||||
SConnItem item = {.fd = connFd, .ip = caddr.sin_addr.s_addr, .port = htons(caddr.sin_port)};
|
||||
tdListAppend(pThreadObj->connQueue, &item);
|
||||
pthread_mutex_unlock(&(pThreadObj->mutex));
|
||||
|
||||
write(pThreadObj->notifySendFd, "", 1);
|
||||
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
||||
if (pFdObj) {
|
||||
pFdObj->ip = caddr.sin_addr.s_addr;
|
||||
pFdObj->port = htons(caddr.sin_port);
|
||||
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
|
||||
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
|
||||
} else {
|
||||
taosCloseSocket(connFd);
|
||||
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
|
||||
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
|
||||
}
|
||||
|
||||
// pick up next thread for next connection
|
||||
threadId++;
|
||||
|
@ -619,109 +591,6 @@ static void *taosProcessTcpData(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void *taosProcessServerTcpData(void *param) {
|
||||
SThreadObj *pThreadObj = param;
|
||||
SFdObj *pFdObj;
|
||||
struct epoll_event events[maxEvents];
|
||||
SRecvInfo recvInfo;
|
||||
|
||||
char bb[1];
|
||||
#ifdef __APPLE__
|
||||
taos_block_sigalrm();
|
||||
#endif // __APPLE__
|
||||
while (1) {
|
||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
|
||||
if (pThreadObj->stop) {
|
||||
tDebug("%s TCP thread get stop event, exiting...", pThreadObj->label);
|
||||
break;
|
||||
}
|
||||
if (fdNum < 0) continue;
|
||||
|
||||
for (int i = 0; i < fdNum; ++i) {
|
||||
if (events[i].data.fd == pThreadObj->notifyReceiveFd) {
|
||||
if (events[i].events & EPOLLIN) {
|
||||
read(pThreadObj->notifyReceiveFd, bb, 1);
|
||||
|
||||
pthread_mutex_lock(&(pThreadObj->mutex));
|
||||
SListNode *head = tdListPopHead(pThreadObj->connQueue);
|
||||
pthread_mutex_unlock(&(pThreadObj->mutex));
|
||||
|
||||
SConnItem item = {0};
|
||||
tdListNodeGetData(pThreadObj->connQueue, head, &item);
|
||||
tfree(head);
|
||||
|
||||
// register fd on epoll
|
||||
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, item.fd);
|
||||
if (pFdObj) {
|
||||
pFdObj->ip = item.ip;
|
||||
pFdObj->port = item.port;
|
||||
tDebug("%s new TCP connection from %u:%hu, fd:%d FD:%p numOfFds:%d", pThreadObj->label,
|
||||
pFdObj->ip, pFdObj->port, item.fd, pFdObj, pThreadObj->numOfFds);
|
||||
} else {
|
||||
taosCloseSocket(item.fd);
|
||||
tError("%s failed to malloc FdObj(%s) for connection from:%u:%hu", pThreadObj->label, strerror(errno),
|
||||
pFdObj->ip, pFdObj->port);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
pFdObj = events[i].data.ptr;
|
||||
|
||||
if (events[i].events & EPOLLERR) {
|
||||
tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLRDHUP) {
|
||||
tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLHUP) {
|
||||
tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
|
||||
shutdown(pFdObj->fd, SHUT_WR);
|
||||
continue;
|
||||
}
|
||||
|
||||
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
||||
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
||||
}
|
||||
|
||||
if (pThreadObj->stop) break;
|
||||
}
|
||||
|
||||
if (pThreadObj->connQueue) {
|
||||
pThreadObj->connQueue = tdListFree(pThreadObj->connQueue);
|
||||
}
|
||||
// close pipe
|
||||
close(pThreadObj->notifySendFd);
|
||||
close(pThreadObj->notifyReceiveFd);
|
||||
|
||||
if (pThreadObj->pollFd >=0) {
|
||||
EpollClose(pThreadObj->pollFd);
|
||||
pThreadObj->pollFd = -1;
|
||||
}
|
||||
|
||||
while (pThreadObj->pHead) {
|
||||
SFdObj *pFdObj = pThreadObj->pHead;
|
||||
pThreadObj->pHead = pFdObj->next;
|
||||
taosReportBrokenLink(pFdObj);
|
||||
}
|
||||
|
||||
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||
tDebug("%s TCP thread exits ...", pThreadObj->label);
|
||||
tfree(pThreadObj);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
|
||||
struct epoll_event event;
|
||||
|
||||
|
|
Loading…
Reference in New Issue