From 5c6924e9e66f2f62ace4e57c731c77047974f0f1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Jan 2022 23:36:15 +0800 Subject: [PATCH 1/3] add libuv --- source/libs/transport/src/rpcMain.c | 91 +++++++++++++++++++---------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3095ddb9d2..542bde37b9 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#ifdef USE_UV #include -#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -78,12 +76,15 @@ typedef struct SThreadObj { } SThreadObj; typedef struct SServerObj { + pthread_t thread; uv_tcp_t server; uv_loop_t* loop; int workerIdx; int numOfThread; SThreadObj** pThreadObj; uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; } SServerObj; typedef struct SConnCtx { @@ -93,33 +94,31 @@ typedef struct SConnCtx { int ref; } SConnCtx; -static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); -static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void onWrite(uv_write_t* req, int status); -static void onAccept(uv_stream_t* stream, int status); -void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void workerAsyncCB(uv_async_t* handle); +static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); +static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void onWrite(uv_write_t* req, int status); +static void onAccept(uv_stream_t* stream, int status); +static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void workerAsyncCB(uv_async_t* handle); + static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); int32_t rpcInit() { return -1; } void rpcCleanup() { return; }; -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThread = pRpc->numOfThreads; + srv->numOfThread = numOfThreads; srv->workerIdx = 0; srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; uv_loop_init(srv->loop); for (int i = 0; i < srv->numOfThread; i++) { @@ -136,24 +135,34 @@ void* rpcOpen(const SRpcInit* pInit) { srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); if (err == 0) { - tError("sucess to create worker thread %d", i); + tDebug("sucess to create worker thread %d", i); // printf("thread %d create\n", i); } else { + // clear all resource later tError("failed to create worker thread %d", i); - return NULL; } } - uv_tcp_init(srv->loop, &srv->server); - struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept thread"); + } else { + // clear all resource later + } + + return srv; +} +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { return NULL; } - uv_run(srv->loop, UV_RUN_DEFAULT); + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } void rpcClose(void* arg) { return; } @@ -186,8 +195,11 @@ void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } void onWrite(uv_write_t* req, int status) { + if (status == 0) { + tDebug("data already was written on stream"); + } + // opt - if (req) tDebug("data already was written on stream"); } void workerAsyncCB(uv_async_t* handle) { @@ -207,7 +219,7 @@ void onAccept(uv_stream_t* stream, int status) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_buf_t buf = uv_buf_init("a", 1); - // despatch to worker thread + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); } else { @@ -257,6 +269,23 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } } +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + int port = 6030; + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} void* workerThread(void* arg) { SThreadObj* pObj = (SThreadObj*)arg; int fd = pObj->fd; From a00a8dd90d1cb19453b4b0ee7b06deed682d8407 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jan 2022 20:59:41 +0800 Subject: [PATCH 2/3] add libuv test --- source/libs/transport/CMakeLists.txt | 7 ++ source/libs/transport/inc/transportInt.h | 53 ++++++++++- source/libs/transport/src/rpcMain.c | 88 +++++++++++++------ source/libs/transport/test/CMakeLists.txt | 21 +++++ source/libs/transport/test/transportTests.cc | 35 ++++++++ source/libs/transport/test/transportTests.cpp | 0 6 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 source/libs/transport/test/CMakeLists.txt create mode 100644 source/libs/transport/test/transportTests.cc delete mode 100644 source/libs/transport/test/transportTests.cpp diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index c4eeef5df2..61d781210c 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -27,4 +27,11 @@ if (${BUILD_WITH_UV}) add_definitions(-DUSE_UV) endif(${BUILD_WITH_UV}) +if (${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) + + + + diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9809f7ee1a..067b371b84 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -22,9 +22,58 @@ extern "C" { #ifdef USE_UV -#else +#include +typedef void *queue[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + +#endif // USE_LIBUV -#endif #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 542bde37b9..818d129032 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -28,6 +28,7 @@ #include "tmd5.h" #include "tmempool.h" #include "tmsg.h" +#include "transportInt.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" @@ -68,11 +69,13 @@ typedef struct { #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_loop_t* loop; - uv_async_t* workerAsync; // - int fd; + pthread_t thread; + uv_pipe_t* pipe; + uv_loop_t* loop; + uv_async_t* workerAsync; // + int fd; + queue conn; + pthread_mutex_t connMtx; } SThreadObj; typedef struct SServerObj { @@ -88,10 +91,12 @@ typedef struct SServerObj { } SServerObj; typedef struct SConnCtx { - uv_tcp_t* pClient; + uv_tcp_t* pTcp; uv_timer_t* pTimer; uv_async_t* pWorkerAsync; + queue queue; int ref; + int persist; // persist connection or not } SConnCtx; static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -110,6 +115,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, int32_t rpcInit() { return -1; } void rpcCleanup() { return; }; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + // opte +} void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); @@ -122,30 +130,32 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); for (int i = 0; i < srv->numOfThread; i++) { - srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { return NULL; } + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - srv->pThreadObj[i]->fd = fds[0]; - srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { - tDebug("sucess to create worker thread %d", i); + tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); } else { // clear all resource later - tError("failed to create worker thread %d", i); + tError("failed to create worker-thread %d", i); } + srv->pThreadObj[i] = thrd; } int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); if (err == 0) { - tDebug("success to create accept thread"); + tDebug("success to create accept-thread"); } else { // clear all resource later } @@ -158,7 +168,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -198,29 +208,45 @@ void onWrite(uv_write_t* req, int status) { if (status == 0) { tDebug("data already was written on stream"); } + free(req); // opt } void workerAsyncCB(uv_async_t* handle) { - // opt SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); + SConnCtx* conn = NULL; + + // opt later + pthread_mutex_lock(&pObj->connMtx); + if (!QUEUE_IS_EMPTY(&pObj->conn)) { + queue* head = QUEUE_HEAD(&pObj->conn); + conn = QUEUE_DATA(head, SConnCtx, queue); + QUEUE_REMOVE(&conn->queue); + } + pthread_mutex_unlock(&pObj->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } } + void onAccept(uv_stream_t* stream, int status) { if (status == -1) { return; } SServerObj* pObj = container_of(stream, SServerObj, server); - tDebug("new conntion accepted by main server, dispatch to one worker thread"); uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pObj->loop, cli); + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_buf_t buf = uv_buf_init("a", 1); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); } else { uv_close((uv_handle_t*)cli, NULL); @@ -250,21 +276,21 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pObj->loop, pConn->pTimer); - pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pWorkerAsync = pObj->workerAsync; // thread safty - uv_tcp_init(pObj->loop, pConn->pClient); + uv_tcp_init(pObj->loop, pConn->pTcp); - if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) { + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; - uv_fileno((const uv_handle_t*)pConn->pClient, &fd); + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); uv_timer_start(pConn->pTimer, onTimeout, 10, 0); - uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead); } else { uv_timer_stop(pConn->pTimer); free(pConn->pTimer); - uv_close((uv_handle_t*)pConn->pClient, NULL); - free(pConn->pClient); + uv_close((uv_handle_t*)pConn->pTcp, NULL); + free(pConn->pTcp); free(pConn); } } @@ -276,7 +302,6 @@ void* acceptThread(void* arg) { struct sockaddr_in bind_addr; - int port = 6030; uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); int err = 0; @@ -288,16 +313,22 @@ void* acceptThread(void* arg) { } void* workerThread(void* arg) { SThreadObj* pObj = (SThreadObj*)arg; - int fd = pObj->fd; + pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pObj->loop); uv_pipe_init(pObj->loop, pObj->pipe, 1); - uv_pipe_open(pObj->pipe, fd); + uv_pipe_open(pObj->pipe, pObj->fd); + + QUEUE_INIT(&pObj->conn); pObj->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); + + // pObj->workerAsync->data = (void*)pObj; + uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); + uv_run(pObj->loop, UV_RUN_DEFAULT); } #else @@ -471,7 +502,8 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt new file mode 100644 index 0000000000..9e58bf08cd --- /dev/null +++ b/source/libs/transport/test/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(transportTest "") +target_sources(transportTest + PRIVATE + "transportTests.cc" +) + +target_include_directories(transportTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries (transportTest + os + util + common + gtest_main + transport +) + + diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc new file mode 100644 index 0000000000..468aeba8a9 --- /dev/null +++ b/source/libs/transport/test/transportTests.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include + +#include "transportInt.h" +#include "trpc.h" + +using namespace std; + +int main() { + SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5}; + void* p = rpcOpen(&init); + + while (1) { + std::cout << "cron task" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); + } +} diff --git a/source/libs/transport/test/transportTests.cpp b/source/libs/transport/test/transportTests.cpp deleted file mode 100644 index e69de29bb2..0000000000 From 41f600698c955dedcbc9dcd26acaf7b2ca044b1d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jan 2022 22:49:05 +0800 Subject: [PATCH 3/3] add libuv --- source/libs/transport/inc/rpcHead.h | 21 ++++ source/libs/transport/src/rpcMain.c | 182 +++++++++++++++++++++++----- 2 files changed, 175 insertions(+), 28 deletions(-) diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 7317d84af1..66821db133 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -22,6 +22,27 @@ extern "C" { #endif #ifdef USE_UV +typedef struct { + char version : 4; // RPC version + char comp : 4; // compression algorithm, 0:no compression 1:lz4 + char resflag : 2; // reserved bits + char spi : 3; // security parameter index + char encrypt : 3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + uint32_t destIp; // destination IP address, for NAT scenario + char user[TSDB_UNI_LEN]; // user ID + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint16_t msgType; // message type + int32_t msgLen; // message length including the header iteslf + uint32_t msgVer; + int32_t code; // code in response message + uint8_t content[0]; // message body starts from here +} SRpcHead; #else diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 818d129032..a1c0c05fc3 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,7 +13,9 @@ * along with this program. If not, see . */ +#ifdef USE_UV #include +#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -68,6 +70,8 @@ typedef struct { #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +static const char* notify = "a"; + typedef struct SThreadObj { pthread_t thread; uv_pipe_t* pipe; @@ -90,23 +94,39 @@ typedef struct SServerObj { uint32_t port; } SServerObj; +typedef struct SContent { + char* buf; + int len; + int cap; + int toRead; +} SContent; + typedef struct SConnCtx { uv_tcp_t* pTcp; + uv_write_t* pWriter; uv_timer_t* pTimer; + uv_async_t* pWorkerAsync; queue queue; int ref; int persist; // persist connection or not + SContent pCont; + int count; } SConnCtx; -static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); +static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); static void onWrite(uv_write_t* req, int status); static void onAccept(uv_stream_t* stream, int status); static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void workerAsyncCB(uv_async_t* handle); +static SConnCtx* connCtxCreate(); +static void connCtxDestroy(SConnCtx* ctx); +static void uvConnCtxDestroy(uv_handle_t* handle); + static void* workerThread(void* arg); static void* acceptThread(void* arg); @@ -131,12 +151,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThread; i++) { SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { return NULL; } - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write @@ -147,7 +166,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); } else { - // clear all resource later + // TODO: clear all other resource later tError("failed to create worker-thread %d", i); } srv->pThreadObj[i] = thrd; @@ -171,7 +190,6 @@ void* rpcOpen(const SRpcInit* pInit) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } @@ -190,26 +208,106 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(suggested_size); - buf->len = suggested_size; +void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + static const int CAPACITY = 1024; + tDebug("pre alloc buffer for read "); + SConnCtx* ctx = handle->data; + SContent* pCont = &ctx->pCont; + if (pCont->cap == 0) { + pCont->buf = (char*)calloc(CAPACITY, sizeof(char)); + pCont->len = 0; + pCont->cap = CAPACITY; + pCont->toRead = -1; + + buf->base = pCont->buf; + buf->len = CAPACITY; + } else { + if (pCont->len >= pCont->cap) { + if (pCont->toRead == -1) { + pCont->cap *= 2; + pCont->buf = realloc(pCont->buf, pCont->cap); + } else if (pCont->len + pCont->toRead > pCont->cap) { + pCont->cap = pCont->len + pCont->toRead; + pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead); + } + } + buf->base = pCont->buf + pCont->len; + buf->len = pCont->cap - pCont->len; + } + + // if (ctx->pCont.cap == 0) { + // ctx->pCont.buf = (char*)calloc(64, sizeof(char)); + // ctx->pCont.len = 0; + // ctx->pCont.cap = 64; + // // + // buf->base = ctx->pCont.buf; + // buf->len = sz; + //} else { + // if (ctx->pCont.len + sz > ctx->pCont.cap) { + // ctx->pCont.cap *= 2; + // ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap); + // } + // buf->base = ctx->pCont.buf + ctx->pCont.len; + // buf->len = sz; + //} +} +// change later +static bool handleUserData(SContent* data) { + SRpcHead rpcHead; + + bool finish = false; + int32_t msgLen, leftLen, retLen; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf, headLen); + msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen + headLen <= data->len) { + return true; + } else { + return false; + } + } else { + return false; + } +} + +void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + SConnCtx* ctx = cli->data; + SContent* pCont = &ctx->pCont; + if (nread > 0) { + pCont->len += nread; + bool finish = handleUserData(pCont); + if (finish == false) { + tDebug("continue read"); + } else { + tDebug("read completely"); + } + return; + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnCtxDestroy); +} +void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; } void onTimeout(uv_timer_t* handle) { // opt tDebug("time out"); } -void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - tDebug("data already was read on a stream"); -} void onWrite(uv_write_t* req, int status) { + SConnCtx* ctx = req->data; if (status == 0) { tDebug("data already was written on stream"); + } else { + connCtxDestroy(ctx); } - free(req); - // opt } @@ -243,7 +341,7 @@ void onAccept(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); - uv_buf_t buf = uv_buf_init("a", 1); + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); @@ -253,6 +351,7 @@ void onAccept(uv_stream_t* stream, int status) { } } void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -261,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_close((uv_handle_t*)q, NULL); return; } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); uv_pipe_t* pipe = (uv_pipe_t*)q; @@ -268,30 +372,33 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("No pending count"); return; } + uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SConnCtx* pConn = malloc(sizeof(SConnCtx)); + SConnCtx* pConn = connCtxCreate(); /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pObj->loop, pConn->pTimer); - pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pWorkerAsync = pObj->workerAsync; // thread safty + + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pObj->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); - uv_timer_start(pConn->pTimer, onTimeout, 10, 0); - uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead); } else { - uv_timer_stop(pConn->pTimer); - free(pConn->pTimer); - uv_close((uv_handle_t*)pConn->pTcp, NULL); - free(pConn->pTcp); - free(pConn); + connCtxDestroy(pConn); } } @@ -325,11 +432,30 @@ void* workerThread(void* arg) { pObj->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); - // pObj->workerAsync->data = (void*)pObj; - - uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); + uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection); uv_run(pObj->loop, UV_RUN_DEFAULT); } +static SConnCtx* connCtxCreate() { + SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx)); + return pConn; +} +static void connCtxDestroy(SConnCtx* ctx) { + if (ctx == NULL) { + return; + } + uv_timer_stop(ctx->pTimer); + free(ctx->pTimer); + uv_close((uv_handle_t*)ctx->pTcp, NULL); + free(ctx->pTcp); + free(ctx->pWriter); + free(ctx); + // handle +} +static void uvConnCtxDestroy(uv_handle_t* handle) { + SConnCtx* ctx = handle->data; + connCtxDestroy(ctx); +} + #else #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))