Merge branch 'develop' into hotfix/taos-tools

* develop:
  free is not right
  TD-337, #1827: thread exit gracefully
  ctrl+c handling in taosd
  version in sync is not updated correctly
  change max cache value from 512 to 128.
  [TD-375]
This commit is contained in:
Shuaiqiang Chang 2020-05-21 10:48:42 +08:00
commit fa1ef5cf6f
35 changed files with 214 additions and 143 deletions

View File

@ -22,6 +22,7 @@
#include "dnodeMain.h" #include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
static sem_t exitSem;
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
// Set global configuration file // Set global configuration file
@ -65,6 +66,11 @@ int32_t main(int32_t argc, char *argv[]) {
#endif #endif
} }
if (sem_init(&exitSem, 0, 0) != 0) {
printf("failed to create exit semphore\n");
exit(EXIT_FAILURE);
}
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
@ -90,9 +96,19 @@ int32_t main(int32_t argc, char *argv[]) {
syslog(LOG_INFO, "Started TDengine service successfully."); syslog(LOG_INFO, "Started TDengine service successfully.");
while (1) { for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) {
sleep(1000); if (res != EINTR) {
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
break;
}
} }
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
return EXIT_SUCCESS;
} }
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
@ -104,14 +120,21 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
taosCfgDynamicOptions("resetlog"); taosCfgDynamicOptions("resetlog");
return; return;
} }
syslog(LOG_INFO, "Shut down signal is %d", signum); syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service..."); syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system. // clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog // protect the application from receive another signal
syslog(LOG_INFO, "Shut down TDengine service successfully"); struct sigaction act = {{0}};
dPrint("TDengine is shut down!"); act.sa_handler = SIG_IGN;
closelog(); sigaction(SIGTERM, &act, NULL);
exit(EXIT_SUCCESS); sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// inform main thread to exit
sem_post(&exitSem);
} }

View File

@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "twal.h" #include "twal.h"
@ -71,11 +72,16 @@ int32_t dnodeInitRead() {
} }
void dnodeCleanupRead() { void dnodeCleanupRead() {
for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(readQset);
}
}
for (int i=0; i < readPool.max; ++i) { for (int i=0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
@ -201,15 +207,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
SReadWorker *pWorker = param;
SReadMsg *pReadMsg; SReadMsg *pReadMsg;
int type; int type;
void *pVnode; void *pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dnodeHandleIdleReadWorker(pWorker); dTrace("dnodeProcessReadQueee: got no message from qset, exiting...");
continue; break;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
@ -221,6 +226,8 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) { static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset); int32_t num = taosGetQueueNumber(readQset);

View File

@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
@ -67,11 +68,16 @@ int32_t dnodeInitWrite() {
} }
void dnodeCleanupWrite() { void dnodeCleanupWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_cancel(pWorker->thread);
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
@ -186,9 +192,9 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs ==0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
continue; break;
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
@ -228,6 +234,7 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset); int32_t num = taosGetQueueNumber(pWorker->qset);

View File

@ -95,7 +95,7 @@ typedef void* tsync_h;
tsync_h syncStart(const SSyncInfo *); tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle); void syncStop(tsync_h shandle);
int syncReconfig(tsync_h shandle, const SSyncCfg *); int syncReconfig(tsync_h shandle, const SSyncCfg *);
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle); int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes: void syncRecover(tsync_h shandle); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *); int syncGetNodesRole(tsync_h shandle, SNodesRole *);

View File

