commit
0ef402dcf3
|
@ -47,7 +47,7 @@ static struct argp_option options[] = {
|
||||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||||
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
|
||||||
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
{"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."},
|
||||||
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup."},
|
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."},
|
||||||
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
|
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
|
||||||
{0}};
|
{0}};
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ void printHelp() {
|
||||||
printf("%s%s\n", indent, "-t");
|
printf("%s%s\n", indent, "-t");
|
||||||
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
|
printf("%s%s%s\n", indent, indent, "Time zone of the shell, default is local.");
|
||||||
printf("%s%s\n", indent, "-n");
|
printf("%s%s\n", indent, "-n");
|
||||||
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup.");
|
printf("%s%s%s\n", indent, indent, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync.");
|
||||||
printf("%s%s\n", indent, "-l");
|
printf("%s%s\n", indent, "-l");
|
||||||
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
|
printf("%s%s%s\n", indent, indent, "Packet length used for net test, default is 1000 bytes.");
|
||||||
printf("%s%s\n", indent, "-V");
|
printf("%s%s\n", indent, "-V");
|
||||||
|
|
|
@ -37,7 +37,8 @@ typedef enum {
|
||||||
TAOS_SMSG_SETUP_RSP = 12,
|
TAOS_SMSG_SETUP_RSP = 12,
|
||||||
TAOS_SMSG_SYNC_FILE = 13,
|
TAOS_SMSG_SYNC_FILE = 13,
|
||||||
TAOS_SMSG_SYNC_FILE_RSP = 14,
|
TAOS_SMSG_SYNC_FILE_RSP = 14,
|
||||||
TAOS_SMSG_END = 15,
|
TAOS_SMSG_TEST = 15,
|
||||||
|
TAOS_SMSG_END = 16
|
||||||
} ESyncMsgType;
|
} ESyncMsgType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -132,6 +133,7 @@ void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
|
||||||
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
|
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
|
||||||
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
|
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
|
||||||
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
|
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
|
||||||
|
void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId);
|
||||||
|
|
||||||
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
|
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
|
||||||
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
|
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncTcp.h"
|
#include "syncTcp.h"
|
||||||
|
|
||||||
|
extern void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd);
|
||||||
static void arbSignalHandler(int32_t signum, void *sigInfo, void *context);
|
static void arbSignalHandler(int32_t signum, void *sigInfo, void *context);
|
||||||
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
|
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
|
||||||
static void arbProcessBrokenLink(int64_t rid);
|
static void arbProcessBrokenLink(int64_t rid);
|
||||||
|
@ -118,6 +119,11 @@ static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (msg.head.type == TAOS_SMSG_TEST) {
|
||||||
|
syncProcessTestMsg(&msg, connFd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SNodeConn *pNode = calloc(sizeof(SNodeConn), 1);
|
SNodeConn *pNode = calloc(sizeof(SNodeConn), 1);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
sError("failed to allocate memory since %s", strerror(errno));
|
sError("failed to allocate memory since %s", strerror(errno));
|
||||||
|
|
|
@ -1179,6 +1179,20 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncProcessTestMsg(SSyncMsg *pMsg, SOCKET connFd) {
|
||||||
|
sInfo("recv sync test msg");
|
||||||
|
|
||||||
|
SSyncMsg rsp;
|
||||||
|
syncBuildSyncTestMsg(&rsp, -1);
|
||||||
|
if (taosWriteMsg(connFd, &rsp, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
|
||||||
|
sInfo("failed to send sync test rsp since %s", strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
sInfo("send sync test rsp");
|
||||||
|
taosMsleep(1000);
|
||||||
|
taosCloseSocket(connFd);
|
||||||
|
}
|
||||||
|
|
||||||
static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
|
static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
|
||||||
char ipstr[24];
|
char ipstr[24];
|
||||||
int32_t i;
|
int32_t i;
|
||||||
|
@ -1200,6 +1214,11 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (msg.head.type == TAOS_SMSG_TEST) {
|
||||||
|
syncProcessTestMsg(&msg, connFd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vgId = msg.head.vgId;
|
int32_t vgId = msg.head.vgId;
|
||||||
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
|
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
|
||||||
if (ppNode == NULL || *ppNode == NULL) {
|
if (ppNode == NULL || *ppNode == NULL) {
|
||||||
|
|
|
@ -86,6 +86,7 @@ static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
|
||||||
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
|
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
|
||||||
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
|
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
|
||||||
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
|
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
|
||||||
|
void syncBuildSyncTestMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_TEST); }
|
||||||
|
|
||||||
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
|
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
|
||||||
pMsg->head.type = TAOS_SMSG_STATUS;
|
pMsg->head.type = TAOS_SMSG_STATUS;
|
||||||
|
|
|
@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
|
||||||
PROJECT(TDengine)
|
PROJECT(TDengine)
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
ADD_LIBRARY(tutil ${SRC})
|
ADD_LIBRARY(tutil ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
|
TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4 z)
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "rpcHead.h"
|
#include "rpcHead.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "syncMsg.h"
|
||||||
|
|
||||||
#define MAX_PKG_LEN (64 * 1000)
|
#define MAX_PKG_LEN (64 * 1000)
|
||||||
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
|
#define BUFFER_SIZE (MAX_PKG_LEN + 1024)
|
||||||
|
@ -408,13 +410,51 @@ static void taosNetTestStartup(char *host, int32_t port) {
|
||||||
free(pStep);
|
free(pStep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void taosNetCheckSync(char *host, int32_t port) {
|
||||||
|
uint32_t ip = taosGetIpv4FromFqdn(host);
|
||||||
|
if (ip == 0xffffffff) {
|
||||||
|
uError("failed to get IP address from %s since %s", host, strerror(errno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOCKET connFd = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
|
||||||
|
if (connFd < 0) {
|
||||||
|
uError("failed to create socket while test port:%d since %s", port, strerror(errno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncMsg msg;
|
||||||
|
memset(&msg, 0, sizeof(SSyncMsg));
|
||||||
|
SSyncHead *pHead = &msg.head;
|
||||||
|
pHead->type = TAOS_SMSG_TEST;
|
||||||
|
pHead->protocol = SYNC_PROTOCOL_VERSION;
|
||||||
|
pHead->signature = SYNC_SIGNATURE;
|
||||||
|
pHead->code = 0;
|
||||||
|
pHead->cId = 0;
|
||||||
|
pHead->vgId = -1;
|
||||||
|
pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
|
||||||
|
|
||||||
|
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
|
||||||
|
uError("failed to test port:%d while send msg since %s", port, strerror(errno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
|
||||||
|
uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
uInfo("successed to test TCP port:%d", port);
|
||||||
|
taosCloseSocket(connFd);
|
||||||
|
}
|
||||||
|
|
||||||
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
||||||
int32_t endPort = startPort + 9;
|
int32_t endPort = startPort + TSDB_PORT_SYNC;
|
||||||
char spi = 0;
|
char spi = 0;
|
||||||
|
|
||||||
uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
|
uInfo("check rpc, host:%s startPort:%d endPort:%d pkgLen:%d\n", host, startPort, endPort, pkgLen);
|
||||||
|
|
||||||
for (uint16_t port = startPort; port <= endPort; port++) {
|
for (uint16_t port = startPort; port < endPort; port++) {
|
||||||
int32_t sendpkgLen;
|
int32_t sendpkgLen;
|
||||||
if (pkgLen <= tsRpcMaxUdpSize) {
|
if (pkgLen <= tsRpcMaxUdpSize) {
|
||||||
sendpkgLen = tsRpcMaxUdpSize + 1000;
|
sendpkgLen = tsRpcMaxUdpSize + 1000;
|
||||||
|
@ -442,6 +482,9 @@ static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
||||||
uInfo("successed to test UDP port:%d", port);
|
uInfo("successed to test UDP port:%d", port);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosNetCheckSync(host, startPort + TSDB_PORT_SYNC);
|
||||||
|
taosNetCheckSync(host, startPort + TSDB_PORT_ARBITRATOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
|
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
|
||||||
|
@ -508,6 +551,8 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
|
||||||
taosNetTestServer(host, port, pkgLen);
|
taosNetTestServer(host, port, pkgLen);
|
||||||
} else if (0 == strcmp("rpc", role)) {
|
} else if (0 == strcmp("rpc", role)) {
|
||||||
taosNetTestRpc(host, port, pkgLen);
|
taosNetTestRpc(host, port, pkgLen);
|
||||||
|
} else if (0 == strcmp("sync", role)) {
|
||||||
|
taosNetCheckSync(host, port);
|
||||||
} else if (0 == strcmp("startup", role)) {
|
} else if (0 == strcmp("startup", role)) {
|
||||||
taosNetTestStartup(host, port);
|
taosNetTestStartup(host, port);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue