add libuv test

This commit is contained in:
yihaoDeng 2022-01-13 20:59:41 +08:00
parent 5c6924e9e6
commit a00a8dd90d
6 changed files with 174 additions and 30 deletions

View File

@ -27,4 +27,11 @@ if (${BUILD_WITH_UV})
add_definitions(-DUSE_UV) add_definitions(-DUSE_UV)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})

View File

@ -22,9 +22,58 @@ extern "C" {
#ifdef USE_UV #ifdef USE_UV
#else #include <stddef.h>
typedef void *queue[2];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q) \
{ \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
}
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
{ \
QUEUE_NEXT(e) = (q); \
QUEUE_PREV(e) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(e) = (e); \
QUEUE_PREV(q) = (e); \
}
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
#endif // USE_LIBUV
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,6 +28,7 @@
#include "tmd5.h" #include "tmd5.h"
#include "tmempool.h" #include "tmempool.h"
#include "tmsg.h" #include "tmsg.h"
#include "transportInt.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
@ -73,6 +74,8 @@ typedef struct SThreadObj {
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* workerAsync; // uv_async_t* workerAsync; //
int fd; int fd;
queue conn;
pthread_mutex_t connMtx;
} SThreadObj; } SThreadObj;
typedef struct SServerObj { typedef struct SServerObj {
@ -88,10 +91,12 @@ typedef struct SServerObj {
} SServerObj; } SServerObj;
typedef struct SConnCtx { typedef struct SConnCtx {
uv_tcp_t* pClient; uv_tcp_t* pTcp;
uv_timer_t* pTimer; uv_timer_t* pTimer;
uv_async_t* pWorkerAsync; uv_async_t* pWorkerAsync;
queue queue;
int ref; int ref;
int persist; // persist connection or not
} SConnCtx; } SConnCtx;
static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
@ -110,6 +115,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
int32_t rpcInit() { return -1; } int32_t rpcInit() { return -1; }
void rpcCleanup() { return; }; void rpcCleanup() { return; };
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
// opte
}
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj)); SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
@ -122,30 +130,32 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
for (int i = 0; i < srv->numOfThread; i++) { for (int i = 0; i < srv->numOfThread; i++) {
srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj)); SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL; return NULL;
} }
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
srv->pThreadObj[i]->fd = fds[0]; thrd->fd = fds[0];
srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read thrd->pipe = &(srv->pipe[i][1]); // init read
int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create worker thread %d", i); tDebug("sucess to create worker-thread %d", i);
// printf("thread %d create\n", i); // printf("thread %d create\n", i);
} else { } else {
// clear all resource later // clear all resource later
tError("failed to create worker thread %d", i); tError("failed to create worker-thread %d", i);
} }
srv->pThreadObj[i] = thrd;
} }
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
if (err == 0) { if (err == 0) {
tDebug("success to create accept thread"); tDebug("success to create accept-thread");
} else { } else {
// clear all resource later // clear all resource later
} }
@ -158,7 +168,7 @@ void* rpcOpen(const SRpcInit* pInit) {
return NULL; return NULL;
} }
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
} }
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
@ -198,29 +208,45 @@ void onWrite(uv_write_t* req, int status) {
if (status == 0) { if (status == 0) {
tDebug("data already was written on stream"); tDebug("data already was written on stream");
} }
free(req);
// opt // opt
} }
void workerAsyncCB(uv_async_t* handle) { void workerAsyncCB(uv_async_t* handle) {
// opt
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SConnCtx* conn = NULL;
// opt later
pthread_mutex_lock(&pObj->connMtx);
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
queue* head = QUEUE_HEAD(&pObj->conn);
conn = QUEUE_DATA(head, SConnCtx, queue);
QUEUE_REMOVE(&conn->queue);
} }
pthread_mutex_unlock(&pObj->connMtx);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
}
}
void onAccept(uv_stream_t* stream, int status) { void onAccept(uv_stream_t* stream, int status) {
if (status == -1) { if (status == -1) {
return; return;
} }
SServerObj* pObj = container_of(stream, SServerObj, server); SServerObj* pObj = container_of(stream, SServerObj, server);
tDebug("new conntion accepted by main server, dispatch to one worker thread");
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli); uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init("a", 1); uv_buf_t buf = uv_buf_init("a", 1);
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite);
} else { } else {
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
@ -250,21 +276,21 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTimer = malloc(sizeof(uv_timer_t)); pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer); uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
pConn->pWorkerAsync = pObj->workerAsync; // thread safty pConn->pWorkerAsync = pObj->workerAsync; // thread safty
uv_tcp_init(pObj->loop, pConn->pClient); uv_tcp_init(pObj->loop, pConn->pTcp);
if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pClient, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd); tDebug("new connection created: %d", fd);
uv_timer_start(pConn->pTimer, onTimeout, 10, 0); uv_timer_start(pConn->pTimer, onTimeout, 10, 0);
uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead); uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead);
} else { } else {
uv_timer_stop(pConn->pTimer); uv_timer_stop(pConn->pTimer);
free(pConn->pTimer); free(pConn->pTimer);
uv_close((uv_handle_t*)pConn->pClient, NULL); uv_close((uv_handle_t*)pConn->pTcp, NULL);
free(pConn->pClient); free(pConn->pTcp);
free(pConn); free(pConn);
} }
} }
@ -276,7 +302,6 @@ void* acceptThread(void* arg) {
struct sockaddr_in bind_addr; struct sockaddr_in bind_addr;
int port = 6030;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0; int err = 0;
@ -288,16 +313,22 @@ void* acceptThread(void* arg) {
} }
void* workerThread(void* arg) { void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg; SThreadObj* pObj = (SThreadObj*)arg;
int fd = pObj->fd;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop); uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1); uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, fd); uv_pipe_open(pObj->pipe, pObj->fd);
QUEUE_INIT(&pObj->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t)); pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB);
// pObj->workerAsync->data = (void*)pObj;
uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection);
uv_run(pObj->loop, UV_RUN_DEFAULT);
} }
#else #else
@ -471,7 +502,8 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) return NULL; if (pRpc == NULL) return NULL;
if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
if (pRpc->connType == TAOS_CONN_CLIENT) { if (pRpc->connType == TAOS_CONN_CLIENT) {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;

View File

@ -0,0 +1,21 @@
add_executable(transportTest "")
target_sources(transportTest
PRIVATE
"transportTests.cc"
)
target_include_directories(transportTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (transportTest
os
util
common
gtest_main
transport
)

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include "transportInt.h"
#include "trpc.h"
using namespace std;
int main() {
SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5};
void* p = rpcOpen(&init);
while (1) {
std::cout << "cron task" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000));
}
}