From dde3dfd7bd9fc58075f58ecce04ab6311e31c502 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Jan 2022 23:53:18 +0800 Subject: [PATCH] add libuv --- source/libs/transport/src/rpcMain.c | 158 +++++++++++++++++++--------- 1 file changed, 109 insertions(+), 49 deletions(-) diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 9bba0dca0a..802c5ae4ea 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -33,28 +33,40 @@ #include "ttimer.h" #include "tutil.h" +typedef struct { + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; + uint16_t localPort; + int8_t connType; + int index; // for UDP server only, round robin for multiple threads + char label[TSDB_LABEL_LEN]; + + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key + + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); + int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization + void* udphandle; // returned handle from UDP initialization + void* pCache; // connection cache + pthread_mutex_t mutex; + struct SRpcConn* connList; // connection list +} SRpcInfo; + #ifdef USE_UV #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -int32_t rpcInit() { return -1; } -void rpcCleanup() { return; }; -void* rpcOpen(const SRpcInit* pRpc) { return NULL; } -void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } -void rpcFreeCont(void* cont) { return; } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } - -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } - -void rpcSendResponse(const SRpcMsg* pMsg) {} - -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } - typedef struct SThreadObj { pthread_t thread; uv_pipe_t* pipe; @@ -79,6 +91,84 @@ typedef struct SConnCtx { int ref; } SConnCtx; +static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); +static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void onWrite(uv_write_t* req, int status); +static void onAccept(uv_stream_t* stream, int status); +void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void workerAsyncCB(uv_async_t* handle); +static void* workerThread(void* arg); + +int32_t rpcInit() { return -1; } +void rpcCleanup() { return; }; +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + + SServerObj* srv = calloc(1, sizeof(SServerObj)); + srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + srv->numOfThread = pRpc->numOfThreads; + srv->workerIdx = 0; + srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + uv_loop_init(srv->loop); + + for (int i = 0; i < srv->numOfThread; i++) { + srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + return NULL; + } + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + srv->pThreadObj[i]->fd = fds[0]; + srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); + if (err == 0) { + tError("sucess to create worker thread %d", i); + // printf("thread %d create\n", i); + } else { + tError("failed to create worker thread %d", i); + return NULL; + } + } + uv_tcp_init(srv->loop, &srv->server); + struct sockaddr_in bind_addr; + uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); + + return pRpc; +} +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } + +void rpcSendResponse(const SRpcMsg* pMsg) {} + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->base = malloc(suggested_size); buf->len = suggested_size; @@ -98,7 +188,7 @@ void onWrite(uv_write_t* req, int status) { if (req) tDebug("data already was written on stream"); } -static void workerAsyncCB(uv_async_t* handle) { +void workerAsyncCB(uv_async_t* handle) { // opt SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); } @@ -187,36 +277,6 @@ void* workerThread(void* arg) { #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcIsReq(type) (type & 1U) -typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; - uint16_t localPort; - int8_t connType; - int index; // for UDP server only, round robin for multiple threads - char label[TSDB_LABEL_LEN]; - - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - - void (*cfp)(void *parent, SRpcMsg *, SEpSet *); - int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); - - int32_t refCount; - void * parent; - void * idPool; // handle to ID pool - void * tmrCtrl; // handle to timer - SHashObj * hash; // handle returned by hash utility - void * tcphandle; // returned handle from TCP initialization - void * udphandle; // returned handle from UDP initialization - void * pCache; // connection cache - pthread_mutex_t mutex; - struct SRpcConn *connList; // connection list -} SRpcInfo; - typedef struct { SRpcInfo * pRpc; // associated SRpcInfo SEpSet epSet; // ip list provided by app