Merge pull request #2200 from taosdata/develop
merge code to coverity scan
This commit is contained in:
commit
94c445cda5
|
@ -201,7 +201,7 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
||||||
tscPrint("set shellActivityTimer:%d", tsShellActivityTimer);
|
tscPrint("set shellActivityTimer:%d", tsShellActivityTimer);
|
||||||
} else {
|
} else {
|
||||||
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
|
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
|
||||||
tsCfgStatusStr[cfg->cfgStatus], (int32_t *)cfg->ptr);
|
tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
|
|
@ -137,7 +137,7 @@ class TDengineCursor(object):
|
||||||
else:
|
else:
|
||||||
raise ProgrammingError(
|
raise ProgrammingError(
|
||||||
CTaosInterface.errStr(
|
CTaosInterface.errStr(
|
||||||
self._result ))
|
self._result ), errno)
|
||||||
|
|
||||||
def executemany(self, operation, seq_of_parameters):
|
def executemany(self, operation, seq_of_parameters):
|
||||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||||
|
|
|
@ -139,7 +139,7 @@ class TDengineCursor(object):
|
||||||
else:
|
else:
|
||||||
raise ProgrammingError(
|
raise ProgrammingError(
|
||||||
CTaosInterface.errStr(
|
CTaosInterface.errStr(
|
||||||
self._result ))
|
self._result), errno)
|
||||||
|
|
||||||
def executemany(self, operation, seq_of_parameters):
|
def executemany(self, operation, seq_of_parameters):
|
||||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||||
|
|
|
@ -117,7 +117,7 @@ class TDengineCursor(object):
|
||||||
self._fields = CTaosInterface.useResult(self._result)
|
self._fields = CTaosInterface.useResult(self._result)
|
||||||
return self._handle_result()
|
return self._handle_result()
|
||||||
else:
|
else:
|
||||||
raise ProgrammingError(CTaosInterface.errStr(self._result))
|
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||||
|
|
||||||
def executemany(self, operation, seq_of_parameters):
|
def executemany(self, operation, seq_of_parameters):
|
||||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||||
|
|
|
@ -117,7 +117,7 @@ class TDengineCursor(object):
|
||||||
self._fields = CTaosInterface.useResult(self._result )
|
self._fields = CTaosInterface.useResult(self._result )
|
||||||
return self._handle_result()
|
return self._handle_result()
|
||||||
else:
|
else:
|
||||||
raise ProgrammingError(CTaosInterface.errStr(self._result ))
|
raise ProgrammingError(CTaosInterface.errStr(self._result ), errno)
|
||||||
|
|
||||||
def executemany(self, operation, seq_of_parameters):
|
def executemany(self, operation, seq_of_parameters):
|
||||||
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
|
||||||
|
|
|
@ -92,6 +92,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
||||||
// Initialize the system
|
// Initialize the system
|
||||||
if (dnodeInitSystem() < 0) {
|
if (dnodeInitSystem() < 0) {
|
||||||
syslog(LOG_ERR, "Error initialize TDengine system");
|
syslog(LOG_ERR, "Error initialize TDengine system");
|
||||||
|
dPrint("Failed to start TDengine, please check the log at:%s", tsLogDir);
|
||||||
closelog();
|
closelog();
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,66 +331,7 @@ bool taosGetDisk() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool taosGetCardName(char *ip, char *name) {
|
|
||||||
struct ifaddrs *ifaddr, *ifa;
|
|
||||||
int family, s;
|
|
||||||
char host[NI_MAXHOST];
|
|
||||||
bool ret = false;
|
|
||||||
|
|
||||||
if (getifaddrs(&ifaddr) == -1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Walk through linked list, maintaining head pointer so we can free list
|
|
||||||
* later */
|
|
||||||
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
|
|
||||||
if (ifa->ifa_addr == NULL) continue;
|
|
||||||
|
|
||||||
family = ifa->ifa_addr->sa_family;
|
|
||||||
if (family != AF_INET) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), host,
|
|
||||||
NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
|
|
||||||
if (s != 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strcmp(host, "127.0.0.1") == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: the ip not config
|
|
||||||
// if (strcmp(host, ip) == 0) {
|
|
||||||
strcpy(name, ifa->ifa_name);
|
|
||||||
ret = true;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
freeifaddrs(ifaddr);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool taosGetCardInfo(int64_t *bytes) {
|
static bool taosGetCardInfo(int64_t *bytes) {
|
||||||
static char tsPublicCard[1000] = {0};
|
|
||||||
static char tsPrivateIp[40];
|
|
||||||
|
|
||||||
if (tsPublicCard[0] == 0) {
|
|
||||||
if (!taosGetCardName(tsPrivateIp, tsPublicCard)) {
|
|
||||||
uError("can't get card name from ip:%s", tsPrivateIp);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int cardNameLen = (int)strlen(tsPublicCard);
|
|
||||||
for (int i = 0; i < cardNameLen; ++i) {
|
|
||||||
if (tsPublicCard[i] == ':') {
|
|
||||||
tsPublicCard[i] = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// uTrace("card name of public ip:%s is %s", tsPublicIp, tsPublicCard);
|
|
||||||
}
|
|
||||||
|
|
||||||
FILE *fp = fopen(tsSysNetFile, "r");
|
FILE *fp = fopen(tsSysNetFile, "r");
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
uError("open file:%s failed", tsSysNetFile);
|
uError("open file:%s failed", tsSysNetFile);
|
||||||
|
@ -403,6 +344,7 @@ static bool taosGetCardInfo(int64_t *bytes) {
|
||||||
|
|
||||||
size_t len;
|
size_t len;
|
||||||
char * line = NULL;
|
char * line = NULL;
|
||||||
|
*bytes = 0;
|
||||||
|
|
||||||
while (!feof(fp)) {
|
while (!feof(fp)) {
|
||||||
tfree(line);
|
tfree(line);
|
||||||
|
@ -411,23 +353,20 @@ static bool taosGetCardInfo(int64_t *bytes) {
|
||||||
if (line == NULL) {
|
if (line == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (strstr(line, tsPublicCard) != NULL) {
|
if (strstr(line, "lo:") != NULL) {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sscanf(line,
|
||||||
|
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
|
||||||
|
nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &tbytes, &tpackets);
|
||||||
|
*bytes += (rbytes + tbytes);
|
||||||
}
|
}
|
||||||
if (line != NULL) {
|
|
||||||
sscanf(line, "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, nouse0, &rbytes, &rpackts, &nouse1, &nouse2, &nouse3,
|
tfree(line);
|
||||||
&nouse4, &nouse5, &nouse6, &tbytes, &tpackets);
|
fclose(fp);
|
||||||
*bytes = rbytes + tbytes;
|
|
||||||
tfree(line);
|
return true;
|
||||||
fclose(fp);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
uWarn("can't get card:%s info from device:%s", tsPublicCard, tsSysNetFile);
|
|
||||||
*bytes = 0;
|
|
||||||
fclose(fp);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool taosGetBandSpeed(float *bandSpeedKb) {
|
bool taosGetBandSpeed(float *bandSpeedKb) {
|
||||||
|
@ -443,13 +382,15 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
|
||||||
if (lastTime == 0 || lastBytes == 0) {
|
if (lastTime == 0 || lastBytes == 0) {
|
||||||
lastTime = curTime;
|
lastTime = curTime;
|
||||||
lastBytes = curBytes;
|
lastBytes = curBytes;
|
||||||
return false;
|
*bandSpeedKb = 0;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastTime >= curTime || lastBytes > curBytes) {
|
if (lastTime >= curTime || lastBytes > curBytes) {
|
||||||
lastTime = curTime;
|
lastTime = curTime;
|
||||||
lastBytes = curBytes;
|
lastBytes = curBytes;
|
||||||
return false;
|
*bandSpeedKb = 0;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb
|
double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb
|
||||||
|
|
|
@ -67,7 +67,7 @@ static void *taosProcessTcpData(void *param);
|
||||||
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
|
||||||
static void taosFreeFdObj(SFdObj *pFdObj);
|
static void taosFreeFdObj(SFdObj *pFdObj);
|
||||||
static void taosReportBrokenLink(SFdObj *pFdObj);
|
static void taosReportBrokenLink(SFdObj *pFdObj);
|
||||||
static void* taosAcceptTcpConnection(void *arg);
|
static void *taosAcceptTcpConnection(void *arg);
|
||||||
|
|
||||||
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
|
@ -80,6 +80,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pServerObj->fd = -1;
|
||||||
pServerObj->thread = 0;
|
pServerObj->thread = 0;
|
||||||
pServerObj->ip = ip;
|
pServerObj->ip = ip;
|
||||||
pServerObj->port = port;
|
pServerObj->port = port;
|
||||||
|
@ -99,6 +100,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
// initialize parameters in case it may encounter error later
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
pThreadObj->pollFd = -1;
|
pThreadObj->pollFd = -1;
|
||||||
|
@ -106,18 +108,21 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
pThreadObj++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize mutex, thread, fd which may fail
|
||||||
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;;
|
break;;
|
||||||
}
|
}
|
||||||
|
|
||||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||||
if (pThreadObj->pollFd < 0) {
|
if (pThreadObj->pollFd < 0) {
|
||||||
tError("%s failed to create TCP epoll", label);
|
tError("%s failed to create TCP epoll", label);
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
code = -1;
|
code = -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -125,7 +130,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,15 +137,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pThreadObj++;
|
pThreadObj++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
||||||
|
if (pServerObj->fd < 0) code = -1;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
|
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosCleanUpTcpServer(pServerObj);
|
taosCleanUpTcpServer(pServerObj);
|
||||||
pServerObj = NULL;
|
pServerObj = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
@ -204,7 +211,7 @@ void taosCleanUpTcpServer(void *handle) {
|
||||||
tfree(pServerObj);
|
tfree(pServerObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* taosAcceptTcpConnection(void *arg) {
|
static void *taosAcceptTcpConnection(void *arg) {
|
||||||
int connFd = -1;
|
int connFd = -1;
|
||||||
struct sockaddr_in caddr;
|
struct sockaddr_in caddr;
|
||||||
int threadId = 0;
|
int threadId = 0;
|
||||||
|
@ -212,10 +219,6 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
SServerObj *pServerObj;
|
SServerObj *pServerObj;
|
||||||
|
|
||||||
pServerObj = (SServerObj *)arg;
|
pServerObj = (SServerObj *)arg;
|
||||||
|
|
||||||
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
|
|
||||||
if (pServerObj->fd < 0) return NULL;
|
|
||||||
|
|
||||||
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "rpcLog.h"
|
#include "rpcLog.h"
|
||||||
#include "rpcUdp.h"
|
#include "rpcUdp.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
|
@ -65,6 +66,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pSet = (SUdpConnSet *)malloc((size_t)size);
|
pSet = (SUdpConnSet *)malloc((size_t)size);
|
||||||
if (pSet == NULL) {
|
if (pSet == NULL) {
|
||||||
tError("%s failed to allocate UdpConn", label);
|
tError("%s failed to allocate UdpConn", label);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,30 +75,34 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pSet->port = port;
|
pSet->port = port;
|
||||||
pSet->shandle = shandle;
|
pSet->shandle = shandle;
|
||||||
pSet->fp = fp;
|
pSet->fp = fp;
|
||||||
|
pSet->threads = threads;
|
||||||
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
int i;
|
||||||
uint16_t ownPort;
|
uint16_t ownPort;
|
||||||
for (int i = 0; i < threads; ++i) {
|
for (i = 0; i < threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
ownPort = (port ? port + i : 0);
|
ownPort = (port ? port + i : 0);
|
||||||
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
pConn->fd = taosOpenUdpSocket(ip, ownPort);
|
||||||
if (pConn->fd < 0) {
|
if (pConn->fd < 0) {
|
||||||
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
|
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
|
||||||
taosCleanUpUdpConnection(pSet);
|
break;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
|
pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
|
||||||
if (NULL == pConn->buffer) {
|
if (NULL == pConn->buffer) {
|
||||||
tError("%s failed to malloc recv buffer", label);
|
tError("%s failed to malloc recv buffer", label);
|
||||||
taosCleanUpUdpConnection(pSet);
|
break;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sockaddr_in sin;
|
struct sockaddr_in sin;
|
||||||
unsigned int addrlen = sizeof(sin);
|
unsigned int addrlen = sizeof(sin);
|
||||||
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
|
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 &&
|
||||||
addrlen == sizeof(sin)) {
|
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
|
||||||
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,23 +113,22 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pConn->pSet = pSet;
|
pConn->pSet = pSet;
|
||||||
pConn->signature = pConn;
|
pConn->signature = pConn;
|
||||||
|
|
||||||
pthread_attr_t thAttr;
|
|
||||||
pthread_attr_init(&thAttr);
|
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
|
||||||
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
|
||||||
pthread_attr_destroy(&thAttr);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("%s failed to create thread to process UDP data, reason:%s", label, strerror(errno));
|
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
|
||||||
taosCloseSocket(pConn->fd);
|
break;
|
||||||
taosCleanUpUdpConnection(pSet);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
++pSet->threads;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("%s UDP connection is initialized, ip:%x port:%hu threads:%d", label, ip, port, threads);
|
pthread_attr_destroy(&thAttr);
|
||||||
|
|
||||||
|
if (i != threads) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
taosCleanUpUdpConnection(pSet);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tTrace("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
|
||||||
return pSet;
|
return pSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,16 +141,17 @@ void taosCleanUpUdpConnection(void *handle) {
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pConn->signature = NULL;
|
pConn->signature = NULL;
|
||||||
|
|
||||||
// shutdown to signal the thread to exit
|
// shutdown to signal the thread to exit
|
||||||
shutdown(pConn->fd, SHUT_RD);
|
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pSet->threads; ++i) {
|
for (int i = 0; i < pSet->threads; ++i) {
|
||||||
pConn = pSet->udpConn + i;
|
pConn = pSet->udpConn + i;
|
||||||
pthread_join(pConn->thread, NULL);
|
if (pConn->thread) pthread_join(pConn->thread, NULL);
|
||||||
free(pConn->buffer);
|
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
|
||||||
taosCloseSocket(pConn->fd);
|
tfree(pConn->buffer);
|
||||||
tTrace("chandle:%p is closed", pConn);
|
tTrace("UDP chandle:%p is closed", pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pSet);
|
tfree(pSet);
|
||||||
|
@ -159,7 +165,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
|
||||||
SUdpConn *pConn = pSet->udpConn + pSet->index;
|
SUdpConn *pConn = pSet->udpConn + pSet->index;
|
||||||
pConn->port = port;
|
pConn->port = port;
|
||||||
|
|
||||||
tTrace("%s UDP connection is setup, ip:%x:%hu, local:%x:%d", pConn->label, ip, port, pSet->ip, pConn->localPort);
|
tTrace("%s UDP connection is setup, ip:%x:%hu", pConn->label, ip, port);
|
||||||
|
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
|
@ -730,13 +730,15 @@ class DbState():
|
||||||
# when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
|
# when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
|
||||||
# by a factor of 500.
|
# by a factor of 500.
|
||||||
# TODO: what if it goes beyond 10 years into the future
|
# TODO: what if it goes beyond 10 years into the future
|
||||||
|
# TODO: fix the error as result of above: "tsdb timestamp is out of range"
|
||||||
def setupLastTick(self):
|
def setupLastTick(self):
|
||||||
t1 = datetime.datetime(2020, 5, 30)
|
t1 = datetime.datetime(2020, 6, 1)
|
||||||
t2 = datetime.datetime.now()
|
t2 = datetime.datetime.now()
|
||||||
elSec = t2.timestamp() - t1.timestamp()
|
elSec = int(t2.timestamp() - t1.timestamp()) # maybe a very large number, takes 69 years to exceed Python int range
|
||||||
|
elSec2 = ( elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500 ) ) * 500 # a number representing seconds within 10 years
|
||||||
# print("elSec = {}".format(elSec))
|
# print("elSec = {}".format(elSec))
|
||||||
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
||||||
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec * 500) # see explanation above
|
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above
|
||||||
logger.info("Setting up TICKS to start from: {}".format(t4))
|
logger.info("Setting up TICKS to start from: {}".format(t4))
|
||||||
return t4
|
return t4
|
||||||
|
|
||||||
|
@ -963,7 +965,7 @@ class Task():
|
||||||
try:
|
try:
|
||||||
self._executeInternal(te, wt) # TODO: no return value?
|
self._executeInternal(te, wt) # TODO: no return value?
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
self.logDebug("[=] Taos library exception: errno={}, msg: {}".format(err.errno, err))
|
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err))
|
||||||
self._err = err
|
self._err = err
|
||||||
except:
|
except:
|
||||||
self.logDebug("[=] Unexpected exception")
|
self.logDebug("[=] Unexpected exception")
|
||||||
|
|
|
@ -205,7 +205,7 @@ class Test (Thread):
|
||||||
global written
|
global written
|
||||||
|
|
||||||
dnodesDir = tdDnodes.getDnodesRootDir()
|
dnodesDir = tdDnodes.getDnodesRootDir()
|
||||||
dataDir = dnodesDir + '/dnode1/*'
|
dataDir = dnodesDir + '/dnode1/data/*'
|
||||||
deleteCmd = 'rm -rf %s' % dataDir
|
deleteCmd = 'rm -rf %s' % dataDir
|
||||||
os.system(deleteCmd)
|
os.system(deleteCmd)
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ class Test (threading.Thread):
|
||||||
global written
|
global written
|
||||||
|
|
||||||
dnodesDir = tdDnodes.getDnodesRootDir()
|
dnodesDir = tdDnodes.getDnodesRootDir()
|
||||||
dataDir = dnodesDir + '/dnode1/*'
|
dataDir = dnodesDir + '/dnode1/data/*'
|
||||||
deleteCmd = 'rm -rf %s' % dataDir
|
deleteCmd = 'rm -rf %s' % dataDir
|
||||||
os.system(deleteCmd)
|
os.system(deleteCmd)
|
||||||
|
|
||||||
|
|
|
@ -31,21 +31,26 @@ class TDTestCase:
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
tdLog.info("===== step1 =====")
|
tdLog.info("===== step1 =====")
|
||||||
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
tdSql.execute(
|
||||||
|
"create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||||
for i in range(tbNum):
|
for i in range(tbNum):
|
||||||
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
||||||
for j in range(rowNum):
|
for j in range(rowNum):
|
||||||
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
|
tdSql.execute(
|
||||||
|
"insert into tb%d values (now - %dm, %d, %d)" %
|
||||||
|
(i, 1440 - j, j, j))
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
tdLog.info("===== step2 =====")
|
tdLog.info("===== step2 =====")
|
||||||
tdSql.query("select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
tdSql.query(
|
||||||
|
"select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||||
tdSql.checkData(0, 1, rowNum)
|
tdSql.checkData(0, 1, rowNum)
|
||||||
tdSql.checkData(0, 2, rowNum)
|
tdSql.checkData(0, 2, rowNum)
|
||||||
tdSql.checkData(0, 3, rowNum)
|
tdSql.checkData(0, 3, rowNum)
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum)
|
tdSql.checkRows(tbNum)
|
||||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
|
|
||||||
|
@ -67,7 +72,8 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.info("===== step6 =====")
|
tdLog.info("===== step6 =====")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
|
|
||||||
|
@ -81,14 +87,16 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 3, rowNum)
|
tdSql.checkData(0, 3, rowNum)
|
||||||
|
|
||||||
tdLog.info("===== step8 =====")
|
tdLog.info("===== step8 =====")
|
||||||
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
tdSql.query(
|
||||||
|
"select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||||
tdSql.checkData(0, 1, rowNum * tbNum)
|
tdSql.checkData(0, 1, rowNum * tbNum)
|
||||||
tdSql.checkData(0, 2, rowNum * tbNum)
|
tdSql.checkData(0, 2, rowNum * tbNum)
|
||||||
tdSql.checkData(0, 3, rowNum * tbNum)
|
tdSql.checkData(0, 3, rowNum * tbNum)
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
|
|
||||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 2)
|
tdSql.checkRows(tbNum + 2)
|
||||||
|
|
||||||
|
@ -110,7 +118,8 @@ class TDTestCase:
|
||||||
tdSql.error("select * from s1")
|
tdSql.error("select * from s1")
|
||||||
|
|
||||||
tdLog.info("===== step12 =====")
|
tdLog.info("===== step12 =====")
|
||||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 2)
|
tdSql.checkRows(tbNum + 2)
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ class TDTestCase:
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor(), logSql)
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tbNum = 10
|
tbNum = 10
|
||||||
rowNum = 20
|
rowNum = 20
|
||||||
|
@ -33,11 +32,14 @@ class TDTestCase:
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
tdLog.info("===== step1 =====")
|
tdLog.info("===== step1 =====")
|
||||||
tdSql.execute("create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
tdSql.execute(
|
||||||
|
"create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)")
|
||||||
for i in range(tbNum):
|
for i in range(tbNum):
|
||||||
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
tdSql.execute("create table tb%d using stb0 tags(%d)" % (i, i))
|
||||||
for j in range(rowNum):
|
for j in range(rowNum):
|
||||||
tdSql.execute("insert into tb%d values (now - %dm, %d, %d)" % (i, 1440 - j, j, j))
|
tdSql.execute(
|
||||||
|
"insert into tb%d values (now - %dm, %d, %d)" %
|
||||||
|
(i, 1440 - j, j, j))
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
tdLog.info("===== step2 =====")
|
tdLog.info("===== step2 =====")
|
||||||
|
@ -45,7 +47,8 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 1, rowNum)
|
tdSql.checkData(0, 1, rowNum)
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum)
|
tdSql.checkRows(tbNum)
|
||||||
tdSql.execute("create table s0 as select count(col1) from tb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s0 as select count(col1) from tb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
|
|
||||||
|
@ -63,7 +66,8 @@ class TDTestCase:
|
||||||
tdSql.error("select * from s0")
|
tdSql.error("select * from s0")
|
||||||
|
|
||||||
tdLog.info("===== step6 =====")
|
tdLog.info("===== step6 =====")
|
||||||
tdSql.execute("create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
|
|
||||||
|
@ -75,13 +79,15 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 3, rowNum)
|
tdSql.checkData(0, 3, rowNum)
|
||||||
|
|
||||||
tdLog.info("===== step8 =====")
|
tdLog.info("===== step8 =====")
|
||||||
tdSql.query("select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
tdSql.query(
|
||||||
|
"select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||||
tdSql.checkData(0, 1, totalNum)
|
tdSql.checkData(0, 1, totalNum)
|
||||||
tdSql.checkData(0, 2, totalNum)
|
tdSql.checkData(0, 2, totalNum)
|
||||||
tdSql.checkData(0, 3, totalNum)
|
tdSql.checkData(0, 3, totalNum)
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 1)
|
tdSql.checkRows(tbNum + 1)
|
||||||
tdSql.execute("create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 2)
|
tdSql.checkRows(tbNum + 2)
|
||||||
|
|
||||||
|
@ -101,7 +107,8 @@ class TDTestCase:
|
||||||
tdSql.error("select * from s1")
|
tdSql.error("select * from s1")
|
||||||
|
|
||||||
tdLog.info("===== step12 =====")
|
tdLog.info("===== step12 =====")
|
||||||
tdSql.execute("create table s1 as select count(col1) from stb0 interval(1d)")
|
tdSql.execute(
|
||||||
|
"create table s1 as select count(col1) from stb0 interval(1d)")
|
||||||
tdSql.query("show tables")
|
tdSql.query("show tables")
|
||||||
tdSql.checkRows(tbNum + 2)
|
tdSql.checkRows(tbNum + 2)
|
||||||
|
|
||||||
|
@ -112,7 +119,6 @@ class TDTestCase:
|
||||||
#tdSql.checkData(0, 2, None)
|
#tdSql.checkData(0, 2, None)
|
||||||
#tdSql.checkData(0, 3, None)
|
#tdSql.checkData(0, 3, None)
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
|
@ -319,6 +319,7 @@ class TDDnodes:
|
||||||
self.dnodes.append(TDDnode(8))
|
self.dnodes.append(TDDnode(8))
|
||||||
self.dnodes.append(TDDnode(9))
|
self.dnodes.append(TDDnode(9))
|
||||||
self.dnodes.append(TDDnode(10))
|
self.dnodes.append(TDDnode(10))
|
||||||
|
self.simDeployed = False
|
||||||
|
|
||||||
def init(self, path):
|
def init(self, path):
|
||||||
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
|
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
|
||||||
|
@ -378,7 +379,10 @@ class TDDnodes:
|
||||||
self.sim = TDSimClient()
|
self.sim = TDSimClient()
|
||||||
self.sim.init(self.path)
|
self.sim.init(self.path)
|
||||||
self.sim.setTestCluster(self.testCluster)
|
self.sim.setTestCluster(self.testCluster)
|
||||||
self.sim.deploy()
|
|
||||||
|
if (self.simDeployed == False):
|
||||||
|
self.sim.deploy()
|
||||||
|
self.simDeployed = True
|
||||||
|
|
||||||
self.check(index)
|
self.check(index)
|
||||||
self.dnodes[index - 1].setTestCluster(self.testCluster)
|
self.dnodes[index - 1].setTestCluster(self.testCluster)
|
||||||
|
|
Loading…
Reference in New Issue