diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3095ddb9d2..542bde37b9 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#ifdef USE_UV #include -#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -78,12 +76,15 @@ typedef struct SThreadObj { } SThreadObj; typedef struct SServerObj { + pthread_t thread; uv_tcp_t server; uv_loop_t* loop; int workerIdx; int numOfThread; SThreadObj** pThreadObj; uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; } SServerObj; typedef struct SConnCtx { @@ -93,33 +94,31 @@ typedef struct SConnCtx { int ref; } SConnCtx; -static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); -static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void onWrite(uv_write_t* req, int status); -static void onAccept(uv_stream_t* stream, int status); -void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void workerAsyncCB(uv_async_t* handle); +static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); +static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void onWrite(uv_write_t* req, int status); +static void onAccept(uv_stream_t* stream, int status); +static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void workerAsyncCB(uv_async_t* handle); + static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); int32_t rpcInit() { return -1; } void rpcCleanup() { return; }; -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThread = pRpc->numOfThreads; + srv->numOfThread = numOfThreads; srv->workerIdx = 0; srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; uv_loop_init(srv->loop); for (int i = 0; i < srv->numOfThread; i++) { @@ -136,24 +135,34 @@ void* rpcOpen(const SRpcInit* pInit) { srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); if (err == 0) { - tError("sucess to create worker thread %d", i); + tDebug("sucess to create worker thread %d", i); // printf("thread %d create\n", i); } else { + // clear all resource later tError("failed to create worker thread %d", i); - return NULL; } } - uv_tcp_init(srv->loop, &srv->server); - struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept thread"); + } else { + // clear all resource later + } + + return srv; +} +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { return NULL; } - uv_run(srv->loop, UV_RUN_DEFAULT); + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } void rpcClose(void* arg) { return; } @@ -186,8 +195,11 @@ void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } void onWrite(uv_write_t* req, int status) { + if (status == 0) { + tDebug("data already was written on stream"); + } + // opt - if (req) tDebug("data already was written on stream"); } void workerAsyncCB(uv_async_t* handle) { @@ -207,7 +219,7 @@ void onAccept(uv_stream_t* stream, int status) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_buf_t buf = uv_buf_init("a", 1); - // despatch to worker thread + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); } else { @@ -257,6 +269,23 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } } +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + int port = 6030; + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} void* workerThread(void* arg) { SThreadObj* pObj = (SThreadObj*)arg; int fd = pObj->fd;