Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
998ad42399
|
@ -2169,7 +2169,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
|
|||
tscMgmtIpSet.inUse = 0;
|
||||
|
||||
if (first && first[0] != 0) {
|
||||
if (strlen(first) >= TSDB_FQDN_LEN) {
|
||||
if (strlen(first) >= TSDB_EP_LEN) {
|
||||
terrno = TSDB_CODE_INVALID_FQDN;
|
||||
return -1;
|
||||
}
|
||||
|
@ -2178,7 +2178,7 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
|
|||
}
|
||||
|
||||
if (second && second[0] != 0) {
|
||||
if (strlen(second) >= TSDB_FQDN_LEN) {
|
||||
if (strlen(second) >= TSDB_EP_LEN) {
|
||||
terrno = TSDB_CODE_INVALID_FQDN;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -61,10 +61,10 @@ int32_t tscEmbedded = 0;
|
|||
*/
|
||||
int64_t tsMsPerDay[] = {86400000L, 86400000000L};
|
||||
|
||||
char tsFirst[TSDB_FQDN_LEN] = {0};
|
||||
char tsSecond[TSDB_FQDN_LEN] = {0};
|
||||
char tsArbitrator[TSDB_FQDN_LEN] = {0};
|
||||
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
|
||||
char tsFirst[TSDB_EP_LEN] = {0};
|
||||
char tsSecond[TSDB_EP_LEN] = {0};
|
||||
char tsArbitrator[TSDB_EP_LEN] = {0};
|
||||
char tsLocalEp[TSDB_EP_LEN] = {0}; // Local End Point, hostname:port
|
||||
uint16_t tsServerPort = 6030;
|
||||
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
|
||||
uint16_t tsDnodeDnodePort = 6035; // udp/tcp
|
||||
|
@ -284,7 +284,7 @@ static void doInitGlobalConfig() {
|
|||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = TSDB_FQDN_LEN;
|
||||
cfg.ptrLength = TSDB_EP_LEN;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
|
@ -294,7 +294,7 @@ static void doInitGlobalConfig() {
|
|||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = TSDB_FQDN_LEN;
|
||||
cfg.ptrLength = TSDB_EP_LEN;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
|
@ -356,7 +356,7 @@ static void doInitGlobalConfig() {
|
|||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
|
||||
cfg.minValue = 0;
|
||||
cfg.maxValue = 0;
|
||||
cfg.ptrLength = TSDB_FQDN_LEN;
|
||||
cfg.ptrLength = TSDB_EP_LEN;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
|
|
|
@ -411,7 +411,7 @@ static bool dnodeReadMnodeInfos() {
|
|||
dError("failed to read mnode mgmtIpList.json, nodeName not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
|
||||
strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
|
||||
}
|
||||
|
||||
ret = true;
|
||||
|
|
|
@ -218,7 +218,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_LOCALE_LEN 64
|
||||
#define TSDB_TIMEZONE_LEN 64
|
||||
|
||||
#define TSDB_FQDN_LEN 256
|
||||
#define TSDB_FQDN_LEN 128
|
||||
#define TSDB_EP_LEN (TSDB_FQDN_LEN+6)
|
||||
#define TSDB_IPv4ADDR_LEN 16
|
||||
#define TSDB_FILENAME_LEN 128
|
||||
#define TSDB_METER_VNODE_BITS 20
|
||||
|
|
|
@ -530,7 +530,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t nodeId;
|
||||
char nodeEp[TSDB_FQDN_LEN];
|
||||
char nodeEp[TSDB_EP_LEN];
|
||||
} SDMMnodeInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -542,7 +542,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
uint32_t version;
|
||||
int32_t dnodeId;
|
||||
char dnodeEp[TSDB_FQDN_LEN];
|
||||
char dnodeEp[TSDB_EP_LEN];
|
||||
uint32_t moduleStatus;
|
||||
uint32_t lastReboot; // time stamp for last reboot
|
||||
uint16_t numOfTotalVnodes; // from config file
|
||||
|
@ -584,7 +584,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t nodeId;
|
||||
char nodeEp[TSDB_FQDN_LEN];
|
||||
char nodeEp[TSDB_EP_LEN];
|
||||
} SMDVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
|
@ -669,7 +669,7 @@ typedef struct SCMShowRsp {
|
|||
} SCMShowRsp;
|
||||
|
||||
typedef struct {
|
||||
char ep[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
||||
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -684,7 +684,7 @@ typedef struct {
|
|||
} SDMConfigVnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
char ep[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||
char ep[TSDB_EP_LEN]; // end point, hostname:port
|
||||
char config[64];
|
||||
} SMDCfgDnodeMsg, SCMCfgDnodeMsg;
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ typedef struct SDnodeObj {
|
|||
int32_t dnodeId;
|
||||
uint16_t dnodePort;
|
||||
char dnodeFqdn[TSDB_FQDN_LEN + 1];
|
||||
char dnodeEp[TSDB_FQDN_LEN + 1];
|
||||
char dnodeEp[TSDB_EP_LEN + 1];
|
||||
int64_t createdTime;
|
||||
uint32_t lastAccess;
|
||||
int32_t openVnodes;
|
||||
|
|
|
@ -68,7 +68,7 @@ typedef enum {
|
|||
typedef struct {
|
||||
void * conn;
|
||||
void * timer;
|
||||
char ep[TSDB_FQDN_LEN];
|
||||
char ep[TSDB_EP_LEN];
|
||||
int8_t cmdIndex;
|
||||
int8_t state;
|
||||
char sql[SQL_LENGTH];
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _rpc_hash_ip_header_
|
||||
#define _rpc_hash_ip_header_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
void *rpcOpenIpHash(int maxSessions);
|
||||
void rpcCloseIpHash(void *handle);
|
||||
void *rpcAddIpHash(void *handle, void *pData, uint32_t ip, uint16_t port);
|
||||
void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port);
|
||||
void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -1,167 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tmempool.h"
|
||||
#include "rpcLog.h"
|
||||
|
||||
typedef struct SIpHash {
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
int hash;
|
||||
struct SIpHash *prev;
|
||||
struct SIpHash *next;
|
||||
void *data;
|
||||
} SIpHash;
|
||||
|
||||
typedef struct {
|
||||
SIpHash **ipHashList;
|
||||
mpool_h ipHashMemPool;
|
||||
int maxSessions;
|
||||
} SHashObj;
|
||||
|
||||
int rpcHashIp(void *handle, uint32_t ip, uint16_t port) {
|
||||
SHashObj *pObj = (SHashObj *)handle;
|
||||
int hash = 0;
|
||||
|
||||
hash = (int)(ip >> 16);
|
||||
hash += (unsigned short)(ip & 0xFFFF);
|
||||
hash += port;
|
||||
|
||||
hash = hash % pObj->maxSessions;
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
void *rpcAddIpHash(void *handle, void *data, uint32_t ip, uint16_t port) {
|
||||
int hash;
|
||||
SIpHash *pNode;
|
||||
SHashObj *pObj;
|
||||
|
||||
pObj = (SHashObj *)handle;
|
||||
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
|
||||
|
||||
hash = rpcHashIp(pObj, ip, port);
|
||||
pNode = (SIpHash *)taosMemPoolMalloc(pObj->ipHashMemPool);
|
||||
pNode->ip = ip;
|
||||
pNode->port = port;
|
||||
pNode->data = data;
|
||||
pNode->prev = 0;
|
||||
pNode->next = pObj->ipHashList[hash];
|
||||
pNode->hash = hash;
|
||||
|
||||
if (pObj->ipHashList[hash] != 0) (pObj->ipHashList[hash])->prev = pNode;
|
||||
pObj->ipHashList[hash] = pNode;
|
||||
|
||||
return pObj;
|
||||
}
|
||||
|
||||
void rpcDeleteIpHash(void *handle, uint32_t ip, uint16_t port) {
|
||||
int hash;
|
||||
SIpHash *pNode;
|
||||
SHashObj *pObj;
|
||||
|
||||
pObj = (SHashObj *)handle;
|
||||
if (pObj == NULL || pObj->maxSessions == 0) return;
|
||||
|
||||
hash = rpcHashIp(pObj, ip, port);
|
||||
|
||||
pNode = pObj->ipHashList[hash];
|
||||
while (pNode) {
|
||||
if (pNode->ip == ip && pNode->port == port) break;
|
||||
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode) {
|
||||
if (pNode->prev) {
|
||||
pNode->prev->next = pNode->next;
|
||||
} else {
|
||||
pObj->ipHashList[hash] = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode->next) {
|
||||
pNode->next->prev = pNode->prev;
|
||||
}
|
||||
|
||||
taosMemPoolFree(pObj->ipHashMemPool, (char *)pNode);
|
||||
}
|
||||
}
|
||||
|
||||
void *rpcGetIpHash(void *handle, uint32_t ip, uint16_t port) {
|
||||
int hash;
|
||||
SIpHash *pNode;
|
||||
SHashObj *pObj;
|
||||
|
||||
pObj = (SHashObj *)handle;
|
||||
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
|
||||
|
||||
hash = rpcHashIp(pObj, ip, port);
|
||||
pNode = pObj->ipHashList[hash];
|
||||
|
||||
while (pNode) {
|
||||
if (pNode->ip == ip && pNode->port == port) {
|
||||
break;
|
||||
}
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
if (pNode) {
|
||||
return pNode->data;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *rpcOpenIpHash(int maxSessions) {
|
||||
SIpHash **ipHashList;
|
||||
mpool_h ipHashMemPool;
|
||||
SHashObj *pObj;
|
||||
|
||||
ipHashMemPool = taosMemPoolInit(maxSessions, sizeof(SIpHash));
|
||||
if (ipHashMemPool == 0) return NULL;
|
||||
|
||||
ipHashList = calloc(sizeof(SIpHash *), (size_t)maxSessions);
|
||||
if (ipHashList == 0) {
|
||||
taosMemPoolCleanUp(ipHashMemPool);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pObj = malloc(sizeof(SHashObj));
|
||||
if (pObj == NULL) {
|
||||
taosMemPoolCleanUp(ipHashMemPool);
|
||||
free(ipHashList);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pObj->maxSessions = maxSessions;
|
||||
pObj->ipHashMemPool = ipHashMemPool;
|
||||
pObj->ipHashList = ipHashList;
|
||||
|
||||
return pObj;
|
||||
}
|
||||
|
||||
void rpcCloseIpHash(void *handle) {
|
||||
SHashObj *pObj;
|
||||
|
||||
pObj = (SHashObj *)handle;
|
||||
if (pObj == NULL || pObj->maxSessions == 0) return;
|
||||
|
||||
if (pObj->ipHashMemPool) taosMemPoolCleanUp(pObj->ipHashMemPool);
|
||||
|
||||
if (pObj->ipHashList) free(pObj->ipHashList);
|
||||
|
||||
memset(pObj, 0, sizeof(SHashObj));
|
||||
free(pObj);
|
||||
}
|
|
@ -19,7 +19,6 @@
|
|||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "rpcLog.h"
|
||||
#include "rpcHaship.h"
|
||||
#include "rpcUdp.h"
|
||||
#include "rpcHead.h"
|
||||
|
||||
|
@ -28,8 +27,6 @@
|
|||
#define RPC_UDP_BUF_TIME 5 // mseconds
|
||||
#define RPC_MAX_UDP_SIZE 65480
|
||||
|
||||
int tsUdpDelay = 0;
|
||||
|
||||
typedef struct {
|
||||
void *signature;
|
||||
int index;
|
||||
|
@ -38,8 +35,6 @@ typedef struct {
|
|||
uint16_t localPort; // local port
|
||||
char label[12]; // copy from udpConnSet;
|
||||
pthread_t thread;
|
||||
pthread_mutex_t mutex;
|
||||
void *tmrCtrl; // copy from UdpConnSet;
|
||||
void *hash;
|
||||
void *shandle; // handle passed by upper layer during server initialization
|
||||
void *pSet;
|
||||
|
@ -55,26 +50,11 @@ typedef struct {
|
|||
void *shandle; // handle passed by upper layer during server initialization
|
||||
int threads;
|
||||
char label[12];
|
||||
void *tmrCtrl;
|
||||
void *(*fp)(SRecvInfo *pPacket);
|
||||
SUdpConn udpConn[];
|
||||
} SUdpConnSet;
|
||||
|
||||
typedef struct {
|
||||
void *signature;
|
||||
uint32_t ip; // dest IP
|
||||
uint16_t port; // dest Port
|
||||
SUdpConn *pConn;
|
||||
struct sockaddr_in destAdd;
|
||||
void *msgHdr;
|
||||
int totalLen;
|
||||
void *timer;
|
||||
int emptyNum;
|
||||
} SUdpBuf;
|
||||
|
||||
static void *taosRecvUdpData(void *param);
|
||||
static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port);
|
||||
static void taosProcessUdpBufTimer(void *param, void *tmrId);
|
||||
|
||||
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
|
||||
SUdpConn *pConn;
|
||||
|
@ -94,16 +74,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
|||
pSet->fp = fp;
|
||||
strcpy(pSet->label, label);
|
||||
|
||||
if ( tsUdpDelay ) {
|
||||
char udplabel[12];
|
||||
sprintf(udplabel, "%s.b", label);
|
||||
pSet->tmrCtrl = taosTmrInit(RPC_MAX_UDP_CONNS * threads, 5, 5000, udplabel);
|
||||
if (pSet->tmrCtrl == NULL) {
|
||||
tError("%s failed to initialize tmrCtrl") taosCleanUpUdpConnection(pSet);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
uint16_t ownPort;
|
||||
for (int i = 0; i < threads; ++i) {
|
||||
pConn = pSet->udpConn + i;
|
||||
|
@ -135,11 +105,6 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
|||
pConn->index = i;
|
||||
pConn->pSet = pSet;
|
||||
pConn->signature = pConn;
|
||||
if (tsUdpDelay) {
|
||||
pConn->hash = rpcOpenIpHash(RPC_MAX_UDP_CONNS);
|
||||
pthread_mutex_init(&pConn->mutex, NULL);
|
||||
pConn->tmrCtrl = pSet->tmrCtrl;
|
||||
}
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
|
@ -173,10 +138,6 @@ void taosCleanUpUdpConnection(void *handle) {
|
|||
free(pConn->buffer);
|
||||
pthread_cancel(pConn->thread);
|
||||
taosCloseSocket(pConn->fd);
|
||||
if (pConn->hash) {
|
||||
rpcCloseIpHash(pConn->hash);
|
||||
pthread_mutex_destroy(&pConn->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pSet->threads; ++i) {
|
||||
|
@ -185,7 +146,6 @@ void taosCleanUpUdpConnection(void *handle) {
|
|||
tTrace("chandle:%p is closed", pConn);
|
||||
}
|
||||
|
||||
taosTmrCleanUp(pSet->tmrCtrl);
|
||||
tfree(pSet);
|
||||
}
|
||||
|
||||
|
@ -205,64 +165,42 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
|
|||
static void *taosRecvUdpData(void *param) {
|
||||
SUdpConn *pConn = param;
|
||||
struct sockaddr_in sourceAdd;
|
||||
int dataLen;
|
||||
ssize_t dataLen;
|
||||
unsigned int addLen;
|
||||
uint16_t port;
|
||||
int minSize = sizeof(SRpcHead);
|
||||
SRecvInfo recvInfo;
|
||||
|
||||
memset(&sourceAdd, 0, sizeof(sourceAdd));
|
||||
addLen = sizeof(sourceAdd);
|
||||
tTrace("%s UDP thread is created, index:%d", pConn->label, pConn->index);
|
||||
char *msg = pConn->buffer;
|
||||
|
||||
while (1) {
|
||||
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
|
||||
port = ntohs(sourceAdd.sin_port);
|
||||
//tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
|
||||
|
||||
if (dataLen < sizeof(SRpcHead)) {
|
||||
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
int processedLen = 0, leftLen = 0;
|
||||
int msgLen = 0;
|
||||
int count = 0;
|
||||
char *msg = pConn->buffer;
|
||||
while (processedLen < dataLen) {
|
||||
leftLen = dataLen - processedLen;
|
||||
SRpcHead *pHead = (SRpcHead *)msg;
|
||||
msgLen = htonl((uint32_t)pHead->msgLen);
|
||||
if (leftLen < minSize || msgLen > leftLen || msgLen < minSize) {
|
||||
tError("%s msg is messed up, dataLen:%d processedLen:%d count:%d msgLen:%d", pConn->label, dataLen,
|
||||
processedLen, count, msgLen);
|
||||
break;
|
||||
}
|
||||
|
||||
char *tmsg = malloc((size_t)msgLen + tsRpcOverhead);
|
||||
if (NULL == tmsg) {
|
||||
tError("%s failed to allocate memory, size:%d", pConn->label, msgLen);
|
||||
break;
|
||||
}
|
||||
|
||||
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
|
||||
memcpy(tmsg, msg, (size_t)msgLen);
|
||||
recvInfo.msg = tmsg;
|
||||
recvInfo.msgLen = msgLen;
|
||||
recvInfo.ip = sourceAdd.sin_addr.s_addr;
|
||||
recvInfo.port = port;
|
||||
recvInfo.shandle = pConn->shandle;
|
||||
recvInfo.thandle = NULL;
|
||||
recvInfo.chandle = pConn;
|
||||
recvInfo.connType = 0;
|
||||
(*(pConn->processData))(&recvInfo);
|
||||
|
||||
processedLen += msgLen;
|
||||
msg += msgLen;
|
||||
count++;
|
||||
char *tmsg = malloc(dataLen + tsRpcOverhead);
|
||||
if (NULL == tmsg) {
|
||||
tError("%s failed to allocate memory, size:%d", pConn->label, dataLen);
|
||||
continue;
|
||||
}
|
||||
|
||||
// tTrace("%s %d UDP packets are received together", pConn->label, count);
|
||||
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
|
||||
memcpy(tmsg, msg, dataLen);
|
||||
recvInfo.msg = tmsg;
|
||||
recvInfo.msgLen = dataLen;
|
||||
recvInfo.ip = sourceAdd.sin_addr.s_addr;
|
||||
recvInfo.port = port;
|
||||
recvInfo.shandle = pConn->shandle;
|
||||
recvInfo.thandle = NULL;
|
||||
recvInfo.chandle = pConn;
|
||||
recvInfo.connType = 0;
|
||||
(*(pConn->processData))(&recvInfo);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -270,141 +208,17 @@ static void *taosRecvUdpData(void *param) {
|
|||
|
||||
int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
|
||||
SUdpConn *pConn = (SUdpConn *)chandle;
|
||||
SUdpBuf *pBuf;
|
||||
|
||||
if (pConn == NULL || pConn->signature != pConn) return -1;
|
||||
|
||||
if (pConn->hash == NULL) {
|
||||
struct sockaddr_in destAdd;
|
||||
memset(&destAdd, 0, sizeof(destAdd));
|
||||
destAdd.sin_family = AF_INET;
|
||||
destAdd.sin_addr.s_addr = ip;
|
||||
destAdd.sin_port = htons(port);
|
||||
struct sockaddr_in destAdd;
|
||||
memset(&destAdd, 0, sizeof(destAdd));
|
||||
destAdd.sin_family = AF_INET;
|
||||
destAdd.sin_addr.s_addr = ip;
|
||||
destAdd.sin_port = htons(port);
|
||||
|
||||
//tTrace("%s msg is sent to 0x%x:%hu len:%d ret:%d localPort:%hu chandle:0x%x", pConn->label, destAdd.sin_addr.s_addr,
|
||||
// port, dataLen, ret, pConn->localPort, chandle);
|
||||
int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
|
||||
int ret = (int)sendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pConn->mutex);
|
||||
|
||||
pBuf = (SUdpBuf *)rpcGetIpHash(pConn->hash, ip, port);
|
||||
if (pBuf == NULL) {
|
||||
pBuf = taosCreateUdpBuf(pConn, ip, port);
|
||||
rpcAddIpHash(pConn->hash, pBuf, ip, port);
|
||||
}
|
||||
|
||||
if ((pBuf->totalLen + dataLen > RPC_MAX_UDP_SIZE) || (taosMsgHdrSize(pBuf->msgHdr) >= RPC_MAX_UDP_PKTS)) {
|
||||
taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
|
||||
|
||||
taosSendMsgHdr(pBuf->msgHdr, pConn->fd);
|
||||
pBuf->totalLen = 0;
|
||||
}
|
||||
|
||||
taosSetMsgHdrData(pBuf->msgHdr, data, dataLen);
|
||||
|
||||
pBuf->totalLen += dataLen;
|
||||
|
||||
pthread_mutex_unlock(&pConn->mutex);
|
||||
|
||||
return dataLen;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void taosFreeMsgHdr(void *hdr) {
|
||||
struct msghdr *msgHdr = (struct msghdr *)hdr;
|
||||
free(msgHdr->msg_iov);
|
||||
}
|
||||
|
||||
int taosMsgHdrSize(void *hdr) {
|
||||
struct msghdr *msgHdr = (struct msghdr *)hdr;
|
||||
return (int)msgHdr->msg_iovlen;
|
||||
}
|
||||
|
||||
void taosSendMsgHdr(void *hdr, int fd) {
|
||||
struct msghdr *msgHdr = (struct msghdr *)hdr;
|
||||
sendmsg(fd, msgHdr, 0);
|
||||
msgHdr->msg_iovlen = 0;
|
||||
}
|
||||
|
||||
void taosInitMsgHdr(void **hdr, void *dest, int maxPkts) {
|
||||
struct msghdr *msgHdr = (struct msghdr *)malloc(sizeof(struct msghdr));
|
||||
memset(msgHdr, 0, sizeof(struct msghdr));
|
||||
*hdr = msgHdr;
|
||||
struct sockaddr_in *destAdd = (struct sockaddr_in *)dest;
|
||||
|
||||
msgHdr->msg_name = destAdd;
|
||||
msgHdr->msg_namelen = sizeof(struct sockaddr_in);
|
||||
int size = (int)sizeof(struct iovec) * maxPkts;
|
||||
msgHdr->msg_iov = (struct iovec *)malloc((size_t)size);
|
||||
memset(msgHdr->msg_iov, 0, (size_t)size);
|
||||
}
|
||||
|
||||
void taosSetMsgHdrData(void *hdr, char *data, int dataLen) {
|
||||
struct msghdr *msgHdr = (struct msghdr *)hdr;
|
||||
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_base = data;
|
||||
msgHdr->msg_iov[msgHdr->msg_iovlen].iov_len = (size_t)dataLen;
|
||||
msgHdr->msg_iovlen++;
|
||||
}
|
||||
|
||||
void taosRemoveUdpBuf(SUdpBuf *pBuf) {
|
||||
taosTmrStopA(&pBuf->timer);
|
||||
rpcDeleteIpHash(pBuf->pConn->hash, pBuf->ip, pBuf->port);
|
||||
|
||||
// tTrace("%s UDP buffer to:0x%lld:%d is removed", pBuf->pConn->label,
|
||||
// pBuf->ip, pBuf->port);
|
||||
|
||||
pBuf->signature = NULL;
|
||||
taosFreeMsgHdr(pBuf->msgHdr);
|
||||
free(pBuf);
|
||||
}
|
||||
|
||||
void taosProcessUdpBufTimer(void *param, void *tmrId) {
|
||||
SUdpBuf *pBuf = (SUdpBuf *)param;
|
||||
if (pBuf->signature != param) return;
|
||||
if (pBuf->timer != tmrId) return;
|
||||
|
||||
SUdpConn *pConn = pBuf->pConn;
|
||||
|
||||
pthread_mutex_lock(&pConn->mutex);
|
||||
|
||||
if (taosMsgHdrSize(pBuf->msgHdr) > 0) {
|
||||
taosSendMsgHdr(pBuf->msgHdr, pConn->fd);
|
||||
pBuf->totalLen = 0;
|
||||
pBuf->emptyNum = 0;
|
||||
} else {
|
||||
pBuf->emptyNum++;
|
||||
if (pBuf->emptyNum > 200) {
|
||||
taosRemoveUdpBuf(pBuf);
|
||||
pBuf = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pConn->mutex);
|
||||
|
||||
if (pBuf) taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
|
||||
}
|
||||
|
||||
static SUdpBuf *taosCreateUdpBuf(SUdpConn *pConn, uint32_t ip, uint16_t port) {
|
||||
SUdpBuf *pBuf = (SUdpBuf *)malloc(sizeof(SUdpBuf));
|
||||
memset(pBuf, 0, sizeof(SUdpBuf));
|
||||
|
||||
pBuf->ip = ip;
|
||||
pBuf->port = port;
|
||||
pBuf->pConn = pConn;
|
||||
|
||||
pBuf->destAdd.sin_family = AF_INET;
|
||||
pBuf->destAdd.sin_addr.s_addr = ip;
|
||||
pBuf->destAdd.sin_port = (uint16_t)htons(port);
|
||||
taosInitMsgHdr(&(pBuf->msgHdr), &(pBuf->destAdd), RPC_MAX_UDP_PKTS);
|
||||
pBuf->signature = pBuf;
|
||||
taosTmrReset(taosProcessUdpBufTimer, RPC_UDP_BUF_TIME, pBuf, pConn->tmrCtrl, &pBuf->timer);
|
||||
|
||||
// tTrace("%s UDP buffer to:0x%lld:%d is created", pBuf->pConn->label,
|
||||
// pBuf->ip, pBuf->port);
|
||||
|
||||
return pBuf;
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue