Merge branch 'patch/TD-1632' of https://github.com/taosdata/TDengine into patch/TD-1632
This commit is contained in:
commit
62fe32678f
|
@ -62,7 +62,7 @@ typedef struct {
|
||||||
char label[TSDB_LABEL_LEN];
|
char label[TSDB_LABEL_LEN];
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
void * shandle;
|
void * shandle;
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj **pThreadObj;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
||||||
pServerObj->numOfThreads = numOfThreads;
|
pServerObj->numOfThreads = numOfThreads;
|
||||||
|
|
||||||
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
|
||||||
if (pServerObj->pThreadObj == NULL) {
|
if (pServerObj->pThreadObj == NULL) {
|
||||||
tError("TCP:%s no enough memory", label);
|
tError("TCP:%s no enough memory", label);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
// initialize parameters in case it may encounter error later
|
// initialize parameters in case it may encounter error later
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
|
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
|
||||||
|
if (pThreadObj == NULL) {
|
||||||
|
tError("TCP:%s no enough memory", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
|
||||||
|
free(pServerObj->pThreadObj);
|
||||||
|
free(pServerObj);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pServerObj->pThreadObj[i] = pThreadObj;
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
taosResetPthread(&pThreadObj->thread);
|
taosResetPthread(&pThreadObj->thread);
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
pThreadObj++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize mutex, thread, fd which may fail
|
// initialize mutex, thread, fd which may fail
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
|
pThreadObj = pServerObj->pThreadObj[i];
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
|
@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->threadId = i;
|
pThreadObj->threadId = i;
|
||||||
pThreadObj++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||||
pThreadObj->stop = true;
|
pThreadObj->stop = true;
|
||||||
eventfd_t fd = -1;
|
eventfd_t fd = -1;
|
||||||
|
|
||||||
|
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
||||||
// signal the thread to stop, try graceful method first,
|
// signal the thread to stop, try graceful method first,
|
||||||
// and use pthread_cancel when failed
|
// and use pthread_cancel when failed
|
||||||
|
@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL);
|
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
|
||||||
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
pthread_join(pThreadObj->thread, NULL);
|
||||||
if (fd != -1) taosCloseSocket(fd);
|
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
|
||||||
SFdObj *pFdObj = pThreadObj->pHead;
|
|
||||||
pThreadObj->pHead = pFdObj->next;
|
|
||||||
taosFreeFdObj(pFdObj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fd != -1) taosCloseSocket(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosStopTcpServer(void *handle) {
|
void taosStopTcpServer(void *handle) {
|
||||||
|
@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
|
||||||
|
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
|
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
|
||||||
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
|
|
||||||
|
if (taosCheckPthreadValid(pServerObj->thread)) {
|
||||||
|
if (taosComparePthread(pServerObj->thread, pthread_self())) {
|
||||||
|
pthread_detach(pthread_self());
|
||||||
|
} else {
|
||||||
|
pthread_join(pServerObj->thread, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tDebug("%s TCP server is stopped", pServerObj->label);
|
tDebug("%s TCP server is stopped", pServerObj->label);
|
||||||
}
|
}
|
||||||
|
@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
|
|
||||||
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
||||||
pThreadObj = pServerObj->pThreadObj + i;
|
pThreadObj = pServerObj->pThreadObj[i];
|
||||||
taosStopTcpThread(pThreadObj);
|
taosStopTcpThread(pThreadObj);
|
||||||
pthread_mutex_destroy(&(pThreadObj->mutex));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s TCP server is cleaned up", pServerObj->label);
|
tDebug("%s TCP server is cleaned up", pServerObj->label);
|
||||||
|
@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
|
||||||
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
|
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
// pick up the thread to handle this connection
|
||||||
pThreadObj = pServerObj->pThreadObj + threadId;
|
pThreadObj = pServerObj->pThreadObj[threadId];
|
||||||
|
|
||||||
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
|
||||||
if (pFdObj) {
|
if (pFdObj) {
|
||||||
|
@ -329,8 +344,6 @@ void taosCleanUpTcpClient(void *chandle) {
|
||||||
|
|
||||||
taosStopTcpThread(pThreadObj);
|
taosStopTcpThread(pThreadObj);
|
||||||
tDebug ("%s TCP client is cleaned up", pThreadObj->label);
|
tDebug ("%s TCP client is cleaned up", pThreadObj->label);
|
||||||
|
|
||||||
taosTFree(pThreadObj);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
|
||||||
|
@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
|
||||||
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
||||||
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThreadObj->stop) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
|
||||||
|
|
||||||
|
while (pThreadObj->pHead) {
|
||||||
|
SFdObj *pFdObj = pThreadObj->pHead;
|
||||||
|
pThreadObj->pHead = pFdObj->next;
|
||||||
|
taosFreeFdObj(pFdObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||||
|
tDebug("%s TCP thread exits ...", pThreadObj->label);
|
||||||
|
taosTFree(pThreadObj);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pThread->stop) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("%p TCP epoll thread exits", pThread);
|
uDebug("%p TCP epoll thread exits", pThread);
|
||||||
|
@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_join(thread, NULL);
|
pthread_join(thread, NULL);
|
||||||
taosClose(fd);
|
if (fd >= 0) taosClose(fd);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue