add libuv

This commit is contained in:
yihaoDeng 2022-01-11 23:06:36 +08:00
parent 3077a2449f
commit b945000779
11 changed files with 720 additions and 546 deletions

View File

@ -4,7 +4,7 @@ ExternalProject_Add(libuv
GIT_REPOSITORY https://github.com/libuv/libuv.git GIT_REPOSITORY https://github.com/libuv/libuv.git
GIT_TAG v1.42.0 GIT_TAG v1.42.0
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv" SOURCE_DIR "${CMAKE_CONTRIB_DIR}/libuv"
BINARY_DIR "" BINARY_DIR "${CMAKE_CONTRIB_DIR}/libuv"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""

View File

@ -13,3 +13,18 @@ target_link_libraries(
PUBLIC util PUBLIC util
PUBLIC common PUBLIC common
) )
if (${BUILD_WITH_UV})
target_include_directories(
transport
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/libuv/include"
)
#LINK_DIRECTORIES("${CMAKE_SOURCE_DIR}/debug/contrib/libuv")
target_link_libraries(
transport
PUBLIC uv_a
)
add_definitions(-DUSE_UV)
endif(${BUILD_WITH_UV})

View File

@ -16,6 +16,8 @@
#ifndef TDENGINE_RPC_CACHE_H #ifndef TDENGINE_RPC_CACHE_H
#define TDENGINE_RPC_CACHE_H #define TDENGINE_RPC_CACHE_H
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif

View File

@ -16,33 +16,38 @@
#ifndef TDENGINE_RPCHEAD_H #ifndef TDENGINE_RPCHEAD_H
#define TDENGINE_RPCHEAD_H #define TDENGINE_RPCHEAD_H
#include <tdef.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
#define RPC_CONN_TCP 2 #define RPC_CONN_TCP 2
extern int tsRpcOverhead; extern int tsRpcOverhead;
typedef struct { typedef struct {
void *msg; void* msg;
int msgLen; int msgLen;
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int connType; int connType;
void *shandle; void* shandle;
void *thandle; void* thandle;
void *chandle; void* chandle;
} SRecvInfo; } SRecvInfo;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
char version:4; // RPC version char version : 4; // RPC version
char comp:4; // compression algorithm, 0:no compression 1:lz4 char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag:2; // reserved bits char resflag : 2; // reserved bits
char spi:3; // security parameter index char spi : 3; // security parameter index
char encrypt:3; // encrypt algorithm, 0: no encryption char encrypt : 3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client
@ -70,11 +75,10 @@ typedef struct {
} SRpcDigest; } SRpcDigest;
#pragma pack(pop) #pragma pack(pop)
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif // TDENGINE_RPCHEAD_H #endif // TDENGINE_RPCHEAD_H

View File

@ -15,11 +15,14 @@
#ifndef _rpc_tcp_header_ #ifndef _rpc_tcp_header_
#define _rpc_tcp_header_ #define _rpc_tcp_header_
#include <stdint.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param); void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param); void taosCleanUpTcpServer(void *param);
@ -32,6 +35,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle); void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -20,6 +20,11 @@
extern "C" { extern "C" {
#endif #endif
#ifdef USE_UV
#else
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -13,27 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcCache.h"
#include "os.h" #include "os.h"
#include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmempool.h" #include "tmempool.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "rpcLog.h"
#include "rpcCache.h"
#ifdef USE_UV
#else
typedef struct SConnHash { typedef struct SConnHash {
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
char connType; char connType;
struct SConnHash *prev; struct SConnHash *prev;
struct SConnHash *next; struct SConnHash *next;
void *data; void * data;
uint64_t time; uint64_t time;
} SConnHash; } SConnHash;
typedef struct { typedef struct {
SConnHash **connHashList; SConnHash ** connHashList;
mpool_h connHashMemPool; mpool_h connHashMemPool;
int maxSessions; int maxSessions;
int total; int total;
@ -41,8 +44,8 @@ typedef struct {
int64_t keepTimer; int64_t keepTimer;
pthread_mutex_t mutex; pthread_mutex_t mutex;
void (*cleanFp)(void *); void (*cleanFp)(void *);
void *tmrCtrl; void * tmrCtrl;
void *pTimer; void * pTimer;
int64_t *lockedBy; int64_t *lockedBy;
} SConnCache; } SConnCache;
@ -134,7 +137,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
pNode->prev = NULL; pNode->prev = NULL;
pNode->time = time; pNode->time = time;
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode->next = pCache->connHashList[hash]; pNode->next = pCache->connHashList[hash];
if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode; if (pCache->connHashList[hash] != NULL) (pCache->connHashList[hash])->prev = pNode;
@ -143,10 +146,11 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
pCache->count[hash]++; pCache->count[hash]++;
rpcRemoveExpiredNodes(pCache, pNode->next, hash, time); rpcRemoveExpiredNodes(pCache, pNode->next, hash, time);
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
pCache->total++; pCache->total++;
// tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
return; return;
} }
@ -163,7 +167,7 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
hash = rpcHashConn(pCache, fqdn, port, connType); hash = rpcHashConn(pCache, fqdn, port, connType);
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
while (pNode) { while (pNode) {
@ -197,12 +201,14 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
pCache->count[hash]--; pCache->count[hash]--;
} }
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
if (pData) { if (pData) {
//tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); // tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode,
// pCache->count[hash]);
} else { } else {
//tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); // tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash,
// pCache->count[hash]);
} }
return pData; return pData;
@ -221,10 +227,10 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pCache->maxSessions; ++hash) { for (hash = 0; hash < pCache->maxSessions; ++hash) {
rpcLockCache(pCache->lockedBy+hash); rpcLockCache(pCache->lockedBy + hash);
pNode = pCache->connHashList[hash]; pNode = pCache->connHashList[hash];
rpcRemoveExpiredNodes(pCache, pNode, hash, time); rpcRemoveExpiredNodes(pCache, pNode, hash, time);
rpcUnlockCache(pCache->lockedBy+hash); rpcUnlockCache(pCache->lockedBy + hash);
} }
// tTrace("timer, total connections in cache:%d", pCache->total); // tTrace("timer, total connections in cache:%d", pCache->total);
@ -233,7 +239,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
} }
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL || (time < pCache->keepTimer + pNode->time) ) return; if (pNode == NULL || (time < pCache->keepTimer + pNode->time)) return;
SConnHash *pPrev = pNode->prev, *pNext; SConnHash *pPrev = pNode->prev, *pNext;
@ -242,7 +248,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext = pNode->next; pNext = pNode->next;
pCache->total--; pCache->total--;
pCache->count[hash]--; pCache->count[hash]--;
//tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, // tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port,
// pNode->connType, hash, pNode,
// pCache->count[hash]); // pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext; pNode = pNext;
@ -257,7 +264,7 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) { static int rpcHashConn(void *handle, char *fqdn, uint16_t port, int8_t connType) {
SConnCache *pCache = (SConnCache *)handle; SConnCache *pCache = (SConnCache *)handle;
int hash = 0; int hash = 0;
char *temp = fqdn; char * temp = fqdn;
while (*temp) { while (*temp) {
hash += *temp; hash += *temp;
@ -288,4 +295,4 @@ static void rpcUnlockCache(int64_t *lockedBy) {
assert(false); assert(false);
} }
} }
#endif

View File

@ -13,27 +13,175 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <uv.h>
#include "lz4.h"
#include "os.h" #include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h" #include "tidpool.h"
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "lz4.h"
#include "tref.h" #ifdef USE_UV
#include "taoserror.h"
#include "tglobal.h" #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#include "tmsg.h"
#include "trpc.h" int32_t rpcInit() { return -1; }
#include "thash.h" void rpcCleanup() { return; };
#include "rpcLog.h" void* rpcOpen(const SRpcInit* pRpc) { return NULL; }
#include "rpcUdp.h" void rpcClose(void* arg) { return; }
#include "rpcCache.h" void* rpcMallocCont(int contLen) { return NULL; }
#include "rpcTcp.h" void rpcFreeCont(void* cont) { return; }
#include "rpcHead.h" void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
typedef struct SThreadObj {
pthread_t thread;
uv_pipe_t* pipe;
uv_loop_t* loop;
uv_async_t* workerAsync; //
int fd;
} SThreadObj;
typedef struct SServerObj {
uv_tcp_t server;
uv_loop_t* loop;
int workerIdx;
int numOfThread;
SThreadObj** pThreadObj;
uv_pipe_t** pipe;
} SServerObj;
typedef struct SConnCtx {
uv_tcp_t* pClient;
uv_timer_t* pTimer;
uv_async_t* pWorkerAsync;
int ref;
} SConnCtx;
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(suggested_size);
buf->len = suggested_size;
}
void onTimeout(uv_timer_t* handle) {
// opt
tDebug("time out");
}
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
tDebug("data already was read on a stream");
}
void onWrite(uv_write_t* req, int status) {
// opt
if (req) tDebug("data already was written on stream");
}
static void workerAsyncCB(uv_async_t* handle) {
// opt
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
}
void onAccept(uv_stream_t* stream, int status) {
if (status == -1) {
return;
}
SServerObj* pObj = container_of(stream, SServerObj, server);
tDebug("new conntion accepted by main server, dispatch to one worker thread");
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init("a", 1);
// despatch to worker thread
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
} else {
uv_close((uv_handle_t*)cli, NULL);
}
}
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if (nread < 0) {
if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread));
}
// TODO(log other failure reason)
uv_close((uv_handle_t*)q, NULL);
return;
}
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
tError("No pending count");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SConnCtx* pConn = malloc(sizeof(SConnCtx));
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
uv_tcp_init(pObj->loop, pConn->pClient);
if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pClient, &fd);
tDebug("new connection created: %d", fd);
uv_timer_start(pConn->pTimer, onTimeout, 10, 0);
uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead);
} else {
uv_timer_stop(pConn->pTimer);
free(pConn->pTimer);
uv_close((uv_handle_t*)pConn->pClient, NULL);
free(pConn->pClient);
free(pConn);
}
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
int fd = pObj->fd;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, fd);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection);
}
#else
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
@ -58,24 +206,24 @@ typedef struct {
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
void *parent; void * parent;
void *idPool; // handle to ID pool void * idPool; // handle to ID pool
void *tmrCtrl; // handle to timer void * tmrCtrl; // handle to timer
SHashObj *hash; // handle returned by hash utility SHashObj * hash; // handle returned by hash utility
void *tcphandle;// returned handle from TCP initialization void * tcphandle; // returned handle from TCP initialization
void *udphandle;// returned handle from UDP initialization void * udphandle; // returned handle from UDP initialization
void *pCache; // connection cache void * pCache; // connection cache
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct SRpcConn *connList; // connection list struct SRpcConn *connList; // connection list
} SRpcInfo; } SRpcInfo;
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo * pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app void * ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated struct SRpcConn *pConn; // pConn allocated
tmsg_t msgType; // message type tmsg_t msgType; // message type
uint8_t *pCont; // content provided by app uint8_t * pCont; // content provided by app
int32_t contLen; // content length int32_t contLen; // content length
int32_t code; // error code int32_t code; // error code
int16_t numOfTry; // number of try for different servers int16_t numOfTry; // number of try for different servers
@ -83,14 +231,14 @@ typedef struct {
int8_t redirect; // flag to indicate redirect int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg * pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t * pSem; // for synchronous API
SEpSet *pSet; // for synchronous API SEpSet * pSet; // for synchronous API
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
typedef struct SRpcConn { typedef struct SRpcConn {
char info[48];// debug info: label + pConn + ahandle char info[48]; // debug info: label + pConn + ahandle
int sid; // session ID int sid; // session ID
uint32_t ownId; // own link ID uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID uint32_t peerId; // peer link ID
@ -110,17 +258,17 @@ typedef struct SRpcConn {
uint16_t inTranId; // transcation ID for incoming msg uint16_t inTranId; // transcation ID for incoming msg
tmsg_t outType; // message type for outgoing request tmsg_t outType; // message type for outgoing request
tmsg_t inType; // message type for incoming request tmsg_t inType; // message type for incoming request
void *chandle; // handle passed by TCP/UDP connection layer void * chandle; // handle passed by TCP/UDP connection layer
void *ahandle; // handle provided by upper app layter void * ahandle; // handle provided by upper app layter
int retry; // number of retry for sending request int retry; // number of retry for sending request
int tretry; // total retry int tretry; // total retry
void *pTimer; // retry timer to monitor the response void * pTimer; // retry timer to monitor the response
void *pIdleTimer; // idle timer void * pIdleTimer; // idle timer
char *pRspMsg; // response message including header char * pRspMsg; // response message including header
int rspMsgLen; // response messag length int rspMsgLen; // response messag length
char *pReqMsg; // request message including header char * pReqMsg; // request message including header
int reqMsgLen; // request message length int reqMsgLen; // request message length
SRpcInfo *pRpc; // the associated SRpcInfo SRpcInfo * pRpc; // the associated SRpcInfo
int8_t connType; // connection type int8_t connType; // connection type
int64_t lockedBy; // lock for connection int64_t lockedBy; // lock for connection
SRpcReqContext *pContext; // request context SRpcReqContext *pContext; // request context
@ -137,7 +285,7 @@ int tsRpcOverhead;
static int tsRpcRefId = -1; static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0; static int32_t tsRpcNum = 0;
//static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; // static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
// server:0 client:1 tcp:2 udp:0 // server:0 client:1 tcp:2 udp:0
#define RPC_CONN_UDPS 0 #define RPC_CONN_UDPS 0
@ -146,18 +294,10 @@ static int32_t tsRpcNum = 0;
#define RPC_CONN_TCPC 3 #define RPC_CONN_TCPC 3
void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = { void *(*taosInitConn[])(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) = {
taosInitUdpConnection, taosInitUdpConnection, taosInitUdpConnection, taosInitTcpServer, taosInitTcpClient};
taosInitUdpConnection,
taosInitTcpServer,
taosInitTcpClient
};
void (*taosCleanUpConn[])(void *thandle) = { void (*taosCleanUpConn[])(void *thandle) = {taosCleanUpUdpConnection, taosCleanUpUdpConnection, taosCleanUpTcpServer,
taosCleanUpUdpConnection, taosCleanUpTcpClient};
taosCleanUpUdpConnection,
taosCleanUpTcpServer,
taosCleanUpTcpClient
};
void (*taosStopConn[])(void *thandle) = { void (*taosStopConn[])(void *thandle) = {
taosStopUdpConnection, taosStopUdpConnection,
@ -167,11 +307,7 @@ void (*taosStopConn[])(void *thandle) = {
}; };
int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = { int (*taosSendData[])(uint32_t ip, uint16_t port, void *data, int len, void *chandle) = {
taosSendUdpData, taosSendUdpData, taosSendUdpData, taosSendTcpData, taosSendTcpData};
taosSendUdpData,
taosSendTcpData,
taosSendTcpData
};
void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = { void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port) = {
taosOpenUdpConnection, taosOpenUdpConnection,
@ -180,12 +316,7 @@ void *(*taosOpenConn[])(void *shandle, void *thandle, uint32_t ip, uint16_t port
taosOpenTcpClientConnection, taosOpenTcpClientConnection,
}; };
void (*taosCloseConn[])(void *chandle) = { void (*taosCloseConn[])(void *chandle) = {NULL, NULL, taosCloseTcpConnection, taosCloseTcpConnection};
NULL,
NULL,
taosCloseTcpConnection,
taosCloseTcpConnection
};
static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType); static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, int8_t connType);
static void rpcCloseConn(void *thandle); static void rpcCloseConn(void *thandle);
@ -208,7 +339,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId);
static void rpcProcessProgressTimer(void *param, void *tmrId); static void rpcProcessProgressTimer(void *param, void *tmrId);
static void rpcFreeMsg(void *msg); static void rpcFreeMsg(void *msg);
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen);
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead); static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead);
static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
@ -244,22 +375,22 @@ void rpcCleanup(void) {
void *rpcOpen(const SRpcInit *pInit) { void *rpcOpen(const SRpcInit *pInit) {
SRpcInfo *pRpc; SRpcInfo *pRpc;
//pthread_once(&tsRpcInit, rpcInit); // pthread_once(&tsRpcInit, rpcInit);
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
if (pRpc->connType == TAOS_CONN_CLIENT) { if (pRpc->connType == TAOS_CONN_CLIENT) {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
} else { } else {
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
} }
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->localPort = pInit->localPort; pRpc->localPort = pInit->localPort;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->sessions = pInit->sessions+1; pRpc->sessions = pInit->sessions + 1;
if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); if (pInit->user) tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
@ -279,14 +410,14 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL; return NULL;
} }
pRpc->idPool = taosInitIdPool(pRpc->sessions-1); pRpc->idPool = taosInitIdPool(pRpc->sessions - 1);
if (pRpc->idPool == NULL) { if (pRpc->idPool == NULL) {
tError("%s failed to init ID pool", pRpc->label); tError("%s failed to init ID pool", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
} }
pRpc->tmrCtrl = taosTmrInit(pRpc->sessions*2 + 1, 50, 10000, pRpc->label); pRpc->tmrCtrl = taosTmrInit(pRpc->sessions * 2 + 1, 50, 10000, pRpc->label);
if (pRpc->tmrCtrl == NULL) { if (pRpc->tmrCtrl == NULL) {
tError("%s failed to init timers", pRpc->label); tError("%s failed to init timers", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
@ -302,7 +433,7 @@ void *rpcOpen(const SRpcInit *pInit) {
} }
} else { } else {
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20); pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime * 20);
if ( pRpc->pCache == NULL ) { if (pRpc->pCache == NULL) {
tError("%s failed to init connection cache", pRpc->label); tError("%s failed to init connection cache", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
return NULL; return NULL;
@ -311,10 +442,10 @@ void *rpcOpen(const SRpcInit *pInit) {
pthread_mutex_init(&pRpc->mutex, NULL); pthread_mutex_init(&pRpc->mutex, NULL);
pRpc->tcphandle = (*taosInitConn[pRpc->connType|RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->tcphandle = (*taosInitConn[pRpc->connType | RPC_CONN_TCP])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads,
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); rpcProcessMsgFromPeer, pRpc);
pRpc->udphandle = (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->udphandle =
pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc); (*taosInitConn[pRpc->connType])(0, pRpc->localPort, pRpc->label, pRpc->numOfThreads, rpcProcessMsgFromPeer, pRpc);
if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) { if (pRpc->tcphandle == NULL || pRpc->udphandle == NULL) {
tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort); tError("%s failed to init network, port:%d", pRpc->label, pRpc->localPort);
@ -375,7 +506,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
if (ptr == NULL) return rpcMallocCont(contLen); if (ptr == NULL) return rpcMallocCont(contLen);
char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
if (contLen == 0 ) { if (contLen == 0) {
free(start); free(start);
return NULL; return NULL;
} }
@ -391,11 +522,11 @@ void *rpcReallocCont(void *ptr, int contLen) {
} }
void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo * pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle; pContext->ahandle = pMsg->ahandle;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
pContext->epSet = *pEpSet; pContext->epSet = *pEpSet;
@ -405,15 +536,14 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
pContext->oldInUse = pEpSet->inUse; pContext->oldInUse = pEpSet->inUse;
pContext->connType = RPC_CONN_UDPC; pContext->connType = RPC_CONN_UDPC;
if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp ) pContext->connType = RPC_CONN_TCPC; if (contLen > tsRpcMaxUdpSize || tsRpcForceTcp) pContext->connType = RPC_CONN_TCPC;
// connection type is application specific. // connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
tmsg_t type = pMsg->msgType; tmsg_t type = pMsg->msgType;
if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH ||
|| type == TDMT_VND_FETCH || type == TDMT_MND_VGROUP_LIST type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META ||
|| type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
|| type == TDMT_MND_SHOW || type == TDMT_MND_STATUS || type == TDMT_VND_ALTER_TABLE)
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
pContext->rid = taosAddRef(tsRpcRefId, pContext); pContext->rid = taosAddRef(tsRpcRefId, pContext);
@ -428,23 +558,23 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
int msgLen = 0; int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)pRsp->handle; SRpcConn *pConn = (SRpcConn *)pRsp->handle;
SRpcMsg rpcMsg = *pRsp; SRpcMsg rpcMsg = *pRsp;
SRpcMsg *pMsg = &rpcMsg; SRpcMsg * pMsg = &rpcMsg;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if ( pMsg->pCont == NULL ) { if (pMsg->pCont == NULL) {
pMsg->pCont = rpcMallocCont(0); pMsg->pCont = rpcMallocCont(0);
pMsg->contLen = 0; pMsg->contLen = 0;
} }
SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont); SRpcHead *pHead = rpcHeadFromCont(pMsg->pCont);
char *msg = (char *)pHead; char * msg = (char *)pHead;
pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
msgLen = rpcMsgLenFromCont(pMsg->contLen); msgLen = rpcMsgLenFromCont(pMsg->contLen);
rpcLockConn(pConn); rpcLockConn(pConn);
if ( pConn->inType == 0 || pConn->user[0] == 0 ) { if (pConn->inType == 0 || pConn->user[0] == 0) {
tError("%s, connection is already released, rsp wont be sent", pConn->info); tError("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
@ -454,7 +584,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
// set msg header // set msg header
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType + 1;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = pConn->encrypt; pHead->encrypt = pConn->encrypt;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
@ -463,7 +593,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pHead->linkUid = pConn->linkUid; pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort); pHead->port = htons(pConn->localPort);
pHead->code = htonl(pMsg->code); pHead->code = htonl(pMsg->code);
pHead->ahandle = (uint64_t) pConn->ahandle; pHead->ahandle = (uint64_t)pConn->ahandle;
// set pConn parameters // set pConn parameters
pConn->inType = 0; pConn->inType = 0;
@ -482,8 +612,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcSendMsgToPeer(pConn, msg, msgLen); rpcSendMsgToPeer(pConn, msg, msgLen);
// if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured // if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) if (pConn->secured == 0 && pMsg->code != TSDB_CODE_RPC_NOT_READY) pConn->secured = 1; // connection shall be secured
pConn->secured = 1; // connection shall be secured
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
@ -527,7 +656,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *)((char *)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg)); memset(pRsp, 0, sizeof(SRpcMsg));
@ -567,7 +696,6 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
} }
void rpcCancelRequest(int64_t rid) { void rpcCancelRequest(int64_t rid) {
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid);
if (pContext == NULL) return; if (pContext == NULL) return;
@ -577,7 +705,7 @@ void rpcCancelRequest(int64_t rid) {
} }
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
if ( msg ) { if (msg) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp); free(temp);
tTrace("free mem: %p", temp); tTrace("free mem: %p", temp);
@ -604,7 +732,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
pConn->connType = connType; pConn->connType = connType;
if (taosOpenConn[connType]) { if (taosOpenConn[connType]) {
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; void *shandle = (connType & RPC_CONN_TCP) ? pRpc->tcphandle : pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle == NULL) { if (pConn->chandle == NULL) {
tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort); tError("failed to connect to:%s:%d", taosIpStr(pConn->peerIp), pConn->peerPort);
@ -629,9 +757,10 @@ static void rpcReleaseConn(SRpcConn *pConn) {
taosTmrStopA(&pConn->pTimer); taosTmrStopA(&pConn->pTimer);
taosTmrStopA(&pConn->pIdleTimer); taosTmrStopA(&pConn->pIdleTimer);
if ( pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId,
pConn->connType);
taosHashRemove(pRpc->hash, hashstr, size); taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
@ -682,8 +811,7 @@ static void rpcCloseConn(void *thandle) {
rpcLockConn(pConn); rpcLockConn(pConn);
if (pConn->user[0]) if (pConn->user[0]) rpcReleaseConn(pConn);
rpcReleaseConn(pConn);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
@ -717,7 +845,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
size_t size = snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size =
snprintf(hashstr, sizeof(hashstr), "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
@ -767,7 +896,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
} }
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid, hashstr); tDebug("%s %p server connection is allocated, uid:0x%x sid:%d key:%s", pRpc->label, pConn, pConn->linkUid, sid,
hashstr);
} }
return pConn; return pConn;
@ -805,10 +935,11 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SEpSet *pEpSet = &pContext->epSet; SEpSet * pEpSet = &pContext->epSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); pConn =
if ( pConn == NULL || pConn->user[0] == 0) { rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
if (pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); pConn = rpcOpenConn(pRpc, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
} }
@ -825,13 +956,11 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
} }
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->peerId == 0) { if (pConn->peerId == 0) {
pConn->peerId = pHead->sourceId; pConn->peerId = pHead->sourceId;
} else { } else {
if (pConn->peerId != pHead->sourceId) { if (pConn->peerId != pHead->sourceId) {
tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, tDebug("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId);
pConn->peerId, pHead->sourceId);
return TSDB_CODE_RPC_INVALID_VALUE; return TSDB_CODE_RPC_INVALID_VALUE;
} }
} }
@ -856,8 +985,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
} }
if (pConn->inType != 0) { if (pConn->inType != 0) {
tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, tDebug("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId);
pConn->inTranId, pHead->tranId);
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
} }
@ -961,18 +1089,22 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) { if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL; terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE;
return NULL;
} }
if (sid < 0 || sid >= pRpc->sessions) { if (sid < 0 || sid >= pRpc->sessions) {
tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, tDebug("%s sid:%d, sid is out of range, max sid:%d, %s discarded", pRpc->label, sid, pRpc->sessions,
pRpc->sessions, TMSG_INFO(pHead->msgType)); TMSG_INFO(pHead->msgType));
terrno = TSDB_CODE_RPC_INVALID_SESSION_ID; return NULL; terrno = TSDB_CODE_RPC_INVALID_SESSION_ID;
return NULL;
} }
if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) { if (rpcIsReq(pHead->msgType) && htonl(pHead->msgVer) != tsVersion >> 8) {
tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion, TMSG_INFO(pHead->msgType)); tDebug("%s sid:%d, invalid client version:%x/%x %s", pRpc->label, sid, htonl(pHead->msgVer), tsVersion,
terrno = TSDB_CODE_RPC_INVALID_VERSION; return NULL; TMSG_INFO(pHead->msgType));
terrno = TSDB_CODE_RPC_INVALID_VERSION;
return NULL;
} }
pConn = rpcGetConnObj(pRpc, sid, pRecv); pConn = rpcGetConnObj(pRpc, sid, pRecv);
@ -1004,7 +1136,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
// decrypt here // decrypt here
} }
if ( rpcIsReq(pHead->msgType) ) { if (rpcIsReq(pHead->msgType)) {
pConn->connType = pRecv->connType; pConn->connType = pRecv->connType;
terrno = rpcProcessReqHead(pConn, pHead); terrno = rpcProcessReqHead(pConn, pHead);
@ -1013,7 +1145,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
// client shall send the request within tsRpcTime again for UDP, double it // client shall send the request within tsRpcTime again for UDP, double it
if (pConn->connType != RPC_CONN_TCPS) if (pConn->connType != RPC_CONN_TCPS)
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer * 2, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
*ppContext = pConn->pContext; *ppContext = pConn->pContext;
@ -1026,7 +1158,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
} }
static void doRpcReportBrokenLinkToServer(void *param, void *id) { static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param); SRpcMsg * pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
@ -1100,13 +1232,13 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
taosIpPort2String(pRecv->ip, pRecv->port, ipstr); taosIpPort2String(pRecv->ip, pRecv->port, ipstr);
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId,
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); pHead->destId, pHead->tranId, pHead->code);
} else { } else {
tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn,
pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId,
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); pHead->tranId, pHead->code);
} }
int32_t code = terrno; int32_t code = terrno;
@ -1118,9 +1250,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) { if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType+1), code); tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
TMSG_INFO(pHead->msgType + 1), code);
} else { } else {
tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), code); tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle,
TMSG_INFO(pHead->msgType), code);
} }
} }
} else { // msg is passed to app only parsing is ok } else { // msg is passed to app only parsing is ok
@ -1144,8 +1278,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} else { } else {
// for asynchronous API // for asynchronous API
SEpSet *pEpSet = NULL; SEpSet *pEpSet = NULL;
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet;
pEpSet = &pContext->epSet;
(*pRpc->cfp)(pRpc->parent, pMsg, pEpSet); (*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
} }
@ -1155,7 +1288,6 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
} }
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
@ -1180,14 +1312,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
// for UDP, port may be changed by server, the port in epSet shall be used for cache // for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse],
pConn->connType);
} else { } else {
rpcCloseConn(pConn); rpcCloseConn(pConn);
} }
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SEpSet *pEpSet = (SEpSet*)pHead->content; SEpSet *pEpSet = (SEpSet *)pHead->content;
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
@ -1200,7 +1333,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
} }
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_OFFLINE) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY ||
pHead->code == TSDB_CODE_DND_OFFLINE) {
pContext->code = pHead->code; pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL); rpcProcessConnError(pContext, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
@ -1218,7 +1352,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
memset(msg, 0, sizeof(SRpcHead)); memset(msg, 0, sizeof(SRpcHead));
pHead = (SRpcHead *)msg; pHead = (SRpcHead *)msg;
pHead->version = 1; pHead->version = 1;
pHead->msgType = pConn->inType+1; pHead->msgType = pConn->inType + 1;
pHead->spi = pConn->spi; pHead->spi = pConn->spi;
pHead->encrypt = 0; pHead->encrypt = 0;
pHead->tranId = pConn->inTranId; pHead->tranId = pConn->inTranId;
@ -1258,7 +1392,7 @@ static void rpcSendReqHead(SRpcConn *pConn) {
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
SRpcHead *pRecvHead, *pReplyHead; SRpcHead *pRecvHead, *pReplyHead;
char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t)];
uint32_t timeStamp; uint32_t timeStamp;
int msgLen; int msgLen;
@ -1295,7 +1429,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); SRpcHead *pHead = rpcHeadFromCont(pContext->pCont);
char *msg = (char *)pHead; char * msg = (char *)pHead;
int msgLen = rpcMsgLenFromCont(pContext->contLen); int msgLen = rpcMsgLenFromCont(pContext->contLen);
tmsg_t msgType = pContext->msgType; tmsg_t msgType = pContext->msgType;
@ -1317,7 +1451,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->msgType = msgType; pHead->msgType = msgType;
pHead->encrypt = 0; pHead->encrypt = 0;
pConn->tranId++; pConn->tranId++;
if ( pConn->tranId == 0 ) pConn->tranId++; if (pConn->tranId == 0) pConn->tranId++;
pHead->tranId = pConn->tranId; pHead->tranId = pConn->tranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
@ -1346,18 +1480,16 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
msgLen = rpcAddAuthPart(pConn, msg, msgLen); msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHead->msgType)) { if (rpcIsReq(pHead->msgType)) {
tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
pConn->info, TMSG_INFO(pHead->msgType), pConn->peerFqdn, pConn->peerPort, pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else { } else {
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType),
pConn->info, TMSG_INFO(pHead->msgType), pConn->peerIp, pConn->peerPort, pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
//tTrace("connection type is: %d", pConn->connType); // tTrace("connection type is: %d", pConn->connType);
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) { if (writtenLen != msgLen) {
@ -1369,7 +1501,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcReqContext *pContext = (SRpcReqContext *)param;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo * pRpc = pContext->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
if (pRpc == NULL) { if (pRpc == NULL) {
@ -1379,7 +1511,7 @@ static void rpcProcessConnError(void *param, void *id) {
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) { if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TDMT_VND_FETCH) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType + 1;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code; rpcMsg.code = pContext->code;
rpcMsg.pCont = NULL; rpcMsg.pCont = NULL;
@ -1411,7 +1543,8 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl);
} else { } else {
// close the connection // close the connection
tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn, pConn->peerPort); tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, TMSG_INFO(pConn->outType), pConn->peerFqdn,
pConn->peerPort);
if (pConn->pContext) { if (pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pContext->pConn = NULL; pConn->pContext->pConn = NULL;
@ -1460,7 +1593,7 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { static int32_t rpcCompressRpcMsg(char *pCont, int32_t contLen) {
SRpcHead *pHead = rpcHeadFromCont(pCont); SRpcHead *pHead = rpcHeadFromCont(pCont);
int32_t finalLen = 0; int32_t finalLen = 0;
int overhead = sizeof(SRpcComp); int overhead = sizeof(SRpcComp);
@ -1469,7 +1602,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
return contLen; return contLen;
} }
char *buf = malloc (contLen + overhead + 8); // 8 extra bytes char *buf = malloc(contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) { if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen; return contLen;
@ -1502,7 +1635,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
int overhead = sizeof(SRpcComp); int overhead = sizeof(SRpcComp);
SRpcHead *pNewHead = NULL; SRpcHead *pNewHead = NULL;
uint8_t *pCont = pHead->content; uint8_t * pCont = pHead->content;
SRpcComp *pComp = (SRpcComp *)pHead->content; SRpcComp *pComp = (SRpcComp *)pHead->content;
if (pHead->comp) { if (pHead->comp) {
@ -1516,7 +1649,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
if (pNewHead) { if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char *)pNewHead->content, compLen, contLen); int origLen = LZ4_decompress_safe((char *)(pCont + overhead), (char *)pNewHead->content, compLen, contLen);
assert(origLen == contLen); assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead)); memcpy(pNewHead, pHead, sizeof(SRpcHead));
@ -1582,19 +1715,19 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg; SRpcHead *pHead = (SRpcHead *)msg;
int code = 0; int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
// secured link, or no authentication // secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, secured link, no auth is required", pConn->info); // tTrace("%s, secured link, no auth is required", pConn->info);
return 0; return 0;
} }
if ( !rpcIsReq(pHead->msgType) ) { if (!rpcIsReq(pHead->msgType)) {
// for response, if code is auth failure, it shall bypass the auth process // for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code); code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return 0; return 0;
@ -1613,12 +1746,12 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_RPC_INVALID_TIME_STAMP; code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
} else { } else {
if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { if (rpcAuthenticateMsg(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tDebug("%s, authentication failed, msg discarded", pConn->info); tDebug("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_RPC_AUTH_FAILURE; code = TSDB_CODE_RPC_AUTH_FAILURE;
} else { } else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client
// tTrace("%s, message is authenticated", pConn->info); // tTrace("%s, message is authenticated", pConn->info);
} }
} }
@ -1647,13 +1780,9 @@ static void rpcUnlockConn(SRpcConn *pConn) {
} }
} }
static void rpcAddRef(SRpcInfo *pRpc) static void rpcAddRef(SRpcInfo *pRpc) { atomic_add_fetch_32(&pRpc->refCount, 1); }
{
atomic_add_fetch_32(&pRpc->refCount, 1);
}
static void rpcDecRef(SRpcInfo *pRpc) static void rpcDecRef(SRpcInfo *pRpc) {
{
if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) { if (atomic_sub_fetch_32(&pRpc->refCount, 1) == 0) {
rpcCloseConnCache(pRpc->pCache); rpcCloseConnCache(pRpc->pCache);
taosHashCleanup(pRpc->hash); taosHashCleanup(pRpc->hash);
@ -1668,4 +1797,4 @@ static void rpcDecRef(SRpcInfo *pRpc)
atomic_sub_fetch_32(&tsRpcNum, 1); atomic_sub_fetch_32(&tsRpcNum, 1);
} }
} }
#endif

View File

@ -13,24 +13,28 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcTcp.h"
#include <uv.h>
#include "os.h" #include "os.h"
#include "tutil.h" #include "rpcHead.h"
#include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "tutil.h"
#include "rpcHead.h"
#include "rpcTcp.h"
#ifdef USE_UV
#else
typedef struct SFdObj { typedef struct SFdObj {
void *signature; void * signature;
SOCKET fd; // TCP socket FD SOCKET fd; // TCP socket FD
void *thandle; // handle from upper layer, like TAOS void * thandle; // handle from upper layer, like TAOS
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int16_t closedByApp; // 1: already closed by App int16_t closedByApp; // 1: already closed by App
struct SThreadObj *pThreadObj; struct SThreadObj *pThreadObj;
struct SFdObj *prev; struct SFdObj * prev;
struct SFdObj *next; struct SFdObj * next;
} SFdObj; } SFdObj;
typedef struct SThreadObj { typedef struct SThreadObj {
@ -43,7 +47,7 @@ typedef struct SThreadObj {
int numOfFds; int numOfFds;
int threadId; int threadId;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void *(*processData)(SRecvInfo *pPacket); void *(*processData)(SRecvInfo *pPacket);
} SThreadObj; } SThreadObj;
@ -67,11 +71,11 @@ typedef struct {
pthread_t thread; pthread_t thread;
} SServerObj; } SServerObj;
static void *taosProcessTcpData(void *param); static void * taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd);
static void taosFreeFdObj(SFdObj *pFdObj); static void taosFreeFdObj(SFdObj *pFdObj);
static void taosReportBrokenLink(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj);
static void *taosAcceptTcpConnection(void *arg); static void * taosAcceptTcpConnection(void *arg);
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) { void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
SServerObj *pServerObj; SServerObj *pServerObj;
@ -110,7 +114,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj); free(pServerObj->pThreadObj);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
@ -172,8 +176,10 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
return (void *)pServerObj; return (void *)pServerObj;
} }
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj *pThreadObj) {
if (pThreadObj == NULL) { return;} if (pThreadObj == NULL) {
return;
}
// save thread into local variable and signal thread to stop // save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread; pthread_t thread = pThreadObj->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
@ -227,8 +233,8 @@ static void *taosAcceptTcpConnection(void *arg) {
SOCKET connFd = -1; SOCKET connFd = -1;
struct sockaddr_in caddr; struct sockaddr_in caddr;
int threadId = 0; int threadId = 0;
SThreadObj *pThreadObj; SThreadObj * pThreadObj;
SServerObj *pServerObj; SServerObj * pServerObj;
pServerObj = (SServerObj *)arg; pServerObj = (SServerObj *)arg;
tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tDebug("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
@ -253,7 +259,7 @@ static void *taosAcceptTcpConnection(void *arg) {
} }
taosKeepTcpAlive(connFd); taosKeepTcpAlive(connFd);
struct timeval to={5, 0}; struct timeval to = {5, 0};
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) { if (ret != 0) {
taosCloseSocket(connFd); taosCloseSocket(connFd);
@ -262,7 +268,6 @@ static void *taosAcceptTcpConnection(void *arg) {
continue; continue;
} }
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId]; pThreadObj = pServerObj->pThreadObj[threadId];
@ -297,7 +302,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); tstrncpy(pClientObj->label, label, sizeof(pClientObj->label));
pClientObj->numOfThreads = numOfThreads; pClientObj->numOfThreads = numOfThreads;
pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj *));
if (pClientObj->pThreadObj == NULL) { if (pClientObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
tfree(pClientObj); tfree(pClientObj);
@ -314,7 +319,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pClientObj->pThreadObj[j]); for (int j = 0; j < i; ++j) free(pClientObj->pThreadObj[j]);
free(pClientObj); free(pClientObj);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
return NULL; return NULL;
@ -364,14 +369,14 @@ void taosStopTcpClient(void *chandle) {
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
tDebug ("%s TCP client is stopped", pClientObj->label); tDebug("%s TCP client is stopped", pClientObj->label);
} }
void taosCleanUpTcpClient(void *chandle) { void taosCleanUpTcpClient(void *chandle) {
SClientObj *pClientObj = chandle; SClientObj *pClientObj = chandle;
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
for (int i = 0; i < pClientObj->numOfThreads; ++i) { for (int i = 0; i < pClientObj->numOfThreads; ++i) {
SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; SThreadObj *pThreadObj = pClientObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
} }
@ -381,7 +386,7 @@ void taosCleanUpTcpClient(void *chandle) {
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SClientObj * pClientObj = shandle; SClientObj *pClientObj = shandle;
int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads;
atomic_store_32(&pClientObj->index, index + 1); atomic_store_32(&pClientObj->index, index + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
@ -396,8 +401,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
struct sockaddr_in sin; struct sockaddr_in sin;
uint16_t localPort = 0; uint16_t localPort = 0;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
sin.sin_family == AF_INET && addrlen == sizeof(sin)) {
localPort = (uint16_t)ntohs(sin.sin_port); localPort = (uint16_t)ntohs(sin.sin_port);
} }
@ -407,8 +411,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle,
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd); taosCloseSocket(fd);
@ -441,7 +445,6 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
} }
static void taosReportBrokenLink(SFdObj *pFdObj) { static void taosReportBrokenLink(SFdObj *pFdObj) {
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
// notify the upper layer, so it will clean the associated context // notify the upper layer, so it will clean the associated context
@ -466,7 +469,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
SRpcHead rpcHead; SRpcHead rpcHead;
int32_t msgLen, leftLen, retLen, headLen; int32_t msgLen, leftLen, retLen, headLen;
char *buffer, *msg; char * buffer, *msg;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
@ -483,7 +486,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, buffer); tTrace("%s %p read data, FD:%p fd:%d TCP malloc mem:%p", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd,
buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
@ -491,8 +495,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer); free(buffer);
return -1; return -1;
} }
@ -519,8 +522,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *taosProcessTcpData(void *param) {
SThreadObj *pThreadObj = param; SThreadObj * pThreadObj = param;
SFdObj *pFdObj; SFdObj * pFdObj;
struct epoll_event events[maxEvents]; struct epoll_event events[maxEvents];
SRecvInfo recvInfo; SRecvInfo recvInfo;
@ -569,7 +572,7 @@ static void *taosProcessTcpData(void *param) {
if (pThreadObj->stop) break; if (pThreadObj->stop) break;
} }
if (pThreadObj->pollFd >=0) { if (pThreadObj->pollFd >= 0) {
EpollClose(pThreadObj->pollFd); EpollClose(pThreadObj->pollFd);
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
} }
@ -620,7 +623,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
} }
static void taosFreeFdObj(SFdObj *pFdObj) { static void taosFreeFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return; if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return; if (pFdObj->signature != pFdObj) return;
@ -638,8 +640,8 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle,
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->threadId);
if (pFdObj->prev) { if (pFdObj->prev) {
(pFdObj->prev)->next = pFdObj->next; (pFdObj->prev)->next = pFdObj->next;
@ -653,8 +655,10 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex); pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj,
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); pFdObj->fd, pThreadObj->numOfFds);
tfree(pFdObj); tfree(pFdObj);
} }
#endif

