add libuv
This commit is contained in:
parent
6d10ef5e69
commit
5c6924e9e6
|
@ -13,9 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifdef USE_UV
|
|
||||||
#include <uv.h>
|
#include <uv.h>
|
||||||
#endif
|
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "rpcCache.h"
|
#include "rpcCache.h"
|
||||||
|
@ -78,12 +76,15 @@ typedef struct SThreadObj {
|
||||||
} SThreadObj;
|
} SThreadObj;
|
||||||
|
|
||||||
typedef struct SServerObj {
|
typedef struct SServerObj {
|
||||||
|
pthread_t thread;
|
||||||
uv_tcp_t server;
|
uv_tcp_t server;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
int workerIdx;
|
int workerIdx;
|
||||||
int numOfThread;
|
int numOfThread;
|
||||||
SThreadObj** pThreadObj;
|
SThreadObj** pThreadObj;
|
||||||
uv_pipe_t** pipe;
|
uv_pipe_t** pipe;
|
||||||
|
uint32_t ip;
|
||||||
|
uint32_t port;
|
||||||
} SServerObj;
|
} SServerObj;
|
||||||
|
|
||||||
typedef struct SConnCtx {
|
typedef struct SConnCtx {
|
||||||
|
@ -93,33 +94,31 @@ typedef struct SConnCtx {
|
||||||
int ref;
|
int ref;
|
||||||
} SConnCtx;
|
} SConnCtx;
|
||||||
|
|
||||||
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
|
||||||
static void onTimeout(uv_timer_t* handle);
|
static void onTimeout(uv_timer_t* handle);
|
||||||
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void onWrite(uv_write_t* req, int status);
|
static void onWrite(uv_write_t* req, int status);
|
||||||
static void onAccept(uv_stream_t* stream, int status);
|
static void onAccept(uv_stream_t* stream, int status);
|
||||||
void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
|
||||||
static void workerAsyncCB(uv_async_t* handle);
|
static void workerAsyncCB(uv_async_t* handle);
|
||||||
|
|
||||||
static void* workerThread(void* arg);
|
static void* workerThread(void* arg);
|
||||||
|
static void* acceptThread(void* arg);
|
||||||
|
|
||||||
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||||
|
|
||||||
int32_t rpcInit() { return -1; }
|
int32_t rpcInit() { return -1; }
|
||||||
void rpcCleanup() { return; };
|
void rpcCleanup() { return; };
|
||||||
void* rpcOpen(const SRpcInit* pInit) {
|
|
||||||
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
|
||||||
if (pRpc == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
if (pInit->label) {
|
|
||||||
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
|
||||||
}
|
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
|
||||||
|
|
||||||
|
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
|
||||||
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
SServerObj* srv = calloc(1, sizeof(SServerObj));
|
||||||
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
|
||||||
srv->numOfThread = pRpc->numOfThreads;
|
srv->numOfThread = numOfThreads;
|
||||||
srv->workerIdx = 0;
|
srv->workerIdx = 0;
|
||||||
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
|
||||||
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
|
||||||
|
srv->ip = ip;
|
||||||
|
srv->port = port;
|
||||||
uv_loop_init(srv->loop);
|
uv_loop_init(srv->loop);
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThread; i++) {
|
for (int i = 0; i < srv->numOfThread; i++) {
|
||||||
|
@ -136,24 +135,34 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read
|
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read
|
||||||
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i]));
|
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i]));
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
tError("sucess to create worker thread %d", i);
|
tDebug("sucess to create worker thread %d", i);
|
||||||
// printf("thread %d create\n", i);
|
// printf("thread %d create\n", i);
|
||||||
} else {
|
} else {
|
||||||
|
// clear all resource later
|
||||||
tError("failed to create worker thread %d", i);
|
tError("failed to create worker thread %d", i);
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
uv_tcp_init(srv->loop, &srv->server);
|
|
||||||
struct sockaddr_in bind_addr;
|
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
|
||||||
uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr);
|
if (err == 0) {
|
||||||
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
tDebug("success to create accept thread");
|
||||||
int err = 0;
|
} else {
|
||||||
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
|
// clear all resource later
|
||||||
tError("Listen error %s\n", uv_err_name(err));
|
}
|
||||||
|
|
||||||
|
return srv;
|
||||||
|
}
|
||||||
|
void* rpcOpen(const SRpcInit* pInit) {
|
||||||
|
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
|
||||||
|
if (pRpc == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
uv_run(srv->loop, UV_RUN_DEFAULT);
|
if (pInit->label) {
|
||||||
|
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
||||||
|
}
|
||||||
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
|
|
||||||
|
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||||
return pRpc;
|
return pRpc;
|
||||||
}
|
}
|
||||||
void rpcClose(void* arg) { return; }
|
void rpcClose(void* arg) { return; }
|
||||||
|
@ -186,8 +195,11 @@ void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void onWrite(uv_write_t* req, int status) {
|
void onWrite(uv_write_t* req, int status) {
|
||||||
|
if (status == 0) {
|
||||||
|
tDebug("data already was written on stream");
|
||||||
|
}
|
||||||
|
|
||||||
// opt
|
// opt
|
||||||
if (req) tDebug("data already was written on stream");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void workerAsyncCB(uv_async_t* handle) {
|
void workerAsyncCB(uv_async_t* handle) {
|
||||||
|
@ -207,7 +219,7 @@ void onAccept(uv_stream_t* stream, int status) {
|
||||||
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
|
||||||
|
|
||||||
uv_buf_t buf = uv_buf_init("a", 1);
|
uv_buf_t buf = uv_buf_init("a", 1);
|
||||||
// despatch to worker thread
|
|
||||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
|
||||||
} else {
|
} else {
|
||||||
|
@ -257,6 +269,23 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* acceptThread(void* arg) {
|
||||||
|
// opt
|
||||||
|
SServerObj* srv = (SServerObj*)arg;
|
||||||
|
uv_tcp_init(srv->loop, &srv->server);
|
||||||
|
|
||||||
|
struct sockaddr_in bind_addr;
|
||||||
|
|
||||||
|
int port = 6030;
|
||||||
|
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
|
||||||
|
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
|
||||||
|
int err = 0;
|
||||||
|
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) {
|
||||||
|
tError("Listen error %s\n", uv_err_name(err));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
}
|
||||||
void* workerThread(void* arg) {
|
void* workerThread(void* arg) {
|
||||||
SThreadObj* pObj = (SThreadObj*)arg;
|
SThreadObj* pObj = (SThreadObj*)arg;
|
||||||
int fd = pObj->fd;
|
int fd = pObj->fd;
|
||||||
|
|
Loading…
Reference in New Issue