From 41f600698c955dedcbc9dcd26acaf7b2ca044b1d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Jan 2022 22:49:05 +0800 Subject: [PATCH] add libuv --- source/libs/transport/inc/rpcHead.h | 21 ++++ source/libs/transport/src/rpcMain.c | 182 +++++++++++++++++++++++----- 2 files changed, 175 insertions(+), 28 deletions(-) diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 7317d84af1..66821db133 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -22,6 +22,27 @@ extern "C" { #endif #ifdef USE_UV +typedef struct { + char version : 4; // RPC version + char comp : 4; // compression algorithm, 0:no compression 1:lz4 + char resflag : 2; // reserved bits + char spi : 3; // security parameter index + char encrypt : 3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + uint32_t destIp; // destination IP address, for NAT scenario + char user[TSDB_UNI_LEN]; // user ID + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint16_t msgType; // message type + int32_t msgLen; // message length including the header iteslf + uint32_t msgVer; + int32_t code; // code in response message + uint8_t content[0]; // message body starts from here +} SRpcHead; #else diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 818d129032..a1c0c05fc3 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,7 +13,9 @@ * along with this program. If not, see . */ +#ifdef USE_UV #include +#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -68,6 +70,8 @@ typedef struct { #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +static const char* notify = "a"; + typedef struct SThreadObj { pthread_t thread; uv_pipe_t* pipe; @@ -90,23 +94,39 @@ typedef struct SServerObj { uint32_t port; } SServerObj; +typedef struct SContent { + char* buf; + int len; + int cap; + int toRead; +} SContent; + typedef struct SConnCtx { uv_tcp_t* pTcp; + uv_write_t* pWriter; uv_timer_t* pTimer; + uv_async_t* pWorkerAsync; queue queue; int ref; int persist; // persist connection or not + SContent pCont; + int count; } SConnCtx; -static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); +static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); static void onWrite(uv_write_t* req, int status); static void onAccept(uv_stream_t* stream, int status); static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void workerAsyncCB(uv_async_t* handle); +static SConnCtx* connCtxCreate(); +static void connCtxDestroy(SConnCtx* ctx); +static void uvConnCtxDestroy(uv_handle_t* handle); + static void* workerThread(void* arg); static void* acceptThread(void* arg); @@ -131,12 +151,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThread; i++) { SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { return NULL; } - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write @@ -147,7 +166,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); } else { - // clear all resource later + // TODO: clear all other resource later tError("failed to create worker-thread %d", i); } srv->pThreadObj[i] = thrd; @@ -171,7 +190,6 @@ void* rpcOpen(const SRpcInit* pInit) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } @@ -190,26 +208,106 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(suggested_size); - buf->len = suggested_size; +void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + static const int CAPACITY = 1024; + tDebug("pre alloc buffer for read "); + SConnCtx* ctx = handle->data; + SContent* pCont = &ctx->pCont; + if (pCont->cap == 0) { + pCont->buf = (char*)calloc(CAPACITY, sizeof(char)); + pCont->len = 0; + pCont->cap = CAPACITY; + pCont->toRead = -1; + + buf->base = pCont->buf; + buf->len = CAPACITY; + } else { + if (pCont->len >= pCont->cap) { + if (pCont->toRead == -1) { + pCont->cap *= 2; + pCont->buf = realloc(pCont->buf, pCont->cap); + } else if (pCont->len + pCont->toRead > pCont->cap) { + pCont->cap = pCont->len + pCont->toRead; + pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead); + } + } + buf->base = pCont->buf + pCont->len; + buf->len = pCont->cap - pCont->len; + } + + // if (ctx->pCont.cap == 0) { + // ctx->pCont.buf = (char*)calloc(64, sizeof(char)); + // ctx->pCont.len = 0; + // ctx->pCont.cap = 64; + // // + // buf->base = ctx->pCont.buf; + // buf->len = sz; + //} else { + // if (ctx->pCont.len + sz > ctx->pCont.cap) { + // ctx->pCont.cap *= 2; + // ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap); + // } + // buf->base = ctx->pCont.buf + ctx->pCont.len; + // buf->len = sz; + //} +} +// change later +static bool handleUserData(SContent* data) { + SRpcHead rpcHead; + + bool finish = false; + int32_t msgLen, leftLen, retLen; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf, headLen); + msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen + headLen <= data->len) { + return true; + } else { + return false; + } + } else { + return false; + } +} + +void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + SConnCtx* ctx = cli->data; + SContent* pCont = &ctx->pCont; + if (nread > 0) { + pCont->len += nread; + bool finish = handleUserData(pCont); + if (finish == false) { + tDebug("continue read"); + } else { + tDebug("read completely"); + } + return; + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnCtxDestroy); +} +void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; } void onTimeout(uv_timer_t* handle) { // opt tDebug("time out"); } -void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - tDebug("data already was read on a stream"); -} void onWrite(uv_write_t* req, int status) { + SConnCtx* ctx = req->data; if (status == 0) { tDebug("data already was written on stream"); + } else { + connCtxDestroy(ctx); } - free(req); - // opt } @@ -243,7 +341,7 @@ void onAccept(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); - uv_buf_t buf = uv_buf_init("a", 1); + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); @@ -253,6 +351,7 @@ void onAccept(uv_stream_t* stream, int status) { } } void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -261,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_close((uv_handle_t*)q, NULL); return; } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); uv_pipe_t* pipe = (uv_pipe_t*)q; @@ -268,30 +372,33 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("No pending count"); return; } + uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SConnCtx* pConn = malloc(sizeof(SConnCtx)); + SConnCtx* pConn = connCtxCreate(); /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pObj->loop, pConn->pTimer); - pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pWorkerAsync = pObj->workerAsync; // thread safty + + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pObj->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); - uv_timer_start(pConn->pTimer, onTimeout, 10, 0); - uv_read_start((uv_stream_t*)(pConn->pTcp), allocBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead); } else { - uv_timer_stop(pConn->pTimer); - free(pConn->pTimer); - uv_close((uv_handle_t*)pConn->pTcp, NULL); - free(pConn->pTcp); - free(pConn); + connCtxDestroy(pConn); } } @@ -325,11 +432,30 @@ void* workerThread(void* arg) { pObj->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); - // pObj->workerAsync->data = (void*)pObj; - - uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); + uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection); uv_run(pObj->loop, UV_RUN_DEFAULT); } +static SConnCtx* connCtxCreate() { + SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx)); + return pConn; +} +static void connCtxDestroy(SConnCtx* ctx) { + if (ctx == NULL) { + return; + } + uv_timer_stop(ctx->pTimer); + free(ctx->pTimer); + uv_close((uv_handle_t*)ctx->pTcp, NULL); + free(ctx->pTcp); + free(ctx->pWriter); + free(ctx); + // handle +} +static void uvConnCtxDestroy(uv_handle_t* handle) { + SConnCtx* ctx = handle->data; + connCtxDestroy(ctx); +} + #else #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))