feat: check server status
This commit is contained in:
parent
7273581072
commit
9e130aef6d
|
@ -315,6 +315,16 @@ DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const
|
||||||
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_SRV_STATUS_UNAVAILABLE = 0,
|
||||||
|
TSDB_SRV_STATUS_NETWORK_OK = 1,
|
||||||
|
TSDB_SRV_STATUS_SERVICE_OK = 2,
|
||||||
|
TSDB_SRV_STATUS_SERVICE_DEGRADED = 3,
|
||||||
|
TSDB_SRV_STATUS_EXTING = 4,
|
||||||
|
} TSDB_SERVER_STATUS;
|
||||||
|
|
||||||
|
DLL_EXPORT TSDB_SERVER_STATUS taos_check_server_status(const char *fqdn, int port, char *details, int maxlen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1071,10 +1071,14 @@ int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
|
||||||
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
|
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t finished;
|
int32_t statusCode;
|
||||||
char name[TSDB_STEP_NAME_LEN];
|
int32_t detailLen;
|
||||||
char desc[TSDB_STEP_DESC_LEN];
|
char* details;
|
||||||
} SStartupReq;
|
} SServerStatusRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp);
|
||||||
|
int32_t tDeserializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp);
|
||||||
|
void tFreeSServerStatusRsp(SServerStatusRsp* pRsp);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The layout of the query message payload is as following:
|
* The layout of the query message payload is as following:
|
||||||
|
|
|
@ -87,7 +87,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "dnode-server-status", NULL, NULL)
|
||||||
|
|
||||||
// Requests handled by MNODE
|
// Requests handled by MNODE
|
||||||
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
||||||
|
|
|
@ -815,3 +815,62 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
|
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
|
||||||
convertUcs4);
|
convertUcs4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
|
||||||
|
TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
|
||||||
|
void* clientRpc = NULL;
|
||||||
|
SServerStatusRsp statusRsp = {0};
|
||||||
|
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
|
||||||
|
SRpcMsg rpcMsg = {.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
SRpcInit rpcInit = {0};
|
||||||
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
|
||||||
|
taosEncryptPass_c((uint8_t*)("_pwd"), strlen("_pwd"), pass);
|
||||||
|
rpcInit.label = "CHK";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = NULL;
|
||||||
|
rpcInit.sessions = 16;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.user = "_dnd";
|
||||||
|
rpcInit.ckey = "_key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.secret = pass;
|
||||||
|
|
||||||
|
clientRpc = rpcOpen(&rpcInit);
|
||||||
|
if (clientRpc == NULL) {
|
||||||
|
tscError("failed to init server status client");
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
|
||||||
|
epSet.eps[0].port = (uint16_t)port;
|
||||||
|
rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
|
||||||
|
|
||||||
|
if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
|
||||||
|
tscError("failed to send server status req since %s", terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
|
||||||
|
tscError("failed to parse server status rsp since %s", terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = statusRsp.statusCode;
|
||||||
|
if (details != NULL) {
|
||||||
|
tstrncpy(details, statusRsp.details, maxlen);
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (clientRpc != NULL) {
|
||||||
|
rpcClose(clientRpc);
|
||||||
|
}
|
||||||
|
if (rpcRsp.pCont != NULL) {
|
||||||
|
rpcFreeCont(rpcRsp.pCont);
|
||||||
|
}
|
||||||
|
tFreeSServerStatusRsp(&statusRsp);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -3097,6 +3097,47 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp *pRsp) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->statusCode) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRsp->detailLen) < 0) return -1;
|
||||||
|
if (pRsp->detailLen > 0) {
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->details) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp *pRsp) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->statusCode) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pRsp->detailLen) < 0) return -1;
|
||||||
|
if (pRsp->detailLen > 0) {
|
||||||
|
pRsp->details = taosMemoryCalloc(1, pRsp->detailLen);
|
||||||
|
if (pRsp->details == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->details) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSServerStatusRsp(SServerStatusRsp *pRsp) { taosMemoryFree(pRsp->details); }
|
||||||
|
|
||||||
int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) {
|
int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) {
|
||||||
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
||||||
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
||||||
|
|
|
@ -256,7 +256,7 @@ static int32_t dmStartNodes(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("TDengine initialized successfully");
|
dInfo("TDengine initialized successfully");
|
||||||
dmReportStartup(pDnode, "TDengine", "initialized successfully");
|
dmReportStartup(pDnode, "TDengine", "initialized successfully", true);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -125,9 +125,9 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||||
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
||||||
|
|
||||||
if (msgType == TDMT_DND_NETWORK_TEST) {
|
if (msgType == TDMT_DND_SERVER_STATUS) {
|
||||||
dTrace("network test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
|
dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
|
||||||
dmProcessStartupReq(pDnode, pMsg);
|
dmProcessServerStatusReq(pDnode, pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -132,12 +132,18 @@ typedef struct {
|
||||||
uint16_t serverPort;
|
uint16_t serverPort;
|
||||||
} SDnodeData;
|
} SDnodeData;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
bool finished;
|
||||||
|
char name[TSDB_STEP_NAME_LEN];
|
||||||
|
char desc[TSDB_STEP_DESC_LEN];
|
||||||
|
} SStartupInfo;
|
||||||
|
|
||||||
typedef struct SDnode {
|
typedef struct SDnode {
|
||||||
EDndProcType ptype;
|
EDndProcType ptype;
|
||||||
EDndNodeType ntype;
|
EDndNodeType ntype;
|
||||||
EDndRunStatus status;
|
EDndRunStatus status;
|
||||||
EDndEvent event;
|
EDndEvent event;
|
||||||
SStartupReq startup;
|
SStartupInfo startup;
|
||||||
SDnodeTrans trans;
|
SDnodeTrans trans;
|
||||||
SDnodeData data;
|
SDnodeData data;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
|
|
|
@ -34,8 +34,8 @@ const char *dmEventName(EDndEvent ev);
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stat);
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus stat);
|
||||||
void dmSetEvent(SDnode *pDnode, EDndEvent event);
|
void dmSetEvent(SDnode *pDnode, EDndEvent event);
|
||||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
||||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
|
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc, bool finished);
|
||||||
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
|
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
|
void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
|
||||||
|
|
||||||
// dmFile.c
|
// dmFile.c
|
||||||
|
|
|
@ -136,27 +136,68 @@ void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||||
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc, bool finished) {
|
||||||
SStartupReq *pStartup = &pDnode->startup;
|
SStartupInfo *pStartup = &pDnode->startup;
|
||||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||||
pStartup->finished = 0;
|
pStartup->finished = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
static void dmGetStartup(SDnode *pDnode, SStartupInfo *pStartup) {
|
||||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
memcpy(pStartup, &pDnode->startup, sizeof(SStartupInfo));
|
||||||
pStartup->finished = (pDnode->status == DND_STAT_RUNNING);
|
pStartup->finished = (pDnode->status == DND_STAT_RUNNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
|
||||||
dDebug("startup req is received");
|
if (pDnode->status == DND_STAT_INIT) {
|
||||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
|
||||||
dmGetStartup(pDnode, pStartup);
|
} else if (pDnode->status != DND_STAT_STOPPED) {
|
||||||
|
pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
|
||||||
|
} else {
|
||||||
|
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
if (pStatus->statusCode == TSDB_SRV_STATUS_NETWORK_OK) {
|
||||||
SRpcMsg rpcRsp = {
|
SStartupInfo *pStartup = &pDnode->startup;
|
||||||
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
|
|
||||||
rpcSendResponse(&rpcRsp);
|
int32_t len = strlen(pStartup->name) + strlen(pStartup->desc) + 24;
|
||||||
|
pStatus->details = taosMemoryCalloc(1, len);
|
||||||
|
if (pStatus->details != NULL) {
|
||||||
|
pStatus->detailLen = snprintf(pStatus->details, len - 1, "%s: %s", pStartup->name, pStartup->desc) + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStatus->statusCode == TSDB_SRV_STATUS_SERVICE_OK) {
|
||||||
|
// check the status of mnode and vnode
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
|
dDebug("server status req is received");
|
||||||
|
|
||||||
|
SServerStatusRsp statusRsp = {0};
|
||||||
|
dmGetServerStatus(pDnode, &statusRsp);
|
||||||
|
|
||||||
|
SRpcMsg rspMsg = {.handle = pReq->handle, .handle = pReq->ahandle};
|
||||||
|
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
|
||||||
|
if (rspLen < 0) {
|
||||||
|
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
|
if (pRsp == NULL) {
|
||||||
|
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
|
||||||
|
rspMsg.pCont = pRsp;
|
||||||
|
rspMsg.contLen = rspLen;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
rpcSendResponse(&rspMsg);
|
||||||
|
tFreeSServerStatusRsp(&statusRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
|
void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
|
||||||
|
|
|
@ -130,7 +130,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
|
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pCfg->vgId,
|
||||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||||
dmReportStartup(pDnode, "open-vnodes", stepDesc);
|
dmReportStartup(pDnode, "open-vnodes", stepDesc, false);
|
||||||
|
|
||||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
|
|
|
@ -356,187 +356,6 @@ static void taosNetCheckPort(uint32_t hostIp, int32_t startPort, int32_t endPort
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *taosNetInitRpc(char *secretEncrypt, char spi) {
|
|
||||||
SRpcInit rpcInit;
|
|
||||||
void * pRpcConn = NULL;
|
|
||||||
|
|
||||||
char user[] = "nettestinternal";
|
|
||||||
char pass[] = "nettestinternal";
|
|
||||||
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
|
|
||||||
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
|
||||||
rpcInit.localPort = 0;
|
|
||||||
rpcInit.label = "NT";
|
|
||||||
rpcInit.numOfThreads = 1; // every DB connection has only one thread
|
|
||||||
rpcInit.cfp = NULL;
|
|
||||||
rpcInit.sessions = 16;
|
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
|
||||||
rpcInit.user = user;
|
|
||||||
rpcInit.idleTime = 2000;
|
|
||||||
rpcInit.ckey = "key";
|
|
||||||
rpcInit.spi = spi;
|
|
||||||
rpcInit.secret = secretEncrypt;
|
|
||||||
|
|
||||||
pRpcConn = rpcOpen(&rpcInit);
|
|
||||||
return pRpcConn;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) {
|
|
||||||
SEpSet epSet;
|
|
||||||
SRpcMsg reqMsg;
|
|
||||||
SRpcMsg rspMsg;
|
|
||||||
void * pRpcConn;
|
|
||||||
|
|
||||||
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
|
|
||||||
|
|
||||||
pRpcConn = taosNetInitRpc(secretEncrypt, spi);
|
|
||||||
if (NULL == pRpcConn) {
|
|
||||||
uError("failed to init client rpc");
|
|
||||||
return TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(&epSet, 0, sizeof(SEpSet));
|
|
||||||
strcpy(epSet.eps[0].fqdn, serverFqdn);
|
|
||||||
epSet.eps[0].port = port;
|
|
||||||
epSet.numOfEps = 1;
|
|
||||||
|
|
||||||
reqMsg.msgType = TDMT_DND_NETWORK_TEST;
|
|
||||||
reqMsg.pCont = rpcMallocCont(pktLen);
|
|
||||||
reqMsg.contLen = pktLen;
|
|
||||||
reqMsg.code = 0;
|
|
||||||
reqMsg.handle = NULL; // rpc handle returned to app
|
|
||||||
reqMsg.ahandle = NULL; // app handle set by client
|
|
||||||
strcpy(reqMsg.pCont, "dnode-nettest");
|
|
||||||
|
|
||||||
rpcSendRecv(pRpcConn, &epSet, &reqMsg, &rspMsg);
|
|
||||||
|
|
||||||
if ((rspMsg.code != 0) || (rspMsg.msgType != TDMT_DND_NETWORK_TEST + 1)) {
|
|
||||||
uDebug("ret code 0x%x %s", rspMsg.code, tstrerror(rspMsg.code));
|
|
||||||
return rspMsg.code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupReq)) {
|
|
||||||
memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
|
|
||||||
code = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcFreeCont(rspMsg.pCont);
|
|
||||||
rpcClose(pRpcConn);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t taosNetParseStartup(SStartupReq *pCont) {
|
|
||||||
SStartupReq *pStep = pCont;
|
|
||||||
uInfo("step:%s desc:%s", pStep->name, pStep->desc);
|
|
||||||
|
|
||||||
if (pStep->finished) {
|
|
||||||
uInfo("check startup finished");
|
|
||||||
}
|
|
||||||
|
|
||||||
return pStep->finished ? 0 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosNetTestStartup(char *host, int32_t port) {
|
|
||||||
uInfo("check startup, host:%s port:%d\n", host, port);
|
|
||||||
|
|
||||||
SStartupReq *pStep = taosMemoryMalloc(sizeof(SStartupReq));
|
|
||||||
while (1) {
|
|
||||||
int32_t code = taosNetCheckRpc(host, port, 20, 0, pStep);
|
|
||||||
if (code > 0) {
|
|
||||||
code = taosNetParseStartup(pStep);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code > 0) {
|
|
||||||
uDebug("continue check startup step");
|
|
||||||
} else {
|
|
||||||
if (code < 0) {
|
|
||||||
uError("failed to check startup step, code:0x%x %s", code, tstrerror(code));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pStep);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosNetCheckSync(char *host, int32_t port) {
|
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(host);
|
|
||||||
if (ip == 0xffffffff) {
|
|
||||||
uError("failed to get IP address from %s since %s", host, strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr pSocket = taosOpenTcpClientSocket(ip, (uint16_t)port, 0);
|
|
||||||
if (pSocket == NULL) {
|
|
||||||
uError("failed to create socket while test port:%d since %s", port, strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSyncMsg msg;
|
|
||||||
memset(&msg, 0, sizeof(SSyncMsg));
|
|
||||||
SSyncHead *pHead = &msg.head;
|
|
||||||
pHead->type = TAOS_SMSG_TEST;
|
|
||||||
pHead->protocol = SYNC_PROTOCOL_VERSION;
|
|
||||||
pHead->signature = SYNC_SIGNATURE;
|
|
||||||
pHead->code = 0;
|
|
||||||
pHead->cId = 0;
|
|
||||||
pHead->vgId = -1;
|
|
||||||
pHead->len = sizeof(SSyncMsg) - sizeof(SSyncHead);
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
|
|
||||||
|
|
||||||
if (taosWriteMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
|
|
||||||
uError("failed to test port:%d while send msg since %s", port, strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosReadMsg(pSocket, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
|
|
||||||
uError("failed to test port:%d while recv msg since %s", port, strerror(errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
uInfo("successed to test TCP port:%d", port);
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosNetTestRpc(char *host, int32_t startPort, int32_t pkgLen) {
|
|
||||||
char spi = 0;
|
|
||||||
|
|
||||||
uInfo("check rpc, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
|
|
||||||
|
|
||||||
uint16_t port = startPort;
|
|
||||||
int32_t sendpkgLen;
|
|
||||||
if (pkgLen <= tsRpcMaxUdpSize) {
|
|
||||||
sendpkgLen = tsRpcMaxUdpSize + 1000;
|
|
||||||
} else {
|
|
||||||
sendpkgLen = pkgLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsRpcForceTcp = 1;
|
|
||||||
int32_t ret = taosNetCheckRpc(host, port, sendpkgLen, spi, NULL);
|
|
||||||
if (ret < 0) {
|
|
||||||
printf("failed to test TCP port:%d\n", port);
|
|
||||||
} else {
|
|
||||||
printf("successed to test TCP port:%d\n", port);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pkgLen >= tsRpcMaxUdpSize) {
|
|
||||||
sendpkgLen = tsRpcMaxUdpSize - 1000;
|
|
||||||
} else {
|
|
||||||
sendpkgLen = pkgLen;
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
tsRpcForceTcp = 0;
|
|
||||||
ret = taosNetCheckRpc(host, port, pkgLen, spi, NULL);
|
|
||||||
if (ret < 0) {
|
|
||||||
printf("failed to test UDP port:%d\n", port);
|
|
||||||
} else {
|
|
||||||
printf("successed to test UDP port:%d\n", port);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
taosNetCheckSync(host, startPort);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
|
static void taosNetTestClient(char *host, int32_t startPort, int32_t pkgLen) {
|
||||||
uInfo("work as client, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
|
uInfo("work as client, host:%s Port:%d pkgLen:%d\n", host, startPort, pkgLen);
|
||||||
|
|
||||||
|
@ -586,22 +405,10 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosNetTestFqdn(char *host) {
|
|
||||||
int code = 0;
|
|
||||||
uint64_t startTime = taosGetTimestampUs();
|
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(host);
|
|
||||||
if (ip == 0xffffffff) {
|
|
||||||
uError("failed to get IP address from %s since %s", host, strerror(errno));
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
uint64_t endTime = taosGetTimestampUs();
|
|
||||||
uint64_t el = endTime - startTime;
|
|
||||||
printf("check convert fqdn spend, status: %d\tcost: %" PRIu64 " us\n", code, el);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
|
static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
|
||||||
int32_t pkgNum, char *pkgType) {
|
int32_t pkgNum, char *pkgType) {
|
||||||
|
#if 0
|
||||||
|
|
||||||
// record config
|
// record config
|
||||||
int32_t compressTmp = tsCompressMsgSize;
|
int32_t compressTmp = tsCompressMsgSize;
|
||||||
int32_t maxUdpSize = tsRpcMaxUdpSize;
|
int32_t maxUdpSize = tsRpcMaxUdpSize;
|
||||||
|
@ -674,10 +481,10 @@ static void taosNetCheckSpeed(char *host, int32_t port, int32_t pkgLen,
|
||||||
tsRpcMaxUdpSize = maxUdpSize;
|
tsRpcMaxUdpSize = maxUdpSize;
|
||||||
tsRpcForceTcp = forceTcp;
|
tsRpcForceTcp = forceTcp;
|
||||||
return;
|
return;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
|
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType) {
|
||||||
int32_t pkgNum, char *pkgType) {
|
|
||||||
tsLogEmbedded = 1;
|
tsLogEmbedded = 1;
|
||||||
if (host == NULL) host = tsLocalFqdn;
|
if (host == NULL) host = tsLocalFqdn;
|
||||||
if (port == 0) port = tsServerPort;
|
if (port == 0) port = tsServerPort;
|
||||||
|
@ -695,21 +502,12 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
|
||||||
taosNetTestClient(host, port, pkgLen);
|
taosNetTestClient(host, port, pkgLen);
|
||||||
} else if (0 == strcmp("server", role)) {
|
} else if (0 == strcmp("server", role)) {
|
||||||
taosNetTestServer(host, port, pkgLen);
|
taosNetTestServer(host, port, pkgLen);
|
||||||
} else if (0 == strcmp("rpc", role)) {
|
|
||||||
tsLogEmbedded = 0;
|
|
||||||
taosNetTestRpc(host, port, pkgLen);
|
|
||||||
} else if (0 == strcmp("sync", role)) {
|
|
||||||
taosNetCheckSync(host, port);
|
|
||||||
} else if (0 == strcmp("startup", role)) {
|
|
||||||
taosNetTestStartup(host, port);
|
|
||||||
} else if (0 == strcmp("speed", role)) {
|
} else if (0 == strcmp("speed", role)) {
|
||||||
tsLogEmbedded = 0;
|
tsLogEmbedded = 0;
|
||||||
char type[10] = {0};
|
char type[10] = {0};
|
||||||
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
|
taosNetCheckSpeed(host, port, pkgLen, pkgNum, strtolower(type, pkgType));
|
||||||
}else if (0 == strcmp("fqdn", role)) {
|
|
||||||
taosNetTestFqdn(host);
|
|
||||||
} else {
|
} else {
|
||||||
taosNetTestStartup(host, port);
|
TASSERT(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsLogEmbedded = 0;
|
tsLogEmbedded = 0;
|
||||||
|
|
Loading…
Reference in New Issue