enable monitor gzip

This commit is contained in:
Shengliang Guan 2022-03-07 13:52:49 +08:00
parent e6a6887864
commit bef117d7db
9 changed files with 44 additions and 22 deletions

View File

@ -58,6 +58,7 @@ extern int32_t tsMonitorInterval;
extern char tsMonitorFqdn[]; extern char tsMonitorFqdn[];
extern uint16_t tsMonitorPort; extern uint16_t tsMonitorPort;
extern int32_t tsMonitorMaxLogs; extern int32_t tsMonitorMaxLogs;
extern bool tsMonitorComp;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing

View File

@ -130,6 +130,7 @@ typedef struct {
const char *server; const char *server;
uint16_t port; uint16_t port;
int32_t maxLogs; int32_t maxLogs;
bool comp;
} SMonCfg; } SMonCfg;
int32_t monInit(const SMonCfg *pCfg); int32_t monInit(const SMonCfg *pCfg);

View File

@ -22,7 +22,9 @@
extern "C" { extern "C" {
#endif #endif
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen); typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -52,6 +52,7 @@ int32_t tsMonitorInterval = 5;
char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; char tsMonitorFqdn[TSDB_FQDN_LEN] = {0};
uint16_t tsMonitorPort = 6043; uint16_t tsMonitorPort = 6043;
int32_t tsMonitorMaxLogs = 100; int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
@ -346,6 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1;
return 0; return 0;
} }
@ -462,6 +464,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN);
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;

View File

@ -298,7 +298,7 @@ int32_t dndInit() {
return -1; return -1;
} }
SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn}; SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp};
if (monInit(&monCfg) != 0) { if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr()); dError("failed to init monitor since %s", terrstr());
dndCleanup(); dndCleanup();

View File

@ -87,7 +87,7 @@ static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) {
taosWLockLatch(&pMgmt->lock); taosWLockLatch(&pMgmt->lock);
char* pCont = mndBuildTelemetryReport(pMnode); char* pCont = mndBuildTelemetryReport(pMnode);
if (pCont != NULL) { if (pCont != NULL) {
taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont)); taosSendHttpReport(TELEMETRY_SERVER, TELEMETRY_PORT, pCont, strlen(pCont), HTTP_FLAT);
free(pCont); free(pCont);
} }
taosWUnLockLatch(&pMgmt->lock); taosWUnLockLatch(&pMgmt->lock);

View File

@ -54,6 +54,7 @@ typedef struct {
int32_t maxLogs; int32_t maxLogs;
const char *server; const char *server;
uint16_t port; uint16_t port;
bool comp;
SMonState state; SMonState state;
} SMonitor; } SMonitor;

View File

@ -45,6 +45,7 @@ int32_t monInit(const SMonCfg *pCfg) {
tsMonitor.maxLogs = pCfg->maxLogs; tsMonitor.maxLogs = pCfg->maxLogs;
tsMonitor.server = pCfg->server; tsMonitor.server = pCfg->server;
tsMonitor.port = pCfg->port; tsMonitor.port = pCfg->port;
tsMonitor.comp = pCfg->comp;
tsLogFp = monRecordLog; tsLogFp = monRecordLog;
tsMonitor.state.time = taosGetTimestampMs(); tsMonitor.state.time = taosGetTimestampMs();
pthread_mutex_init(&tsMonitor.lock, NULL); pthread_mutex_init(&tsMonitor.lock, NULL);
@ -375,7 +376,7 @@ void monSendReport(SMonInfo *pMonitor) {
char *pCont = tjsonToString(pMonitor->pJson); char *pCont = tjsonToString(pMonitor->pJson);
if (pCont != NULL) { if (pCont != NULL) {
taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont)); taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont), tsMonitor.comp);
free(pCont); free(pCont);
} }
} }

View File

@ -18,6 +18,28 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
if (flag == HTTP_FLAT) {
return snprintf(pHead, headLen,
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
} else if (flag == HTTP_GZIP) {
return snprintf(pHead, headLen,
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Encoding: gzip\n"
"Content-Length: %d\n\n",
server, contLen);
} else {
return -1;
}
}
#ifdef USE_UV #ifdef USE_UV
static void clientConnCb(uv_connect_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) {
if(status < 0) { if(status < 0) {
@ -36,7 +58,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
uv_close((uv_handle_t *)req->handle,NULL); uv_close((uv_handle_t *)req->handle,NULL);
} }
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server); uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) { if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -51,17 +73,13 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
uv_ip4_addr(ipv4Buf, port, &dest); uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp = {0}; uv_tcp_t socket_tcp = {0};
uv_loop_t *loop = uv_default_loop(); uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp); uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
char header[1024] = {0}; char header[1024] = {0};
int32_t headLen = snprintf(header, sizeof(header), int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
uv_buf_t wb[2]; uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen); wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen); wb[1] = uv_buf_init((char*)pCont, contLen);
@ -76,7 +94,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
} }
#else #else
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1; int32_t code = -1;
SOCKET fd = 0; SOCKET fd = 0;
@ -94,15 +112,10 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont,
goto SEND_OVER; goto SEND_OVER;
} }
char header[4096] = {0}; char header[1024] = {0};
int32_t headLen = snprintf(header, sizeof(header), int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
if (taosWriteSocket(fd, (void*)header, headLen) < 0) { if (taosWriteSocket(fd, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to send http header to %s:%u since %s", server, port, terrstr()); uError("failed to send http header to %s:%u since %s", server, port, terrstr());
goto SEND_OVER; goto SEND_OVER;