Merge pull request #12141 from taosdata/fix/invalid-fqdn
enh(rpc): taosd exited when fqdn is configed to invalid
This commit is contained in:
commit
fe66836ca3
|
@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha
|
|||
typedef bool (*RpcRfp)(int32_t code);
|
||||
|
||||
typedef struct SRpcInit {
|
||||
char localFqdn[TSDB_FQDN_LEN];
|
||||
uint16_t localPort; // local port
|
||||
char * label; // for debug purpose
|
||||
int numOfThreads; // number of threads to handle connections
|
||||
|
|
|
@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec);
|
|||
|
||||
TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
||||
TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
||||
bool taosValidIpAndPort(uint32_t ip, uint16_t port);
|
||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
||||
int32_t taosKeepTcpAlive(TdSocketPtr pSocket);
|
||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen);
|
||||
|
|
|
@ -16,8 +16,8 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmImp.h"
|
||||
|
||||
#define INTERNAL_USER "_dnd"
|
||||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_USER "_dnd"
|
||||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_SECRET "_pwd"
|
||||
|
||||
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
||||
|
@ -130,10 +130,10 @@ _OVER:
|
|||
}
|
||||
|
||||
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
SDnodeTrans * pTrans = &pDnode->trans;
|
||||
tmsg_t msgType = pMsg->msgType;
|
||||
bool isReq = msgType & 1u;
|
||||
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
||||
|
||||
if (msgType == TDMT_DND_SERVER_STATUS) {
|
||||
|
@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
|
|||
SAuthReq authReq = {0};
|
||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||
void *pReq = rpcMallocCont(contLen);
|
||||
void * pReq = rpcMallocCont(contLen);
|
||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||
|
||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
|
||||
|
@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) {
|
|||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
|
||||
SRpcInit rpcInit = {0};
|
||||
|
||||
strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn));
|
||||
rpcInit.localPort = pDnode->data.serverPort;
|
||||
rpcInit.label = "DND";
|
||||
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||
|
|
|
@ -46,9 +46,21 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
}
|
||||
|
||||
uint32_t ip = 0;
|
||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||
ip = taosGetIpv4FromFqdn(pInit->localFqdn);
|
||||
if (ip == 0xFFFFFFFF) {
|
||||
tError("invalid fqdn: %s", pInit->localFqdn);
|
||||
terrno = TSDB_CODE_RPC_FQDN_ERROR;
|
||||
taosMemoryFree(pRpc);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
pRpc->connType = pInit->connType;
|
||||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
pRpc->tcphandle =
|
||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
if (pRpc->tcphandle == NULL) {
|
||||
taosMemoryFree(pRpc);
|
||||
return NULL;
|
||||
|
|
|
@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
|
||||
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||
|
||||
|
||||
uv_os_sock_t fds[2];
|
||||
if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||
goto End;
|
||||
|
@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
|||
goto End;
|
||||
}
|
||||
}
|
||||
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
|
||||
tError("failed to bind, reason: %s", terrstr());
|
||||
goto End;
|
||||
}
|
||||
if (false == addHandleToAcceptloop(srv)) {
|
||||
goto End;
|
||||
}
|
||||
|
|
|
@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
||||
struct sockaddr_in serverAdd;
|
||||
SocketFd fd;
|
||||
int32_t reuse;
|
||||
|
||||
// printf("open tcp server socket:0x%x:%hu", ip, port);
|
||||
|
||||
bzero((char *)&serverAdd, sizeof(serverAdd));
|
||||
serverAdd.sin_family = AF_INET;
|
||||
serverAdd.sin_addr.s_addr = ip;
|
||||
serverAdd.sin_port = (uint16_t)htons(port);
|
||||
|
||||
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
|
||||
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
|
||||
taosCloseSocketNoCheck1(fd);
|
||||
return false;
|
||||
}
|
||||
|
||||
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
|
||||
if (pSocket == NULL) {
|
||||
taosCloseSocketNoCheck1(fd);
|
||||
return false;
|
||||
}
|
||||
pSocket->refId = 0;
|
||||
pSocket->fd = fd;
|
||||
|
||||
/* set REUSEADDR option, so the portnumber can be re-used */
|
||||
reuse = 1;
|
||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
||||
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
||||
taosCloseSocket(&pSocket);
|
||||
return NULL;
|
||||
}
|
||||
/* bind socket to server address */
|
||||
if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
|
||||
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
|
||||
taosCloseSocket(&pSocket);
|
||||
return false;
|
||||
}
|
||||
taosCloseSocket(&pSocket);
|
||||
return true;
|
||||
}
|
||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
||||
struct sockaddr_in serverAdd;
|
||||
SocketFd fd;
|
||||
|
|
|
@ -21,7 +21,7 @@ static void shellWorkAsClient() {
|
|||
SRpcInit rpcInit = {0};
|
||||
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
|
||||
SRpcMsg rpcRsp = {0};
|
||||
void *clientRpc = NULL;
|
||||
void * clientRpc = NULL;
|
||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
|
||||
taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass);
|
||||
|
@ -116,6 +116,7 @@ static void shellWorkAsServer() {
|
|||
}
|
||||
|
||||
SRpcInit rpcInit = {0};
|
||||
memcpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
|
||||
rpcInit.localPort = pArgs->port;
|
||||
rpcInit.label = "CHK";
|
||||
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||
|
@ -126,7 +127,7 @@ static void shellWorkAsServer() {
|
|||
|
||||
void *serverRpc = rpcOpen(&rpcInit);
|
||||
if (serverRpc == NULL) {
|
||||
printf("failed to init net test server since %s", terrstr());
|
||||
printf("failed to init net test server since %s\n", terrstr());
|
||||
} else {
|
||||
printf("network test server is initialized, port:%u\n", pArgs->port);
|
||||
taosSetSignal(SIGTERM, shellNettestHandler);
|
||||
|
|
Loading…
Reference in New Issue