This commit is contained in:
Shengliang Guan 2020-12-02 17:50:44 +08:00
parent 98e3defba8
commit 8b4e85dd09
14 changed files with 343 additions and 326 deletions

30
src/dnode/inc/dnodeStep.h Normal file
View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_STEP_H
#define TDENGINE_DNODE_STEP_H
#ifdef __cplusplus
extern "C" {
#endif
void dnodeReportStep(char *name, char *desc, int8_t finished);
void dnodeSendStartupStep(SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -30,6 +30,7 @@
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
#include "dnodeStep.h"
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
@ -56,6 +57,8 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));

View File

@ -29,6 +29,7 @@
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
#include "dnodeShell.h"
#include "dnodeStep.h"
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
@ -74,6 +75,8 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) {
@ -142,7 +145,23 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, "nettestinternal") == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
dTrace("nettest user is authorized");
return 0;
}
return -1;
}
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0;
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;

45
src/dnode/src/dnodeStep.c Normal file
View File

@ -0,0 +1,45 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "dnodeInt.h"
static SStartupStep tsStartupStep;
void dnodeReportStep(char *name, char *desc, int8_t finished) {
tstrncpy(tsStartupStep.name, name, sizeof(tsStartupStep.name));
tstrncpy(tsStartupStep.desc, desc, sizeof(tsStartupStep.desc));
tsStartupStep.finished = finished;
}
void dnodeSendStartupStep(SRpcMsg *pMsg) {
dInfo("nettest msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
#if 1
memcpy(pStep, &tsStartupStep, sizeof(SStartupStep));
#else
static int32_t step = 0;
sprintf(pStep->name, "module:%d", step++);
sprintf(pStep->desc, "step:%d", step++);
if (step > 10) pStep->finished = 1;
#endif
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}

View File

@ -71,6 +71,8 @@ void dnodeDelayReprocessMWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode();
void dnodeReportStep(char *name, char *desc, int8_t finished);
#ifdef __cplusplus
}
#endif

View File

@ -286,6 +286,9 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24

View File

@ -105,10 +105,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "network-test" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 105
@ -838,6 +835,14 @@ typedef struct {
char ckey[TSDB_KEY_LEN];
} SAuthMsg, SAuthRsp;
typedef struct {
int8_t finished;
int8_t reserved1[7];
char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN];
char reserved2[64];
} SStartupStep;
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -51,7 +51,6 @@ typedef struct SShellArguments {
char* commands;
int abort;
int port;
int endPort;
int pktLen;
char* netTestRole;
} SShellArguments;

View File

@ -32,14 +32,14 @@
/**************** Global variables ****************/
#ifdef _TD_POWER_
char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n"
"Copyright (c) 2017 by PowerDB, Inc. All rights reserved.\n\n";
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
char PROMPT_HEADER[] = "power> ";
char CONTINUE_PROMPT[] = " -> ";
int prompt_size = 7;
#else
char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n";
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char PROMPT_HEADER[] = "taos> ";
char CONTINUE_PROMPT[] = " -> ";

View File

@ -46,8 +46,7 @@ static struct argp_option options[] = {
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|clients|server."},
{"endport", 'e', "ENDPORT", 0, "Net test end port, default is 6042."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is NULL, options: client|server|rpc|startup."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{0}};
@ -130,20 +129,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case 'd':
arguments->database = arg;
break;
case 'n':
arguments->netTestRole = arg;
break;
case 'e':
if (arg) {
arguments->endPort = atoi(arg);
} else {
fprintf(stderr, "Invalid end port\n");
return -1;
}
break;
case 'l':
if (arg) {
arguments->pktLen = atoi(arg);
@ -152,7 +140,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return -1;
}
break;
case OPT_ABORT:
arguments->abort = 1;
break;

View File

@ -61,8 +61,7 @@ SShellArguments args = {
.file = "\0",
.dir = "\0",
.threadNum = 5,
.commands = NULL,
.endPort = 6042,
.commands = NULL,
.pktLen = 1000,
.netTestRole = NULL
};
@ -81,9 +80,7 @@ int main(int argc, char* argv[]) {
if (args.netTestRole && args.netTestRole[0] != 0) {
taos_init();
CmdArguments cmdArgs;
memcpy(&cmdArgs, &args, sizeof(SShellArguments));
taosNetTest(&cmdArgs);
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen);
exit(0);
}

View File

@ -1086,13 +1086,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
}
} else { // msg is passed to app only parsing is ok
if (pHead->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
rpcSendQuickRsp(pConn, TSDB_CODE_SUCCESS);
rpcFreeMsg(pRecv->msg);
return pConn;
}
rpcProcessIncomingMsg(pConn, pHead, pContext);
}
}

