Merge branch 'master' of https://github.com/taosdata/TDengine
This commit is contained in:
commit
24538afd89
|
@ -2,7 +2,7 @@
|
|||
name: Bug Report
|
||||
about: Create a report to help us improve
|
||||
title: ''
|
||||
labels: ''
|
||||
labels: bug
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
name: Feature Request
|
||||
about: Suggest an idea for this project
|
||||
title: ''
|
||||
labels: ''
|
||||
labels: enhancement
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
name: General Questions
|
||||
about: General questions about TDengine's usage, user experiences, milestones etc.
|
||||
title: ''
|
||||
labels: ''
|
||||
labels: help wanted, question
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
name: Performance-related Questions
|
||||
about: Any questions related to TDengine's performance.
|
||||
title: ''
|
||||
labels: ''
|
||||
labels: performance
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
|
|
@ -276,12 +276,12 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
|
|||
}
|
||||
|
||||
ssize_t twrite(int fd, void *buf, size_t n) {
|
||||
size_t nleft, nwritten;
|
||||
|
||||
nleft = n;
|
||||
size_t nleft = n;
|
||||
ssize_t nwritten = 0;
|
||||
char *tbuf = (char *)buf
|
||||
|
||||
while (nleft > 0) {
|
||||
nwritten = write(fd, buf, nleft);
|
||||
nwritten = write(fd, (void *)tbuf, nleft);
|
||||
if (nwritten < 0) {
|
||||
if (errno == EINTR) {
|
||||
continue;
|
||||
|
@ -289,7 +289,7 @@ ssize_t twrite(int fd, void *buf, size_t n) {
|
|||
return -1;
|
||||
}
|
||||
nleft -= nwritten;
|
||||
buf += nwritten;
|
||||
tbuf += nwritten;
|
||||
}
|
||||
|
||||
return n;
|
||||
|
|
|
@ -118,6 +118,7 @@ typedef struct rpc_server {
|
|||
int taosDebugFlag = 131;
|
||||
int tsRpcTimer = 300;
|
||||
int tsRpcMaxTime = 600; // seconds;
|
||||
int tsRpcProgressTime = 10; // milliseocnds
|
||||
|
||||
// not configurable
|
||||
int tsRpcMaxRetry;
|
||||
|
@ -294,7 +295,7 @@ int taosSendQuickRsp(void *thandle, char rsptype, char code) {
|
|||
void *taosOpenRpc(SRpcInit *pRpc) {
|
||||
STaosRpc *pServer;
|
||||
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcTimer;
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
|
||||
tsRpcHeadSize = sizeof(STaosHeader) + sizeof(SMsgNode);
|
||||
|
||||
pServer = (STaosRpc *)malloc(sizeof(STaosRpc));
|
||||
|
@ -896,7 +897,7 @@ int taosProcessMsgHeader(STaosHeader *pHeader, SRpcConn **ppConn, STaosRpc *pSer
|
|||
tTrace("%s cid:%d sid:%d id:%s, peer is still processing the transaction, pConn:%p",
|
||||
pServer->label, chann, sid, pHeader->meterId, pConn);
|
||||
pConn->tretry++;
|
||||
taosTmrReset(taosProcessTaosTimer, tsRpcTimer, pConn, pChann->tmrCtrl, &pConn->pTimer);
|
||||
taosTmrReset(taosProcessTaosTimer, tsRpcProgressTime, pConn, pChann->tmrCtrl, &pConn->pTimer);
|
||||
code = TSDB_CODE_ALREADY_PROCESSED;
|
||||
goto _exit;
|
||||
} else {
|
||||
|
|
|
@ -137,9 +137,7 @@ void taosProcessMonitorTimer(void *param, void *tmrId) {
|
|||
tTrace("%s monitor timer is expired, update the link status", pSet->label);
|
||||
(*pSet->fp)(data, pMonitor->dataLen, pMonitor->ip, 0, pSet->shandle, NULL, NULL);
|
||||
taosTmrReset(taosProcessMonitorTimer, 200, pMonitor, pSet->tmrCtrl, &pMonitor->pTimer);
|
||||
}
|
||||
|
||||
if (pMonitor->pSet == NULL) {
|
||||
} else {
|
||||
taosTmrStopA(&pMonitor->pTimer);
|
||||
free(pMonitor);
|
||||
}
|
||||
|
@ -181,6 +179,7 @@ void *taosReadTcpData(void *argv) {
|
|||
|
||||
if (retLen != pInfo->msgLen) {
|
||||
tError("%s failed to read data from server, msgLen:%d retLen:%d", pSet->label, pInfo->msgLen, retLen);
|
||||
free(buffer);
|
||||
} else {
|
||||
(*pSet->fp)(buffer, pInfo->msgLen, pMonitor->ip, (int16_t)pInfo->port, pSet->shandle, NULL, pMonitor->pConn);
|
||||
}
|
||||
|
@ -214,9 +213,11 @@ int taosReceivePacketViaTcp(uint32_t ip, STaosHeader *pHead, SUdpConn *pConn) {
|
|||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||
code = pthread_create(&(thread), &thattr, taosReadTcpData, (void *)pMonitor);
|
||||
if (code < 0) {
|
||||
tTrace("%s faile to create thread to read tcp data, reason:%s", pSet->label, strerror(errno));
|
||||
tTrace("%s failed to create thread to read tcp data, reason:%s", pSet->label, strerror(errno));
|
||||
pMonitor->pSet = NULL;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thattr);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -402,6 +403,9 @@ void *taosUdpTcpConnection(void *argv) {
|
|||
|
||||
tTrace("%s UDP server is created, ip:%s:%d", pSet->label, pSet->ip, pSet->port);
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||
|
||||
while (1) {
|
||||
if (pSet->tcpFd < 0) break;
|
||||
socklen_t addrlen = sizeof(clientAddr);
|
||||
|
@ -422,14 +426,14 @@ void *taosUdpTcpConnection(void *argv) {
|
|||
pTransfer->port = clientAddr.sin_port;
|
||||
pTransfer->pSet = pSet;
|
||||
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||
if (pthread_create(&(thread), &thattr, taosTransferDataViaTcp, (void *)pTransfer) < 0) {
|
||||
tTrace("%s faile to create thread for UDP server, reason:%s", pSet->label, strerror(errno));
|
||||
tTrace("%s failed to create thread for UDP server, reason:%s", pSet->label, strerror(errno));
|
||||
free(pTransfer);
|
||||
taosCloseSocket(connFd);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thattr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -448,7 +452,6 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
|||
memset(pSet, 0, (size_t)size);
|
||||
strcpy(pSet->ip, ip);
|
||||
pSet->port = port;
|
||||
pSet->threads = threads;
|
||||
pSet->shandle = shandle;
|
||||
pSet->fp = fp;
|
||||
pSet->tcpFd = -1;
|
||||
|
@ -458,8 +461,16 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
|||
char udplabel[12];
|
||||
sprintf(udplabel, "%s.b", label);
|
||||
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
|
||||
if (pSet->tmrCtrl == NULL) {
|
||||
tError("%s failed to initialize tmrCtrl")
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
}
|
||||
// }
|
||||
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
short ownPort;
|
||||
for (int i = 0; i < threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
|
@ -467,6 +478,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
|||
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
||||
if (pConn->fd < 0) {
|
||||
tError("%s failed to open UDP socket %s:%d", label, ip, port);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -477,11 +489,10 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
|||
pConn->localPort = (int16_t)ntohs(sin.sin_port);
|
||||
}
|
||||
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn) != 0) {
|
||||
close(pConn->fd);
|
||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
||||
taosCloseSocket(pConn->fd);
|
||||
taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -496,6 +507,7 @@ void *taosInitUdpConnection(char *ip, short port, char *label, int threads, void
|
|||
pthread_mutex_init(&pConn->mutex, NULL);
|
||||
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||
}
|
||||
++pSet->threads;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
|
@ -520,6 +532,7 @@ void *taosInitUdpServer(char *ip, short port, char *label, int threads, void *fp
|
|||
// pthread_t thread;
|
||||
// pSet->tcpThread = pthread_create(&(thread), &thattr, taosUdpTcpConnection, pSet);
|
||||
pthread_create(&(pSet->tcpThread), &thattr, taosUdpTcpConnection, pSet);
|
||||
pthread_attr_destroy(&thattr);
|
||||
|
||||
return pSet;
|
||||
}
|
||||
|
@ -540,13 +553,16 @@ void taosCleanUpUdpConnection(void *handle) {
|
|||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pConn->signature = NULL;
|
||||
pthread_cancel(pConn->thread);
|
||||
taosCloseSocket(pConn->fd);
|
||||
if (pConn->hash) {
|
||||
taosCloseIpHash(pConn->hash);
|
||||
pthread_mutex_destroy(&pConn->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
pthread_cancel(pConn->thread);
|
||||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
pthread_join(pConn->thread, NULL);
|
||||
tTrace("chandle:%p is closed", pConn);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue