Merge pull request #9748 from taosdata/feature/rpc

add libuv
This commit is contained in:
Shengliang Guan 2022-01-12 11:57:12 +08:00 committed by GitHub
commit afc4b3b760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 795 additions and 557 deletions

View File

@ -4,9 +4,9 @@ ExternalProject_Add(libuv
GIT_REPOSITORY https://github.com/libuv/libuv.git GIT_REPOSITORY https://github.com/libuv/libuv.git
GIT_TAG v1.42.0 GIT_TAG v1.42.0
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv"
BINARY_DIR "" BINARY_DIR "${CMAKE_CONTRIB_DIR}/libuv"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""
TEST_COMMAND "" TEST_COMMAND ""
) )

View File

@ -12,4 +12,19 @@ target_link_libraries(
PUBLIC os PUBLIC os
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
) )
if (${BUILD_WITH_UV})
target_include_directories(
transport
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/libuv/include"
)
#LINK_DIRECTORIES("${CMAKE_SOURCE_DIR}/debug/contrib/libuv")
target_link_libraries(
transport
PUBLIC uv_a
)
add_definitions(-DUSE_UV)
endif(${BUILD_WITH_UV})

View File

@ -16,6 +16,8 @@
#ifndef TDENGINE_RPC_CACHE_H #ifndef TDENGINE_RPC_CACHE_H
#define TDENGINE_RPC_CACHE_H #define TDENGINE_RPC_CACHE_H
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif

View File

@ -16,52 +16,57 @@
#ifndef TDENGINE_RPCHEAD_H #ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H #define TDENGINE_RPCHEAD_H
#include <tdef.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define RPC_CONN_TCP 2 #ifdef USE_UV
#else
#define RPC_CONN_TCP 2
extern int tsRpcOverhead; extern int tsRpcOverhead;
typedef struct { typedef struct {
void *msg; void* msg;
int msgLen; int msgLen;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int connType; int connType;
void *shandle; void* shandle;
void *thandle; void* thandle;
void *chandle; void* chandle;
} SRecvInfo; } SRecvInfo;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
char version:4; // RPC version char version : 4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4 char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag:2; // reserved bits char resflag : 2; // reserved bits
char spi:3; // security parameter index char spi : 3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt : 3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; // user ID char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved char empty[1]; // reserved
uint16_t msgType; // message type uint16_t msgType; // message type
int32_t msgLen; // message length including the header iteslf int32_t msgLen; // message length including the header iteslf
uint32_t msgVer; uint32_t msgVer;
int32_t code; // code in response message int32_t code; // code in response message
uint8_t content[0]; // message body starts from here uint8_t content[0]; // message body starts from here
} SRpcHead; } SRpcHead;
typedef struct { typedef struct {
int32_t reserved; int32_t reserved;
int32_t contLen; int32_t contLen;
} SRpcComp; } SRpcComp;
typedef struct { typedef struct {
@ -70,11 +75,10 @@ typedef struct {
} SRpcDigest; } SRpcDigest;
#pragma pack(pop) #pragma pack(pop)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_RPCHEAD_H #endif // TDENGINE_RPCHEAD_H

View File

@ -15,23 +15,28 @@
#ifndef _rpc_tcp_header_ #ifndef _rpc_tcp_header_
#define _rpc_tcp_header_ #define _rpc_tcp_header_
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param); void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle); void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle);
void taosStopTcpClient(void *chandle); void taosStopTcpClient(void *chandle);
void taosCleanUpTcpClient(void *chandle); void taosCleanUpTcpClient(void *chandle);
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port); void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port);
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -20,8 +20,13 @@
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_TRANSPORT_INT_H_*/ #endif /*_TD_TRANSPORT_INT_H_*/

View File

@ -13,37 +13,40 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcCache.h"
#include "os.h" #include "os.h"
#include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmempool.h" #include "tmempool.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "rpcLog.h"
#include "rpcCache.h"
#ifdef USE_UV
#else
typedef struct SConnHash { typedef struct SConnHash {
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
char connType; char connType;
struct SConnHash *prev; struct SConnHash *prev;
struct SConnHash *next; struct SConnHash *next;
void *data; void * data;
uint64_t time; uint64_t time;
} SConnHash; } SConnHash;
typedef struct { typedef struct {
SConnHash **connHashList; SConnHash ** connHashList;
mpool_h connHashMemPool; mpool_h connHashMemPool;
int maxSessions; int maxSessions;
int total; int total;
int * count; int * count;
int64_t keepTimer; int64_t keepTimer;
pthread_mutex_t mutex; pthread_mutex_t mutex;
void (*cleanFp)(void *); void (*cleanFp)(void *);
void *tmrCtrl; void * tmrCtrl;
void *pTimer; void * pTimer;
int64_t *lockedBy; int64_t *lockedBy;
} SConnCache; } SConnCache;
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType); static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType);
@ -122,7 +125,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
pCache = (SConnCache *)handle; pCache = (SConnCache *)handle;
assert(pCache); assert(pCache);
assert(data); assert(data);
hash = rpcHashConn(pCache, fqdn, port, connType); hash = rpcHashConn(pCache, fqdn, port, connType);
@ -134,7 +137,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
pNode->prev = NULL; pNode->prev = NULL;
pNode->time = time; pNode->time = time;
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode->next = pCache->connHashList[hash]; pNode->next = pCache->connHashList[hash];
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
@ -143,10 +146,11 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
pCache->count[hash]++; pCache->count[hash]++;
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
pCache->total++; pCache->total++;
// tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
return; return;
} }
@ -158,12 +162,12 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
void * pData = NULL; void * pData = NULL;
pCache = (SConnCache *)handle; pCache = (SConnCache *)handle;
assert(pCache); assert(pCache);
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
hash = rpcHashConn(pCache, fqdn, port, connType); hash = rpcHashConn(pCache, fqdn, port, connType);
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
while (pNode) { while (pNode) {
@ -197,12 +201,14 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
pCache->count[hash]--; pCache->count[hash]--;
} }
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
if (pData) { if (pData) {
//tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
} else { } else {
//tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash,
// pCache->count[hash]);
} }
return pData; return pData;
@ -221,10 +227,10 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pCache->maxSessions; ++hash) { for (hash = 0; hash < pCache->maxSessions; ++hash) {
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
rpcRemoveExpiredNodes(pCache, pNode, hash, time); rpcRemoveExpiredNodes(pCache, pNode, hash, time);
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
} }
// tTrace("timer, total connections in cache:%d", pCache->total); // tTrace("timer, total connections in cache:%d", pCache->total);
@ -233,7 +239,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
} }
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return;
SConnHash *pPrev = pNode->prev, *pNext; SConnHash *pPrev = pNode->prev, *pNext;
@ -242,7 +248,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext = pNode->next; pNext = pNode->next;
pCache->total--; pCache->total--;
pCache->count[hash]--; pCache->count[hash]--;
//tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port,
// pNode->connType, hash, pNode,
// pCache->count[hash]); // pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
@ -257,7 +264,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
SConnCache *pCache = (SConnCache *)handle; SConnCache *pCache = (SConnCache *)handle;
int hash = 0; int hash = 0;
char *temp = fqdn; char * temp = fqdn;
while (*temp) { while (*temp) {
hash += *temp; hash += *temp;
@ -288,4 +295,4 @@ static void rpcUnlockCache(int64_t *lockedBy) {
assert(false); assert(false);
} }
} }
#endif

File diff suppressed because it is too large Load Diff

View File

@ -13,24 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcTcp.h"
#ifdef USE_UV
#include <uv.h>
#endif
#include "os.h" #include "os.h"
#include "tutil.h" #include "rpcHead.h"
#include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "tutil.h"
#include "rpcHead.h"
#include "rpcTcp.h"
#ifdef USE_UV
#else
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void * signature;
SOCKET fd; // TCP socket FD SOCKET fd; // TCP socket FD
void *thandle; // handle from upper layer, like TAOS void * thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int16_t closedByApp; // 1: already closed by App int16_t closedByApp; // 1: already closed by App
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
struct SFdObj *prev; struct SFdObj * prev;
struct SFdObj *next; struct SFdObj * next;
} SFdObj; } SFdObj;
typedef struct SThreadObj { typedef struct SThreadObj {
@ -43,35 +49,35 @@ typedef struct SThreadObj {
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
typedef struct { typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int32_t index; int32_t index;
int numOfThreads; int numOfThreads;
SThreadObj **pThreadObj; SThreadObj **pThreadObj;
} SClientObj; } SClientObj;
typedef struct { typedef struct {
SOCKET fd; SOCKET fd;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int8_t stop; int8_t stop;
int8_t reserve; int8_t reserve;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
int numOfThreads; int numOfThreads;
void * shandle; void * shandle;
SThreadObj **pThreadObj; SThreadObj **pThreadObj;
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
static void *taosProcessTcpData(void *param); static void * taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static void taosFreeFdObj(SFdObj *pFdObj); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void *taosAcceptTcpConnection(void *arg); static void * taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
@ -99,7 +105,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return NULL; return NULL;
} }
int code = 0; int code = 0;
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
@ -110,7 +116,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj); free(pServerObj->pThreadObj);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
@ -172,8 +178,10 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj *pThreadObj) {
if (pThreadObj == NULL) { return;} if (pThreadObj == NULL) {
return;
}
// save thread into local variable and signal thread to stop // save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread; pthread_t thread = pThreadObj->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
@ -194,7 +202,7 @@ void taosStopTcpServer(void *handle) {
pServerObj->stop = 1; pServerObj->stop = 1;
if (pServerObj->fd >= 0) { if (pServerObj->fd >= 0) {
taosShutDownSocketRD(pServerObj->fd); taosShutDownSocketRD(pServerObj->fd);
} }
if (taosCheckPthreadValid(pServerObj->thread)) { if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) { if (taosComparePthread(pServerObj->thread, pthread_self())) {
@ -227,8 +235,8 @@ static void *taosAcceptTcpConnection(void *arg) {
SOCKET connFd = -1; SOCKET connFd = -1;
struct sockaddr_in caddr; struct sockaddr_in caddr;
int threadId = 0; int threadId = 0;
SThreadObj *pThreadObj; SThreadObj * pThreadObj;
SServerObj *pServerObj; SServerObj * pServerObj;
pServerObj = (SServerObj *)arg; pServerObj = (SServerObj *)arg;
tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
@ -253,8 +261,8 @@ static void *taosAcceptTcpConnection(void *arg) {
} }
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
struct timeval to={5, 0}; struct timeval to = {5, 0};
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) { if (ret != 0) {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
@ -262,7 +270,6 @@ static void *taosAcceptTcpConnection(void *arg) {
continue; continue;
} }
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId]; pThreadObj = pServerObj->pThreadObj[threadId];
@ -271,7 +278,7 @@ static void *taosAcceptTcpConnection(void *arg) {
pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port); pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
@ -297,14 +304,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); tstrncpy(pClientObj->label, label, sizeof(pClientObj->label));
pClientObj->numOfThreads = numOfThreads; pClientObj->numOfThreads = numOfThreads;
pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj *));
if (pClientObj->pThreadObj == NULL) { if (pClientObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
tfree(pClientObj); tfree(pClientObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }
int code = 0; int code = 0;
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
@ -314,15 +321,15 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pClientObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pClientObj->pThreadObj[j]);
free(pClientObj); free(pClientObj);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
return NULL; return NULL;
} }
pClientObj->pThreadObj[i] = pThreadObj; pClientObj->pThreadObj[i] = pThreadObj;
taosResetPthread(&pThreadObj->thread); taosResetPthread(&pThreadObj->thread);
pThreadObj->ip = ip; pThreadObj->ip = ip;
pThreadObj->stop = false; pThreadObj->stop = false;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
pThreadObj->processData = fp; pThreadObj->processData = fp;
@ -364,14 +371,14 @@ void taosStopTcpClient(void *chandle) {
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
tDebug ("%s TCP client is stopped", pClientObj->label); tDebug("%s TCP client is stopped", pClientObj->label);
} }
void taosCleanUpTcpClient(void *chandle) { void taosCleanUpTcpClient(void *chandle) {
SClientObj *pClientObj = chandle; SClientObj *pClientObj = chandle;
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
for (int i = 0; i < pClientObj->numOfThreads; ++i) { for (int i = 0; i < pClientObj->numOfThreads; ++i) {
SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; SThreadObj *pThreadObj = pClientObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
} }
@ -381,9 +388,9 @@ void taosCleanUpTcpClient(void *chandle) {
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SClientObj * pClientObj = shandle; SClientObj *pClientObj = shandle;
int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads;
atomic_store_32(&pClientObj->index, index + 1); atomic_store_32(&pClientObj->index, index + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
@ -394,10 +401,9 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
#endif #endif
struct sockaddr_in sin; struct sockaddr_in sin;
uint16_t localPort = 0; uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port); localPort = (uint16_t)ntohs(sin.sin_port);
} }
@ -407,8 +413,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle,
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd); taosCloseSocket(fd);
@ -441,7 +447,6 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
} }
static void taosReportBrokenLink(SFdObj *pFdObj) { static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
@ -464,9 +469,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
} }
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SRpcHead rpcHead; SRpcHead rpcHead;
int32_t msgLen, leftLen, retLen, headLen; int32_t msgLen, leftLen, retLen, headLen;
char *buffer, *msg; char * buffer, *msg;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
@ -483,7 +488,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, buffer); tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd,
buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
@ -491,8 +497,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer); free(buffer);
return -1; return -1;
} }
@ -519,8 +524,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *taosProcessTcpData(void *param) {
SThreadObj *pThreadObj = param; SThreadObj * pThreadObj = param;
SFdObj *pFdObj; SFdObj * pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
@ -569,7 +574,7 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break; if (pThreadObj->stop) break;
} }
if (pThreadObj->pollFd >=0) { if (pThreadObj->pollFd >= 0) {
EpollClose(pThreadObj->pollFd); EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
} }
@ -620,7 +625,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
} }
static void taosFreeFdObj(SFdObj *pFdObj) { static void taosFreeFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
@ -638,8 +642,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle,
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->threadId);
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
@ -653,8 +657,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex); pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj,
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); pFdObj->fd, pThreadObj->numOfFds);
tfree(pFdObj); tfree(pFdObj);
} }
#endif

