From e6a6887864e027fde2bcbff2b5c9d3b5304276f1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 13:40:14 +0800 Subject: [PATCH 1/3] minor changes --- include/os/osSocket.h | 4 ++++ source/util/src/thttp.c | 15 ++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/include/os/osSocket.h b/include/os/osSocket.h index cbecb380e2..57baabef03 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -34,6 +34,10 @@ #include #endif +#ifdef USE_UV + #include +#endif + #ifdef __cplusplus extern "C" { #endif diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index 593f3c43c2..b8d73478ea 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -19,10 +19,7 @@ #include "tlog.h" #ifdef USE_UV - -#include - -void clientConnCb(uv_connect_t* req, int status) { +static void clientConnCb(uv_connect_t* req, int32_t status) { if(status < 0) { terrno = TAOS_SYSTEM_ERROR(status); uError("Connection error %s\n",uv_strerror(status)); @@ -45,20 +42,20 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to get http server:%s ip since %s", server, terrstr()); return -1; - // goto SEND_OVER; } - char ipv4Buf[128]; + + char ipv4Buf[128] = {0}; tinet_ntoa(ipv4Buf, ipv4); - struct sockaddr_in dest; + struct sockaddr_in dest = {0}; uv_ip4_addr(ipv4Buf, port, &dest); - uv_tcp_t socket_tcp; + uv_tcp_t socket_tcp = {0}; uv_loop_t *loop = uv_default_loop(); uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); - char header[4096] = {0}; + char header[1024] = {0}; int32_t headLen = snprintf(header, sizeof(header), "POST /report HTTP/1.1\n" "Host: %s\n" From bef117d7db3485ea146eead2f55e01a1492d2f37 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 13:52:49 +0800 Subject: [PATCH 2/3] enable monitor gzip --- include/common/tglobal.h | 1 + include/libs/monitor/monitor.h | 1 + include/util/thttp.h | 4 ++- source/common/src/tglobal.c | 3 ++ source/dnode/mgmt/impl/src/dndEnv.c | 2 +- source/dnode/mnode/impl/src/mndTelem.c | 2 +- source/libs/monitor/inc/monInt.h | 1 + source/libs/monitor/src/monitor.c | 3 +- source/util/src/thttp.c | 49 ++++++++++++++++---------- 9 files changed, 44 insertions(+), 22 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2261170e63..ba41cb0292 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -58,6 +58,7 @@ extern int32_t tsMonitorInterval; extern char tsMonitorFqdn[]; extern uint16_t tsMonitorPort; extern int32_t tsMonitorMaxLogs; +extern bool tsMonitorComp; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 0c832f802b..a3e049b00c 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -130,6 +130,7 @@ typedef struct { const char *server; uint16_t port; int32_t maxLogs; + bool comp; } SMonCfg; int32_t monInit(const SMonCfg *pCfg); diff --git a/include/util/thttp.h b/include/util/thttp.h index f211b2615d..31d84850e0 100644 --- a/include/util/thttp.h +++ b/include/util/thttp.h @@ -22,7 +22,9 @@ extern "C" { #endif -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen); +typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; + +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a521e10cc7..f62f8e5d7a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -52,6 +52,7 @@ int32_t tsMonitorInterval = 5; char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; uint16_t tsMonitorPort = 6043; int32_t tsMonitorMaxLogs = 100; +bool tsMonitorComp = false; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -346,6 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1; return 0; } @@ -462,6 +464,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; + tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 1cbdf0c1dc..84b2dca326 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -298,7 +298,7 @@ int32_t dndInit() { return -1; } - SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn}; + SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; if (monInit(&monCfg) != 0) { dError("failed to init monitor since %s", terrstr()); dndCleanup(); diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 0e2141a4d7..453535c6e7 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -87,7 +87,7 @@ static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) { taosWLockLatch(&pMgmt->lock); char* pCont = mndBuildTelemetryReport(pMnode); if (pCont != NULL) { - taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont)); + taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont), HTTP_FLAT); free(pCont); } taosWUnLockLatch(&pMgmt->lock); diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h index bfb73af034..c3b6569555 100644 --- a/source/libs/monitor/inc/monInt.h +++ b/source/libs/monitor/inc/monInt.h @@ -54,6 +54,7 @@ typedef struct { int32_t maxLogs; const char *server; uint16_t port; + bool comp; SMonState state; } SMonitor; diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 354989a7a1..905ff8d6da 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -45,6 +45,7 @@ int32_t monInit(const SMonCfg *pCfg) { tsMonitor.maxLogs = pCfg->maxLogs; tsMonitor.server = pCfg->server; tsMonitor.port = pCfg->port; + tsMonitor.comp = pCfg->comp; tsLogFp = monRecordLog; tsMonitor.state.time = taosGetTimestampMs(); pthread_mutex_init(&tsMonitor.lock, NULL); @@ -375,7 +376,7 @@ void monSendReport(SMonInfo *pMonitor) { char *pCont = tjsonToString(pMonitor->pJson); if (pCont != NULL) { - taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont)); + taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp); free(pCont); } } diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index b8d73478ea..c8a52a4735 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -18,6 +18,28 @@ #include "taoserror.h" #include "tlog.h" +static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, + EHttpCompFlag flag) { + if (flag == HTTP_FLAT) { + return snprintf(pHead, headLen, + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + server, contLen); + } else if (flag == HTTP_GZIP) { + return snprintf(pHead, headLen, + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + server, contLen); + } else { + return -1; + } +} + #ifdef USE_UV static void clientConnCb(uv_connect_t* req, int32_t status) { if(status < 0) { @@ -36,7 +58,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_close((uv_handle_t *)req->handle,NULL); } -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { uint32_t ipv4 = taosGetIpv4FromFqdn(server); if (ipv4 == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -50,18 +72,14 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, struct sockaddr_in dest = {0}; uv_ip4_addr(ipv4Buf, port, &dest); - uv_tcp_t socket_tcp = {0}; - uv_loop_t *loop = uv_default_loop(); + uv_tcp_t socket_tcp = {0}; + uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); char header[1024] = {0}; - int32_t headLen = snprintf(header, sizeof(header), - "POST /report HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - server, contLen); + int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); + uv_buf_t wb[2]; wb[0] = uv_buf_init((char*)header, headLen); wb[1] = uv_buf_init((char*)pCont, contLen); @@ -76,7 +94,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, } #else -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t code = -1; SOCKET fd = 0; @@ -94,15 +112,10 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } - char header[4096] = {0}; - int32_t headLen = snprintf(header, sizeof(header), - "POST /report HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - server, contLen); + char header[1024] = {0}; + int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteSocket(fd, (void*)header, headLen) < 0) { + if (taosWriteSocket(fd, header, headLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to send http header to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; From 514f9f6626de43b4657b12733ae1d2d89a52a620 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 7 Mar 2022 14:54:57 +0800 Subject: [PATCH 3/3] enable monitor gzip --- include/util/taoserror.h | 1 + include/util/thttp.h | 2 +- source/libs/monitor/src/monitor.c | 3 +- source/util/CMakeLists.txt | 2 +- source/util/src/terror.c | 1 + source/util/src/thttp.c | 101 +++++++++++++++++++++++++++--- 6 files changed, 100 insertions(+), 10 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5fe69284a2..6ad34eb0c0 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -84,6 +84,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120) #define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122) +#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/include/util/thttp.h b/include/util/thttp.h index 31d84850e0..7d8c588bfc 100644 --- a/include/util/thttp.h +++ b/include/util/thttp.h @@ -24,7 +24,7 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c index 905ff8d6da..0760d7ae9e 100644 --- a/source/libs/monitor/src/monitor.c +++ b/source/libs/monitor/src/monitor.c @@ -376,7 +376,8 @@ void monSendReport(SMonInfo *pMonitor) { char *pCont = tjsonToString(pMonitor->pJson); if (pCont != NULL) { - taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp); + EHttpCompFlag flag = tsMonitor.comp ? HTTP_GZIP : HTTP_FLAT; + taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), flag); free(pCont); } } diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 6effdff712..42950b2284 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -10,7 +10,7 @@ target_link_libraries( util PRIVATE os PUBLIC lz4_static - PUBLIC api cjson + PUBLIC api cjson zlib ) if(${BUILD_WITH_UV}) target_link_libraries( diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 9c2fef5b47..6cf6eeb371 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") +TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index c8a52a4735..9d5df20337 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -17,6 +17,7 @@ #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "zlib.h" static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -40,22 +41,91 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } +int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { + int32_t code = -1; + int32_t destLen = srcLen; + void* pDest = malloc(destLen); + + if (pDest == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + z_stream gzipStream = {0}; + gzipStream.zalloc = (alloc_func)0; + gzipStream.zfree = (free_func)0; + gzipStream.opaque = (voidpf)0; + if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + gzipStream.next_in = (Bytef*)pSrc; + gzipStream.avail_in = (uLong)srcLen; + gzipStream.next_out = (Bytef*)pDest; + gzipStream.avail_out = (uLong)(destLen); + + while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) { + if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + } + + if (gzipStream.avail_in != 0) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + int32_t err = 0; + while (1) { + if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) { + break; + } + if (err != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + } + + if (deflateEnd(&gzipStream) != Z_OK) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + if (gzipStream.total_out >= srcLen) { + terrno = TSDB_CODE_COMPRESS_ERROR; + goto _OVER; + } + + code = 0; + +_OVER: + if (code == 0) { + memcpy(pSrc, pDest, gzipStream.total_out); + code = gzipStream.total_out; + } + + free(pDest); + return code; +} + #ifdef USE_UV static void clientConnCb(uv_connect_t* req, int32_t status) { - if(status < 0) { + if (status < 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("Connection error %s\n",uv_strerror(status)); + uError("Connection error %s\n", uv_strerror(status)); return; } // impl later uv_buf_t* wb = req->data; if (wb == NULL) { - uv_close((uv_handle_t *)req->handle,NULL); + uv_close((uv_handle_t*)req->handle, NULL); } uv_write_t write_req; uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t *)req->handle,NULL); + uv_close((uv_handle_t*)req->handle, NULL); } int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { @@ -77,6 +147,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, uv_tcp_init(loop, &socket_tcp); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); + if (flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(pCont, contLen); + if (dstLen > 0) { + contLen = dstLen; + } else { + flag = HTTP_FLAT; + } + } + char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); @@ -87,14 +166,14 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, connect->data = wb; uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); terrno = 0; - uv_run(loop,UV_RUN_DEFAULT); + uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); free(connect); return terrno; } #else -int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) { +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { int32_t code = -1; SOCKET fd = 0; @@ -112,6 +191,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } + if (flag == HTTP_GZIP) { + int32_t dstLen = taosCompressHttpRport(pCont, contLen); + if (dstLen > 0) { + contLen = dstLen; + } else { + flag = HTTP_FLAT; + } + } + char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); @@ -134,7 +222,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, goto SEND_OVER; } - uTrace("send http to %s:%u, len:%d content: %s", server, port, contLen, pCont); code = 0; SEND_OVER: