add libuv
This commit is contained in:
parent
b945000779
commit
dde3dfd7bd
|
@ -33,28 +33,40 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int sessions; // number of sessions allowed
|
||||||
|
int numOfThreads; // number of threads to process incoming messages
|
||||||
|
int idleTime; // milliseconds;
|
||||||
|
uint16_t localPort;
|
||||||
|
int8_t connType;
|
||||||
|
int index; // for UDP server only, round robin for multiple threads
|
||||||
|
char label[TSDB_LABEL_LEN];
|
||||||
|
|
||||||
|
char user[TSDB_UNI_LEN]; // meter ID
|
||||||
|
char spi; // security parameter index
|
||||||
|
char encrypt; // encrypt algorithm
|
||||||
|
char secret[TSDB_PASSWORD_LEN]; // secret for the link
|
||||||
|
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
||||||
|
|
||||||
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
|
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
|
||||||
|
|
||||||
|
int32_t refCount;
|
||||||
|
void* parent;
|
||||||
|
void* idPool; // handle to ID pool
|
||||||
|
void* tmrCtrl; // handle to timer
|
||||||
|
SHashObj* hash; // handle returned by hash utility
|
||||||
|
void* tcphandle; // returned handle from TCP initialization
|
||||||
|
void* udphandle; // returned handle from UDP initialization
|
||||||
|
void* pCache; // connection cache
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
struct SRpcConn* connList; // connection list
|
||||||
|
} SRpcInfo;
|
||||||
|
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
|
|
||||||
int32_t rpcInit() { return -1; }
|
|
||||||
void rpcCleanup() { return; };
|
|
||||||
void* rpcOpen(const SRpcInit* pRpc) { return NULL; }
|
|
||||||
void rpcClose(void* arg) { return; }
|
|
||||||
void* rpcMallocCont(int contLen) { return NULL; }
|
|
||||||
void rpcFreeCont(void* cont) { return; }
|
|
||||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
|
||||||
|
|
||||||
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
|
|
||||||
|
|
||||||
void rpcSendResponse(const SRpcMsg* pMsg) {}
|
|
||||||
|
|
||||||
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
|
|
||||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
|
|
||||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
|
|
||||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
|
||||||
void rpcCancelRequest(int64_t rid) { return; }
|
|
||||||
|
|
||||||
typedef struct SThreadObj {
|
typedef struct SThreadObj {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
uv_pipe_t* pipe;
|
uv_pipe_t* pipe;
|
||||||
|
@ -79,6 +91,84 @@ typedef struct SConnCtx {
|
||||||
int ref;
|
int ref;
|
||||||
} SConnCtx;
|
} SConnCtx;
|
||||||
|
|
||||||
|
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
|
static void onTimeout(uv_timer_t* handle);
|
||||||
|
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
|
static void onWrite(uv_write_t* req, int status);
|
||||||
|
static void onAccept(uv_stream_t* stream, int status);
|
||||||
|
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||||
|
static void workerAsyncCB(uv_async_t* handle);
|
||||||
|
static void* workerThread(void* arg);
|
||||||
|
|
||||||
|
int32_t rpcInit() { return -1; }
|
||||||
|
void rpcCleanup() { return; };
|
||||||
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
|
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||||
|
if (pRpc == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (pInit->label) {
|
||||||
|
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
||||||
|
}
|
||||||
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
|
||||||
|
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
||||||
|
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
|
srv->numOfThread = pRpc->numOfThreads;
|
||||||
|
srv->workerIdx = 0;
|
||||||
|
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
||||||
|
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
||||||
|
uv_loop_init(srv->loop);
|
||||||
|
|
||||||
|
for (int i = 0; i < srv->numOfThread; i++) {
|
||||||
|
srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj));
|
||||||
|
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||||
|
int fds[2];
|
||||||
|
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||||
|
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
||||||
|
|
||||||
|
srv->pThreadObj[i]->fd = fds[0];
|
||||||
|
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read
|
||||||
|
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i]));
|
||||||
|
if (err == 0) {
|
||||||
|
tError("sucess to create worker thread %d", i);
|
||||||
|
// printf("thread %d create\n", i);
|
||||||
|
} else {
|
||||||
|
tError("failed to create worker thread %d", i);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
uv_tcp_init(srv->loop, &srv->server);
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
|
uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr);
|
||||||
|
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
||||||
|
int err = 0;
|
||||||
|
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
|
||||||
|
tError("Listen error %s\n", uv_err_name(err));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
|
||||||
|
return pRpc;
|
||||||
|
}
|
||||||
|
void rpcClose(void* arg) { return; }
|
||||||
|
void* rpcMallocCont(int contLen) { return NULL; }
|
||||||
|
void rpcFreeCont(void* cont) { return; }
|
||||||
|
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
||||||
|
|
||||||
|
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
|
||||||
|
|
||||||
|
void rpcSendResponse(const SRpcMsg* pMsg) {}
|
||||||
|
|
||||||
|
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
|
||||||
|
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
|
||||||
|
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
|
||||||
|
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||||
|
void rpcCancelRequest(int64_t rid) { return; }
|
||||||
|
|
||||||
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
buf->base = malloc(suggested_size);
|
buf->base = malloc(suggested_size);
|
||||||
buf->len = suggested_size;
|
buf->len = suggested_size;
|
||||||
|
@ -98,7 +188,7 @@ void onWrite(uv_write_t* req, int status) {
|
||||||
if (req) tDebug("data already was written on stream");
|
if (req) tDebug("data already was written on stream");
|
||||||
}
|
}
|
||||||
|
|
||||||
static void workerAsyncCB(uv_async_t* handle) {
|
void workerAsyncCB(uv_async_t* handle) {
|
||||||
// opt
|
// opt
|
||||||
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
|
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
|
||||||
}
|
}
|
||||||
|
@ -187,36 +277,6 @@ void* workerThread(void* arg) {
|
||||||
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
|
||||||
#define rpcIsReq(type) (type & 1U)
|
#define rpcIsReq(type) (type & 1U)
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int sessions; // number of sessions allowed
|
|
||||||
int numOfThreads; // number of threads to process incoming messages
|
|
||||||
int idleTime; // milliseconds;
|
|
||||||
uint16_t localPort;
|
|
||||||
int8_t connType;
|
|
||||||
int index; // for UDP server only, round robin for multiple threads
|
|
||||||
char label[TSDB_LABEL_LEN];
|
|
||||||
|
|
||||||
char user[TSDB_UNI_LEN]; // meter ID
|
|
||||||
char spi; // security parameter index
|
|
||||||
char encrypt; // encrypt algorithm
|
|
||||||
char secret[TSDB_PASSWORD_LEN]; // secret for the link
|
|
||||||
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
|
|
||||||
|
|
||||||
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
|
|
||||||
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
|
||||||
|
|
||||||
int32_t refCount;
|
|
||||||
void * parent;
|
|
||||||
void * idPool; // handle to ID pool
|
|
||||||
void * tmrCtrl; // handle to timer
|
|
||||||
SHashObj * hash; // handle returned by hash utility
|
|
||||||
void * tcphandle; // returned handle from TCP initialization
|
|
||||||
void * udphandle; // returned handle from UDP initialization
|
|
||||||
void * pCache; // connection cache
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
struct SRpcConn *connList; // connection list
|
|
||||||
} SRpcInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SRpcInfo * pRpc; // associated SRpcInfo
|
SRpcInfo * pRpc; // associated SRpcInfo
|
||||||
SEpSet epSet; // ip list provided by app
|
SEpSet epSet; // ip list provided by app
|
||||||
|
|
Loading…
Reference in New Issue