enable monitor gzip
This commit is contained in:
parent
bef117d7db
commit
514f9f6626
|
@ -84,6 +84,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120)
|
||||
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
|
||||
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
|
||||
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123)
|
||||
|
||||
//client
|
||||
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
|
||||
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
|
||||
|
||||
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag);
|
||||
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -376,7 +376,8 @@ void monSendReport(SMonInfo *pMonitor) {
|
|||
|
||||
char *pCont = tjsonToString(pMonitor->pJson);
|
||||
if (pCont != NULL) {
|
||||
taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp);
|
||||
EHttpCompFlag flag = tsMonitor.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||
taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), flag);
|
||||
free(pCont);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ target_link_libraries(
|
|||
util
|
||||
PRIVATE os
|
||||
PUBLIC lz4_static
|
||||
PUBLIC api cjson
|
||||
PUBLIC api cjson zlib
|
||||
)
|
||||
if(${BUILD_WITH_UV})
|
||||
target_link_libraries(
|
||||
|
|
|
@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
|
||||
|
||||
//common & util
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "thttp.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "zlib.h"
|
||||
|
||||
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
|
||||
EHttpCompFlag flag) {
|
||||
|
@ -40,22 +41,91 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
|
|||
}
|
||||
}
|
||||
|
||||
int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
||||
int32_t code = -1;
|
||||
int32_t destLen = srcLen;
|
||||
void* pDest = malloc(destLen);
|
||||
|
||||
if (pDest == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
z_stream gzipStream = {0};
|
||||
gzipStream.zalloc = (alloc_func)0;
|
||||
gzipStream.zfree = (free_func)0;
|
||||
gzipStream.opaque = (voidpf)0;
|
||||
if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
gzipStream.next_in = (Bytef*)pSrc;
|
||||
gzipStream.avail_in = (uLong)srcLen;
|
||||
gzipStream.next_out = (Bytef*)pDest;
|
||||
gzipStream.avail_out = (uLong)(destLen);
|
||||
|
||||
while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) {
|
||||
if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) {
|
||||
terrno = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (gzipStream.avail_in != 0) {
|
||||
terrno = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t err = 0;
|
||||
while (1) {
|
||||
if ((err = deflate(&gzipStream, Z_FINISH)) == Z_STREAM_END) {
|
||||
break;
|
||||
}
|
||||
if (err != Z_OK) {
|
||||
terrno = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (deflateEnd(&gzipStream) != Z_OK) {
|
||||
terrno = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (gzipStream.total_out >= srcLen) {
|
||||
terrno = TSDB_CODE_COMPRESS_ERROR;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
if (code == 0) {
|
||||
memcpy(pSrc, pDest, gzipStream.total_out);
|
||||
code = gzipStream.total_out;
|
||||
}
|
||||
|
||||
free(pDest);
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef USE_UV
|
||||
static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||
if(status < 0) {
|
||||
if (status < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(status);
|
||||
uError("Connection error %s\n",uv_strerror(status));
|
||||
uError("Connection error %s\n", uv_strerror(status));
|
||||
return;
|
||||
}
|
||||
|
||||
// impl later
|
||||
uv_buf_t* wb = req->data;
|
||||
if (wb == NULL) {
|
||||
uv_close((uv_handle_t *)req->handle,NULL);
|
||||
uv_close((uv_handle_t*)req->handle, NULL);
|
||||
}
|
||||
uv_write_t write_req;
|
||||
uv_write(&write_req, req->handle, wb, 2, NULL);
|
||||
uv_close((uv_handle_t *)req->handle,NULL);
|
||||
uv_close((uv_handle_t*)req->handle, NULL);
|
||||
}
|
||||
|
||||
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||
|
@ -77,6 +147,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
|
|||
uv_tcp_init(loop, &socket_tcp);
|
||||
uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
|
||||
|
||||
if (flag == HTTP_GZIP) {
|
||||
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
|
||||
if (dstLen > 0) {
|
||||
contLen = dstLen;
|
||||
} else {
|
||||
flag = HTTP_FLAT;
|
||||
}
|
||||
}
|
||||
|
||||
char header[1024] = {0};
|
||||
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
|
||||
|
||||
|
@ -87,14 +166,14 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
|
|||
connect->data = wb;
|
||||
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
|
||||
terrno = 0;
|
||||
uv_run(loop,UV_RUN_DEFAULT);
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
uv_loop_close(loop);
|
||||
free(connect);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
#else
|
||||
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||
int32_t code = -1;
|
||||
SOCKET fd = 0;
|
||||
|
||||
|
@ -112,6 +191,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
|
|||
goto SEND_OVER;
|
||||
}
|
||||
|
||||
if (flag == HTTP_GZIP) {
|
||||
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
|
||||
if (dstLen > 0) {
|
||||
contLen = dstLen;
|
||||
} else {
|
||||
flag = HTTP_FLAT;
|
||||
}
|
||||
}
|
||||
|
||||
char header[1024] = {0};
|
||||
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
|
||||
|
||||
|
@ -134,7 +222,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
|
|||
goto SEND_OVER;
|
||||
}
|
||||
|
||||
uTrace("send http to %s:%u, len:%d content: %s", server, port, contLen, pCont);
|
||||
code = 0;
|
||||
|
||||
SEND_OVER:
|
||||
|
|
Loading…
Reference in New Issue