Merge pull request #16180 from taosdata/feature/testWindows

refactor code
This commit is contained in:
Hui Li 2022-08-18 19:04:38 +08:00 committed by GitHub
commit c742747428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 102 deletions

View File

@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100;
bool tsMonitorComp = false; bool tsMonitorComp = false;
// telem // telem
bool tsEnableTelem = false; bool tsEnableTelem = true;
int32_t tsTelemInterval = 86400; int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
memset(tsDataDir, 0, PATH_MAX); memset(tsDataDir, 0, PATH_MAX);
@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
uError("failed to create dataDir:%s", tsDataDir); uError("failed to create dataDir:%s", tsDataDir);
return -1; return -1;
} }
return 0; return 0;
} }
#else #else
int32_t taosSetTfsCfg(SConfig *pCfg); int32_t taosSetTfsCfg(SConfig *pCfg);

View File

@ -14,15 +14,22 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#ifdef USE_UV
#include <uv.h>
#endif
// clang-format off // clang-format off
#include <uv.h>
#include "zlib.h" #include "zlib.h"
#include "thttp.h" #include "thttp.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
typedef struct SHttpClient {
uv_connect_t conn;
uv_tcp_t tcp;
uv_write_t req;
uv_buf_t* buf;
char* addr;
uint16_t port;
} SHttpClient;
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
if (flag == HTTP_FLAT) { if (flag == HTTP_FLAT) {
@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
} }
} }
int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
int32_t code = -1; int32_t code = -1;
int32_t destLen = srcLen; int32_t destLen = srcLen;
void* pDest = taosMemoryMalloc(destLen); void* pDest = taosMemoryMalloc(destLen);
@ -114,84 +121,53 @@ _OVER:
return code; return code;
} }
#ifdef USE_UV static void destroyHttpClient(SHttpClient* cli) {
static void clientConnCb(uv_connect_t* req, int32_t status) { taosMemoryFree(cli->buf);
if (status < 0) { taosMemoryFree(cli->addr);
taosMemoryFree(cli);
}
static void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data;
destroyHttpClient(cli);
}
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status); terrno = TAOS_SYSTEM_ERROR(status);
uError("connection error %s", uv_strerror(status)); uError("http-report failed to send data %s", uv_strerror(status));
uv_close((uv_handle_t*)req->handle, NULL); } else {
uInfo("http-report succ to send data");
}
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
return; return;
} }
uv_buf_t* wb = req->data; uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb);
assert(wb != NULL);
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t*)req->handle, NULL);
} }
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
return -1;
}
char ipv4Buf[128] = {0};
tinet_ntoa(ipv4Buf, ipv4);
struct sockaddr_in dest = {0};
uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp = {0};
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t));
if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) {
contLen = dstLen;
} else {
flag = HTTP_FLAT;
}
}
char header[1024] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
terrno = 0;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
taosMemoryFree(connect);
return terrno;
}
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
int32_t code = -1;
TdSocketPtr pSocket = NULL;
uint32_t ip = taosGetIpv4FromFqdn(server); uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr()); uError("http-report failed to get http server:%s ip since %s", server, terrstr());
goto SEND_OVER; return -1;
} }
char buf[128] = {0};
pSocket = taosOpenTcpClientSocket(ip, port, 0); tinet_ntoa(buf, ip);
if (pSocket == NULL) { uv_ip4_addr(buf, port, dest);
terrno = TAOS_SYSTEM_ERROR(errno); return 0;
uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); }
goto SEND_OVER; int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
struct sockaddr_in dest = {0};
if (taosBuildDstAddr(server, port, &dest) < 0) {
return -1;
} }
if (flag == HTTP_GZIP) { if (flag == HTTP_GZIP) {
int32_t dstLen = taosCompressHttpRport(pCont, contLen); int32_t dstLen = taosCompressHttpRport(pCont, contLen);
if (dstLen > 0) { if (dstLen > 0) {
@ -200,36 +176,38 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
flag = HTTP_FLAT; flag = HTTP_FLAT;
} }
} }
terrno = 0;
char header[1024] = {0}; char header[2048] = {0};
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
if (taosWriteMsg(pSocket, header, headLen) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
uError("failed to send http header to %s:%u since %s", server, port, terrstr()); wb[0] = uv_buf_init((char*)header, headLen); // stack var
goto SEND_OVER; wb[1] = uv_buf_init((char*)pCont, contLen); // heap var
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
cli->conn.data = cli;
cli->tcp.data = cli;
cli->req.data = cli;
cli->buf = wb;
cli->addr = tstrdup(server);
cli->port = port;
uv_loop_t* loop = uv_default_loop();
uv_tcp_init(loop, &cli->tcp);
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5);
uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb);
if (ret != 0) {
uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
destroyHttpClient(cli);
} }
if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { uv_run(loop, UV_RUN_DEFAULT);
terrno = TAOS_SYSTEM_ERROR(errno); uv_loop_close(loop);
uError("failed to send http content to %s:%u since %s", server, port, terrstr()); return terrno;
goto SEND_OVER;
}
// read something to avoid nginx error 499
if (taosWriteMsg(pSocket, header, 10) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to receive response from %s:%u since %s", server, port, terrstr());
goto SEND_OVER;
}
code = 0;
SEND_OVER:
if (pSocket != NULL) {
taosCloseSocket(&pSocket);
}
return code;
} }
// clang-format on // clang-format on
#endif