commit
f639851ef1
|
@ -27,4 +27,11 @@ if (${BUILD_WITH_UV})
|
||||||
add_definitions(-DUSE_UV)
|
add_definitions(-DUSE_UV)
|
||||||
endif(${BUILD_WITH_UV})
|
endif(${BUILD_WITH_UV})
|
||||||
|
|
||||||
|
if (${BUILD_TEST})
|
||||||
|
add_subdirectory(test)
|
||||||
|
endif(${BUILD_TEST})
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,27 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
|
typedef struct {
|
||||||
|
char version : 4; // RPC version
|
||||||
|
char comp : 4; // compression algorithm, 0:no compression 1:lz4
|
||||||
|
char resflag : 2; // reserved bits
|
||||||
|
char spi : 3; // security parameter index
|
||||||
|
char encrypt : 3; // encrypt algorithm, 0: no encryption
|
||||||
|
uint16_t tranId; // transcation ID
|
||||||
|
uint32_t linkUid; // for unique connection ID assigned by client
|
||||||
|
uint64_t ahandle; // ahandle assigned by client
|
||||||
|
uint32_t sourceId; // source ID, an index for connection list
|
||||||
|
uint32_t destId; // destination ID, an index for connection list
|
||||||
|
uint32_t destIp; // destination IP address, for NAT scenario
|
||||||
|
char user[TSDB_UNI_LEN]; // user ID
|
||||||
|
uint16_t port; // for UDP only, port may be changed
|
||||||
|
char empty[1]; // reserved
|
||||||
|
uint16_t msgType; // message type
|
||||||
|
int32_t msgLen; // message length including the header iteslf
|
||||||
|
uint32_t msgVer;
|
||||||
|
int32_t code; // code in response message
|
||||||
|
uint8_t content[0]; // message body starts from here
|
||||||
|
} SRpcHead;
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
|
|
|
@ -22,9 +22,58 @@ extern "C" {
|
||||||
|
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
|
|
||||||
#else
|
#include <stddef.h>
|
||||||
|
typedef void *queue[2];
|
||||||
|
|
||||||
|
/* Private macros. */
|
||||||
|
#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
|
||||||
|
#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
|
||||||
|
|
||||||
|
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
|
||||||
|
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
|
||||||
|
|
||||||
|
/* Initialize an empty queue. */
|
||||||
|
#define QUEUE_INIT(q) \
|
||||||
|
{ \
|
||||||
|
QUEUE_NEXT(q) = (q); \
|
||||||
|
QUEUE_PREV(q) = (q); \
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return true if the queue has no element. */
|
||||||
|
#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
|
||||||
|
|
||||||
|
/* Insert an element at the back of a queue. */
|
||||||
|
#define QUEUE_PUSH(q, e) \
|
||||||
|
{ \
|
||||||
|
QUEUE_NEXT(e) = (q); \
|
||||||
|
QUEUE_PREV(e) = QUEUE_PREV(q); \
|
||||||
|
QUEUE_PREV_NEXT(e) = (e); \
|
||||||
|
QUEUE_PREV(q) = (e); \
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Remove the given element from the queue. Any element can be removed at any *
|
||||||
|
* time. */
|
||||||
|
#define QUEUE_REMOVE(e) \
|
||||||
|
{ \
|
||||||
|
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
|
||||||
|
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return the element at the front of the queue. */
|
||||||
|
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
|
||||||
|
|
||||||
|
/* Return the element at the back of the queue. */
|
||||||
|
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
|
||||||
|
|
||||||
|
/* Iterate over the element of a queue. * Mutating the queue while iterating
|
||||||
|
* results in undefined behavior. */
|
||||||
|
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
|
||||||
|
|
||||||
|
/* Return the structure holding the given element. */
|
||||||
|
#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
|
#endif // USE_LIBUV
|
||||||
|
|
||||||
#endif
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
#include "tmd5.h"
|
#include "tmd5.h"
|
||||||
#include "tmempool.h"
|
#include "tmempool.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
#include "transportInt.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
@ -69,61 +70,87 @@ typedef struct {
|
||||||
|
|
||||||
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
|
||||||
|
|
||||||
|
static const char* notify = "a";
|
||||||
|
|
||||||
typedef struct SThreadObj {
|
typedef struct SThreadObj {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
uv_pipe_t* pipe;
|
uv_pipe_t* pipe;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
uv_async_t* workerAsync; //
|
uv_async_t* workerAsync; //
|
||||||
int fd;
|
int fd;
|
||||||
|
queue conn;
|
||||||
|
pthread_mutex_t connMtx;
|
||||||
} SThreadObj;
|
} SThreadObj;
|
||||||
|
|
||||||
typedef struct SServerObj {
|
typedef struct SServerObj {
|
||||||
|
pthread_t thread;
|
||||||
uv_tcp_t server;
|
uv_tcp_t server;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
int workerIdx;
|
int workerIdx;
|
||||||
int numOfThread;
|
int numOfThread;
|
||||||
SThreadObj** pThreadObj;
|
SThreadObj** pThreadObj;
|
||||||
uv_pipe_t** pipe;
|
uv_pipe_t** pipe;
|
||||||
|
uint32_t ip;
|
||||||
|
uint32_t port;
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
|
typedef struct SContent {
|
||||||
|
char* buf;
|
||||||
|
int len;
|
||||||
|
int cap;
|
||||||
|
int toRead;
|
||||||
|
} SContent;
|
||||||
|
|
||||||
typedef struct SConnCtx {
|
typedef struct SConnCtx {
|
||||||
uv_tcp_t* pClient;
|
uv_tcp_t* pTcp;
|
||||||
|
uv_write_t* pWriter;
|
||||||
uv_timer_t* pTimer;
|
uv_timer_t* pTimer;
|
||||||
|
|
||||||
uv_async_t* pWorkerAsync;
|
uv_async_t* pWorkerAsync;
|
||||||
|
queue queue;
|
||||||
int ref;
|
int ref;
|
||||||
|
int persist; // persist connection or not
|
||||||
|
SContent pCont;
|
||||||
|
int count;
|
||||||
} SConnCtx;
|
} SConnCtx;
|
||||||
|
|
||||||
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void onTimeout(uv_timer_t* handle);
|
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void onWrite(uv_write_t* req, int status);
|
static void onTimeout(uv_timer_t* handle);
|
||||||
static void onAccept(uv_stream_t* stream, int status);
|
static void onWrite(uv_write_t* req, int status);
|
||||||
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
static void onAccept(uv_stream_t* stream, int status);
|
||||||
static void workerAsyncCB(uv_async_t* handle);
|
static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||||
|
static void workerAsyncCB(uv_async_t* handle);
|
||||||
|
|
||||||
|
static SConnCtx* connCtxCreate();
|
||||||
|
static void connCtxDestroy(SConnCtx* ctx);
|
||||||
|
static void uvConnCtxDestroy(uv_handle_t* handle);
|
||||||
|
|
||||||
static void* workerThread(void* arg);
|
static void* workerThread(void* arg);
|
||||||
|
static void* acceptThread(void* arg);
|
||||||
|
|
||||||
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
||||||
int32_t rpcInit() { return -1; }
|
int32_t rpcInit() { return -1; }
|
||||||
void rpcCleanup() { return; };
|
void rpcCleanup() { return; };
|
||||||
void* rpcOpen(const SRpcInit* pInit) {
|
|
||||||
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
|
||||||
if (pRpc == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
if (pInit->label) {
|
|
||||||
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
|
||||||
}
|
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
|
||||||
|
|
||||||
|
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
|
// opte
|
||||||
|
}
|
||||||
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
||||||
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
srv->numOfThread = pRpc->numOfThreads;
|
srv->numOfThread = numOfThreads;
|
||||||
srv->workerIdx = 0;
|
srv->workerIdx = 0;
|
||||||
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
||||||
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
||||||
|
srv->ip = ip;
|
||||||
|
srv->port = port;
|
||||||
uv_loop_init(srv->loop);
|
uv_loop_init(srv->loop);
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThread; i++) {
|
for (int i = 0; i < srv->numOfThread; i++) {
|
||||||
srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj));
|
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
|
||||||
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
|
||||||
int fds[2];
|
int fds[2];
|
||||||
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
|
||||||
|
@ -132,28 +159,38 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
|
||||||
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
|
||||||
|
|
||||||
srv->pThreadObj[i]->fd = fds[0];
|
thrd->fd = fds[0];
|
||||||
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i]));
|
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tError("sucess to create worker thread %d", i);
|
tDebug("sucess to create worker-thread %d", i);
|
||||||
// printf("thread %d create\n", i);
|
// printf("thread %d create\n", i);
|
||||||
} else {
|
} else {
|
||||||
tError("failed to create worker thread %d", i);
|
// TODO: clear all other resource later
|
||||||
return NULL;
|
tError("failed to create worker-thread %d", i);
|
||||||
}
|
}
|
||||||
|
srv->pThreadObj[i] = thrd;
|
||||||
}
|
}
|
||||||
uv_tcp_init(srv->loop, &srv->server);
|
|
||||||
struct sockaddr_in bind_addr;
|
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
|
||||||
uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr);
|
if (err == 0) {
|
||||||
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
tDebug("success to create accept-thread");
|
||||||
int err = 0;
|
} else {
|
||||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
|
// clear all resource later
|
||||||
tError("Listen error %s\n", uv_err_name(err));
|
}
|
||||||
|
|
||||||
|
return srv;
|
||||||
|
}
|
||||||
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
|
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||||
|
if (pRpc == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
uv_run(srv->loop, UV_RUN_DEFAULT);
|
if (pInit->label) {
|
||||||
|
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||||
|
}
|
||||||
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
return pRpc;
|
return pRpc;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) { return; }
|
void rpcClose(void* arg) { return; }
|
||||||
|
@ -171,50 +208,150 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||||
void rpcCancelRequest(int64_t rid) { return; }
|
void rpcCancelRequest(int64_t rid) { return; }
|
||||||
|
|
||||||
void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
buf->base = malloc(suggested_size);
|
static const int CAPACITY = 1024;
|
||||||
buf->len = suggested_size;
|
tDebug("pre alloc buffer for read ");
|
||||||
|
SConnCtx* ctx = handle->data;
|
||||||
|
SContent* pCont = &ctx->pCont;
|
||||||
|
if (pCont->cap == 0) {
|
||||||
|
pCont->buf = (char*)calloc(CAPACITY, sizeof(char));
|
||||||
|
pCont->len = 0;
|
||||||
|
pCont->cap = CAPACITY;
|
||||||
|
pCont->toRead = -1;
|
||||||
|
|
||||||
|
buf->base = pCont->buf;
|
||||||
|
buf->len = CAPACITY;
|
||||||
|
} else {
|
||||||
|
if (pCont->len >= pCont->cap) {
|
||||||
|
if (pCont->toRead == -1) {
|
||||||
|
pCont->cap *= 2;
|
||||||
|
pCont->buf = realloc(pCont->buf, pCont->cap);
|
||||||
|
} else if (pCont->len + pCont->toRead > pCont->cap) {
|
||||||
|
pCont->cap = pCont->len + pCont->toRead;
|
||||||
|
pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
buf->base = pCont->buf + pCont->len;
|
||||||
|
buf->len = pCont->cap - pCont->len;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if (ctx->pCont.cap == 0) {
|
||||||
|
// ctx->pCont.buf = (char*)calloc(64, sizeof(char));
|
||||||
|
// ctx->pCont.len = 0;
|
||||||
|
// ctx->pCont.cap = 64;
|
||||||
|
// //
|
||||||
|
// buf->base = ctx->pCont.buf;
|
||||||
|
// buf->len = sz;
|
||||||
|
//} else {
|
||||||
|
// if (ctx->pCont.len + sz > ctx->pCont.cap) {
|
||||||
|
// ctx->pCont.cap *= 2;
|
||||||
|
// ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap);
|
||||||
|
// }
|
||||||
|
// buf->base = ctx->pCont.buf + ctx->pCont.len;
|
||||||
|
// buf->len = sz;
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
// change later
|
||||||
|
static bool handleUserData(SContent* data) {
|
||||||
|
SRpcHead rpcHead;
|
||||||
|
|
||||||
|
bool finish = false;
|
||||||
|
int32_t msgLen, leftLen, retLen;
|
||||||
|
int32_t headLen = sizeof(rpcHead);
|
||||||
|
if (data->len >= headLen) {
|
||||||
|
memcpy((char*)&rpcHead, data->buf, headLen);
|
||||||
|
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
||||||
|
if (msgLen + headLen <= data->len) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
// opt
|
||||||
|
SConnCtx* ctx = cli->data;
|
||||||
|
SContent* pCont = &ctx->pCont;
|
||||||
|
if (nread > 0) {
|
||||||
|
pCont->len += nread;
|
||||||
|
bool finish = handleUserData(pCont);
|
||||||
|
if (finish == false) {
|
||||||
|
tDebug("continue read");
|
||||||
|
} else {
|
||||||
|
tDebug("read completely");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nread != UV_EOF) {
|
||||||
|
tDebug("Read error %s\n", uv_err_name(nread));
|
||||||
|
}
|
||||||
|
uv_close((uv_handle_t*)cli, uvConnCtxDestroy);
|
||||||
|
}
|
||||||
|
void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
|
buf->base = malloc(sizeof(char));
|
||||||
|
buf->len = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
void onTimeout(uv_timer_t* handle) {
|
void onTimeout(uv_timer_t* handle) {
|
||||||
// opt
|
// opt
|
||||||
tDebug("time out");
|
tDebug("time out");
|
||||||
}
|
}
|
||||||
void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
|
||||||
// opt
|
|
||||||
tDebug("data already was read on a stream");
|
|
||||||
}
|
|
||||||
|
|
||||||
void onWrite(uv_write_t* req, int status) {
|
void onWrite(uv_write_t* req, int status) {
|
||||||
|
SConnCtx* ctx = req->data;
|
||||||
|
if (status == 0) {
|
||||||
|
tDebug("data already was written on stream");
|
||||||
|
} else {
|
||||||
|
connCtxDestroy(ctx);
|
||||||
|
}
|
||||||
// opt
|
// opt
|
||||||
if (req) tDebug("data already was written on stream");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void workerAsyncCB(uv_async_t* handle) {
|
void workerAsyncCB(uv_async_t* handle) {
|
||||||
// opt
|
|
||||||
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
|
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
|
||||||
|
SConnCtx* conn = NULL;
|
||||||
|
|
||||||
|
// opt later
|
||||||
|
pthread_mutex_lock(&pObj->connMtx);
|
||||||
|
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
|
||||||
|
queue* head = QUEUE_HEAD(&pObj->conn);
|
||||||
|
conn = QUEUE_DATA(head, SConnCtx, queue);
|
||||||
|
QUEUE_REMOVE(&conn->queue);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&pObj->connMtx);
|
||||||
|
if (conn == NULL) {
|
||||||
|
tError("except occurred, do nothing");
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void onAccept(uv_stream_t* stream, int status) {
|
void onAccept(uv_stream_t* stream, int status) {
|
||||||
if (status == -1) {
|
if (status == -1) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SServerObj* pObj = container_of(stream, SServerObj, server);
|
SServerObj* pObj = container_of(stream, SServerObj, server);
|
||||||
tDebug("new conntion accepted by main server, dispatch to one worker thread");
|
|
||||||
|
|
||||||
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
|
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
|
||||||
uv_tcp_init(pObj->loop, cli);
|
uv_tcp_init(pObj->loop, cli);
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
|
||||||
|
|
||||||
uv_buf_t buf = uv_buf_init("a", 1);
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
// despatch to worker thread
|
|
||||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
||||||
|
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
tDebug("connection coming");
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
if (nread != UV_EOF) {
|
if (nread != UV_EOF) {
|
||||||
tError("read error %s", uv_err_name(nread));
|
tError("read error %s", uv_err_name(nread));
|
||||||
|
@ -223,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
uv_close((uv_handle_t*)q, NULL);
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// free memory allocated by
|
||||||
|
assert(nread == strlen(notify));
|
||||||
|
assert(buf->base[0] == notify[0]);
|
||||||
|
free(buf->base);
|
||||||
|
|
||||||
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
|
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
|
||||||
|
|
||||||
uv_pipe_t* pipe = (uv_pipe_t*)q;
|
uv_pipe_t* pipe = (uv_pipe_t*)q;
|
||||||
|
@ -230,46 +372,90 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
tError("No pending count");
|
tError("No pending count");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
uv_handle_type pending = uv_pipe_pending_type(pipe);
|
||||||
assert(pending == UV_TCP);
|
assert(pending == UV_TCP);
|
||||||
|
|
||||||
SConnCtx* pConn = malloc(sizeof(SConnCtx));
|
SConnCtx* pConn = connCtxCreate();
|
||||||
/* init conn timer*/
|
/* init conn timer*/
|
||||||
pConn->pTimer = malloc(sizeof(uv_timer_t));
|
pConn->pTimer = malloc(sizeof(uv_timer_t));
|
||||||
uv_timer_init(pObj->loop, pConn->pTimer);
|
uv_timer_init(pObj->loop, pConn->pTimer);
|
||||||
|
|
||||||
pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
|
|
||||||
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
|
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
|
||||||
uv_tcp_init(pObj->loop, pConn->pClient);
|
|
||||||
|
|
||||||
if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) {
|
// init client handle
|
||||||
|
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
|
||||||
|
uv_tcp_init(pObj->loop, pConn->pTcp);
|
||||||
|
pConn->pTcp->data = pConn;
|
||||||
|
|
||||||
|
// init write request, just
|
||||||
|
pConn->pWriter = calloc(1, sizeof(uv_write_t));
|
||||||
|
pConn->pWriter->data = pConn;
|
||||||
|
|
||||||
|
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
uv_fileno((const uv_handle_t*)pConn->pClient, &fd);
|
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||||
tDebug("new connection created: %d", fd);
|
tDebug("new connection created: %d", fd);
|
||||||
uv_timer_start(pConn->pTimer, onTimeout, 10, 0);
|
uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead);
|
||||||
uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead);
|
|
||||||
} else {
|
} else {
|
||||||
uv_timer_stop(pConn->pTimer);
|
connCtxDestroy(pConn);
|
||||||
free(pConn->pTimer);
|
|
||||||
uv_close((uv_handle_t*)pConn->pClient, NULL);
|
|
||||||
free(pConn->pClient);
|
|
||||||
free(pConn);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* acceptThread(void* arg) {
|
||||||
|
// opt
|
||||||
|
SServerObj* srv = (SServerObj*)arg;
|
||||||
|
uv_tcp_init(srv->loop, &srv->server);
|
||||||
|
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
|
|
||||||
|
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
||||||
|
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
||||||
|
int err = 0;
|
||||||
|
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
|
||||||
|
tError("Listen error %s\n", uv_err_name(err));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
}
|
||||||
void* workerThread(void* arg) {
|
void* workerThread(void* arg) {
|
||||||
SThreadObj* pObj = (SThreadObj*)arg;
|
SThreadObj* pObj = (SThreadObj*)arg;
|
||||||
int fd = pObj->fd;
|
|
||||||
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
uv_loop_init(pObj->loop);
|
uv_loop_init(pObj->loop);
|
||||||
|
|
||||||
uv_pipe_init(pObj->loop, pObj->pipe, 1);
|
uv_pipe_init(pObj->loop, pObj->pipe, 1);
|
||||||
uv_pipe_open(pObj->pipe, fd);
|
uv_pipe_open(pObj->pipe, pObj->fd);
|
||||||
|
|
||||||
|
QUEUE_INIT(&pObj->conn);
|
||||||
|
|
||||||
pObj->workerAsync = malloc(sizeof(uv_async_t));
|
pObj->workerAsync = malloc(sizeof(uv_async_t));
|
||||||
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
|
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
|
||||||
uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection);
|
|
||||||
|
uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection);
|
||||||
|
uv_run(pObj->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
static SConnCtx* connCtxCreate() {
|
||||||
|
SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx));
|
||||||
|
return pConn;
|
||||||
|
}
|
||||||
|
static void connCtxDestroy(SConnCtx* ctx) {
|
||||||
|
if (ctx == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
uv_timer_stop(ctx->pTimer);
|
||||||
|
free(ctx->pTimer);
|
||||||
|
uv_close((uv_handle_t*)ctx->pTcp, NULL);
|
||||||
|
free(ctx->pTcp);
|
||||||
|
free(ctx->pWriter);
|
||||||
|
free(ctx);
|
||||||
|
// handle
|
||||||
|
}
|
||||||
|
static void uvConnCtxDestroy(uv_handle_t* handle) {
|
||||||
|
SConnCtx* ctx = handle->data;
|
||||||
|
connCtxDestroy(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
|
||||||
|
@ -442,7 +628,8 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
||||||
if (pRpc == NULL) return NULL;
|
if (pRpc == NULL) return NULL;
|
||||||
|
|
||||||
if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
|
||||||
|
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
if (pRpc->connType == TAOS_CONN_CLIENT) {
|
if (pRpc->connType == TAOS_CONN_CLIENT) {
|
||||||
pRpc->numOfThreads = pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads;
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
add_executable(transportTest "")
|
||||||
|
target_sources(transportTest
|
||||||
|
PRIVATE
|
||||||
|
"transportTests.cc"
|
||||||
|
)
|
||||||
|
|
||||||
|
target_include_directories(transportTest
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries (transportTest
|
||||||
|
os
|
||||||
|
util
|
||||||
|
common
|
||||||
|
gtest_main
|
||||||
|
transport
|
||||||
|
)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
|
||||||
|
* Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <chrono>
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
#include "transportInt.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5};
|
||||||
|
void* p = rpcOpen(&init);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
std::cout << "cron task" << std::endl;
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue