homework-jianmu/source/libs/transport/src/rpcUdp.c

261 lines
7.5 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "rpcUdp.h"
#include "os.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "taosdef.h"
#include "taoserror.h"
#include "ttimer.h"
#include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
#define RPC_MAX_UDP_SIZE 65480
typedef struct {
int index;
TdSocketPtr pSocket;
uint16_t port; // peer port
uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
TdThread thread;
void *hash;
void *shandle; // handle passed by upper layer during server initialization
void *pSet;
void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data
} SUdpConn;
typedef struct {
int index;
int server;
uint32_t ip; // local IP
uint16_t port; // local Port
void * shandle; // handle passed by upper layer during server initialization
int threads;
char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[];
} SUdpConnSet;
static void *taosRecvUdpData(void *param);
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConn * pConn;
SUdpConnSet *pSet;
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
pSet = (SUdpConnSet *)taosMemoryMalloc((size_t)size);
if (pSet == NULL) {
tError("%s failed to allocate UdpConn", label);
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
memset(pSet, 0, (size_t)size);
pSet->ip = ip;
pSet->port = port;
pSet->shandle = shandle;
pSet->fp = fp;
pSet->threads = threads;
tstrncpy(pSet->label, label, sizeof(pSet->label));
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
int i;
uint16_t ownPort;
for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i;
ownPort = (port ? port + i : 0);
pConn->pSocket = taosOpenUdpSocket(ip, ownPort);
if (pConn->pSocket == NULL) {
tError("%s failed to open UDP socket %x:%hu", label, ip, port);
break;
}
pConn->buffer = taosMemoryMalloc(RPC_MAX_UDP_SIZE);
if (NULL == pConn->buffer) {
tError("%s failed to malloc recv buffer", label);
break;
}
struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin);
if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
}
tstrncpy(pConn->label, label, sizeof(pConn->label));
pConn->shandle = shandle;
pConn->processData = fp;
pConn->index = i;
pConn->pSet = pSet;
int code = taosThreadCreate(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
if (code != 0) {
tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
break;
}
}
taosThreadAttrDestroy(&thAttr);
if (i != threads) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpUdpConnection(pSet);
return NULL;
}
tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
return pSet;
}
void taosStopUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn;
if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket);
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
pConn->pSocket = NULL;
}
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (taosCheckPthreadValid(pConn->thread)) {
taosThreadJoin(pConn->thread, NULL);
}
taosMemoryFreeClear(pConn->buffer);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
}
tDebug("%s UDP is stopped", pSet->label);
}
void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn * pConn;
if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i;
if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
}
tDebug("%s UDP is cleaned up", pSet->label);
taosMemoryFreeClear(pSet);
}
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SUdpConnSet *pSet = (SUdpConnSet *)shandle;
pSet->index = (pSet->index + 1) % pSet->threads;
SUdpConn *pConn = pSet->udpConn + pSet->index;
pConn->port = port;
tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort);
return pConn;
}
static void *taosRecvUdpData(void *param) {
SUdpConn * pConn = param;
struct sockaddr_in sourceAdd;
ssize_t dataLen;
unsigned int addLen;
uint16_t port;
SRecvInfo recvInfo;
memset(&sourceAdd, 0, sizeof(sourceAdd));
addLen = sizeof(sourceAdd);
tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
char *msg = pConn->buffer;
setThreadName("recvUdpData");
while (1) {
dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if (dataLen <= 0) {
tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen);
// for windows usage, remote shutdown also returns - 1 in windows client
if (pConn->pSocket == NULL) {
break;
} else {
continue;
}
}
port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
continue;
}
int32_t size = dataLen + tsRpcOverhead;
char * tmsg = taosMemoryMalloc(size);
if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue;
} else {
tTrace("UDP malloc mem:%p size:%d", tmsg, size);
}
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
memcpy(tmsg, msg, dataLen);
recvInfo.msg = tmsg;
recvInfo.msgLen = dataLen;
recvInfo.ip = sourceAdd.sin_addr.s_addr;
recvInfo.port = port;
recvInfo.shandle = pConn->shandle;
recvInfo.thandle = NULL;
recvInfo.chandle = pConn;
recvInfo.connType = 0;
(*(pConn->processData))(&recvInfo);
}
return NULL;
}
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
SUdpConn *pConn = (SUdpConn *)chandle;
if (pConn == NULL) return -1;
struct sockaddr_in destAdd;
memset(&destAdd, 0, sizeof(destAdd));
destAdd.sin_family = AF_INET;
destAdd.sin_addr.s_addr = ip;
destAdd.sin_port = htons(port);
int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
return ret;
}
#endif