Merge branch 'develop' into with-getaddrinfo
This commit is contained in:
commit
fe22eb50f6
|
@ -113,6 +113,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dnodeInitClient() {
|
int32_t dnodeInitClient() {
|
||||||
|
char secret[TSDB_KEY_LEN] = "secret";
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.label = "DND-C";
|
rpcInit.label = "DND-C";
|
||||||
|
@ -123,7 +124,7 @@ int32_t dnodeInitClient() {
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
rpcInit.user = "t";
|
rpcInit.user = "t";
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.secret = "secret";
|
rpcInit.secret = secret;
|
||||||
|
|
||||||
tsDnodeClientRpc = rpcOpen(&rpcInit);
|
tsDnodeClientRpc = rpcOpen(&rpcInit);
|
||||||
if (tsDnodeClientRpc == NULL) {
|
if (tsDnodeClientRpc == NULL) {
|
||||||
|
|
|
@ -221,6 +221,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TSDB_COUNTRY_LEN 20
|
#define TSDB_COUNTRY_LEN 20
|
||||||
#define TSDB_LOCALE_LEN 64
|
#define TSDB_LOCALE_LEN 64
|
||||||
#define TSDB_TIMEZONE_LEN 64
|
#define TSDB_TIMEZONE_LEN 64
|
||||||
|
#define TSDB_LABEL_LEN 8
|
||||||
|
|
||||||
#define TSDB_FQDN_LEN 128
|
#define TSDB_FQDN_LEN 128
|
||||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
|
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
|
||||||
|
|
|
@ -47,7 +47,7 @@ typedef struct {
|
||||||
uint16_t localPort;
|
uint16_t localPort;
|
||||||
int8_t connType;
|
int8_t connType;
|
||||||
int index; // for UDP server only, round robin for multiple threads
|
int index; // for UDP server only, round robin for multiple threads
|
||||||
char label[12];
|
char label[TSDB_LABEL_LEN];
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
char spi; // security parameter index
|
char spi; // security parameter index
|
||||||
|
@ -88,7 +88,7 @@ typedef struct {
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
typedef struct SRpcConn {
|
typedef struct SRpcConn {
|
||||||
char info[50];// debug info: label + pConn + ahandle
|
char info[48];// debug info: label + pConn + ahandle
|
||||||
int sid; // session ID
|
int sid; // session ID
|
||||||
uint32_t ownId; // own link ID
|
uint32_t ownId; // own link ID
|
||||||
uint32_t peerId; // peer link ID
|
uint32_t peerId; // peer link ID
|
||||||
|
@ -805,16 +805,16 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
|
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
}
|
||||||
if (rpcIsReq(pHead->msgType)) {
|
|
||||||
pConn->ahandle = (void *)pHead->ahandle;
|
|
||||||
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcLockConn(pConn);
|
rpcLockConn(pConn);
|
||||||
sid = pConn->sid;
|
|
||||||
|
|
||||||
|
if (rpcIsReq(pHead->msgType)) {
|
||||||
|
pConn->ahandle = (void *)pHead->ahandle;
|
||||||
|
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
sid = pConn->sid;
|
||||||
pConn->chandle = pRecv->chandle;
|
pConn->chandle = pRecv->chandle;
|
||||||
pConn->peerIp = pRecv->ip;
|
pConn->peerIp = pRecv->ip;
|
||||||
pConn->peerPort = pRecv->port;
|
pConn->peerPort = pRecv->port;
|
||||||
|
@ -847,10 +847,11 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||||
|
if (pConn == NULL) return;
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
|
||||||
tTrace("%s, link is broken", pConn->info);
|
tTrace("%s, link is broken", pConn->info);
|
||||||
// pConn->chandle = NULL;
|
|
||||||
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->outType) {
|
if (pConn->outType) {
|
||||||
SRpcReqContext *pContext = pConn->pContext;
|
SRpcReqContext *pContext = pConn->pContext;
|
||||||
|
@ -871,7 +872,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||||
(*(pRpc->cfp))(&rpcMsg);
|
(*(pRpc->cfp))(&rpcMsg);
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpcUnlockConn(pConn);
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -885,7 +887,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
// underlying UDP layer does not know it is server or client
|
// underlying UDP layer does not know it is server or client
|
||||||
pRecv->connType = pRecv->connType | pRpc->connType;
|
pRecv->connType = pRecv->connType | pRpc->connType;
|
||||||
|
|
||||||
if (pRecv->ip == 0 && pConn) {
|
if (pRecv->ip == 0) {
|
||||||
rpcProcessBrokenLink(pConn);
|
rpcProcessBrokenLink(pConn);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "rpcLog.h"
|
#include "rpcLog.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
#include "rpcTcp.h"
|
#include "rpcTcp.h"
|
||||||
|
@ -26,8 +28,9 @@
|
||||||
|
|
||||||
typedef struct SFdObj {
|
typedef struct SFdObj {
|
||||||
void *signature;
|
void *signature;
|
||||||
int fd; // TCP socket FD
|
int fd; // TCP socket FD
|
||||||
void *thandle; // handle from upper layer, like TAOS
|
int closedByApp; // 1: already closed by App
|
||||||
|
void *thandle; // handle from upper layer, like TAOS
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
struct SThreadObj *pThreadObj;
|
struct SThreadObj *pThreadObj;
|
||||||
|
@ -44,7 +47,7 @@ typedef struct SThreadObj {
|
||||||
int pollFd;
|
int pollFd;
|
||||||
int numOfFds;
|
int numOfFds;
|
||||||
int threadId;
|
int threadId;
|
||||||
char label[12];
|
char label[TSDB_LABEL_LEN];
|
||||||
void *shandle; // handle passed by upper layer during server initialization
|
void *shandle; // handle passed by upper layer during server initialization
|
||||||
void *(*processData)(SRecvInfo *pPacket);
|
void *(*processData)(SRecvInfo *pPacket);
|
||||||
} SThreadObj;
|
} SThreadObj;
|
||||||
|
@ -53,7 +56,7 @@ typedef struct {
|
||||||
int fd;
|
int fd;
|
||||||
uint32_t ip;
|
uint32_t ip;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
char label[12];
|
char label[TSDB_LABEL_LEN];
|
||||||
int numOfThreads;
|
int numOfThreads;
|
||||||
void * shandle;
|
void * shandle;
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
|
@ -71,6 +74,13 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
||||||
|
if (pServerObj == NULL) {
|
||||||
|
tError("TCP:%s no enough memory", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pServerObj->thread = 0;
|
||||||
pServerObj->ip = ip;
|
pServerObj->ip = ip;
|
||||||
pServerObj->port = port;
|
pServerObj->port = port;
|
||||||
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
||||||
|
@ -79,13 +89,20 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
||||||
if (pServerObj->pThreadObj == NULL) {
|
if (pServerObj->pThreadObj == NULL) {
|
||||||
tError("TCP:%s no enough memory", label);
|
tError("TCP:%s no enough memory", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
free(pServerObj);
|
free(pServerObj);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
|
pThreadObj->pollFd = -1;
|
||||||
|
pThreadObj->thread = 0;
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
@ -93,23 +110,22 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;;
|
break;;
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP epoll", label);
|
tError("%s failed to create TCP epoll", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
code = -1;
|
code = -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
pthread_attr_destroy(&thattr);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,47 +134,47 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pthread_attr_t thattr;
|
|
||||||
pthread_attr_init(&thattr);
|
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
||||||
pthread_attr_destroy(&thattr);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
free(pServerObj->pThreadObj);
|
taosCleanUpTcpServer(pServerObj);
|
||||||
free(pServerObj);
|
|
||||||
pServerObj = NULL;
|
pServerObj = NULL;
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
|
tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thattr);
|
||||||
return (void *)pServerObj;
|
return (void *)pServerObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
static void taosStopTcpThread(SThreadObj* pThreadObj) {
|
||||||
pThreadObj->stop = true;
|
pThreadObj->stop = true;
|
||||||
|
eventfd_t fd = -1;
|
||||||
|
|
||||||
// signal the thread to stop, try graceful method first,
|
if (pThreadObj->thread && pThreadObj->pollFd >=0) {
|
||||||
// and use pthread_cancel when failed
|
// signal the thread to stop, try graceful method first,
|
||||||
struct epoll_event event = { .events = EPOLLIN };
|
// and use pthread_cancel when failed
|
||||||
eventfd_t fd = eventfd(1, 0);
|
struct epoll_event event = { .events = EPOLLIN };
|
||||||
if (fd == -1) {
|
fd = eventfd(1, 0);
|
||||||
tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
|
if (fd == -1) {
|
||||||
pthread_cancel(pThreadObj->thread);
|
// failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
|
||||||
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
|
||||||
tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
|
pthread_cancel(pThreadObj->thread);
|
||||||
pthread_cancel(pThreadObj->thread);
|
} else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||||
|
// failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
|
||||||
|
tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
|
||||||
|
pthread_cancel(pThreadObj->thread);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_join(pThreadObj->thread, NULL);
|
if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
|
||||||
close(pThreadObj->pollFd);
|
if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
|
||||||
if (fd != -1) {
|
if (fd != -1) close(fd);
|
||||||
close(fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (pThreadObj->pHead) {
|
while (pThreadObj->pHead) {
|
||||||
SFdObj *pFdObj = pThreadObj->pHead;
|
SFdObj *pFdObj = pThreadObj->pHead;
|
||||||
|
@ -173,9 +189,8 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
SThreadObj *pThreadObj;
|
SThreadObj *pThreadObj;
|
||||||
|
|
||||||
if (pServerObj == NULL) return;
|
if (pServerObj == NULL) return;
|
||||||
|
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
|
||||||
shutdown(pServerObj->fd, SHUT_RD);
|
if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
|
||||||
pthread_join(pServerObj->thread, NULL);
|
|
||||||
|
|
||||||
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
|
||||||
pThreadObj = pServerObj->pThreadObj + i;
|
pThreadObj = pServerObj->pThreadObj + i;
|
||||||
|
@ -211,6 +226,7 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
|
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
|
tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -254,6 +270,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
|
if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
|
||||||
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,6 +278,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP client epoll", label);
|
tError("%s failed to create TCP client epoll", label);
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,6 +291,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
close(pThreadObj->pollFd);
|
close(pThreadObj->pollFd);
|
||||||
free(pThreadObj);
|
free(pThreadObj);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -287,7 +306,7 @@ void taosCleanUpTcpClient(void *chandle) {
|
||||||
if (pThreadObj == NULL) return;
|
if (pThreadObj == NULL) return;
|
||||||
|
|
||||||
taosStopTcpThread(pThreadObj);
|
taosStopTcpThread(pThreadObj);
|
||||||
tTrace (":%s, all connections are cleaned up", pThreadObj->label);
|
tTrace ("%s, all connections are cleaned up", pThreadObj->label);
|
||||||
|
|
||||||
tfree(pThreadObj);
|
tfree(pThreadObj);
|
||||||
}
|
}
|
||||||
|
@ -318,7 +337,9 @@ void taosCloseTcpConnection(void *chandle) {
|
||||||
SFdObj *pFdObj = chandle;
|
SFdObj *pFdObj = chandle;
|
||||||
if (pFdObj == NULL) return;
|
if (pFdObj == NULL) return;
|
||||||
|
|
||||||
taosFreeFdObj(pFdObj);
|
pFdObj->thandle = NULL;
|
||||||
|
pFdObj->closedByApp = 1;
|
||||||
|
shutdown(pFdObj->fd, SHUT_WR);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
|
||||||
|
@ -334,7 +355,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||||
|
|
||||||
// notify the upper layer, so it will clean the associated context
|
// notify the upper layer, so it will clean the associated context
|
||||||
if (pFdObj->thandle) {
|
if (pFdObj->closedByApp == 0) {
|
||||||
|
shutdown(pFdObj->fd, SHUT_WR);
|
||||||
|
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
recvInfo.msg = NULL;
|
recvInfo.msg = NULL;
|
||||||
recvInfo.msgLen = 0;
|
recvInfo.msgLen = 0;
|
||||||
|
@ -345,9 +368,59 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
recvInfo.chandle = NULL;
|
recvInfo.chandle = NULL;
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
recvInfo.connType = RPC_CONN_TCP;
|
||||||
(*(pThreadObj->processData))(&recvInfo);
|
(*(pThreadObj->processData))(&recvInfo);
|
||||||
} else {
|
}
|
||||||
taosFreeFdObj(pFdObj);
|
|
||||||
|
taosFreeFdObj(pFdObj);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
|
||||||
|
SRpcHead rpcHead;
|
||||||
|
int32_t msgLen, leftLen, retLen, headLen;
|
||||||
|
char *buffer, *msg;
|
||||||
|
|
||||||
|
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||||
|
|
||||||
|
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
||||||
|
if (headLen != sizeof(SRpcHead)) {
|
||||||
|
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
||||||
|
buffer = malloc(msgLen + tsRpcOverhead);
|
||||||
|
if ( NULL == buffer) {
|
||||||
|
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = buffer + tsRpcOverhead;
|
||||||
|
leftLen = msgLen - headLen;
|
||||||
|
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
|
||||||
|
|
||||||
|
if (leftLen != retLen) {
|
||||||
|
tError("%s %p, read error, leftLen:%d retLen:%d",
|
||||||
|
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
|
||||||
|
free(buffer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(msg, &rpcHead, sizeof(SRpcHead));
|
||||||
|
|
||||||
|
pInfo->msg = msg;
|
||||||
|
pInfo->msgLen = msgLen;
|
||||||
|
pInfo->ip = pFdObj->ip;
|
||||||
|
pInfo->port = pFdObj->port;
|
||||||
|
pInfo->shandle = pThreadObj->shandle;
|
||||||
|
pInfo->thandle = pFdObj->thandle;;
|
||||||
|
pInfo->chandle = pFdObj;
|
||||||
|
pInfo->connType = RPC_CONN_TCP;
|
||||||
|
|
||||||
|
if (pFdObj->closedByApp) {
|
||||||
|
free(buffer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define maxEvents 10
|
#define maxEvents 10
|
||||||
|
@ -357,7 +430,6 @@ static void *taosProcessTcpData(void *param) {
|
||||||
SFdObj *pFdObj;
|
SFdObj *pFdObj;
|
||||||
struct epoll_event events[maxEvents];
|
struct epoll_event events[maxEvents];
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
SRpcHead rpcHead;
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
||||||
|
@ -376,51 +448,23 @@ static void *taosProcessTcpData(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (events[i].events & EPOLLRDHUP) {
|
||||||
|
tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
|
||||||
|
taosReportBrokenLink(pFdObj);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLHUP) {
|
if (events[i].events & EPOLLHUP) {
|
||||||
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
|
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
|
||||||
taosReportBrokenLink(pFdObj);
|
taosReportBrokenLink(pFdObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
|
||||||
if (headLen != sizeof(SRpcHead)) {
|
shutdown(pFdObj->fd, SHUT_WR);
|
||||||
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
|
||||||
char *buffer = malloc(msgLen + tsRpcOverhead);
|
|
||||||
if ( NULL == buffer) {
|
|
||||||
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *msg = buffer + tsRpcOverhead;
|
|
||||||
int32_t leftLen = msgLen - headLen;
|
|
||||||
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
|
|
||||||
|
|
||||||
if (leftLen != retLen) {
|
|
||||||
tError("%s %p, read error, leftLen:%d retLen:%d",
|
|
||||||
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
|
|
||||||
taosReportBrokenLink(pFdObj);
|
|
||||||
tfree(buffer);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
|
|
||||||
|
|
||||||
memcpy(msg, &rpcHead, sizeof(SRpcHead));
|
|
||||||
recvInfo.msg = msg;
|
|
||||||
recvInfo.msgLen = msgLen;
|
|
||||||
recvInfo.ip = pFdObj->ip;
|
|
||||||
recvInfo.port = pFdObj->port;
|
|
||||||
recvInfo.shandle = pThreadObj->shandle;
|
|
||||||
recvInfo.thandle = pFdObj->thandle;;
|
|
||||||
recvInfo.chandle = pFdObj;
|
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
|
||||||
|
|
||||||
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
|
||||||
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
|
@ -433,16 +477,20 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
|
||||||
struct epoll_event event;
|
struct epoll_event event;
|
||||||
|
|
||||||
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
|
SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
|
||||||
if (pFdObj == NULL) return NULL;
|
if (pFdObj == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFdObj->closedByApp = 0;
|
||||||
pFdObj->fd = fd;
|
pFdObj->fd = fd;
|
||||||
pFdObj->pThreadObj = pThreadObj;
|
pFdObj->pThreadObj = pThreadObj;
|
||||||
pFdObj->signature = pFdObj;
|
pFdObj->signature = pFdObj;
|
||||||
|
|
||||||
event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
|
event.events = EPOLLIN | EPOLLRDHUP;
|
||||||
event.data.ptr = pFdObj;
|
event.data.ptr = pFdObj;
|
||||||
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
|
||||||
tfree(pFdObj);
|
tfree(pFdObj);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,13 +523,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
|
||||||
taosCloseSocket(pFdObj->fd);
|
taosCloseSocket(pFdObj->fd);
|
||||||
|
|
||||||
pThreadObj->numOfFds--;
|
pThreadObj->numOfFds--;
|
||||||
|
|
||||||
if (pThreadObj->numOfFds < 0)
|
if (pThreadObj->numOfFds < 0)
|
||||||
tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
|
tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
|
||||||
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
|
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
|
||||||
|
|
||||||
// remove from the FdObject list
|
|
||||||
|
|
||||||
if (pFdObj->prev) {
|
if (pFdObj->prev) {
|
||||||
(pFdObj->prev)->next = pFdObj->next;
|
(pFdObj->prev)->next = pFdObj->next;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "tsystem.h"
|
#include "tsystem.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "rpcLog.h"
|
#include "rpcLog.h"
|
||||||
#include "rpcUdp.h"
|
#include "rpcUdp.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
|
@ -33,7 +34,7 @@ typedef struct {
|
||||||
int fd;
|
int fd;
|
||||||
uint16_t port; // peer port
|
uint16_t port; // peer port
|
||||||
uint16_t localPort; // local port
|
uint16_t localPort; // local port
|
||||||
char label[12]; // copy from udpConnSet;
|
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
void *hash;
|
void *hash;
|
||||||
void *shandle; // handle passed by upper layer during server initialization
|
void *shandle; // handle passed by upper layer during server initialization
|
||||||
|
@ -49,7 +50,7 @@ typedef struct {
|
||||||
uint16_t port; // local Port
|
uint16_t port; // local Port
|
||||||
void *shandle; // handle passed by upper layer during server initialization
|
void *shandle; // handle passed by upper layer during server initialization
|
||||||
int threads;
|
int threads;
|
||||||
char label[12];
|
char label[TSDB_LABEL_LEN];
|
||||||
void *(*fp)(SRecvInfo *pPacket);
|
void *(*fp)(SRecvInfo *pPacket);
|
||||||
SUdpConn udpConn[];
|
SUdpConn udpConn[];
|
||||||
} SUdpConnSet;
|
} SUdpConnSet;
|
||||||
|
@ -93,7 +94,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in sin;
|
struct sockaddr_in sin;
|
||||||
unsigned int addrlen = sizeof(sin);
|
unsigned int addrlen = sizeof(sin);
|
||||||
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
|
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
|
||||||
addrlen == sizeof(sin)) {
|
addrlen == sizeof(sin)) {
|
||||||
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "tulog.h"
|
#include "tulog.h"
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
@ -21,7 +22,7 @@
|
||||||
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
|
#define DUMP_SCHEDULER_TIME_WINDOW 30000 //every 30sec, take a snap shot of task queue.
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char label[16];
|
char label[TSDB_LABEL_LEN];
|
||||||
tsem_t emptySem;
|
tsem_t emptySem;
|
||||||
tsem_t fullSem;
|
tsem_t fullSem;
|
||||||
pthread_mutex_t queueMutex;
|
pthread_mutex_t queueMutex;
|
||||||
|
|
Loading…
Reference in New Issue