@ -227,7 +227,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
static int32_t sdbForwardToPeer(SWalHead *pHead) { static int32_t sdbForwardToPeer(SWalHead *pHead) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version); int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
if (code > 0) { if (code > 0) {
sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); sdbTrace("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
sem_wait(&tsSdbObj.sem); sem_wait(&tsSdbObj.sem);

View File

@ -53,6 +53,7 @@ extern "C" {
#include <string.h> #include <string.h>
#include <strings.h> #include <strings.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/file.h> #include <sys/file.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/mman.h> #include <sys/mman.h>

View File

@ -199,7 +199,7 @@ typedef struct HttpThread {
pthread_t thread; pthread_t thread;
HttpContext * pHead; HttpContext * pHead;
pthread_mutex_t threadMutex; pthread_mutex_t threadMutex;
pthread_cond_t fdReady; bool stop;
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
@ -212,6 +212,8 @@ typedef struct HttpServer {
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
uint32_t serverIp; uint32_t serverIp;
uint16_t serverPort; uint16_t serverPort;
bool online;
int fd;
int cacheContext; int cacheContext;
int sessionExpire; int sessionExpire;
int numOfThreads; int numOfThreads;
@ -226,7 +228,6 @@ typedef struct HttpServer {
bool (*processData)(HttpContext *pContext); bool (*processData)(HttpContext *pContext);
int requestNum; int requestNum;
void *timerHandle; void *timerHandle;
bool online;
} HttpServer; } HttpServer;
// http util method // http util method

View File

@ -258,28 +258,45 @@ void httpCloseContextByServerForExpired(void *param, void *tmrId) {
httpCloseContextByServer(pContext->pThread, pContext); httpCloseContextByServer(pContext->pThread, pContext);
} }
void httpCleanUpConnect(HttpServer *pServer) {
int i;
HttpThread *pThread;
static void httpStopThread(HttpThread* pThread) {
pThread->stop = true;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThread->thread);
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThread->thread);
}
pthread_join(pThread->thread, NULL);
if (fd != -1) {
close(fd);
}
close(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex));
//while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0);
//}
}
void httpCleanUpConnect(HttpServer *pServer) {
if (pServer == NULL) return; if (pServer == NULL) return;
pthread_cancel(pServer->thread); shutdown(pServer->fd, SHUT_RD);
pthread_join(pServer->thread, NULL); pthread_join(pServer->thread, NULL);
for (i = 0; i < pServer->numOfThreads; ++i) { for (int i = 0; i < pServer->numOfThreads; ++i) {
pThread = pServer->pThreads + i; HttpThread* pThread = pServer->pThreads + i;
if (pThread == NULL) continue; if (pThread != NULL) {
//taosCloseSocket(pThread->pollFd); httpStopThread(pThread);
}
//while (pThread->pHead) {
// httpCleanUpContext(pThread->pHead, 0);
//}
pthread_cancel(pThread->thread);
pthread_join(pThread->thread, NULL);
pthread_cond_destroy(&(pThread->fdReady));
pthread_mutex_destroy(&(pThread->threadMutex));
} }
tfree(pServer->pThreads); tfree(pServer->pThreads);
@ -412,15 +429,13 @@ void httpProcessHttpData(void *param) {
pthread_sigmask(SIG_SETMASK, &set, NULL); pthread_sigmask(SIG_SETMASK, &set, NULL);
while (1) { while (1) {
pthread_mutex_lock(&pThread->threadMutex);
if (pThread->numOfFds < 1) {
pthread_cond_wait(&pThread->fdReady, &pThread->threadMutex);
}
pthread_mutex_unlock(&pThread->threadMutex);
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1);
if (pThread->stop) {
httpTrace("%p, http thread get stop event, exiting...", pThread);
break;
}
if (fdNum <= 0) continue; if (fdNum <= 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
@ -485,10 +500,9 @@ void httpProcessHttpData(void *param) {
} }
} }
void httpAcceptHttpConnection(void *arg) { void* httpAcceptHttpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
int sockFd;
int threadId = 0; int threadId = 0;
HttpThread * pThread; HttpThread * pThread;
HttpServer * pServer; HttpServer * pServer;
@ -502,12 +516,12 @@ void httpAcceptHttpConnection(void *arg) {
sigaddset(&set, SIGPIPE); sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL); pthread_sigmask(SIG_SETMASK, &set, NULL);
sockFd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
if (sockFd < 0) { if (pServer->fd < 0) {
httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp), httpError("http server:%s, failed to open http socket, ip:%s:%u error:%s", pServer->label, taosIpStr(pServer->serverIp),
pServer->serverPort, strerror(errno)); pServer->serverPort, strerror(errno));
return; return NULL;
} else { } else {
httpPrint("http service init success at %u", pServer->serverPort); httpPrint("http service init success at %u", pServer->serverPort);
pServer->online = true; pServer->online = true;
@ -515,9 +529,12 @@ void httpAcceptHttpConnection(void *arg) {
while (1) { while (1) {
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
connFd = (int)accept(sockFd, (struct sockaddr *)&clientAddr, &addrlen); connFd = (int)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd == -1) {
if (connFd < 3) { if (errno == EINVAL) {
httpTrace("%s HTTP server socket was shutdown, exiting...", pServer->label);
break;
}
httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno)); httpError("http server:%s, accept connect failure, errno:%d, reason:%s", pServer->label, errno, strerror(errno));
continue; continue;
} }
@ -579,7 +596,6 @@ void httpAcceptHttpConnection(void *arg) {
pThread->pHead = pContext; pThread->pHead = pContext;
pThread->numOfFds++; pThread->numOfFds++;
pthread_cond_signal(&pThread->fdReady);
pthread_mutex_unlock(&(pThread->threadMutex)); pthread_mutex_unlock(&(pThread->threadMutex));
@ -587,6 +603,9 @@ void httpAcceptHttpConnection(void *arg) {
threadId++; threadId++;
threadId = threadId % pServer->numOfThreads; threadId = threadId % pServer->numOfThreads;
} }
close(pServer->fd);
return NULL;
} }
bool httpInitConnect(HttpServer *pServer) { bool httpInitConnect(HttpServer *pServer) {
@ -612,11 +631,6 @@ bool httpInitConnect(HttpServer *pServer) {
return false; return false;
} }
if (pthread_cond_init(&(pThread->fdReady), NULL) != 0) {
httpError("http thread:%s, init HTTP condition variable failed, reason:%s", pThread->label, strerror(errno));
return false;
}
pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
httpError("http thread:%s, failed to create HTTP epoll", pThread->label); httpError("http thread:%s, failed to create HTTP epoll", pThread->label);

View File

@ -889,12 +889,12 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
rpcSendErrorMsgToPeer(pRecv, code); rpcSendErrorMsgToPeer(pRecv, code);
tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
} }
} else { // parsing OK } else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead); rpcProcessIncomingMsg(pConn, pHead);
} }
} }
if (code) rpcFreeMsg(pRecv->msg); if (code) tfree(pRecv->msg); // parsing failed, msg shall be freed
return pConn; return pConn;
} }

View File

@ -39,8 +39,8 @@ typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
SFdObj * pHead; SFdObj * pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t fdReady;
uint32_t ip; uint32_t ip;
bool stop;
int pollFd; int pollFd;
int numOfFds; int numOfFds;
int threadId; int threadId;
@ -50,6 +50,7 @@ typedef struct SThreadObj {
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
int fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char label[12]; char label[12];
@ -63,7 +64,7 @@ static void *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void taosFreeFdObj(SFdObj *pFdObj); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void taosAcceptTcpConnection(void *arg); static void* taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
@ -95,12 +96,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break;; break;;
} }
code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
if (code != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
break;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
@ -144,28 +139,45 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
struct epoll_event event = { .events = EPOLLIN };
eventfd_t fd = eventfd(1, 0);
if (fd == -1) {
pthread_cancel(pThreadObj->thread);
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
pthread_cancel(pThreadObj->thread);
}
pthread_join(pThreadObj->thread, NULL);
close(pThreadObj->pollFd);
if (fd != -1) {
close(fd);
}
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
}
void taosCleanUpTcpServer(void *handle) { void taosCleanUpTcpServer(void *handle) {
SServerObj *pServerObj = handle; SServerObj *pServerObj = handle;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
if (pServerObj == NULL) return; if (pServerObj == NULL) return;
pthread_cancel(pServerObj->thread); shutdown(pServerObj->fd, SHUT_RD);
pthread_join(pServerObj->thread, NULL); pthread_join(pServerObj->thread, NULL);
for (int i = 0; i < pServerObj->numOfThreads; ++i) { for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i; pThreadObj = pServerObj->pThreadObj + i;
taosStopTcpThread(pThreadObj);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
pthread_cond_destroy(&(pThreadObj->fdReady));
pthread_mutex_destroy(&(pThreadObj->mutex)); pthread_mutex_destroy(&(pThreadObj->mutex));
} }
@ -175,26 +187,28 @@ void taosCleanUpTcpServer(void *handle) {
tfree(pServerObj); tfree(pServerObj);
} }
static void taosAcceptTcpConnection(void *arg) { static void* taosAcceptTcpConnection(void *arg) {
int connFd = -1; int connFd = -1;
struct sockaddr_in caddr; struct sockaddr_in caddr;
int sockFd;
int threadId = 0; int threadId = 0;
SThreadObj *pThreadObj; SThreadObj *pThreadObj;
SServerObj *pServerObj; SServerObj *pServerObj;
pServerObj = (SServerObj *)arg; pServerObj = (SServerObj *)arg;
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) return; if (pServerObj->fd < 0) return NULL;
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) { while (1) {
socklen_t addrlen = sizeof(caddr); socklen_t addrlen = sizeof(caddr);
connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen); connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
if (connFd == -1) {
if (connFd < 0) { if (errno == EINVAL) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break;
}
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno)); tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
continue; continue;
} }
@ -220,6 +234,9 @@ static void taosAcceptTcpConnection(void *arg) {
threadId++; threadId++;
threadId = threadId % pServerObj->numOfThreads; threadId = threadId % pServerObj->numOfThreads;
} }
close(pServerObj->fd);
return NULL;
} }
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) { void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
@ -237,11 +254,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
return NULL; return NULL;
} }
if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
return NULL;
}
pThreadObj->pollFd = epoll_create(10); // size does not matter pThreadObj->pollFd = epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
@ -268,17 +280,7 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle; SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return; if (pThreadObj == NULL) return;
while (pThreadObj->pHead) { taosStopTcpThread(pThreadObj);
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
pthread_cancel(pThreadObj->thread);
pthread_join(pThreadObj->thread, NULL);
tTrace (":%s, all connections are cleaned up", pThreadObj->label); tTrace (":%s, all connections are cleaned up", pThreadObj->label);
tfree(pThreadObj); tfree(pThreadObj);
@ -350,13 +352,11 @@ static void *taosProcessTcpData(void *param) {
SRpcHead rpcHead; SRpcHead rpcHead;
while (1) { while (1) {
pthread_mutex_lock(&pThreadObj->mutex);
if (pThreadObj->numOfFds < 1) {
pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
}
pthread_mutex_unlock(&pThreadObj->mutex);
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
if (pThreadObj->stop) {
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
break;
}
if (fdNum < 0) continue; if (fdNum < 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
@ -444,7 +444,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj; if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
pThreadObj->pHead = pFdObj; pThreadObj->pHead = pFdObj;
pThreadObj->numOfFds++; pThreadObj->numOfFds++;
pthread_cond_signal(&pThreadObj->fdReady);
pthread_mutex_unlock(&(pThreadObj->mutex)); pthread_mutex_unlock(&(pThreadObj->mutex));
return pFdObj; return pFdObj;
@ -492,5 +491,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }

View File

@ -135,14 +135,15 @@ void taosCleanUpUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pConn->signature = NULL; pConn->signature = NULL;
free(pConn->buffer); // shutdown to signal the thread to exit
pthread_cancel(pConn->thread); shutdown(pConn->fd, SHUT_RD);
taosCloseSocket(pConn->fd);
} }
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pthread_join(pConn->thread, NULL); pthread_join(pConn->thread, NULL);
free(pConn->buffer);
taosCloseSocket(pConn->fd);
tTrace("chandle:%p is closed", pConn); tTrace("chandle:%p is closed", pConn);
} }
@ -177,6 +178,11 @@ static void *taosRecvUdpData(void *param) {
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if(dataLen == 0) {
tTrace("data length is 0, socket was closed, exiting");
break;
}
port = ntohs(sourceAdd.sin_port); port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) { if (dataLen < sizeof(SRpcHead)) {

View File

@ -39,6 +39,7 @@ void taosResetQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
void taosCloseQset(); void taosCloseQset();
void taosQsetThreadResume(taos_qset param);
int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);

View File

@ -230,6 +230,14 @@ void taosCloseQset(taos_qset param) {
free(qset); free(qset);
} }
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
tsem_post(&qset->sem);
}
int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;

