diff --git a/include/common/taosdef.h b/include/common/taosdef.h index f252143fa6..29d711d6d6 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -75,6 +75,8 @@ typedef enum { extern char *qtypeStr[]; +#define TSDB_PORT_DNODEDNODE 5 +#define TSDB_PORT_SYNC 10 #define TSDB_PORT_HTTP 11 #ifdef __cplusplus diff --git a/source/libs/transport/inc/rpcHead.h b/include/libs/transport/rpcHead.h similarity index 100% rename from source/libs/transport/inc/rpcHead.h rename to include/libs/transport/rpcHead.h diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 3c8cc8abd1..7af8dd37bf 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -51,10 +51,31 @@ extern "C" { #endif -#if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) +#if defined(WINDOWS) #define htobe64 htonll #endif +#if defined(WINDOWS) + #define TAOS_EPOLL_WAIT_TIME 100 + typedef SOCKET eventfd_t; + #define eventfd(a, b) -1 + typedef SOCKET EpollFd; + #define EpollClose(pollFd) epoll_close(pollFd) + #ifndef EPOLLWAKEUP + #define EPOLLWAKEUP (1u << 29) + #endif +#elif defined(_TD_DARWIN_64) + #define TAOS_EPOLL_WAIT_TIME 500 + typedef int32_t SOCKET; + typedef SOCKET EpollFd; + #define EpollClose(pollFd) epoll_close(pollFd) +#else + #define TAOS_EPOLL_WAIT_TIME 500 + typedef int32_t SOCKET; + typedef SOCKET EpollFd; + #define EpollClose(pollFd) taosCloseSocket(pollFd) +#endif + #if defined(_TD_DARWIN_64) // #define htobe64 htonll @@ -64,12 +85,12 @@ extern "C" { # define htole16(x) OSSwapHostToLittleInt16(x) # define be16toh(x) OSSwapBigToHostInt16(x) # define le16toh(x) OSSwapLittleToHostInt16(x) - + # define htobe32(x) OSSwapHostToBigInt32(x) # define htole32(x) OSSwapHostToLittleInt32(x) # define be32toh(x) OSSwapBigToHostInt32(x) # define le32toh(x) OSSwapLittleToHostInt32(x) - + # define htobe64(x) OSSwapHostToBigInt64(x) # define htole64(x) OSSwapHostToLittleInt64(x) # define be64toh(x) OSSwapBigToHostInt64(x) @@ -83,6 +104,17 @@ extern "C" { #define TAOS_EPOLL_WAIT_TIME 500 +typedef int32_t SocketFd; +typedef SocketFd EpollFd; + +typedef struct TdSocket { +#if SOCKET_WITH_LOCK + TdThreadRwlock rwlock; +#endif + int refId; + SocketFd fd; +} *TdSocketPtr, TdSocket; + typedef struct TdSocketServer *TdSocketServerPtr; typedef struct TdSocket *TdSocketPtr; typedef struct TdEpoll *TdEpollPtr; @@ -91,6 +123,7 @@ int32_t taosSendto(TdSocketPtr pSocket, void * msg, int len, unsigned int flags, int32_t taosWriteSocket(TdSocketPtr pSocket, void *msg, int len); int32_t taosReadSocket(TdSocketPtr pSocket, void *msg, int len); int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr, socklen_t *addrLen); +int32_t taosCloseSocketNoCheck1(SocketFd fd); int32_t taosCloseSocket(TdSocketPtr *ppSocket); int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer); int32_t taosShutDownSocketRD(TdSocketPtr pSocket); diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 0e6abb5785..f693ad74de 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -47,9 +47,6 @@ #endif #endif -typedef int32_t SocketFd; -typedef SocketFd EpollFd; - typedef struct TdSocketServer { #if SOCKET_WITH_LOCK TdThreadRwlock rwlock; @@ -58,14 +55,6 @@ typedef struct TdSocketServer { SocketFd fd; } *TdSocketServerPtr, TdSocketServer; -typedef struct TdSocket { -#if SOCKET_WITH_LOCK - TdThreadRwlock rwlock; -#endif - int refId; - SocketFd fd; -} *TdSocketPtr, TdSocket; - typedef struct TdEpoll { #if SOCKET_WITH_LOCK TdThreadRwlock rwlock; diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index dae0a1c840..f064833691 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -4,6 +4,7 @@ IF (TD_TAOS_TOOLS) INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/common) INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/util) INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/os) + INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/include/libs/transport) ADD_SUBDIRECTORY(taos-tools) ENDIF () diff --git a/tools/shell/CMakeLists.txt b/tools/shell/CMakeLists.txt index b351675e47..284693795e 100644 --- a/tools/shell/CMakeLists.txt +++ b/tools/shell/CMakeLists.txt @@ -4,9 +4,7 @@ add_executable(shell ${SHELL_SRC}) target_link_libraries( shell PUBLIC taos - PUBLIC util - PUBLIC common - PUBLIC os + PRIVATE os common transport util ) target_include_directories( shell diff --git a/tools/shell/inc/syncMsg.h b/tools/shell/inc/syncMsg.h new file mode 100644 index 0000000000..85ac9c78af --- /dev/null +++ b/tools/shell/inc/syncMsg.h @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_SYNC_MSG_H +#define TDENGINE_SYNC_MSG_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "tsync.h" + +typedef enum { + TAOS_SMSG_START = 0, + TAOS_SMSG_SYNC_DATA = 1, + TAOS_SMSG_SYNC_DATA_RSP = 2, + TAOS_SMSG_SYNC_FWD = 3, + TAOS_SMSG_SYNC_FWD_RSP = 4, + TAOS_SMSG_SYNC_REQ = 5, + TAOS_SMSG_SYNC_REQ_RSP = 6, + TAOS_SMSG_SYNC_MUST = 7, + TAOS_SMSG_SYNC_MUST_RSP = 8, + TAOS_SMSG_STATUS = 9, + TAOS_SMSG_STATUS_RSP = 10, + TAOS_SMSG_SETUP = 11, + TAOS_SMSG_SETUP_RSP = 12, + TAOS_SMSG_SYNC_FILE = 13, + TAOS_SMSG_SYNC_FILE_RSP = 14, + TAOS_SMSG_TEST = 15, + TAOS_SMSG_END = 16 +} ESyncMsgType; + +typedef enum { + SYNC_STATUS_BROADCAST, + SYNC_STATUS_BROADCAST_RSP, + SYNC_STATUS_SETUP_CONN, + SYNC_STATUS_SETUP_CONN_RSP, + SYNC_STATUS_EXCHANGE_DATA, + SYNC_STATUS_EXCHANGE_DATA_RSP, + SYNC_STATUS_CHECK_ROLE, + SYNC_STATUS_CHECK_ROLE_RSP +} ESyncStatusType; + +#pragma pack(push, 1) + +typedef struct { + int8_t type; // msg type + int8_t protocol; // protocol version + uint16_t signature; // fixed value + int32_t code; // + int32_t cId; // cluster Id + int32_t vgId; // vg ID + int32_t len; // content length, does not include head + uint32_t cksum; +} SSyncHead; + +typedef struct { + SSyncHead head; + uint16_t port; + uint16_t tranId; + int32_t sourceId; // only for arbitrator + char fqdn[TSDB_FQDN_LEN]; +} SSyncMsg; + +typedef struct { + SSyncHead head; + int8_t sync; + int8_t reserved; + uint16_t tranId; + int8_t reserverd[4]; +} SSyncRsp; + +typedef struct { + int8_t role; + uint64_t version; +} SPeerStatus; + +typedef struct { + SSyncHead head; + int8_t role; + int8_t ack; + int8_t type; + int8_t reserved[3]; + uint16_t tranId; + uint64_t version; + SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA]; +} SPeersStatus; + +typedef struct { + SSyncHead head; + uint64_t fversion; +} SFileVersion; + +typedef struct { + SSyncHead head; + int8_t ack; +} SFileAck; + +typedef struct { + SSyncHead head; + uint64_t version; + int32_t code; +} SFwdRsp; + +#pragma pack(pop) + +#define SYNC_PROTOCOL_VERSION 1 +#define SYNC_SIGNATURE ((uint16_t)(0xCDEF)) + +extern char *statusType[]; + +uint16_t syncGenTranId(); +int32_t syncCheckHead(SSyncHead *pHead); + +void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len); +void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code); +void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId); +void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId); +void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId); + +void syncBuildFileAck(SFileAck *pMsg, int32_t vgId); +void syncBuildFileVersion(SFileVersion *pMsg, int32_t vgId); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_VNODEPEER_H diff --git a/tools/shell/inc/tsync.h b/tools/shell/inc/tsync.h new file mode 100644 index 0000000000..d1b68e3f5a --- /dev/null +++ b/tools/shell/inc/tsync.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_SYNC_H +#define TDENGINE_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define TAOS_SYNC_MAX_REPLICA 5 +#define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF + +typedef enum { + TAOS_SYNC_ROLE_OFFLINE = 0, + TAOS_SYNC_ROLE_UNSYNCED = 1, + TAOS_SYNC_ROLE_SYNCING = 2, + TAOS_SYNC_ROLE_SLAVE = 3, + TAOS_SYNC_ROLE_MASTER = 4 +} ESyncRole; + +typedef enum { + TAOS_SYNC_STATUS_INIT = 0, + TAOS_SYNC_STATUS_START = 1, + TAOS_SYNC_STATUS_FILE = 2, + TAOS_SYNC_STATUS_CACHE = 3 +} ESyncStatus; + +typedef struct { + uint32_t nodeId; // node ID assigned by TDengine + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +} SNodeInfo; + +typedef struct { + int8_t quorum; // number of confirms required, >=1 + int8_t replica; // number of replications, >=1 + SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; +} SSyncCfg; + +typedef struct { + int32_t selfIndex; + uint32_t nodeId[TAOS_SYNC_MAX_REPLICA]; + int32_t role[TAOS_SYNC_MAX_REPLICA]; +} SNodesRole; + +// get the wal file from index or after +// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file +typedef int32_t (*FGetWalInfo)(int32_t vgId, char *fileName, int64_t *fileId); + +// when a forward pkt is received, call this to handle data +typedef int32_t (*FWriteToCache)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg); + +// when forward is confirmed by peer, master call this API to notify app +typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); + +// when role is changed, call this to notify app +typedef void (*FNotifyRole)(int32_t vgId, int8_t role); + +// if a number of retrieving data failed, call this to start flow control +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); + +// when data file is synced successfully, notity app +typedef void (*FStartSyncFile)(int32_t vgId); +typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion); + +// get file version +typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); + +typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd); +typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd); + +typedef struct { + int32_t vgId; // vgroup ID + uint64_t version; // initial version + SSyncCfg syncCfg; // configuration from mgmt + char path[TSDB_FILENAME_LEN]; // path to the file + void * pTsdb; + FGetWalInfo getWalInfoFp; + FWriteToCache writeToCacheFp; + FConfirmForward confirmForward; + FNotifyRole notifyRoleFp; + FNotifyFlowCtrl notifyFlowCtrlFp; + FStartSyncFile startSyncFileFp; + FStopSyncFile stopSyncFileFp; + FGetVersion getVersionFp; + FSendFile sendFileFp; + FRecvFile recvFileFp; +} SSyncInfo; + +typedef void *tsync_h; + +int32_t syncInit(); +void syncCleanUp(); + +int64_t syncStart(const SSyncInfo *); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg *); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force); +void syncRecover(int64_t rid); // recover from other nodes: +int32_t syncGetNodesRole(int64_t rid, SNodesRole *); + +extern char *syncRole[]; + +//global configurable parameters +extern int32_t sDebugFlag; +extern char tsArbitrator[]; +extern uint16_t tsSyncPort; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_SYNC_H diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 78d6f74df1..dd5fa2e2a5 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -32,6 +32,8 @@ int indicator = 1; void insertChar(Command *cmd, char *c, int size); +void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, + int32_t pkgNum, char *pkgType); const char *argp_program_version = version; const char *argp_program_bug_address = ""; static char doc[] = ""; @@ -59,7 +61,8 @@ static struct argp_option options[] = { {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."}, {"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."}, {"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."}, - {"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."}, +// Shuduo: 3.0 does not support UDP any more +// {"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."}, {0}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -629,16 +632,25 @@ int main(int argc, char *argv[]) { taosDumpGlobalCfg(); exit(0); } +#endif if (args.netTestRole && args.netTestRole[0] != 0) { - if (taos_init()) { + TAOS *con = NULL; + if (args.auth == NULL) { + con = taos_connect(args.host, args.user, args.password, args.database, args.port); + } else { + con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port); + } + +/* if (taos_init()) { printf("Failed to init taos"); exit(EXIT_FAILURE); } + */ taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType); + taos_close(con); exit(0); } -#endif /* Initialize the shell */ TAOS *con = shellInit(&args); diff --git a/tools/shell/src/backup/tnettest.c b/tools/shell/src/tnettest.c similarity index 85% rename from tools/shell/src/backup/tnettest.c rename to tools/shell/src/tnettest.c index ee32bfb6be..d0b5e5f25c 100644 --- a/tools/shell/src/backup/tnettest.c +++ b/tools/shell/src/tnettest.c @@ -14,18 +14,20 @@ */ #define _DEFAULT_SOURCE +#define ALLOW_FORBID_FUNC #include "os.h" #include "taosdef.h" #include "tmsg.h" #include "taoserror.h" #include "tlog.h" #include "tglobal.h" -#include "tsocket.h" #include "trpc.h" #include "rpcHead.h" #include "tchecksum.h" #include "syncMsg.h" +#include "osSocket.h" + #define MAX_PKG_LEN (64 * 1000) #define MAX_SPEED_PKG_LEN (1024 * 1024 * 1024) #define MIN_SPEED_PKG_LEN 1024 @@ -33,7 +35,7 @@ #define MIN_SPEED_PKG_NUM 1 #define BUFFER_SIZE (MAX_PKG_LEN + 1024) -extern int32_t tsRpcMaxUdpSize; +extern int tsRpcMaxUdpSize; typedef struct { char * hostFqdn; @@ -71,15 +73,23 @@ static void *taosNetBindUdpPort(void *sarg) { return NULL; } - if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(serverSocket); + return NULL; + } + pSocket->fd = serverSocket; + pSocket->refId = 0; + + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { uError("failed to set the send buffer size for UDP socket\n"); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } - if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { uError("failed to set the receive buffer size for UDP socket\n"); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } @@ -98,13 +108,13 @@ static void *taosNetBindUdpPort(void *sarg) { uInfo("UDP: recv:%d bytes from %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); if (iDataNum > 0) { - iDataNum = taosSendto(serverSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size); + iDataNum = taosSendto(pSocket, buffer, iDataNum, 0, (struct sockaddr *)&clientAddr, (int32_t)sin_size); } uInfo("UDP: send:%d bytes to %s at %d", iDataNum, taosInetNtoa(clientAddr.sin_addr), port); } - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } @@ -132,25 +142,35 @@ static void *taosNetBindTcpPort(void *sarg) { server_addr.sin_addr.s_addr = htonl(INADDR_ANY); int32_t reuse = 1; - if (taosSetSockOpt(serverSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(serverSocket); + return NULL; + } + pSocket->fd = serverSocket; + pSocket->refId = 0; + + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { uError("failed to bind TCP port:%d since %s", port, strerror(errno)); + taosCloseSocket(&pSocket); return NULL; } - if (taosKeepTcpAlive(serverSocket) < 0) { + if (taosKeepTcpAlive(pSocket) < 0) { uError("failed to set tcp server keep-alive option since %s", strerror(errno)); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } if (listen(serverSocket, 10) < 0) { uError("failed to listen TCP port:%d since %s", port, strerror(errno)); + taosCloseSocket(&pSocket); return NULL; } @@ -163,26 +183,26 @@ static void *taosNetBindTcpPort(void *sarg) { continue; } - int32_t ret = taosReadMsg(client, buffer, pinfo->pktLen); + int32_t ret = taosReadMsg(pSocket, buffer, pinfo->pktLen); if (ret < 0 || ret != pinfo->pktLen) { uError("TCP: failed to read %d bytes at port:%d since %s", pinfo->pktLen, port, strerror(errno)); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } uInfo("TCP: read:%d bytes from %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port); - ret = taosWriteMsg(client, buffer, pinfo->pktLen); + ret = taosWriteMsg(pSocket, buffer, pinfo->pktLen); if (ret < 0) { uError("TCP: failed to write %d bytes at %d since %s", pinfo->pktLen, port, strerror(errno)); - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } uInfo("TCP: write:%d bytes to %s at %d", pinfo->pktLen, taosInetNtoa(clientAddr.sin_addr), port); } - taosCloseSocket(serverSocket); + taosCloseSocket(&pSocket); return NULL; } @@ -196,9 +216,17 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { } int32_t reuse = 1; - if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(clientSocket); + return -1; + } + pSocket->fd = clientSocket; + pSocket->refId = 0; + + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(clientSocket); + taosCloseSocket(&pSocket); return -1; } @@ -210,27 +238,30 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { uError("TCP: failed to connect port %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } - taosKeepTcpAlive(clientSocket); + taosKeepTcpAlive(pSocket); sprintf(buffer, "client send TCP pkg to %s:%d, content: 1122334455", taosIpStr(info->hostIp), info->port); sprintf(buffer + info->pktLen - 16, "1122334455667788"); - int32_t ret = taosWriteMsg(clientSocket, buffer, info->pktLen); + int32_t ret = taosWriteMsg(pSocket, buffer, info->pktLen); if (ret < 0) { uError("TCP: failed to write msg to %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } - ret = taosReadMsg(clientSocket, buffer, info->pktLen); + ret = taosReadMsg(pSocket, buffer, info->pktLen); if (ret < 0) { uError("TCP: failed to read msg from %s:%d since %s", taosIpStr(info->hostIp), info->port, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } - taosCloseSocket(clientSocket); + taosCloseSocket(&pSocket); return 0; } @@ -247,13 +278,23 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { return -1; } - if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + TdSocketPtr pSocket = (TdSocketPtr)malloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(clientSocket); + return -1; + } + pSocket->fd = clientSocket; + pSocket->refId = 0; + + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { uError("failed to set the send buffer size for UDP socket\n"); + taosCloseSocket(&pSocket); return -1; } - if (taosSetSockOpt(clientSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { uError("failed to set the receive buffer size for UDP socket\n"); + taosCloseSocket(&pSocket); return -1; } @@ -268,9 +309,10 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); - iDataNum = taosSendto(clientSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); + iDataNum = taosSendto(pSocket, buffer, info->pktLen, 0, (struct sockaddr *)&serverAddr, (int32_t)sin_size); if (iDataNum < 0 || iDataNum != info->pktLen) { uError("UDP: failed to perform sendto func since %s", strerror(errno)); + taosCloseSocket(&pSocket); return -1; } @@ -280,10 +322,11 @@ static int32_t taosNetCheckUdpPort(STestInfo *info) { if (iDataNum < 0 || iDataNum != info->pktLen) { uError("UDP: received ack:%d bytes(expect:%d) from port:%d since %s", iDataNum, info->pktLen, info->port, strerror(errno)); + taosCloseSocket(&pSocket); return -1; } - taosCloseSocket(clientSocket); + taosCloseSocket(&pSocket); return 0; } @@ -339,7 +382,7 @@ void *taosNetInitRpc(char *secretEncrypt, char spi) { } static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) { - SRpcEpSet epSet; + SEpSet epSet; SRpcMsg reqMsg; SRpcMsg rspMsg; void * pRpcConn; @@ -352,11 +395,10 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p return TSDB_CODE_RPC_NETWORK_UNAVAIL; } - memset(&epSet, 0, sizeof(SRpcEpSet)); - epSet.inUse = 0; + memset(&epSet, 0, sizeof(SEpSet)); + strcpy(epSet.eps[0].fqdn, serverFqdn); + epSet.eps[0].port = port; epSet.numOfEps = 1; - epSet.port[0] = port; - strcpy(epSet.fqdn[0], serverFqdn); reqMsg.msgType = TDMT_DND_NETWORK_TEST; reqMsg.pCont = rpcMallocCont(pktLen); @@ -425,8 +467,8 @@ static void taosNetCheckSync(char *host, int32_t port) { return; } - SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0); - if (connFd < 0) { + TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, (uint16_t)port, 0); + if (pSocket == NULL) { uError("failed to create socket while test port:%d since %s", port, strerror(errno)); return; } @@ -443,17 +485,17 @@ static void taosNetCheckSync(char *host, int32_t port) { pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead)); - if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + if (taosWriteMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { uError("failed to test port:%d while send msg since %s", port, strerror(errno)); return; } - if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { + if (taosReadMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { uError("failed to test port:%d while recv msg since %s", port, strerror(errno)); } uInfo("successed to test TCP port:%d", port); - taosCloseSocket(connFd); + taosCloseSocket(&pSocket); } static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { @@ -494,7 +536,6 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) { } taosNetCheckSync(host, startPort + TSDB_PORT_SYNC); - taosNetCheckSync(host, startPort + TSDB_PORT_ARBITRATOR); } static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) { @@ -578,7 +619,7 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen, } tsCompressMsgSize = -1; - SRpcEpSet epSet; + SEpSet epSet; SRpcMsg reqMsg; SRpcMsg rspMsg; void * pRpcConn; @@ -596,11 +637,10 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen, for (int32_t i = 1; i <= pkgNum; i++) { uint64_t startTime = taosGetTimestampUs(); - memset(&epSet, 0, sizeof(SRpcEpSet)); - epSet.inUse = 0; + memset(&epSet, 0, sizeof(SEpSet)); + strcpy(epSet.eps[0].fqdn, host); + epSet.eps[0].port = port; epSet.numOfEps = 1; - epSet.port[0] = port; - strcpy(epSet.fqdn[0], host); reqMsg.msgType = TDMT_DND_NETWORK_TEST; reqMsg.pCont = rpcMallocCont(pkgLen); @@ -641,7 +681,7 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen, void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType) { - tscEmbedded = 1; + tsLogEmbedded = 1; if (host == NULL) host = tsLocalFqdn; if (port == 0) port = tsServerPort; if (0 == strcmp("speed", role)){ @@ -659,14 +699,14 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, } else if (0 == strcmp("server", role)) { taosNetTestServer(host, port, pkgLen); } else if (0 == strcmp("rpc", role)) { - tscEmbedded = 0; + tsLogEmbedded = 0; taosNetTestRpc(host, port, pkgLen); } else if (0 == strcmp("sync", role)) { taosNetCheckSync(host, port); } else if (0 == strcmp("startup", role)) { taosNetTestStartup(host, port); } else if (0 == strcmp("speed", role)) { - tscEmbedded = 0; + tsLogEmbedded = 0; char type[10] = {0}; taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType)); }else if (0 == strcmp("fqdn", role)) { @@ -675,5 +715,5 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, taosNetTestStartup(host, port); } - tscEmbedded = 0; + tsLogEmbedded = 0; }