From a00a8dd90d1cb19453b4b0ee7b06deed682d8407 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jan 2022 20:59:41 +0800 Subject: [PATCH] add libuv test --- source/libs/transport/CMakeLists.txt | 7 ++ source/libs/transport/inc/transportInt.h | 53 ++++++++++- source/libs/transport/src/rpcMain.c | 88 +++++++++++++------ source/libs/transport/test/CMakeLists.txt | 21 +++++ source/libs/transport/test/transportTests.cc | 35 ++++++++ source/libs/transport/test/transportTests.cpp | 0 6 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 source/libs/transport/test/CMakeLists.txt create mode 100644 source/libs/transport/test/transportTests.cc delete mode 100644 source/libs/transport/test/transportTests.cpp diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index c4eeef5df2..61d781210c 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -27,4 +27,11 @@ if (${BUILD_WITH_UV}) add_definitions(-DUSE_UV) endif(${BUILD_WITH_UV}) +if (${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) + + + + diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9809f7ee1a..067b371b84 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -22,9 +22,58 @@ extern "C" { #ifdef USE_UV -#else +#include +typedef void *queue[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + +#endif // USE_LIBUV -#endif #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 542bde37b9..818d129032 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -28,6 +28,7 @@ #include "tmd5.h" #include "tmempool.h" #include "tmsg.h" +#include "transportInt.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" @@ -68,11 +69,13 @@ typedef struct { #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_loop_t* loop; - uv_async_t* workerAsync; // - int fd; + pthread_t thread; + uv_pipe_t* pipe; + uv_loop_t* loop; + uv_async_t* workerAsync; // + int fd; + queue conn; + pthread_mutex_t connMtx; } SThreadObj; typedef struct SServerObj { @@ -88,10 +91,12 @@ typedef struct SServerObj { } SServerObj; typedef struct SConnCtx { - uv_tcp_t* pClient; + uv_tcp_t* pTcp; uv_timer_t* pTimer; uv_async_t* pWorkerAsync; + queue queue; int ref; + int persist; // persist connection or not } SConnCtx; static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -110,6 +115,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, int32_t rpcInit() { return -1; } void rpcCleanup() { return; }; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + // opte +} void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); @@ -122,30 +130,32 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); for (int i = 0; i < srv->numOfThread; i++) { - srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { return NULL; } + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - srv->pThreadObj[i]->fd = fds[0]; - srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { - tDebug("sucess to create worker thread %d", i); + tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); } else { // clear all resource later - tError("failed to create worker thread %d", i); + tError("failed to create worker-thread %d", i); } + srv->pThreadObj[i] = thrd; } int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); if (err == 0) { - tDebug("success to create accept thread"); + tDebug("success to create accept-thread"); } else { // clear all resource later } @@ -158,7 +168,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; @@ -198,29 +208,45 @@ void onWrite(uv_write_t* req, int status) { if (status == 0) { tDebug("data already was written on stream"); } + free(req); // opt } void workerAsyncCB(uv_async_t* handle) { - // opt SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); + SConnCtx* conn = NULL; + + // opt later + pthread_mutex_lock(&pObj->connMtx); + if (!QUEUE_IS_EMPTY(&pObj->conn)) { + queue* head = QUEUE_HEAD(&pObj->conn); + conn = QUEUE_DATA(head, SConnCtx, queue); + QUEUE_REMOVE(&conn->queue); + } + pthread_mutex_unlock(&pObj->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } } + void onAccept(uv_stream_t* stream, int status) { if (status == -1) { return; } SServerObj* pObj = container_of(stream, SServerObj, server); - tDebug("new conntion accepted by main server, dispatch to one worker thread"); uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pObj->loop, cli); + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_buf_t buf = uv_buf_init("a", 1); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); } else { uv_close((uv_handle_t*)cli, NULL); @@ -250,21 +276,21 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pObj->loop, pConn->pTimer); - pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pWorkerAsync = pObj->workerAsync; // thread safty - uv_tcp_init(pObj->loop, pConn->pClient); + uv_tcp_init(pObj->loop, pConn->pTcp); - if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) { + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; - uv_fileno((const uv_handle_t*)pConn->pClient, &fd); + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); uv_timer_start(pConn->pTimer, onTimeout, 10, 0); - uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead); } else { uv_timer_stop(pConn->pTimer); free(pConn->pTimer); - uv_close((uv_handle_t*)pConn->pClient, NULL); - free(pConn->pClient); + uv_close((uv_handle_t*)pConn->pTcp, NULL); + free(pConn->pTcp); free(pConn); } } @@ -276,7 +302,6 @@ void* acceptThread(void* arg) { struct sockaddr_in bind_addr; - int port = 6030; uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); int err = 0; @@ -288,16 +313,22 @@ void* acceptThread(void* arg) { } void* workerThread(void* arg) { SThreadObj* pObj = (SThreadObj*)arg; - int fd = pObj->fd; + pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pObj->loop); uv_pipe_init(pObj->loop, pObj->pipe, 1); - uv_pipe_open(pObj->pipe, fd); + uv_pipe_open(pObj->pipe, pObj->fd); + + QUEUE_INIT(&pObj->conn); pObj->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); + + // pObj->workerAsync->data = (void*)pObj; + uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); + uv_run(pObj->loop, UV_RUN_DEFAULT); } #else @@ -471,7 +502,8 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt new file mode 100644 index 0000000000..9e58bf08cd --- /dev/null +++ b/source/libs/transport/test/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(transportTest "") +target_sources(transportTest + PRIVATE + "transportTests.cc" +) + +target_include_directories(transportTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries (transportTest + os + util + common + gtest_main + transport +) + + diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc new file mode 100644 index 0000000000..468aeba8a9 --- /dev/null +++ b/source/libs/transport/test/transportTests.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include + +#include "transportInt.h" +#include "trpc.h" + +using namespace std; + +int main() { + SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5}; + void* p = rpcOpen(&init); + + while (1) { + std::cout << "cron task" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); + } +} diff --git a/source/libs/transport/test/transportTests.cpp b/source/libs/transport/test/transportTests.cpp deleted file mode 100644 index e69de29bb2..0000000000