commit
d855e6cc03
|
@ -22,6 +22,7 @@
|
|||
#include "dnodeMain.h"
|
||||
|
||||
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||
static sem_t exitSem;
|
||||
|
||||
int32_t main(int32_t argc, char *argv[]) {
|
||||
// Set global configuration file
|
||||
|
@ -65,6 +66,11 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
#endif
|
||||
}
|
||||
|
||||
if (sem_init(&exitSem, 0, 0) != 0) {
|
||||
printf("failed to create exit semphore\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
/* Set termination handler. */
|
||||
struct sigaction act = {{0}};
|
||||
act.sa_flags = SA_SIGINFO;
|
||||
|
@ -90,9 +96,19 @@ int32_t main(int32_t argc, char *argv[]) {
|
|||
|
||||
syslog(LOG_INFO, "Started TDengine service successfully.");
|
||||
|
||||
while (1) {
|
||||
sleep(1000);
|
||||
for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) {
|
||||
if (res != EINTR) {
|
||||
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
dnodeCleanUpSystem();
|
||||
// close the syslog
|
||||
syslog(LOG_INFO, "Shut down TDengine service successfully");
|
||||
dPrint("TDengine is shut down!");
|
||||
closelog();
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
|
||||
|
@ -104,14 +120,21 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
|
|||
taosCfgDynamicOptions("resetlog");
|
||||
return;
|
||||
}
|
||||
|
||||
syslog(LOG_INFO, "Shut down signal is %d", signum);
|
||||
syslog(LOG_INFO, "Shutting down TDengine service...");
|
||||
// clean the system.
|
||||
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
|
||||
dnodeCleanUpSystem();
|
||||
// close the syslog
|
||||
syslog(LOG_INFO, "Shut down TDengine service successfully");
|
||||
dPrint("TDengine is shut down!");
|
||||
closelog();
|
||||
exit(EXIT_SUCCESS);
|
||||
|
||||
// protect the application from receive another signal
|
||||
struct sigaction act = {{0}};
|
||||
act.sa_handler = SIG_IGN;
|
||||
sigaction(SIGTERM, &act, NULL);
|
||||
sigaction(SIGHUP, &act, NULL);
|
||||
sigaction(SIGINT, &act, NULL);
|
||||
sigaction(SIGUSR1, &act, NULL);
|
||||
sigaction(SIGUSR2, &act, NULL);
|
||||
|
||||
// inform main thread to exit
|
||||
sem_post(&exitSem);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tutil.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "twal.h"
|
||||
|
@ -71,11 +72,16 @@ int32_t dnodeInitRead() {
|
|||
}
|
||||
|
||||
void dnodeCleanupRead() {
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(readQset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i=0; i < readPool.max; ++i) {
|
||||
SReadWorker *pWorker = readPool.readWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_cancel(pWorker->thread);
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
|||
}
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param) {
|
||||
SReadWorker *pWorker = param;
|
||||
SReadMsg *pReadMsg;
|
||||
int type;
|
||||
void *pVnode;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
|
||||
dnodeHandleIdleReadWorker(pWorker);
|
||||
continue;
|
||||
dTrace("dnodeProcessReadQueee: got no message from qset, exiting...");
|
||||
break;
|
||||
}
|
||||
|
||||
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
|
||||
|
@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
UNUSED_FUNC
|
||||
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
|
||||
int32_t num = taosGetQueueNumber(readQset);
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "taoserror.h"
|
||||
#include "tutil.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tsdb.h"
|
||||
|
@ -67,11 +68,16 @@ int32_t dnodeInitWrite() {
|
|||
}
|
||||
|
||||
void dnodeCleanupWrite() {
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(pWorker->qset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
|
||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_cancel(pWorker->thread);
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
taosFreeQall(pWorker->qall);
|
||||
taosCloseQset(pWorker->qset);
|
||||
|
@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
|
||||
if (numOfMsgs <=0) {
|
||||
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
|
||||
continue;
|
||||
if (numOfMsgs ==0) {
|
||||
dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
|
@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
UNUSED_FUNC
|
||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ extern "C" {
|
|||
#include <string.h>
|
||||
#include <strings.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/eventfd.h>
|
||||
#include <sys/file.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/mman.h>
|
||||
|
|
|
@ -199,7 +199,7 @@ typedef struct HttpThread {
|
|||
pthread_t thread;
|
||||
HttpContext * pHead;
|
||||
pthread_mutex_t threadMutex;
|
||||
pthread_cond_t fdReady;
|
||||
bool stop;
|
||||
int pollFd;
|
||||
int numOfFds;
|
||||
int threadId;
|
||||
|
@ -212,6 +212,8 @@ typedef struct HttpServer {
|
|||
char label[HTTP_LABEL_SIZE];
|
||||
uint32_t serverIp;
|
||||
uint16_t serverPort;
|
||||
bool online;
|
||||
int fd;
|
||||
int cacheContext;
|
||||
int sessionExpire;
|
||||
int numOfThreads;
|
||||
|
@ -226,7 +228,6 @@ typedef struct HttpServer {
|
|||
bool (*processData)(HttpContext *pContext);
|
||||
int requestNum;
|
||||
void *timerHandle;
|
||||
bool online;
|
||||
} HttpServer;
|
||||
|
||||
// http util method
|
||||
|
|
|
@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) {
|
|||
httpCloseContextByServer(pContext->pThread, pContext);
|
||||
}
|
||||
|
||||
void httpCleanUpConnect(HttpServer *pServer) {
|
||||
int i;
|
||||
HttpThread *pThread;
|
||||
|
||||
static void httpStopThread(HttpThread* pThread) {
|
||||
pThread->stop = true;
|
||||
|
||||
// signal the thread to stop, try graceful method first,
|
||||
// and use pthread_cancel when failed
|
||||
struct epoll_event event = { .events = EPOLLIN };
|
||||
eventfd_t fd = eventfd(1, 0);
|
||||
if (fd == -1) {
|
||||
pthread_cancel(pThread->thread);
|
||||
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||
pthread_cancel(pThread->thread);
|
||||
}
|
||||
|
||||
pthread_join(pThread->thread, NULL);
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
|
||||
close(pThread->pollFd);
|
||||
pthread_mutex_destroy(&(pThread->threadMutex));
|
||||
|
||||
//while (pThread->pHead) {
|
||||
// httpCleanUpContext(pThread->pHead, 0);
|
||||
//}
|
||||
}
|
||||
|
||||
|
||||
void httpCleanUpConnect(HttpServer *pServer) {
|
||||
if (pServer == NULL) return;
|
||||
|
||||
pthread_cancel(pServer->thread);
|
||||
shutdown(pServer->fd, SHUT_RD);
|
||||
pthread_join(pServer->thread, NULL);
|
||||
|
||||
for (i = 0; i < pServer->numOfThreads; ++i) {
|
||||
pThread = pServer->pThreads + i;
|
||||
if (pThread == NULL) continue;
|
||||
//taosCloseSocket(pThread->pollFd);
|
||||
|
||||
//while (pThread->pHead) {
|
||||
// httpCleanUpContext(pThread->pHead, 0);
|
||||
//}
|
||||
|
||||
pthread_cancel(pThread->thread);
|
||||
pthread_join(pThread->thread, NULL);
|
||||
pthread_cond_destroy(&(pThread->fdReady));
|
||||
pthread_mutex_destroy(&(pThread->threadMutex));
|
||||
for (int i = 0; i < pServer->numOfThreads; ++i) {
|
||||
HttpThread* pThread = pServer->pThreads + i;
|
||||
if (pThread != NULL) {
|
||||
httpStopThread(pThread);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pServer->pThreads);
|
||||
|
@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) {
|
|||
pthread_sigmask(SIG_SETMASK, &set, NULL);
|
||||
|
||||
while (1) {
|
||||
pthread_mutex_lock(&pThread->threadMutex);
|
||||
if (pThread->numOfFds < 1) {
|
||||
pthread_cond_wait(&pThread->fdReady, &pThread->threadMutex);
|
||||
}
|
||||
pthread_mutex_unlock(&pThread->threadMutex);
|
||||
|
||||
struct epoll_event events[HTTP_MAX_EVENTS];
|
||||
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
|
||||
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
|
||||
if (pThread->stop) {
|
||||
httpTrace("%p, http thread get stop event, exiting...", pThread);
|
||||
break;
|
||||
}
|
||||
if (fdNum <= 0) continue;
|
||||
|
||||
for (int i = 0; i < fdNum; ++i) {
|
||||
|
@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
void httpAcceptHttpConnection(void *arg) {
|
||||
void* httpAcceptHttpConnection(void *arg) {
|
||||
int connFd = -1;
|
||||
struct sockaddr_in clientAddr;
|
||||
int sockFd;
|
||||
int threadId = 0;
|
||||
HttpThread * pThread;
|
||||
HttpServer * pServer;
|
||||
|
@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) {
|
|||
sigaddset(&set, SIGPIPE);
|
||||
pthread_sigmask(SIG_SETMASK, &set, NULL);
|
||||
|
||||
sockFd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
|
||||
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
|
||||
|
||||
if (sockFd < 0) {
|
||||
if (pServer->fd < 0) {
|
||||
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp),
|
||||
pServer->serverPort, strerror(errno));
|
||||
return;
|
||||
return NULL;
|
||||
} else {
|
||||
httpPrint("http service init success at %u", pServer->serverPort);
|
||||
pServer->online = true;
|
||||
|
@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) {
|
|||
|
||||
while (1) {
|
||||
socklen_t addrlen = sizeof(clientAddr);
|
||||
connFd = (int)accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen);
|
||||
|
||||
if (connFd < 3) {
|
||||
connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
|
||||
if (connFd == -1) {
|
||||
if (errno == EINVAL) {
|
||||
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label);
|
||||
break;
|
||||
}
|
||||
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) {
|
|||
pThread->pHead = pContext;
|
||||
|
||||
pThread->numOfFds++;
|
||||
pthread_cond_signal(&pThread->fdReady);
|
||||
|
||||
pthread_mutex_unlock(&(pThread->threadMutex));
|
||||
|
||||
|
@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) {
|
|||
threadId++;
|
||||
threadId = threadId % pServer->numOfThreads;
|
||||
}
|
||||
|
||||
close(pServer->fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool httpInitConnect(HttpServer *pServer) {
|
||||
|
@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) {
|
||||
httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno));
|
||||
return false;
|
||||
}
|
||||
|
||||
pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter
|
||||
if (pThread->pollFd < 0) {
|
||||
httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
|
||||
|
|
|
@ -39,8 +39,8 @@ typedef struct SThreadObj {
|
|||
pthread_t thread;
|
||||
SFdObj * pHead;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t fdReady;
|
||||
uint32_t ip;
|
||||
bool stop;
|
||||
int pollFd;
|
||||
int numOfFds;
|
||||
int threadId;
|
||||
|
@ -50,6 +50,7 @@ typedef struct SThreadObj {
|
|||
} SThreadObj;
|
||||
|
||||
typedef struct {
|
||||
int fd;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
char label[12];
|
||||
|
@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param);
|
|||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
||||
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||
static void taosAcceptTcpConnection(void *arg);
|
||||
static void* taosAcceptTcpConnection(void *arg);
|
||||
|
||||
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||
SServerObj *pServerObj;
|
||||
|
@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
break;;
|
||||
}
|
||||
|
||||
code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
|
||||
if (code != 0) {
|
||||
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||
if (pThreadObj->pollFd < 0) {
|
||||
tError("%s failed to create TCP epoll", label);
|
||||
|
@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
return (void *)pServerObj;
|
||||
}
|
||||
|
||||
static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||
pThreadObj->stop = true;
|
||||
|
||||
// signal the thread to stop, try graceful method first,
|
||||
// and use pthread_cancel when failed
|
||||
struct epoll_event event = { .events = EPOLLIN };
|
||||
eventfd_t fd = eventfd(1, 0);
|
||||
if (fd == -1) {
|
||||
pthread_cancel(pThreadObj->thread);
|
||||
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||
pthread_cancel(pThreadObj->thread);
|
||||
}
|
||||
|
||||
pthread_join(pThreadObj->thread, NULL);
|
||||
close(pThreadObj->pollFd);
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
|
||||
while (pThreadObj->pHead) {
|
||||
SFdObj *pFdObj = pThreadObj->pHead;
|
||||
pThreadObj->pHead = pFdObj->next;
|
||||
taosFreeFdObj(pFdObj);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void taosCleanUpTcpServer(void *handle) {
|
||||
SServerObj *pServerObj = handle;
|
||||
SThreadObj *pThreadObj;
|
||||
|
||||
if (pServerObj == NULL) return;
|
||||
|
||||
pthread_cancel(pServerObj->thread);
|
||||
shutdown(pServerObj->fd, SHUT_RD);
|
||||
pthread_join(pServerObj->thread, NULL);
|
||||
|
||||
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
||||
pThreadObj = pServerObj->pThreadObj + i;
|
||||
|
||||
while (pThreadObj->pHead) {
|
||||
SFdObj *pFdObj = pThreadObj->pHead;
|
||||
pThreadObj->pHead = pFdObj->next;
|
||||
taosFreeFdObj(pFdObj);
|
||||
}
|
||||
|
||||
close(pThreadObj->pollFd);
|
||||
pthread_cancel(pThreadObj->thread);
|
||||
pthread_join(pThreadObj->thread, NULL);
|
||||
pthread_cond_destroy(&(pThreadObj->fdReady));
|
||||
taosStopTcpThread(pThreadObj);
|
||||
pthread_mutex_destroy(&(pThreadObj->mutex));
|
||||
}
|
||||
|
||||
|
@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) {
|
|||
tfree(pServerObj);
|
||||
}
|
||||
|
||||
static void taosAcceptTcpConnection(void *arg) {
|
||||
static void* taosAcceptTcpConnection(void *arg) {
|
||||
int connFd = -1;
|
||||
struct sockaddr_in caddr;
|
||||
int sockFd;
|
||||
int threadId = 0;
|
||||
SThreadObj *pThreadObj;
|
||||
SServerObj *pServerObj;
|
||||
|
||||
pServerObj = (SServerObj *)arg;
|
||||
|
||||
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||
if (sockFd < 0) return;
|
||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||
if (pServerObj->fd < 0) return NULL;
|
||||
|
||||
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||
|
||||
while (1) {
|
||||
socklen_t addrlen = sizeof(caddr);
|
||||
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen);
|
||||
|
||||
if (connFd < 0) {
|
||||
connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
|
||||
if (connFd == -1) {
|
||||
if (errno == EINVAL) {
|
||||
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
|
||||
break;
|
||||
}
|
||||
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) {
|
|||
threadId++;
|
||||
threadId = threadId % pServerObj->numOfThreads;
|
||||
}
|
||||
|
||||
close(pServerObj->fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
|
||||
|
@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
|
||||
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||
if (pThreadObj->pollFd < 0) {
|
||||
tError("%s failed to create TCP client epoll", label);
|
||||
|
@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) {
|
|||
SThreadObj *pThreadObj = chandle;
|
||||
if (pThreadObj == NULL) return;
|
||||
|
||||
while (pThreadObj->pHead) {
|
||||
SFdObj *pFdObj = pThreadObj->pHead;
|
||||
pThreadObj->pHead = pFdObj->next;
|
||||
taosFreeFdObj(pFdObj);
|
||||
}
|
||||
|
||||
close(pThreadObj->pollFd);
|
||||
|
||||
pthread_cancel(pThreadObj->thread);
|
||||
pthread_join(pThreadObj->thread, NULL);
|
||||
|
||||
taosStopTcpThread(pThreadObj);
|
||||
tTrace (":%s, all connections are cleaned up", pThreadObj->label);
|
||||
|
||||
tfree(pThreadObj);
|
||||
|
@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) {
|
|||
SRpcHead rpcHead;
|
||||
|
||||
while (1) {
|
||||
pthread_mutex_lock(&pThreadObj->mutex);
|
||||
if (pThreadObj->numOfFds < 1) {
|
||||
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
|
||||
}
|
||||
pthread_mutex_unlock(&pThreadObj->mutex);
|
||||
|
||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
||||
if (pThreadObj->stop) {
|
||||
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
|
||||
break;
|
||||
}
|
||||
if (fdNum < 0) continue;
|
||||
|
||||
for (int i = 0; i < fdNum; ++i) {
|
||||
|
@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
|
|||
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
|
||||
pThreadObj->pHead = pFdObj;
|
||||
pThreadObj->numOfFds++;
|
||||
pthread_cond_signal(&pThreadObj->fdReady);
|
||||
pthread_mutex_unlock(&(pThreadObj->mutex));
|
||||
|
||||
return pFdObj;
|
||||
|
@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
|
|||
|
||||
tfree(pFdObj);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) {
|
|||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pConn->signature = NULL;
|
||||
free(pConn->buffer);
|
||||
pthread_cancel(pConn->thread);
|
||||
taosCloseSocket(pConn->fd);
|
||||
// shutdown to signal the thread to exit
|
||||
shutdown(pConn->fd, SHUT_RD);
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pthread_join(pConn->thread, NULL);
|
||||
free(pConn->buffer);
|
||||
taosCloseSocket(pConn->fd);
|
||||
tTrace("chandle:%p is closed", pConn);
|
||||
}
|
||||
|
||||
|
@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) {
|
|||
|
||||
while (1) {
|
||||
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
|
||||
if(dataLen == 0) {
|
||||
tTrace("data length is 0, socket was closed, exiting");
|
||||
break;
|
||||
}
|
||||
|
||||
port = ntohs(sourceAdd.sin_port);
|
||||
|
||||
if (dataLen < sizeof(SRpcHead)) {
|
||||
|
|
|
@ -39,6 +39,7 @@ void taosResetQitems(taos_qall);
|
|||
|
||||
taos_qset taosOpenQset();
|
||||
void taosCloseQset();
|
||||
void taosQsetThreadResume(taos_qset param);
|
||||
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
|
||||
void taosRemoveFromQset(taos_qset, taos_queue);
|
||||
int taosGetQueueNumber(taos_qset);
|
||||
|
|
|
@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) {
|
|||
free(qset);
|
||||
}
|
||||
|
||||
// tsem_post 'qset->sem', so that reader threads waiting for it
|
||||
// resumes execution and return, should only be used to signal the
|
||||
// thread to exit.
|
||||
void taosQsetThreadResume(taos_qset param) {
|
||||
STaosQset *qset = (STaosQset *)param;
|
||||
tsem_post(&qset->sem);
|
||||
}
|
||||
|
||||
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
|
||||
STaosQueue *queue = (STaosQueue *)p2;
|
||||
STaosQset *qset = (STaosQset *)p1;
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef struct {
|
|||
int numOfThreads;
|
||||
pthread_t * qthread;
|
||||
SSchedMsg * queue;
|
||||
|
||||
bool stop;
|
||||
void* pTmrCtrl;
|
||||
void* pTimer;
|
||||
} SSchedQueue;
|
||||
|
@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pSched->stop = false;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
pthread_attr_t attr;
|
||||
pthread_attr_init(&attr);
|
||||
|
@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) {
|
|||
}
|
||||
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
|
||||
}
|
||||
if (pSched->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
|
||||
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
|
||||
|
@ -185,13 +189,16 @@ void taosCleanUpScheduler(void *param) {
|
|||
SSchedQueue *pSched = (SSchedQueue *)param;
|
||||
if (pSched == NULL) return;
|
||||
|
||||
pSched->stop = true;
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
if (pSched->qthread[i])
|
||||
pthread_cancel(pSched->qthread[i]);
|
||||
if (pSched->qthread[i]) {
|
||||
tsem_post(&pSched->fullSem);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < pSched->numOfThreads; ++i) {
|
||||
if (pSched->qthread[i])
|
||||
if (pSched->qthread[i]) {
|
||||
pthread_join(pSched->qthread[i], NULL);
|
||||
}
|
||||
}
|
||||
|
||||
tsem_destroy(&pSched->emptySem);
|
||||
|
|
Loading…
Reference in New Issue