From 4fdc98d3fb4987fc87e67cb34fff943ca70589ef Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 22:15:06 +0800 Subject: [PATCH] opt http module --- source/libs/transport/src/thttp.c | 92 ++++++++++++++++++++++++++-- source/libs/transport/src/transCli.c | 30 +-------- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 92989a45f5..6715290acf 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -20,11 +20,80 @@ #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "transComm.h" // clang-format on #define HTTP_RECV_BUF_SIZE 1024 +typedef struct SHttpModule { + uv_loop_t* loop; + SAsyncPool* asyncPool; + TdThread thread; +} SHttpModule; + +typedef struct SHttpMsg { + queue q; + char* server; + int32_t port; + char* cont; + int32_t len; + EHttpCompFlag flag; + +} SHttpMsg; + +static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; +static SHttpModule* http = NULL; + +static void* httpThread(void* arg) { + SHttpModule* http = (SHttpModule*)arg; + setThreadName("http-cli-send-thread"); + uv_run(http->loop, UV_RUN_DEFAULT); + return NULL; +} + +static void httpAsyncCb(uv_async_t* handle) { + SAsyncItem* item = handle->data; + SHttpModule* http = item->pThrd; + + SHttpMsg* msg = NULL; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + } +} + +static void transHttpEnvInit() { + http = taosMemoryMalloc(sizeof(SHttpModule)); + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_init(http->loop); + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + taosMemoryFree(http->loop); + taosMemoryFree(http); + http = NULL; + } +} +static void transHttpEnvDestroy() { + if (http == NULL) return; + + transAsyncPoolDestroy(http->asyncPool); + taosMemoryFree(http->loop); + taosMemoryFree(http); +} + typedef struct SHttpClient { uv_connect_t conn; uv_tcp_t tcp; @@ -127,6 +196,8 @@ _OVER: } static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->wbuf[0].base); + taosMemoryFree(cli->wbuf[1].base); taosMemoryFree(cli->wbuf); taosMemoryFree(cli->rbuf); taosMemoryFree(cli->addr); @@ -230,12 +301,13 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } terrno = 0; - char header[2048] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); + int32_t len = 2048; + char* header = taosMemoryCalloc(1, len); + int32_t headLen = taosBuildHttpHeader(server, contLen, header, len, flag); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); - wb[0] = uv_buf_init((char*)header, headLen); // stack var - wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); cli->conn.data = cli; @@ -281,3 +353,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 taosMemoryFree(loop); return terrno; } +int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + + msg->server = strdup(server); + msg->port = port; + msg->cont = taosMemoryMalloc(contLen); + memcpy(msg->cont, pCont, contLen); + msg->flag = flag; + + transAsyncSend(http->asyncPool, &(msg->q)); + return 0; +} diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 28660934f8..97915e1ded 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -187,18 +187,8 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ } while (0) -#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \ - do { \ - if (exh == NULL) { \ - idx = -1; \ - } else { \ - ASYNC_CHECK_HANDLE((exh), refId); \ - pThrd = (SCliThrd*)(exh)->pThrd; \ - } \ - } while (0) -#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) -#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) -#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -217,6 +207,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) + #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ int i = 0; \ @@ -231,21 +222,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { } \ } while (0) -#define CONN_HANDLE_THREAD_QUIT(thrd) \ - do { \ - if (thrd->quit) { \ - return; \ - } \ - } while (0) - -#define CONN_HANDLE_BROKEN(conn) \ - do { \ - if (conn->broken) { \ - cliHandleExcept(conn); \ - return; \ - } \ - } while (0) - #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ if (conn->status == ConnNormal) { \