View File

@ -13,15 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rpcUdp.h"
#include "os.h" #include "os.h"
#include "ttimer.h" #include "rpcHead.h"
#include "tutil.h" #include "rpcLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "ttimer.h"
#include "rpcUdp.h" #include "tutil.h"
#include "rpcHead.h"
#ifdef USE_UV
// no support upd currently
#else
#define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000 #define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_UDP_BUF_TIME 5 // mseconds
@ -34,9 +37,9 @@ typedef struct {
uint16_t localPort; // local port uint16_t localPort; // local port
char label[TSDB_LABEL_LEN]; // copy from udpConnSet; char label[TSDB_LABEL_LEN]; // copy from udpConnSet;
pthread_t thread; pthread_t thread;
void *hash; void * hash;
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
void *pSet; void * pSet;
void *(*processData)(SRecvInfo *pRecv); void *(*processData)(SRecvInfo *pRecv);
char *buffer; // buffer to receive data char *buffer; // buffer to receive data
} SUdpConn; } SUdpConn;
@ -46,7 +49,7 @@ typedef struct {
int server; int server;
uint32_t ip; // local IP uint32_t ip; // local IP
uint16_t port; // local Port uint16_t port; // local Port
void *shandle; // handle passed by upper layer during server initialization void * shandle; // handle passed by upper layer during server initialization
int threads; int threads;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
void *(*fp)(SRecvInfo *pPacket); void *(*fp)(SRecvInfo *pPacket);
@ -56,7 +59,7 @@ typedef struct {
static void *taosRecvUdpData(void *param); static void *taosRecvUdpData(void *param);
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) { void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
SUdpConn *pConn; SUdpConn * pConn;
SUdpConnSet *pSet; SUdpConnSet *pSet;
int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn); int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
@ -98,8 +101,8 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
struct sockaddr_in sin; struct sockaddr_in sin;
unsigned int addrlen = sizeof(sin); unsigned int addrlen = sizeof(sin);
if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
sin.sin_family == AF_INET && addrlen == sizeof(sin)) { addrlen == sizeof(sin)) {
pConn->localPort = (uint16_t)ntohs(sin.sin_port); pConn->localPort = (uint16_t)ntohs(sin.sin_port);
} }
@ -130,14 +133,14 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
void taosStopUdpConnection(void *handle) { void taosStopUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn *pConn; SUdpConn * pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR); if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
if (pConn->fd >=0) taosCloseSocket(pConn->fd); if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
pConn->fd = -1; pConn->fd = -1;
} }
@ -155,13 +158,13 @@ void taosStopUdpConnection(void *handle) {
void taosCleanUpUdpConnection(void *handle) { void taosCleanUpUdpConnection(void *handle) {
SUdpConnSet *pSet = (SUdpConnSet *)handle; SUdpConnSet *pSet = (SUdpConnSet *)handle;
SUdpConn *pConn; SUdpConn * pConn;
if (pSet == NULL) return; if (pSet == NULL) return;
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->fd >=0) taosCloseSocket(pConn->fd); if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
} }
tDebug("%s UDP is cleaned up", pSet->label); tDebug("%s UDP is cleaned up", pSet->label);
@ -182,7 +185,7 @@ void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t
} }
static void *taosRecvUdpData(void *param) { static void *taosRecvUdpData(void *param) {
SUdpConn *pConn = param; SUdpConn * pConn = param;
struct sockaddr_in sourceAdd; struct sockaddr_in sourceAdd;
ssize_t dataLen; ssize_t dataLen;
unsigned int addLen; unsigned int addLen;
@ -218,7 +221,7 @@ static void *taosRecvUdpData(void *param) {
} }
int32_t size = dataLen + tsRpcOverhead; int32_t size = dataLen + tsRpcOverhead;
char *tmsg = malloc(size); char * tmsg = malloc(size);
if (NULL == tmsg) { if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue; continue;
@ -257,4 +260,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret; return ret;
} }
#endif