diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index 734a2b6aeb..eb017c335e 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -113,6 +113,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { } int32_t dnodeInitClient() { + char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.label = "DND-C"; @@ -123,7 +124,7 @@ int32_t dnodeInitClient() { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "t"; rpcInit.ckey = "key"; - rpcInit.secret = "secret"; + rpcInit.secret = secret; tsDnodeClientRpc = rpcOpen(&rpcInit); if (tsDnodeClientRpc == NULL) { diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 29181ed78f..c75fc70d75 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_COUNTRY_LEN 20 #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 64 +#define TSDB_LABEL_LEN 8 #define TSDB_FQDN_LEN 128 #define TSDB_EP_LEN (TSDB_FQDN_LEN+6) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 7505a22c9d..b0a5a0bfc0 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -47,7 +47,7 @@ typedef struct { uint16_t localPort; int8_t connType; int index; // for UDP server only, round robin for multiple threads - char label[12]; + char label[TSDB_LABEL_LEN]; char user[TSDB_UNI_LEN]; // meter ID char spi; // security parameter index @@ -88,7 +88,7 @@ typedef struct { } SRpcReqContext; typedef struct SRpcConn { - char info[50];// debug info: label + pConn + ahandle + char info[48];// debug info: label + pConn + ahandle int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if (pConn == NULL) { tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno)); return NULL; - } else { - if (rpcIsReq(pHead->msgType)) { - pConn->ahandle = (void *)pHead->ahandle; - sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); - } - } + } rpcLockConn(pConn); - sid = pConn->sid; + if (rpcIsReq(pHead->msgType)) { + pConn->ahandle = (void *)pHead->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); + } + + sid = pConn->sid; pConn->chandle = pRecv->chandle; pConn->peerIp = pRecv->ip; pConn->peerPort = pRecv->port; @@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { } static void rpcProcessBrokenLink(SRpcConn *pConn) { + if (pConn == NULL) return; SRpcInfo *pRpc = pConn->pRpc; - tTrace("%s, link is broken", pConn->info); - // pConn->chandle = NULL; + + rpcLockConn(pConn); if (pConn->outType) { SRpcReqContext *pContext = pConn->pContext; @@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { (*(pRpc->cfp))(&rpcMsg); */ } - + + rpcUnlockConn(pConn); rpcCloseConn(pConn); } @@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { // underlying UDP layer does not know it is server or client pRecv->connType = pRecv->connType | pRpc->connType; - if (pRecv->ip == 0 && pConn) { + if (pRecv->ip == 0) { rpcProcessBrokenLink(pConn); return NULL; } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 511a57f3fe..04a269502e 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -16,6 +16,8 @@ #include "os.h" #include "tsocket.h" #include "tutil.h" +#include "taosdef.h" +#include "taoserror.h" #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" @@ -26,8 +28,9 @@ typedef struct SFdObj { void *signature; - int fd; // TCP socket FD - void *thandle; // handle from upper layer, like TAOS + int fd; // TCP socket FD + int closedByApp; // 1: already closed by App + void *thandle; // handle from upper layer, like TAOS uint32_t ip; uint16_t port; struct SThreadObj *pThreadObj; @@ -44,7 +47,7 @@ typedef struct SThreadObj { int pollFd; int numOfFds; int threadId; - char label[12]; + char label[TSDB_LABEL_LEN]; void *shandle; // handle passed by upper layer during server initialization void *(*processData)(SRecvInfo *pPacket); } SThreadObj; @@ -53,7 +56,7 @@ typedef struct { int fd; uint32_t ip; uint16_t port; - char label[12]; + char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; SThreadObj *pThreadObj; @@ -71,6 +74,13 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread SThreadObj *pThreadObj; pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); + if (pServerObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + pServerObj->thread = 0; pServerObj->ip = ip; pServerObj->port = port; tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); @@ -79,13 +89,20 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); free(pServerObj); return NULL; } int code = 0; + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj->pollFd = -1; + pThreadObj->thread = 0; pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; @@ -93,23 +110,22 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); break;; } pThreadObj->pollFd = epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); + terrno = TAOS_SYSTEM_ERROR(errno); code = -1; break; } - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); - pthread_attr_destroy(&thattr); if (code != 0) { tError("%s failed to create TCP process data thread(%s)", label, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); break; } @@ -118,47 +134,47 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } if (code == 0) { - pthread_attr_t thattr; - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj)); - pthread_attr_destroy(&thattr); if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP accept thread(%s)", label, strerror(errno)); } } if (code != 0) { - free(pServerObj->pThreadObj); - free(pServerObj); + taosCleanUpTcpServer(pServerObj); pServerObj = NULL; } else { tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads); } + pthread_attr_destroy(&thattr); return (void *)pServerObj; } static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; + eventfd_t fd = -1; - // signal the thread to stop, try graceful method first, - // and use pthread_cancel when failed - struct epoll_event event = { .events = EPOLLIN }; - eventfd_t fd = eventfd(1, 0); - if (fd == -1) { - tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); - pthread_cancel(pThreadObj->thread); - } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { - tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno)); - pthread_cancel(pThreadObj->thread); + if (pThreadObj->thread && pThreadObj->pollFd >=0) { + // signal the thread to stop, try graceful method first, + // and use pthread_cancel when failed + struct epoll_event event = { .events = EPOLLIN }; + fd = eventfd(1, 0); + if (fd == -1) { + // failed to create eventfd, call pthread_cancel instead, which may result in data corruption: + tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno)); + pthread_cancel(pThreadObj->thread); + } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { + // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption: + tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno)); + pthread_cancel(pThreadObj->thread); + } } - pthread_join(pThreadObj->thread, NULL); - close(pThreadObj->pollFd); - if (fd != -1) { - close(fd); - } + if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL); + if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd); + if (fd != -1) close(fd); while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; @@ -173,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) { SThreadObj *pThreadObj; if (pServerObj == NULL) return; - - shutdown(pServerObj->fd, SHUT_RD); - pthread_join(pServerObj->thread, NULL); + if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); + if(pServerObj->thread) pthread_join(pServerObj->thread, NULL); for (int i = 0; i < pServerObj->numOfThreads; ++i) { pThreadObj = pServerObj->pThreadObj + i; @@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) { tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label); break; } + tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno)); continue; } @@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) { tError("%s failed to init TCP client mutex(%s)", label, strerror(errno)); free(pThreadObj); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP client epoll", label); free(pThreadObj); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * if (code != 0) { close(pThreadObj->pollFd); free(pThreadObj); + terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); return NULL; } @@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) { if (pThreadObj == NULL) return; taosStopTcpThread(pThreadObj); - tTrace (":%s, all connections are cleaned up", pThreadObj->label); + tTrace ("%s, all connections are cleaned up", pThreadObj->label); tfree(pThreadObj); } @@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; if (pFdObj == NULL) return; - taosFreeFdObj(pFdObj); + pFdObj->thandle = NULL; + pFdObj->closedByApp = 1; + shutdown(pFdObj->fd, SHUT_WR); } int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { @@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { SThreadObj *pThreadObj = pFdObj->pThreadObj; // notify the upper layer, so it will clean the associated context - if (pFdObj->thandle) { + if (pFdObj->closedByApp == 0) { + shutdown(pFdObj->fd, SHUT_WR); + SRecvInfo recvInfo; recvInfo.msg = NULL; recvInfo.msgLen = 0; @@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { recvInfo.chandle = NULL; recvInfo.connType = RPC_CONN_TCP; (*(pThreadObj->processData))(&recvInfo); - } else { - taosFreeFdObj(pFdObj); + } + + taosFreeFdObj(pFdObj); +} + +static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { + SRpcHead rpcHead; + int32_t msgLen, leftLen, retLen, headLen; + char *buffer, *msg; + + SThreadObj *pThreadObj = pFdObj->pThreadObj; + + headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); + if (headLen != sizeof(SRpcHead)) { + tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); + return -1; } + + msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + buffer = malloc(msgLen + tsRpcOverhead); + if ( NULL == buffer) { + tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); + return -1; + } + + msg = buffer + tsRpcOverhead; + leftLen = msgLen - headLen; + retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); + + if (leftLen != retLen) { + tError("%s %p, read error, leftLen:%d retLen:%d", + pThreadObj->label, pFdObj->thandle, leftLen, retLen); + free(buffer); + return -1; + } + + memcpy(msg, &rpcHead, sizeof(SRpcHead)); + + pInfo->msg = msg; + pInfo->msgLen = msgLen; + pInfo->ip = pFdObj->ip; + pInfo->port = pFdObj->port; + pInfo->shandle = pThreadObj->shandle; + pInfo->thandle = pFdObj->thandle;; + pInfo->chandle = pFdObj; + pInfo->connType = RPC_CONN_TCP; + + if (pFdObj->closedByApp) { + free(buffer); + return -1; + } + + return 0; } #define maxEvents 10 @@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) { SFdObj *pFdObj; struct epoll_event events[maxEvents]; SRecvInfo recvInfo; - SRpcHead rpcHead; while (1) { int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); @@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) { continue; } + if (events[i].events & EPOLLRDHUP) { + tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle); + taosReportBrokenLink(pFdObj); + continue; + } + if (events[i].events & EPOLLHUP) { tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle); taosReportBrokenLink(pFdObj); continue; } - int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); - if (headLen != sizeof(SRpcHead)) { - tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen); - taosReportBrokenLink(pFdObj); + if (taosReadTcpData(pFdObj, &recvInfo) < 0) { + shutdown(pFdObj->fd, SHUT_WR); continue; } - int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - char *buffer = malloc(msgLen + tsRpcOverhead); - if ( NULL == buffer) { - tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); - taosReportBrokenLink(pFdObj); - continue; - } - - char *msg = buffer + tsRpcOverhead; - int32_t leftLen = msgLen - headLen; - int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); - - if (leftLen != retLen) { - tError("%s %p, read error, leftLen:%d retLen:%d", - pThreadObj->label, pFdObj->thandle, leftLen, retLen); - taosReportBrokenLink(pFdObj); - tfree(buffer); - continue; - } - - // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen); - - memcpy(msg, &rpcHead, sizeof(SRpcHead)); - recvInfo.msg = msg; - recvInfo.msgLen = msgLen; - recvInfo.ip = pFdObj->ip; - recvInfo.port = pFdObj->port; - recvInfo.shandle = pThreadObj->shandle; - recvInfo.thandle = pFdObj->thandle;; - recvInfo.chandle = pFdObj; - recvInfo.connType = RPC_CONN_TCP; - pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } @@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) { struct epoll_event event; SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); - if (pFdObj == NULL) return NULL; + if (pFdObj == NULL) { + return NULL; + } + pFdObj->closedByApp = 0; pFdObj->fd = fd; pFdObj->pThreadObj = pThreadObj; pFdObj->signature = pFdObj; - event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP; + event.events = EPOLLIN | EPOLLRDHUP; event.data.ptr = pFdObj; if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { tfree(pFdObj); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) { taosCloseSocket(pFdObj->fd); pThreadObj->numOfFds--; - if (pThreadObj->numOfFds < 0) tError("%s %p, TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); - // remove from the FdObject list - if (pFdObj->prev) { (pFdObj->prev)->next = pFdObj->next; } else { diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index a8811f4136..7e2fe0db61 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -18,6 +18,7 @@ #include "tsystem.h" #include "ttimer.h" #include "tutil.h" +#include "taosdef.h" #include "rpcLog.h" #include "rpcUdp.h" #include "rpcHead.h" @@ -33,7 +34,7 @@ typedef struct { int fd; uint16_t port; // peer port uint16_t localPort; // local port - char label[12]; // copy from udpConnSet; + char label[TSDB_LABEL_LEN]; // copy from udpConnSet; pthread_t thread; void *hash; void *shandle; // handle passed by upper layer during server initialization @@ -49,7 +50,7 @@ typedef struct { uint16_t port; // local Port void *shandle; // handle passed by upper layer during server initialization int threads; - char label[12]; + char label[TSDB_LABEL_LEN]; void *(*fp)(SRecvInfo *pPacket); SUdpConn udpConn[]; } SUdpConnSet; @@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads } struct sockaddr_in sin; - unsigned int addrlen = sizeof(sin); + unsigned int addrlen = sizeof(sin); if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) { pConn->localPort = (uint16_t)ntohs(sin.sin_port); diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index 25893969e4..898ab70876 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -14,6 +14,7 @@ */ #include "os.h" +#include "taosdef.h" #include "tulog.h" #include "tsched.h" #include "ttimer.h" @@ -21,7 +22,7 @@ #define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue. typedef struct { - char label[16]; + char label[TSDB_LABEL_LEN]; tsem_t emptySem; tsem_t fullSem; pthread_mutex_t queueMutex;