From 315c9c37dc4251093c58b5e2903aad74937d80ca Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 5 May 2022 23:14:36 +0800 Subject: [PATCH 1/4] enh(rpc): taosd exited when fqdn is configed to invalid --- include/libs/transport/trpc.h | 1 + include/os/osSocket.h | 1 + source/dnode/mgmt/implement/src/dmTransport.c | 12 +++--- source/libs/transport/src/trans.c | 13 +++++- source/libs/transport/src/transSrv.c | 5 ++- source/os/src/osSocket.c | 42 +++++++++++++++++++ 6 files changed, 67 insertions(+), 7 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0e7d486eab..a7d1522d12 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -68,6 +68,7 @@ typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, cha typedef bool (*RpcRfp)(int32_t code); typedef struct SRpcInit { + char localFqdn[TSDB_FQDN_LEN]; uint16_t localPort; // local port char * label; // for debug purpose int numOfThreads; // number of threads to handle connections diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 62c3771669..213a6930ee 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -161,6 +161,7 @@ int taosCreateSocketWithTimeOutOpt(uint32_t conn_timeout_sec); TdSocketPtr taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); TdSocketPtr taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +bool taosValidIpAndPort(uint32_t ip, uint16_t port); TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port); int32_t taosKeepTcpAlive(TdSocketPtr pSocket); TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen); diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 446894556e..114d7b6dfc 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "dmImp.h" -#define INTERNAL_USER "_dnd" -#define INTERNAL_CKEY "_key" +#define INTERNAL_USER "_dnd" +#define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { @@ -130,10 +130,10 @@ _OVER: } static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; + SDnodeTrans * pTrans = &pDnode->trans; tmsg_t msgType = pMsg->msgType; bool isReq = msgType & 1u; - SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; + SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMgmtWrapper *pWrapper = pHandle->pNdWrapper; if (msgType == TDMT_DND_SERVER_STATUS) { @@ -517,7 +517,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SAuthReq authReq = {0}; tstrncpy(authReq.user, user, TSDB_USER_LEN); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); - void *pReq = rpcMallocCont(contLen); + void * pReq = rpcMallocCont(contLen); tSerializeSAuthReq(pReq, contLen, &authReq); SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; @@ -547,6 +547,8 @@ static int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; + + strncpy(rpcInit.localFqdn, pDnode->data.localFqdn, strlen(pDnode->data.localFqdn)); rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f776fb3764..f8277c575e 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -46,9 +46,20 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } + uint32_t ip = 0; + if (pInit->connType == TAOS_CONN_SERVER) { + ip = taosGetIpv4FromFqdn(pInit->localFqdn); + if (ip == 0xFFFFFFFF) { + tError("invalid fqdn: %s", pInit->localFqdn); + taosMemoryFree(pRpc); + return NULL; + } + } + pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; - pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->tcphandle = + (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); if (pRpc->tcphandle == NULL) { taosMemoryFree(pRpc); return NULL; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7378ca3241..e1b0871135 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -817,7 +817,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - uv_os_sock_t fds[2]; if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { goto End; @@ -841,6 +840,10 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } + if (false == taosValidIpAndPort(srv->ip, srv->port)) { + tError("failed to bind, reason: %s", strerror(errno)); + goto End; + } if (false == addHandleToAcceptloop(srv)) { goto End; } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 6aa8520082..8cac660039 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -638,6 +638,48 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { return 0; } +bool taosValidIpAndPort(uint32_t ip, uint16_t port) { + struct sockaddr_in serverAdd; + SocketFd fd; + int32_t reuse; + + // printf("open tcp server socket:0x%x:%hu", ip, port); + + bzero((char *)&serverAdd, sizeof(serverAdd)); + serverAdd.sin_family = AF_INET; + serverAdd.sin_addr.s_addr = ip; + serverAdd.sin_port = (uint16_t)htons(port); + + if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { + // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck1(fd); + return false; + } + + TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket)); + if (pSocket == NULL) { + taosCloseSocketNoCheck1(fd); + return false; + } + pSocket->refId = 0; + pSocket->fd = fd; + + /* set REUSEADDR option, so the portnumber can be re-used */ + reuse = 1; + if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + // printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(&pSocket); + return NULL; + } + /* bind socket to server address */ + if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { + // printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(&pSocket); + return false; + } + taosCloseSocket(&pSocket); + return true; +} TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; SocketFd fd; From f2eca15fe2f716fc8ea3a8086252db5d97c00105 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 6 May 2022 14:27:49 +0800 Subject: [PATCH 2/4] enh(rpc): validate fqdn --- tools/shell/src/shellNettest.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index c8ec31c48b..dfdb4951ad 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -21,7 +21,7 @@ static void shellWorkAsClient() { SRpcInit rpcInit = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcRsp = {0}; - void *clientRpc = NULL; + void * clientRpc = NULL; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); @@ -111,11 +111,16 @@ void shellNettestHandler(int32_t signum, void *sigInfo, void *context) { shellEx static void shellWorkAsServer() { SShellArgs *pArgs = &shell.args; + // char fqdn[TSDB_FQDN_LEN] = {0}; + /// tstrncpy(fqdn, pArgs->host, TSDB_FQDN_LEN); + // strtok(fqdn, ":"); + if (pArgs->port == 0) { pArgs->port = tsServerPort; } SRpcInit rpcInit = {0}; + memcpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); rpcInit.localPort = pArgs->port; rpcInit.label = "CHK"; rpcInit.numOfThreads = tsNumOfRpcThreads; From 78c617a9f1a4d800b591593dc0cb37447fcad2a6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 6 May 2022 14:30:07 +0800 Subject: [PATCH 3/4] enh(rpc): validate fqdn --- tools/shell/src/shellNettest.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index dfdb4951ad..9b68beb4e1 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -111,10 +111,6 @@ void shellNettestHandler(int32_t signum, void *sigInfo, void *context) { shellEx static void shellWorkAsServer() { SShellArgs *pArgs = &shell.args; - // char fqdn[TSDB_FQDN_LEN] = {0}; - /// tstrncpy(fqdn, pArgs->host, TSDB_FQDN_LEN); - // strtok(fqdn, ":"); - if (pArgs->port == 0) { pArgs->port = tsServerPort; } From 699c8a0461a89b1e343290500f51c2b9d1f8b103 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 6 May 2022 14:42:06 +0800 Subject: [PATCH 4/4] enh(rpc): validate fqdn --- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transSrv.c | 2 +- tools/shell/src/shellNettest.c | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f8277c575e..846cf6f967 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -51,6 +51,7 @@ void* rpcOpen(const SRpcInit* pInit) { ip = taosGetIpv4FromFqdn(pInit->localFqdn); if (ip == 0xFFFFFFFF) { tError("invalid fqdn: %s", pInit->localFqdn); + terrno = TSDB_CODE_RPC_FQDN_ERROR; taosMemoryFree(pRpc); return NULL; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index e1b0871135..ad3f520210 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -841,7 +841,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } } if (false == taosValidIpAndPort(srv->ip, srv->port)) { - tError("failed to bind, reason: %s", strerror(errno)); + tError("failed to bind, reason: %s", terrstr()); goto End; } if (false == addHandleToAcceptloop(srv)) { diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 9b68beb4e1..345b85d896 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -127,7 +127,7 @@ static void shellWorkAsServer() { void *serverRpc = rpcOpen(&rpcInit); if (serverRpc == NULL) { - printf("failed to init net test server since %s", terrstr()); + printf("failed to init net test server since %s\n", terrstr()); } else { printf("network test server is initialized, port:%u\n", pArgs->port); taosSetSignal(SIGTERM, shellNettestHandler);