From 70a07503a428714b316bd1a6ffef757c1c6457fb Mon Sep 17 00:00:00 2001 From: Hui Li Date: Wed, 5 Aug 2020 19:29:49 +0800 Subject: [PATCH 1/7] [fix bug] --- src/common/src/tglobal.c | 2 +- src/kit/taosnetwork/client.c | 92 +++++++++++++++++++----------------- src/kit/taosnetwork/server.c | 75 +++++++++++++---------------- src/mnode/src/mnodePeer.c | 5 ++ src/mnode/src/mnodeVgroup.c | 4 +- 5 files changed, 90 insertions(+), 88 deletions(-) diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 0cd3196ad0..c20061d11b 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -140,7 +140,7 @@ char tsMqttBrokerAddress[128] = {0}; char tsMqttBrokerClientId[128] = {0}; // monitor -int32_t tsEnableMonitorModule = 0; +int32_t tsEnableMonitorModule = 1; char tsMonitorDbName[TSDB_DB_NAME_LEN] = "log"; char tsInternalPass[] = "secretkey"; int32_t tsMonitorInterval = 30; // seconds diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index 706359ec20..c51c1ab4ba 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -32,8 +32,8 @@ typedef struct { int port; - char *host[15]; -} info; + char *host; +} info_s; typedef struct Arguments { char * host; @@ -65,10 +65,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, 0, 0}; -void *checkPort(void *sarg) { - info *pinfo = (info *)sarg; - int port = pinfo->port; - char *host = *pinfo->host; +int checkTcpPort(info_s *info) { + int port = info->port; + char *host = info->host; int clientSocket; struct sockaddr_in serverAddr; @@ -77,38 +76,37 @@ void *checkPort(void *sarg) { int iDataNum; if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("socket"); - return NULL; + return -1; } serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = inet_addr(host); - printf("=================================\n"); + //printf("=================================\n"); if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { perror("connect"); - return NULL; + return -1; } - printf("Connect to: %s:%d...success\n", host, port); + //printf("Connect to: %s:%d...success\n", host, port); sprintf(sendbuf, "send port_%d", port); send(clientSocket, sendbuf, strlen(sendbuf), 0); - printf("Send msg_%d: %s\n", port, sendbuf); + //printf("Send msg_%d: %s\n", port, sendbuf); recvbuf[0] = '\0'; iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0); recvbuf[iDataNum] = '\0'; - printf("Read ack msg_%d: %s\n", port, recvbuf); + //printf("Read ack msg_%d: %s\n", port, recvbuf); - printf("=================================\n"); + //printf("=================================\n"); close(clientSocket); - return NULL; + return 0; } -void *checkUPort(void *sarg) { - info *pinfo = (info *)sarg; - int port = pinfo->port; - char *host = *pinfo->host; +void *checkUdpPort(info_s *info) { + int port = info->port; + char *host = info->host; int clientSocket; struct sockaddr_in serverAddr; @@ -117,56 +115,62 @@ void *checkUPort(void *sarg) { int iDataNum; if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { perror("socket"); - return NULL; + return -1; } serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); - serverAddr.sin_addr.s_addr = inet_addr(host); - printf("=================================\n"); - sprintf(sendbuf, "send msg port_%d by udp", port); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size); + int code = sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size); + if (code < 0) { + perror("sendto"); + return -1; + } - printf("Send msg_%d by udp: %s\n", port, sendbuf); + //printf("Send msg_%d by udp: %s\n", port, sendbuf); recvbuf[0] = '\0'; iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); recvbuf[iDataNum] = '\0'; - printf("Read ack msg_%d from udp: %s\n", port, recvbuf); + //printf("Read ack msg_%d from udp: %s\n", port, recvbuf); - printf("=================================\n"); close(clientSocket); - return NULL; + return 0; } int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6041, 6050}; - + SArguments arguments = {"127.0.0.1", 6030, 6060}; + info_s info; + int ret; + argp_parse(&argp, argc, argv, 0, 0, &arguments); - printf("host: %s\tport: %d\tmax_port: %d\n", arguments.host, arguments.port, arguments.max_port); + printf("host: %s\tport: %d\tmax_port: %d\n\n", arguments.host, arguments.port, arguments.max_port); int port = arguments.port; - char *host = arguments.host; - info *tinfo = malloc(sizeof(info)); - info *uinfo = malloc(sizeof(info)); + + info.host = arguments.host; for (; port < arguments.max_port; port++) { - printf("For test: %s:%d\n", host, port); + printf("test: %s:%d\n", info.host, port); - *tinfo->host = host; - tinfo->port = port; - checkPort(tinfo); - - *uinfo->host = host; - uinfo->port = port; - checkUPort(uinfo); + info.port = port; + ret = checkTcpPort(&info); + if (ret != 0) { + printf("tcp port:%d test fail.", port); + } else { + printf("tcp port:%d test ok.", port); + } + + checkUdpPort(&info); + if (ret != 0) { + printf("udp port:%d test fail.", port); + } else { + printf("udp port:%d test ok.", port); + } } - free(tinfo); - free(uinfo); -} \ No newline at end of file +} diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index c967828f0b..f77d23f0a7 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -27,13 +27,13 @@ #include #include #include +#include #define BUFFER_SIZE 200 typedef struct { int port; - int type; // 0: tcp, 1: udo, default: 0 -} info; +} info_s; typedef struct Arguments { char * host; @@ -43,7 +43,7 @@ typedef struct Arguments { static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, - {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6020.", 1}, + {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -65,10 +65,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, 0, 0}; -static void *bindPort(void *sarg) { - info *pinfo = (info *)sarg; +static void *bindTcpPort(void *sarg) { + info_s *pinfo = (info_s *)sarg; int port = pinfo->port; - int type = pinfo->type; int serverSocket; struct sockaddr_in server_addr; @@ -98,14 +97,14 @@ static void *bindPort(void *sarg) { return NULL; } - printf("Bind port: %d success\n", port); + //printf("Bind port: %d success\n", port); while (1) { client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); if (client < 0) { perror("accept"); continue; } - printf("=================================\n"); + //printf("=================================\n"); printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); while (1) { @@ -118,33 +117,30 @@ static void *bindPort(void *sarg) { } if (iDataNum > 0) { buffer[iDataNum] = '\0'; - printf("read msg:%s\n", buffer); + //printf("read msg:%s\n", buffer); if (strcmp(buffer, "quit") == 0) break; buffer[0] = '\0'; sprintf(buffer, "ack port_%d", port); - printf("send ack msg:%s\n", buffer); + //printf("send ack msg:%s\n", buffer); send(client, buffer, strlen(buffer), 0); break; } } - printf("=================================\n"); + //printf("=================================\n"); } close(serverSocket); return NULL; } -static void *bindUPort(void *sarg) { - info *pinfo = (info *)sarg; +static void *bindUdpPort(void *sarg) { + info_s *pinfo = (info_s *)sarg; int port = pinfo->port; - int type = pinfo->type; int serverSocket; struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - int addr_len = sizeof(clientAddr); - int client; char buffer[BUFFER_SIZE]; int iDataNum; @@ -164,7 +160,7 @@ static void *bindUPort(void *sarg) { } socklen_t sin_size; - printf("Bind port: %d success\n", port); + //printf("Bind port: %d success\n", port); while (1) { buffer[0] = '\0'; @@ -178,21 +174,19 @@ static void *bindUPort(void *sarg) { continue; } if (iDataNum > 0) { - printf("=================================\n"); + //printf("=================================\n"); printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); buffer[iDataNum] = '\0'; - printf("Read msg from udp:%s\n", buffer); + //printf("Read msg from udp:%s\n", buffer); if (strcmp(buffer, "quit") == 0) break; buffer[0] = '\0'; sprintf(buffer, "ack port_%d by udp", port); - printf("Send ack msg by udp:%s\n", buffer); + //printf("Send ack msg by udp:%s\n", buffer); sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size); - - send(client, buffer, strlen(buffer), 0); - printf("=================================\n"); + //printf("=================================\n"); } } @@ -202,39 +196,38 @@ static void *bindUPort(void *sarg) { int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6020, 6050}; + SArguments arguments = {"127.0.0.1", 6030, 6060}; argp_parse(&argp, argc, argv, 0, 0, &arguments); int port = arguments.port; - int num = arguments.max_port - arguments.port; + int num = arguments.max_port - arguments.port + 1; if (num < 0) { num = 1; } pthread_t *pids = malloc(2 * num * sizeof(pthread_t)); - info * infos = malloc(num * sizeof(info)); - info * uinfos = malloc(num * sizeof(info)); + info_s * tinfos = malloc(num * sizeof(info_s)); + info_s * uinfos = malloc(num * sizeof(info_s)); for (size_t i = 0; i < num; i++) { - info *pinfo = infos++; - pinfo->port = port; + info_s *tcpInfo = tinfos + i; + tcpInfo->port = port + i; - if (pthread_create(pids + i, NULL, bindPort, pinfo) != 0) //创建线程 - { //创建线程失败 - printf("创建线程失败: %d.\n", port); - exit(0); + if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0) + { + printf("create thread fail, port:%d.\n", port); + exit(-1); } - info *uinfo = uinfos++; - uinfo->port = port; - uinfo->type = 1; - port++; - if (pthread_create(pids + num + i, NULL, bindUPort, uinfo) != 0) //创建线程 - { //创建线程失败 - printf("创建线程失败: %d.\n", port); - exit(0); + info_s *udpInfo = uinfos + i; + udpInfo->port = port + i; + if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0) + { + printf("create thread fail, port:%d.\n", port); + exit(-1); } } + for (int i = 0; i < num; i++) { pthread_join(pids[i], NULL); pthread_join(pids[(num + i)], NULL); diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 71b8b1ea84..7b4b4e4343 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -75,6 +75,11 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { } void mnodeProcessPeerRsp(SRpcMsg *pMsg) { + if (!sdbIsMaster()) { + mError("%p, msg:%s is not processed for it is not master", pMsg->ahandle, taosMsg[pMsg->msgType]); + return; + } + if (tsMnodeProcessPeerRspFp[pMsg->msgType]) { (*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg); } else { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index bfeff1f4cb..4fc71affe9 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -164,9 +164,9 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) { // reset vgid status on vgroup changed - mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId); + mDebug("vgId:%d, reset sync status to offline", pVgroup->vgId); for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) { - pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED; + pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_OFFLINE; } mnodeDecVgroupRef(pVgroup); From f2e9e5bf61b210d1c0bbb4989c16e9e97a46a016 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Thu, 6 Aug 2020 10:47:34 +0800 Subject: [PATCH 2/7] [TD-1076] --- src/kit/CMakeLists.txt | 4 +- src/kit/taosnetwork/CMakeLists.txt | 10 +++ src/kit/taosnetwork/client.c | 86 +++++++++++++++++--------- src/kit/taosnetwork/server.c | 98 ++++++++++++++++-------------- 4 files changed, 124 insertions(+), 74 deletions(-) create mode 100644 src/kit/taosnetwork/CMakeLists.txt diff --git a/src/kit/CMakeLists.txt b/src/kit/CMakeLists.txt index df3ce10001..7095d79755 100644 --- a/src/kit/CMakeLists.txt +++ b/src/kit/CMakeLists.txt @@ -4,4 +4,6 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(shell) ADD_SUBDIRECTORY(taosdemo) ADD_SUBDIRECTORY(taosdump) -ADD_SUBDIRECTORY(taosmigrate) \ No newline at end of file +ADD_SUBDIRECTORY(taosmigrate) +#ADD_SUBDIRECTORY(taosClusterTest) +ADD_SUBDIRECTORY(taosnetwork) diff --git a/src/kit/taosnetwork/CMakeLists.txt b/src/kit/taosnetwork/CMakeLists.txt new file mode 100644 index 0000000000..a7412b196d --- /dev/null +++ b/src/kit/taosnetwork/CMakeLists.txt @@ -0,0 +1,10 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + AUX_SOURCE_DIRECTORY(. SRC) + ADD_EXECUTABLE(taosClient client.c) + ADD_EXECUTABLE(taosServer server.c) + TARGET_LINK_LIBRARIES( taosServer -lpthread -lm -lrt ) + TARGET_LINK_LIBRARIES( taosClient -lpthread -lm -lrt ) +ENDIF () diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index c51c1ab4ba..a4fc26f22f 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -28,23 +28,27 @@ #include #include -#define BUFFER_SIZE 200 +#define MAX_PKG_LEN (16*1000) +#define BUFFER_SIZE (MAX_PKG_LEN + 1024) typedef struct { int port; char *host; + uint16_t pktLen; } info_s; typedef struct Arguments { char * host; uint16_t port; uint16_t max_port; + uint16_t pktLen; } SArguments; static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, - {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}}; + {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1}, + {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, + {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 16k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -59,6 +63,12 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'm': arguments->max_port = atoi(arg); break; + case 'l': + arguments->pktLen = atoi(arg); + break; + default: + printf("unknow parameter!\n"); + break; } return 0; } @@ -75,7 +85,7 @@ int checkTcpPort(info_s *info) { char recvbuf[BUFFER_SIZE]; int iDataNum; if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("socket"); + printf("socket() fail: %s\n", strerror(errno)); return -1; } serverAddr.sin_family = AF_INET; @@ -85,26 +95,30 @@ int checkTcpPort(info_s *info) { //printf("=================================\n"); if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { - perror("connect"); + printf("connect() fail: %s\n", strerror(errno)); return -1; } //printf("Connect to: %s:%d...success\n", host, port); + memset(sendbuf, 0, BUFFER_SIZE); + memset(recvbuf, 0, BUFFER_SIZE); - sprintf(sendbuf, "send port_%d", port); - send(clientSocket, sendbuf, strlen(sendbuf), 0); - //printf("Send msg_%d: %s\n", port, sendbuf); + sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", host, port); + sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); + + send(clientSocket, sendbuf, info->pktLen, 0); - recvbuf[0] = '\0'; iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0); - recvbuf[iDataNum] = '\0'; - //printf("Read ack msg_%d: %s\n", port, recvbuf); + if (iDataNum < info->pktLen) { + printf("Read ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port); + return -1; + } + //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); - //printf("=================================\n"); close(clientSocket); return 0; } -void *checkUdpPort(info_s *info) { +int checkUdpPort(info_s *info) { int port = info->port; char *host = info->host; int clientSocket; @@ -117,60 +131,74 @@ void *checkUdpPort(info_s *info) { perror("socket"); return -1; } + serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = inet_addr(host); + + memset(sendbuf, 0, BUFFER_SIZE); + memset(recvbuf, 0, BUFFER_SIZE); - sprintf(sendbuf, "send msg port_%d by udp", port); + sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", host, port); + sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - int code = sendto(clientSocket, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&serverAddr, (int)sin_size); + int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size); if (code < 0) { perror("sendto"); return -1; } - //printf("Send msg_%d by udp: %s\n", port, sendbuf); - - recvbuf[0] = '\0'; iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); - recvbuf[iDataNum] = '\0'; - //printf("Read ack msg_%d from udp: %s\n", port, recvbuf); + if (iDataNum < info->pktLen) { + printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\n", iDataNum, info->pktLen, port); + return -1; + } + + //printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); close(clientSocket); return 0; } int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6030, 6060}; + SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; info_s info; int ret; argp_parse(&argp, argc, argv, 0, 0, &arguments); + if (arguments.pktLen > MAX_PKG_LEN) { + printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); + exit(0); + } - printf("host: %s\tport: %d\tmax_port: %d\n\n", arguments.host, arguments.port, arguments.max_port); + printf("host: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.port, arguments.max_port, arguments.pktLen); int port = arguments.port; info.host = arguments.host; + info.pktLen = arguments.pktLen; - for (; port < arguments.max_port; port++) { - printf("test: %s:%d\n", info.host, port); + for (; port <= arguments.max_port; port++) { + //printf("test: %s:%d\n", info.host, port); + printf("\n"); info.port = port; ret = checkTcpPort(&info); if (ret != 0) { - printf("tcp port:%d test fail.", port); + printf("tcp port:%d test fail.\t\t", port); } else { - printf("tcp port:%d test ok.", port); + printf("tcp port:%d test ok.\t\t", port); } - checkUdpPort(&info); + ret = checkUdpPort(&info); if (ret != 0) { - printf("udp port:%d test fail.", port); + printf("udp port:%d test fail.\t\t", port); } else { - printf("udp port:%d test ok.", port); + printf("udp port:%d test ok.\t\t", port); } } + printf("\n"); + return 0; } diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index f77d23f0a7..18565edb3f 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -29,22 +29,26 @@ #include #include -#define BUFFER_SIZE 200 +#define MAX_PKG_LEN (16*1000) +#define BUFFER_SIZE (MAX_PKG_LEN + 1024) typedef struct { int port; + uint16_t pktLen; } info_s; typedef struct Arguments { char * host; uint16_t port; uint16_t max_port; + uint16_t pktLen; } SArguments; static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}}; + {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}, + {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 16k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -59,6 +63,11 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'm': arguments->max_port = atoi(arg); break; + case 'l': + arguments->pktLen = atoi(arg); + break; + default: + break; } return 0; } @@ -75,10 +84,10 @@ static void *bindTcpPort(void *sarg) { int addr_len = sizeof(clientAddr); int client; char buffer[BUFFER_SIZE]; - int iDataNum; + int iDataNum = 0; if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { - perror("socket"); + printf("socket() fail: %s", strerror(errno)); return NULL; } @@ -88,12 +97,12 @@ static void *bindTcpPort(void *sarg) { server_addr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { - perror("connect"); + printf("port:%d bind() fail: %s", port, strerror(errno)); return NULL; } if (listen(serverSocket, 5) < 0) { - perror("listen"); + printf("listen() fail: %s", strerror(errno)); return NULL; } @@ -101,34 +110,39 @@ static void *bindTcpPort(void *sarg) { while (1) { client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); if (client < 0) { - perror("accept"); + printf("accept() fail: %s", strerror(errno)); continue; } - //printf("=================================\n"); - printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); - while (1) { - buffer[0] = '\0'; - iDataNum = recv(client, buffer, BUFFER_SIZE, 0); + memset(buffer, 0, BUFFER_SIZE); + int nleft, nread; + char *ptr = buffer; + nleft = pinfo->pktLen; + while (nleft > 0) { + nread = recv(client, buffer, BUFFER_SIZE, 0); - if (iDataNum < 0) { - perror("recv null"); - continue; - } - if (iDataNum > 0) { - buffer[iDataNum] = '\0'; - //printf("read msg:%s\n", buffer); - if (strcmp(buffer, "quit") == 0) break; - buffer[0] = '\0'; - - sprintf(buffer, "ack port_%d", port); - //printf("send ack msg:%s\n", buffer); - - send(client, buffer, strlen(buffer), 0); + if (nread == 0) { break; - } + } else if (nread < 0) { + if (errno == EINTR) { + continue; + } else { + printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", inet_ntoa(clientAddr.sin_addr), port, strerror(errno)); + close(serverSocket); + return NULL; + } + } else { + nleft -= nread; + ptr += nread; + iDataNum += nread; + } + } + + printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); + if (iDataNum > 0) { + send(client, buffer, iDataNum, 0); + break; } - //printf("=================================\n"); } close(serverSocket); return NULL; @@ -143,7 +157,7 @@ static void *bindUdpPort(void *sarg) { struct sockaddr_in clientAddr; char buffer[BUFFER_SIZE]; int iDataNum; - + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { perror("socket"); return NULL; @@ -160,10 +174,9 @@ static void *bindUdpPort(void *sarg) { } socklen_t sin_size; - //printf("Bind port: %d success\n", port); while (1) { - buffer[0] = '\0'; + memset(buffer, 0, BUFFER_SIZE); sin_size = sizeof(*(struct sockaddr *)&server_addr); @@ -174,19 +187,10 @@ static void *bindUdpPort(void *sarg) { continue; } if (iDataNum > 0) { - //printf("=================================\n"); + printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); + //printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16); - printf("Client ip is %s, Server port is %d\n", inet_ntoa(clientAddr.sin_addr), port); - buffer[iDataNum] = '\0'; - //printf("Read msg from udp:%s\n", buffer); - if (strcmp(buffer, "quit") == 0) break; - buffer[0] = '\0'; - - sprintf(buffer, "ack port_%d by udp", port); - //printf("Send ack msg by udp:%s\n", buffer); - - sendto(serverSocket, buffer, strlen(buffer), 0, (struct sockaddr *)&clientAddr, (int)sin_size); - //printf("=================================\n"); + sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size); } } @@ -196,8 +200,13 @@ static void *bindUdpPort(void *sarg) { int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6030, 6060}; + SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; argp_parse(&argp, argc, argv, 0, 0, &arguments); + if (arguments.pktLen > MAX_PKG_LEN) { + printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); + exit(0); + } + int port = arguments.port; int num = arguments.max_port - arguments.port + 1; @@ -212,6 +221,7 @@ int main(int argc, char *argv[]) { for (size_t i = 0; i < num; i++) { info_s *tcpInfo = tinfos + i; tcpInfo->port = port + i; + tcpInfo->pktLen = arguments.pktLen; if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0) { From c755056d75a69c48a3148260b99cf468570500e6 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Thu, 6 Aug 2020 10:58:01 +0800 Subject: [PATCH 3/7] [TD-1076] --- src/kit/taosnetwork/client.c | 29 ++++++++++++++++++++++++++--- src/kit/taosnetwork/server.c | 4 ++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index a4fc26f22f..26e292a1be 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -28,7 +28,7 @@ #include #include -#define MAX_PKG_LEN (16*1000) +#define MAX_PKG_LEN (64*1000) #define BUFFER_SIZE (MAX_PKG_LEN + 1024) typedef struct { @@ -107,9 +107,32 @@ int checkTcpPort(info_s *info) { send(clientSocket, sendbuf, info->pktLen, 0); - iDataNum = recv(clientSocket, recvbuf, BUFFER_SIZE, 0); + memset(recvbuf, 0, BUFFER_SIZE); + int nleft, nread; + char *ptr = recvbuf; + nleft = info->pktLen; + while (nleft > 0) { + nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);; + + if (nread == 0) { + break; + } else if (nread < 0) { + if (errno == EINTR) { + continue; + } else { + printf("recv ack pkg from TCP port: %d fail:%s.\n", port, strerror(errno)); + close(clientSocket); + return -1; + } + } else { + nleft -= nread; + ptr += nread; + iDataNum += nread; + } + } + if (iDataNum < info->pktLen) { - printf("Read ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port); + printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port); return -1; } //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index 18565edb3f..b18fbde46e 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -29,7 +29,7 @@ #include #include -#define MAX_PKG_LEN (16*1000) +#define MAX_PKG_LEN (64*1000) #define BUFFER_SIZE (MAX_PKG_LEN + 1024) typedef struct { @@ -119,7 +119,7 @@ static void *bindTcpPort(void *sarg) { char *ptr = buffer; nleft = pinfo->pktLen; while (nleft > 0) { - nread = recv(client, buffer, BUFFER_SIZE, 0); + nread = recv(client, ptr, BUFFER_SIZE, 0); if (nread == 0) { break; From 012c2730137289591d73ab03a2ce19e365086673 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Thu, 6 Aug 2020 10:59:44 +0800 Subject: [PATCH 4/7] [TD-1076] --- src/kit/taosnetwork/client.c | 2 +- src/kit/taosnetwork/server.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index 26e292a1be..9f51f55d93 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -48,7 +48,7 @@ static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1}, {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, - {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 16k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; + {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index b18fbde46e..403faec91b 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -47,8 +47,8 @@ typedef struct Arguments { static struct argp_option options[] = { {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6041.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6050.", 2}, - {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 16k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; + {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, + {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { From 25998a7d92a521322dc9b384ba749d0ad6b8cd00 Mon Sep 17 00:00:00 2001 From: Hui Li Date: Thu, 6 Aug 2020 11:11:44 +0800 Subject: [PATCH 5/7] [TD-1076] --- src/kit/taosnetwork/client.c | 4 ++-- src/kit/taosnetwork/server.c | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index 9f51f55d93..4b72c5c859 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -66,9 +66,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'l': arguments->pktLen = atoi(arg); break; + default: - printf("unknow parameter!\n"); - break; + return ARGP_ERR_UNKNOWN; } return 0; } diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index 403faec91b..1c3bc6fa09 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -66,8 +66,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'l': arguments->pktLen = atoi(arg); break; + default: - break; + return ARGP_ERR_UNKNOWN; } return 0; } From 9ecfffd9b215101936418545d28446ce45c2ef91 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Thu, 6 Aug 2020 11:23:40 +0800 Subject: [PATCH 6/7] TD-1082: fix jdbc failed tests --- .../test/java/com/taosdata/jdbc/AsyncSubscribeTest.java | 3 ++- .../src/test/java/com/taosdata/jdbc/ResultSetTest.java | 9 ++++----- .../src/test/java/com/taosdata/jdbc/SubscribeTest.java | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java index 7f2314d295..c14624e683 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/AsyncSubscribeTest.java @@ -42,7 +42,8 @@ public class AsyncSubscribeTest extends BaseTest { long ts = System.currentTimeMillis(); for (int i = 0; i < 2; i++) { ts += i; - statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")"); + String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")"; + statement.executeUpdate(sql); } } diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ResultSetTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ResultSetTest.java index 5d86840ec3..a0b9c051c6 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ResultSetTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/ResultSetTest.java @@ -71,7 +71,7 @@ public class ResultSetTest extends BaseTest { } try { - statement.executeQuery("select * from " + dbName + "." + tName); + statement.executeQuery("select * from " + dbName + "." + tName + " where ts = " + ts); resSet = statement.getResultSet(); System.out.println(((TSDBResultSet) resSet).getRowData()); while (resSet.next()) { @@ -806,9 +806,9 @@ public class ResultSetTest extends BaseTest { @Test public void testBatch() throws SQLException { - String[] sqls = new String[]{"insert into test.t0 values (1496732686001,2147483600,1496732687000,3.1415925,3.1415926\n" + - "535897,\"涛思数据,强~!\",12,12,\"TDengine is powerful\")", "insert into test.t0 values (1496732686002,2147483600,1496732687000,3.1415925,3.1415926\n" + - "535897,\"涛思数据,强~!\",12,12,\"TDengine is powerful\")"}; + String[] sqls = new String[]{"insert into test.t0 values (1496732686001,2147483600,1496732687000,3.1415925,3.1415926535897," + + "'涛思数据,强~',12,0,'TDengine is powerful')", "insert into test.t0 values (1496732686002,2147483600,1496732687000,3.1415925,3.1415926535897," + + "'涛思数据,强~',12,1,'TDengine is powerful')"}; for (String sql : sqls) { statement.addBatch(sql); } @@ -816,7 +816,6 @@ public class ResultSetTest extends BaseTest { assertEquals(res.length, 2); statement.clearBatch(); } - @AfterClass public static void close() throws Exception { statement.executeUpdate("drop database " + dbName); diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java index 58e93fbc7f..d7f56ac468 100644 --- a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/SubscribeTest.java @@ -41,7 +41,8 @@ public class SubscribeTest extends BaseTest { long ts = System.currentTimeMillis(); for (int i = 0; i < 2; i++) { ts += i; - statement.executeUpdate("insert into \" + dbName + \".\" + tName + \" values (" + ts + ", " + (100 + i) + ", " + i + ")"); + String sql = "insert into " + dbName + "." + tName + " values (" + ts + ", " + (100 + i) + ", " + i + ")"; + statement.executeUpdate(sql); } } From 0aecda2347b79ea434cdf366d6c246c37fac1e7b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Aug 2020 12:50:45 +0800 Subject: [PATCH 7/7] revert code --- src/client/src/tscServer.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a040b3ef55..3fd0aa79a6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1999,8 +1999,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { return 0; } -int tscProcessDropDbRsp(SSqlObj *pSql) { - pSql->pTscObj->db[0] = 0; +int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { taosCacheEmpty(tscCacheHandle); return 0; }