View File

@ -31,7 +31,7 @@ typedef struct {
int numOfThreads; int numOfThreads;
pthread_t * qthread; pthread_t * qthread;
SSchedMsg * queue; SSchedMsg * queue;
bool stop;
void* pTmrCtrl; void* pTmrCtrl;
void* pTimer; void* pTimer;
} SSchedQueue; } SSchedQueue;
@ -85,6 +85,7 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
return NULL; return NULL;
} }
pSched->stop = false;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
@ -128,6 +129,9 @@ void *taosProcessSchedQueue(void *param) {
} }
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
} }
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0) if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
@ -185,13 +189,16 @@ void taosCleanUpScheduler(void *param) {
SSchedQueue *pSched = (SSchedQueue *)param; SSchedQueue *pSched = (SSchedQueue *)param;
if (pSched == NULL) return; if (pSched == NULL) return;
pSched->stop = true;
for (int i = 0; i < pSched->numOfThreads; ++i) { for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i]) if (pSched->qthread[i]) {
pthread_cancel(pSched->qthread[i]); tsem_post(&pSched->fullSem);
}
} }
for (int i = 0; i < pSched->numOfThreads; ++i) { for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pSched->qthread[i]) if (pSched->qthread[i]) {
pthread_join(pSched->qthread[i], NULL); pthread_join(pSched->qthread[i], NULL);
}
} }
tsem_destroy(&pSched->emptySem); tsem_destroy(&pSched->emptySem);