View File

@ -13,50 +13,53 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcUdp.h"
#include "os.h" #include "os.h"
#include "ttimer.h" #include "rpcHead.h"
#include "tutil.h" #include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "ttimer.h"
#include "rpcUdp.h" #include "tutil.h"
#include "rpcHead.h"
#ifdef USE_UV
// no support upd currently
#else
#define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000 #define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_UDP_BUF_TIME 5 // mseconds
#define RPC_MAX_UDP_SIZE 65480 #define RPC_MAX_UDP_SIZE 65480
typedef struct { typedef struct {
int index; int index;
SOCKET fd; SOCKET fd;
uint16_t port; // peer port uint16_t port; // peer port
uint16_t localPort; // local port uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
void *hash; void * hash;
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void *pSet; void * pSet;
void *(*processData)(SRecvInfo *pRecv); void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data char *buffer; // buffer to receive data
} SUdpConn; } SUdpConn;
typedef struct { typedef struct {
int index; int index;
int server; int server;
uint32_t ip; // local IP uint32_t ip; // local IP
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
SUdpConn udpConn[]; SUdpConn udpConn[];
} SUdpConnSet; } SUdpConnSet;
static void *taosRecvUdpData(void *param); static void *taosRecvUdpData(void *param);
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConn *pConn; SUdpConn * pConn;
SUdpConnSet *pSet; SUdpConnSet *pSet;
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn); int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
@ -79,7 +82,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int i; int i;
uint16_t ownPort; uint16_t ownPort;
for (i = 0; i < threads; ++i) { for (i = 0; i < threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
@ -97,9 +100,9 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
} }
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
sin.sin_family == AF_INET && addrlen == sizeof(sin)) { addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port); pConn->localPort = (uint16_t)ntohs(sin.sin_port);
} }
@ -118,7 +121,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
if (i != threads) { if (i != threads) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpUdpConnection(pSet); taosCleanUpUdpConnection(pSet);
return NULL; return NULL;
@ -130,14 +133,14 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
void taosStopUdpConnection(void *handle) { void taosStopUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn *pConn; SUdpConn * pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
if (pConn->fd >=0) taosCloseSocket(pConn->fd); if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
pConn->fd = -1; pConn->fd = -1;
} }
@ -155,13 +158,13 @@ void taosStopUdpConnection(void *handle) {
void taosCleanUpUdpConnection(void *handle) { void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn *pConn; SUdpConn * pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->fd >=0) taosCloseSocket(pConn->fd); if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
} }
tDebug("%s UDP is cleaned up", pSet->label); tDebug("%s UDP is cleaned up", pSet->label);
@ -182,7 +185,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
} }
static void *taosRecvUdpData(void *param) { static void *taosRecvUdpData(void *param) {
SUdpConn *pConn = param; SUdpConn * pConn = param;
struct sockaddr_in sourceAdd; struct sockaddr_in sourceAdd;
ssize_t dataLen; ssize_t dataLen;
unsigned int addLen; unsigned int addLen;
@ -218,7 +221,7 @@ static void *taosRecvUdpData(void *param) {
} }
int32_t size = dataLen + tsRpcOverhead; int32_t size = dataLen + tsRpcOverhead;
char *tmsg = malloc(size); char * tmsg = malloc(size);
if (NULL == tmsg) { if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue; continue;
@ -257,4 +260,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret; return ret;
} }
#endif

View File

@ -11,4 +11,4 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */