diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 6715290acf..7dffb8d192 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -45,6 +45,8 @@ typedef struct SHttpMsg { static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static SHttpModule* http = NULL; +static void httpHandleReq(SHttpMsg* msg); + static void* httpThread(void* arg) { SHttpModule* http = (SHttpModule*)arg; setThreadName("http-cli-send-thread"); @@ -52,6 +54,13 @@ static void* httpThread(void* arg) { return NULL; } +static void httpDestroyMsg(SHttpMsg* msg) { + if (msg == NULL) return; + + taosMemoryFree(msg->server); + taosMemoryFree(msg->cont); + taosMemoryFree(msg); +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -68,6 +77,7 @@ static void httpAsyncCb(uv_async_t* handle) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); msg = QUEUE_DATA(h, SHttpMsg, q); + httpHandleReq(msg); } } @@ -355,7 +365,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); - msg->server = strdup(server); msg->port = port; msg->cont = taosMemoryMalloc(contLen); @@ -365,3 +374,57 @@ int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, i transAsyncSend(http->asyncPool, &(msg->q)); return 0; } +static void httpHandleReq(SHttpMsg* msg) { + struct sockaddr_in dest = {0}; + if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { + httpDestroyMsg(msg); + return; + } + if (msg->flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); + if (dstLen > 0) { + msg->len = dstLen; + } else { + msg->flag = HTTP_FLAT; + } + } + + terrno = 0; + + int32_t len = 2048; + char* header = taosMemoryCalloc(1, len); + int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag); + + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var + wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->wbuf = wb; + cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + cli->addr = msg->server; + cli->port = msg->port; + + taosMemoryFree(msg); + + uv_tcp_init(http->loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + if (ret != 0) { + uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); + return; + } + + ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, + cli->port); + destroyHttpClient(cli); + } +}