View File

@ -416,12 +416,12 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#else #else
char *tmpDir = "/tmp/"; char *tmpDir = "/tmp/";
#endif #endif
int64_t ts = taosGetTimestampUs();
strcpy(tmpPath, tmpDir); strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix); strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix); strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u"); strcat(tmpPath, "-%d-%"PRIu64"-%u-%"PRIu64);
snprintf(dstPath, PATH_MAX, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1)); snprintf(dstPath, PATH_MAX, tmpPath, getpid(), taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1), ts);
} }
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) { int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {

View File

@ -46,7 +46,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; } int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {} void syncStop(tsync_h shandle) {}
int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }

View File

@ -72,10 +72,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
// forward to peers if data is from RPC or CQ // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ) syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write data locally // write data locally

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -34,7 +34,7 @@ class TDTestCase:
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512') tdSql.execute('create database db cache 128')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")

View File

@ -33,7 +33,7 @@ class TDTestCase:
tdDnodes.start(1) tdDnodes.start(1)
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.execute('drop database if exists db') tdSql.execute('drop database if exists db')
tdSql.execute('create database db cache 512 maxtables 10') tdSql.execute('create database db cache 128 maxtables 10')
tdSql.execute('use db') tdSql.execute('use db')
tdLog.info("================= step1") tdLog.info("================= step1")