View File

@ -20,27 +20,7 @@
extern "C" {
#endif
typedef struct CmdArguments {
char* host;
char* password;
char* user;
char* auth;
char* database;
char* timezone;
bool is_raw_time;
bool is_use_passwd;
char file[TSDB_FILENAME_LEN];
char dir[TSDB_FILENAME_LEN];
int threadNum;
char* commands;
int abort;
int port;
int endPort;
int pktLen;
char* netTestRole;
} CmdArguments;
void taosNetTest(CmdArguments* args);
void taosNetTest(char *role, char *host, int port, int pkgLen);
#ifdef __cplusplus
}

View File

@ -13,50 +13,42 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tulog.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tsocket.h"
#include "trpc.h"
#include "rpcHead.h"
#include "tutil.h"
#include "tnettest.h"
#define MAX_PKG_LEN (64*1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
#define MAX_PKG_LEN (64 * 1000)
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
extern int32_t tsRpcMaxUdpSize;
typedef struct {
char * hostFqdn;
uint32_t hostIp;
uint16_t port;
uint16_t pktLen;
} info_s;
int32_t port;
int32_t pktLen;
} STestInfo;
extern int tsRpcMaxUdpSize;
static char g_user[TSDB_USER_LEN+1] = {0};
static char g_pass[TSDB_PASSWORD_LEN+1] = {0};
static char g_serverFqdn[TSDB_FQDN_LEN] = {0};
static uint16_t g_startPort = 0;
static uint16_t g_endPort = 6042;
static uint32_t g_pktLen = 0;
static void *bindUdpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
SOCKET serverSocket;
static void *taosNetBindUdpPort(void *sarg) {
STestInfo *pinfo = (STestInfo *)sarg;
int32_t port = pinfo->port;
SOCKET serverSocket;
char buffer[BUFFER_SIZE];
int32_t iDataNum;
socklen_t sin_size;
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
char buffer[BUFFER_SIZE];
int iDataNum;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket");
uError("failed to create udp socket since %s", strerror(errno));
return NULL;
}
@ -66,28 +58,25 @@ static void *bindUdpPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("connect");
uError("failed to bind udp port:%d since %s", port, strerror(errno));
return NULL;
}
socklen_t sin_size;
uInfo("udp server at port:%d is listening", port);
while (1) {
memset(buffer, 0, BUFFER_SIZE);
sin_size = sizeof(*(struct sockaddr *)&server_addr);
iDataNum = recvfrom(serverSocket, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&clientAddr, &sin_size);
if (iDataNum < 0) {
perror("recvfrom null");
uDebug("failed to perform recvfrom func at %d since %s", port, strerror(errno));
continue;
}
if (iDataNum > 0) {
printf("recv Client: %s pkg from UDP port: %d, pkg len: %d\n", taosInetNtoa(clientAddr.sin_addr), port, iDataNum);
//printf("Read msg from udp:%s ... %s\n", buffer, buffer+iDataNum-16);
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int)sin_size);
if (iDataNum > 0) {
uInfo("UDP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
sendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size);
}
}
@ -95,20 +84,20 @@ static void *bindUdpPort(void *sarg) {
return NULL;
}
static void *bindTcpPort(void *sarg) {
info_s *pinfo = (info_s *)sarg;
int port = pinfo->port;
SOCKET serverSocket;
static void *taosNetBindTcpPort(void *sarg) {
struct sockaddr_in server_addr;
struct sockaddr_in clientAddr;
int addr_len = sizeof(clientAddr);
SOCKET client;
char buffer[BUFFER_SIZE];
int iDataNum = 0;
STestInfo *pinfo = sarg;
int32_t port = pinfo->port;
SOCKET serverSocket;
int32_t addr_len = sizeof(clientAddr);
SOCKET client;
char buffer[BUFFER_SIZE];
int32_t iDataNum = 0;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
printf("socket() fail: %s", strerror(errno));
uError("failed to create tcp socket since %s", strerror(errno));
return NULL;
}
@ -118,28 +107,30 @@ static void *bindTcpPort(void *sarg) {
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
printf("port:%d bind() fail: %s", port, strerror(errno));
uError("failed to bind tcp port:%d since %s", port, strerror(errno));
return NULL;
}
if (listen(serverSocket, 5) < 0) {
printf("listen() fail: %s", strerror(errno));
uError("failed to listen tcp port:%d since %s", port, strerror(errno));
return NULL;
}
//printf("Bind port: %d success\n", port);
uInfo("tcp server at port:%d is listening", port);
while (1) {
client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len);
if (client < 0) {
printf("accept() fail: %s", strerror(errno));
uDebug("failed to accept from tcp port:%d since %s", port, strerror(errno));
continue;
}
iDataNum = 0;
memset(buffer, 0, BUFFER_SIZE);
int nleft, nread;
char *ptr = buffer;
int32_t nleft, nread;
char * ptr = buffer;
nleft = pinfo->pktLen;
while (nleft > 0) {
nread = recv(client, ptr, BUFFER_SIZE, 0);
@ -149,7 +140,7 @@ static void *bindTcpPort(void *sarg) {
if (errno == EINTR) {
continue;
} else {
printf("recv Client: %s pkg from TCP port: %d fail:%s.\n", taosInetNtoa(clientAddr.sin_addr), port, strerror(errno));
uError("failed to perform recv func at %d since %s", port, strerror(errno));
taosCloseSocket(serverSocket);
return NULL;
}
@ -157,11 +148,11 @@ static void *bindTcpPort(void *sarg) {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
}
}
printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", taosInetNtoa(clientAddr.sin_addr), port, iDataNum);
if (iDataNum > 0) {
uInfo("TCP: recv:%d bytes from %s:%d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port);
send(client, buffer, iDataNum, 0);
}
}
@ -170,39 +161,38 @@ static void *bindTcpPort(void *sarg) {
return NULL;
}
static int checkTcpPort(info_s *info) {
static int32_t taosNetCheckTcpPort(STestInfo *info) {
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int32_t iDataNum = 0;
struct sockaddr_in serverAddr;
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int iDataNum = 0;
if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
printf("socket() fail: %s\n", strerror(errno));
uError("failed to create tcp client socket since %s", strerror(errno));
return -1;
}
// set send and recv overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
struct timeval timeout;
timeout.tv_sec = 2; // s
timeout.tv_usec = 0; // us
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt send timer since %s", strerror(errno));
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno));
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
//printf("=================================\n");
if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
printf("connect() fail: %s\t", strerror(errno));
uError("failed to connect port:%d since %s", info->port, strerror(errno));
return -1;
}
//printf("Connect to: %s:%d...success\n", host, port);
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
@ -214,9 +204,10 @@ static int checkTcpPort(info_s *info) {
send(clientSocket, sendbuf, info->pktLen, 0);
memset(recvbuf, 0, BUFFER_SIZE);
int nleft, nread;
char *ptr = recvbuf;
int32_t nleft, nread;
char * ptr = recvbuf;
nleft = info->pktLen;
while (nleft > 0) {
nread = recv(clientSocket, ptr, BUFFER_SIZE, 0);;
@ -226,7 +217,7 @@ static int checkTcpPort(info_s *info) {
if (errno == EINTR) {
continue;
} else {
printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno));
uError("faild to recv pkg from TCP port:%d since %s", info->port, strerror(errno));
taosCloseSocket(clientSocket);
return -1;
}
@ -234,45 +225,46 @@ static int checkTcpPort(info_s *info) {
nleft -= nread;
ptr += nread;
iDataNum += nread;
}
}
}
if (iDataNum < info->pktLen) {
printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port);
uError("TCP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
return -1;
}
//printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
taosCloseSocket(clientSocket);
return 0;
}
static int checkUdpPort(info_s *info) {
static int32_t taosNetCheckUdpPort(STestInfo *info) {
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int32_t iDataNum = 0;
struct sockaddr_in serverAddr;
SOCKET clientSocket;
char sendbuf[BUFFER_SIZE];
char recvbuf[BUFFER_SIZE];
int iDataNum = 0;
if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
perror("socket");
uError("failed to create udp client socket since %s", strerror(errno));
return -1;
}
// set overtime
// set overtime
struct timeval timeout;
timeout.tv_sec = 2; //s
timeout.tv_usec = 0; //us
if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt send timer failed:");
timeout.tv_sec = 2; // s
timeout.tv_usec = 0; // us
if (setsockopt(clientSocket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt send timer since %s", strerror(errno));
}
if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
perror("setsockopt recv timer failed:");
if (setsockopt(clientSocket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) {
uError("failed to setsockopt recv timer since %s", strerror(errno));
}
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(info->port);
serverAddr.sin_addr.s_addr = info->hostIp;
memset(sendbuf, 0, BUFFER_SIZE);
memset(recvbuf, 0, BUFFER_SIZE);
@ -283,69 +275,66 @@ static int checkUdpPort(info_s *info) {
socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr);
int code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int)sin_size);
int32_t code = sendto(clientSocket, sendbuf, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size);
if (code < 0) {
perror("sendto");
uError("failed to perform sendto func since %s", strerror(errno));
return -1;
}
iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size);
if (iDataNum < info->pktLen) {
printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port);
uError("UDP: received ack:%d bytes, less than send:%d bytes from port:%d", iDataNum, info->pktLen, info->port);
return -1;
}
//printf("Read ack pkg len:%d from udp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8);
taosCloseSocket(clientSocket);
return 0;
}
static void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) {
int ret;
info_s info;
memset(&info, 0, sizeof(info_s));
info.hostIp = hostIp;
info.pktLen = pktLen;
static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort, int32_t pktLen) {
int32_t ret;
STestInfo info;
for (uint16_t port = startPort; port <= maxPort; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
memset(&info, 0, sizeof(STestInfo));
info.hostIp = hostIp;
info.pktLen = pktLen;
for (int32_t port = startPort; port <= endPort; port++) {
info.port = port;
ret = checkTcpPort(&info);
ret = taosNetCheckTcpPort(&info);
if (ret != 0) {
printf("tcp port:%d test fail.\t\n", port);
uError("failed to test tcp port:%d", port);
} else {
printf("tcp port:%d test ok.\t\t", port);
uInfo("successed to test tcp port:%d", port);
}
ret = checkUdpPort(&info);
ret = taosNetCheckUdpPort(&info);
if (ret != 0) {
printf("udp port:%d test fail.\t\n", port);
uError("failed to test udp port:%d", port);
} else {
printf("udp port:%d test ok.\t\t", port);
uInfo("successed to test udp port:%d", port);
}
}
printf("\n");
return ;
return;
}
void* tnetInitRpc(char* secretEncrypt, char spi) {
void *taosNetInitRpc(char *secretEncrypt, char spi) {
SRpcInit rpcInit;
void* pRpcConn = NULL;
void * pRpcConn = NULL;
char user[] = "nettestinternal";
char pass[] = "nettestinternal";
taosEncryptPass((uint8_t *)pass, strlen(pass), secretEncrypt);
taosEncryptPass((uint8_t *)g_pass, strlen(g_pass), secretEncrypt);
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "NET-TEST";
rpcInit.label = "NT";
rpcInit.numOfThreads = 1; // every DB connection has only one thread
rpcInit.cfp = NULL;
rpcInit.sessions = 16;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = g_user;
rpcInit.user = user;
rpcInit.idleTime = 2000;
rpcInit.ckey = "key";
rpcInit.spi = spi;
@ -355,16 +344,17 @@ void* tnetInitRpc(char* secretEncrypt, char spi) {
return pRpcConn;
}
static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi) {
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupStep *pStep) {
SRpcEpSet epSet;
SRpcMsg reqMsg;
SRpcMsg rspMsg;
void* pRpcConn;
void * pRpcConn;
char secretEncrypt[32] = {0};
pRpcConn = tnetInitRpc(secretEncrypt, spi);
pRpcConn = taosNetInitRpc(secretEncrypt, spi);
if (NULL == pRpcConn) {
uError("failed to init client rpc");
return -1;
}
@ -373,205 +363,169 @@ static int rpcCheckPortImpl(const char* serverFqdn, uint16_t port, uint16_t pktL
epSet.numOfEps = 1;
epSet.port[0] = port;
strcpy(epSet.fqdn[0], serverFqdn);
reqMsg.msgType = TSDB_MSG_TYPE_NETWORK_TEST;
reqMsg.pCont = rpcMallocCont(pktLen);
reqMsg.contLen = pktLen;
reqMsg.code = 0;
reqMsg.handle = NULL; // rpc handle returned to app
reqMsg.ahandle = NULL; // app handle set by client
reqMsg.ahandle = NULL; // app handle set by client
strcpy(reqMsg.pCont, "nettest");
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
// handle response
if ((rspMsg.code != 0) || (rspMsg.msgType != TSDB_MSG_TYPE_NETWORK_TEST + 1)) {
//printf("code:%d[%s]\n", rspMsg.code, tstrerror(rspMsg.code));
uDebug("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
return -1;
}
int32_t code = 0;
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupStep)) {
memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
code = 1;
}
rpcFreeCont(rspMsg.pCont);
rpcClose(pRpcConn);
return 0;
return code;
}
static void rpcCheckPort(uint32_t hostIp) {
int ret;
char spi;
static int32_t taosNetParseStartup(SStartupStep *pCont) {
SStartupStep *pStep = pCont;
uInfo("step:%s desc:%s", pStep->name, pStep->desc);
for (uint16_t port = g_startPort; port <= g_endPort; port++) {
//printf("test: %s:%d\n", info.host, port);
printf("\n");
if (pStep->finished) {
uInfo("check startup finished");
}
//================ check tcp port ================
int32_t pktLen;
if (g_pktLen <= tsRpcMaxUdpSize) {
pktLen = tsRpcMaxUdpSize + 1000;
} else {
pktLen = g_pktLen;
return pStep->finished ? 0 : 1;
}
static void taosNetTestStartup(char *host, int32_t port) {
uInfo("check startup, host:%s port:%d\n", host, port);
SStartupStep *pStep = malloc(sizeof(SStartupStep));
while (1) {
int32_t code = taosNetCheckRpc(host, port, 20, 0, pStep);
if (code > 0) {
code = taosNetParseStartup(pStep);
}
spi = 1;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
spi = 0;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
printf("TCP port:%d test fail.\t\t", port);
} else {
//printf("tcp port:%d test ok.\t\t", port);
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
}
if (code > 0) {
uDebug("continue check startup step");
} else {
//printf("tcp port:%d test ok.\t\t", port);
printf("TCP port:\033[32m%d test OK\033[0m\t\t", port);
break;
}
taosMsleep(500);
}
//================ check udp port ================
if (g_pktLen >= tsRpcMaxUdpSize) {
pktLen = tsRpcMaxUdpSize - 1000;
} else {
pktLen = g_pktLen;
}
free(pStep);
}
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 9;
char spi = 0;
uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
spi = 0;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
spi = 1;
ret = rpcCheckPortImpl(g_serverFqdn, port, pktLen, spi);
if (ret != 0) {
printf("udp port:%d test fail.\t\n", port);
} else {
//printf("udp port:%d test ok.\t\n", port);
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
}
for (uint16_t port = startPort; port <= endPort; port++) {
int32_t sendpkgLen;
if (pkgLen <= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize + 1000;
} else {
//printf("udp port:%d test ok.\t\n", port);
printf("UDP port:\033[32m%d test OK\033[0m\t\n", port);
sendpkgLen = pkgLen;
}
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
if (ret < 0) {
uError("failed to test tcp port:%d", port);
} else {
uInfo("successed to test tcp port:%d", port);
}
if (pkgLen >= tsRpcMaxUdpSize) {
sendpkgLen = tsRpcMaxUdpSize - 1000;
} else {
sendpkgLen = pkgLen;
}
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
if (ret < 0) {
uError("failed to test udp port:%d", port);
} else {
uInfo("successed to test udp port:%d", port);
}
}
printf("\n");
return ;
}
static void taosNetTestClient(int flag) {
uint32_t serverIp = taosGetIpFromFqdn(g_serverFqdn);
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 11;
uInfo("work as client, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
uint32_t serverIp = taosGetIpFromFqdn(host);
if (serverIp == 0xFFFFFFFF) {
printf("Failed to resolve FQDN:%s", g_serverFqdn);
uError("failed to resolve fqdn:%s", host);
exit(-1);
}
if (0 == flag) {
checkPort(serverIp, g_startPort, g_endPort, g_pktLen);
} else {
rpcCheckPort(serverIp);
}
return;
uInfo("server ip:%s is resolved from host:%s", taosIpStr(serverIp), host);
taosNetCheckPort(serverIp, startPort, endPort, pkgLen);
}
static void taosNetTestServer(uint16_t startPort, uint16_t endPort, int pktLen) {
static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
int32_t endPort = startPort + 11;
uInfo("work as server, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
int port = startPort;
int num = endPort - startPort + 1;
int32_t port = startPort;
int32_t num = endPort - startPort + 1;
if (num < 0) num = 1;
if (num < 0) {
num = 1;
}
pthread_t *pids = malloc(2 * num * sizeof(pthread_t));
info_s * tinfos = malloc(num * sizeof(info_s));
info_s * uinfos = malloc(num * sizeof(info_s));
STestInfo *tinfos = malloc(num * sizeof(STestInfo));
STestInfo *uinfos = malloc(num * sizeof(STestInfo));
for (size_t i = 0; i < num; i++) {
info_s *tcpInfo = tinfos + i;
tcpInfo->port = (uint16_t)(port + i);
tcpInfo->pktLen = pktLen;
for (int32_t i = 0; i < num; i++) {
STestInfo *tcpInfo = tinfos + i;
tcpInfo->port = port + i;
tcpInfo->pktLen = pkgLen;
if (pthread_create(pids + i, NULL, bindTcpPort, tcpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
if (pthread_create(pids + i, NULL, taosNetBindTcpPort, tcpInfo) != 0) {
uInfo("failed to create tcp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
info_s *udpInfo = uinfos + i;
STestInfo *udpInfo = uinfos + i;
udpInfo->port = (uint16_t)(port + i);
if (pthread_create(pids + num + i, NULL, bindUdpPort, udpInfo) != 0)
{
printf("create thread fail, port:%d.\n", port);
if (pthread_create(pids + num + i, NULL, taosNetBindUdpPort, udpInfo) != 0) {
uInfo("failed to create udp test thread, %s:%d", tcpInfo->hostFqdn, tcpInfo->port);
exit(-1);
}
}
for (int i = 0; i < num; i++) {
for (int32_t i = 0; i < num; i++) {
pthread_join(pids[i], NULL);
pthread_join(pids[(num + i)], NULL);
}
}
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
tscEmbedded = 1;
if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort;
if (pkgLen <= 10) pkgLen = 1000;
if (pkgLen > MAX_PKG_LEN) pkgLen = MAX_PKG_LEN;
void taosNetTest(CmdArguments *args) {
if (0 == args->pktLen) {
g_pktLen = 1000;
if (0 == strcmp("client", role)) {
taosNetTestClient(host, port, pkgLen);
} else if (0 == strcmp("server", role)) {
taosNetTestServer(host, port, pkgLen);
} else if (0 == strcmp("rpc", role)) {
taosNetTestRpc(host, port, pkgLen);
} else if (0 == strcmp("startup", role)) {
taosNetTestStartup(host, port);
} else {
g_pktLen = args->pktLen;
}
if (args->port && args->endPort) {
if (args->port > args->endPort) {
printf("endPort[%d] must not lesss port[%d]\n", args->endPort, args->port);
exit(-1);
}
}
if (args->host && args->host[0] != 0) {
if (strlen(args->host) >= TSDB_EP_LEN) {
printf("host invalid: %s\n", args->host);
exit(-1);
}
taosGetFqdnPortFromEp(args->host, g_serverFqdn, &g_startPort);
} else {
tstrncpy(g_serverFqdn, "127.0.0.1", TSDB_IPv4ADDR_LEN);
g_startPort = tsServerPort;
}
if (args->port) {
g_startPort = args->port;
}
if (args->endPort) {
g_endPort = args->endPort;
}
if (g_startPort > g_endPort) {
printf("endPort[%d] must not lesss port[%d]\n", g_endPort, g_startPort);
exit(-1);
taosNetTestStartup(host, port);
}
if (args->is_use_passwd) {
if (args->password == NULL) args->password = getpass("Enter password: ");
} else {
args->password = TSDB_DEFAULT_PASS;
}
tstrncpy(g_pass, args->password, TSDB_PASSWORD_LEN);
if (args->user == NULL) {
args->user = TSDB_DEFAULT_USER;
}
tstrncpy(g_user, args->user, TSDB_USER_LEN);
if (0 == strcmp("client", args->netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
taosNetTestClient(0);
} else if (0 == strcmp("clients", args->netTestRole)) {
printf("host: %s\tstart port: %d\tend port: %d\tpacket len: %d\n", g_serverFqdn, g_startPort, g_endPort, g_pktLen);
taosNetTestClient(1);
} else if (0 == strcmp("server", args->netTestRole)) {
taosNetTestServer(g_startPort, g_endPort, g_pktLen);
}
tscEmbedded = 0;
}