From c7e778f9a842e7ce5d7066aa129eda71d789b249 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 13:30:37 +0800 Subject: [PATCH 1/8] refactor code --- source/libs/transport/src/thttp.c | 164 +++++++++++++----------------- 1 file changed, 70 insertions(+), 94 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..52c692d22a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -14,14 +14,21 @@ */ #define _DEFAULT_SOURCE -#ifdef USE_UV -#include -#endif // clang-format off -#include "zlib.h" +#include #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "zlib.h" + +typedef struct SHttpClient { + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* buf; + char* addr; + uint16_t port; +} SHttpClient; static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } -int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { +static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { int32_t code = -1; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); @@ -114,40 +121,53 @@ _OVER: return code; } -#ifdef USE_UV -static void clientConnCb(uv_connect_t* req, int32_t status) { - if (status < 0) { +static void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->buf); + taosMemoryFree(cli->addr); + taosMemoryFree(cli); +} +static void clientCloseCb(uv_handle_t* handle) { + SHttpClient* cli = handle->data; + destroyHttpClient(cli); +} +static void clientSentCb(uv_write_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("connection error %s", uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, NULL); + uError("http-report failed to send data %s", uv_strerror(status)); + } else { + uInfo("http-report succ to send data"); + } + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); +} +static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); return; } - uv_buf_t* wb = req->data; - assert(wb != NULL); - uv_write_t write_req; - uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t*)req->handle, NULL); + uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb); } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ipv4 = taosGetIpv4FromFqdn(server); if (ipv4 == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); + uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; tinet_ntoa(ipv4Buf, ipv4); - + uv_ip4_addr(ipv4Buf, port, dest); + return 0; +} +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { struct sockaddr_in dest = {0}; - uv_ip4_addr(ipv4Buf, port, &dest); - - uv_tcp_t socket_tcp = {0}; - uv_loop_t* loop = uv_default_loop(); - uv_tcp_init(loop, &socket_tcp); - uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t)); - + if (taosBuildDstAddr(server, port, &dest) < 0) { + return -1; + } if (flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(pCont, contLen); if (dstLen > 0) { @@ -156,81 +176,37 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 flag = HTTP_FLAT; } } + terrno = 0; char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - uv_buf_t wb[2]; - wb[0] = uv_buf_init((char*)header, headLen); - wb[1] = uv_buf_init((char*)pCont, contLen); + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, headLen); // stack var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->buf = wb; + cli->addr = tstrdup(server); + cli->port = port; + + uv_loop_t* loop = uv_default_loop(); + uv_tcp_init(loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + + int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); + } - connect->data = wb; - terrno = 0; - uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); - taosMemoryFree(connect); return terrno; } - -#else -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; - TdSocketPtr pSocket = NULL; - - uint32_t ip = taosGetIpv4FromFqdn(server); - if (ip == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); - goto SEND_OVER; - } - - pSocket = taosOpenTcpClientSocket(ip, port, 0); - if (pSocket == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; - } else { - flag = HTTP_FLAT; - } - } - - char header[1024] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteMsg(pSocket, header, headLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http header to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http content to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - // read something to avoid nginx error 499 - if (taosWriteMsg(pSocket, header, 10) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to receive response from %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - code = 0; - -SEND_OVER: - if (pSocket != NULL) { - taosCloseSocket(&pSocket); - } - - return code; -} - // clang-format on -#endif From 4e9ead32aaf3693fc46cf4d2a40d7102951a8046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 15:56:14 +0800 Subject: [PATCH 2/8] refactor code --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 52c692d22a..806a786b00 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); - uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + //int32_t fd = taosCreateSocketWithTimeout(5); + //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 4bc57e96ba4b2c4b8d0861543e84a9a0d86af8e6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 09:40:58 +0800 Subject: [PATCH 3/8] fix invalid packet --- source/libs/transport/src/thttp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 806a786b00..76a7611c2e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -16,10 +16,10 @@ #define _DEFAULT_SOURCE // clang-format off #include +#include "zlib.h" #include "thttp.h" #include "taoserror.h" #include "tlog.h" -#include "zlib.h" typedef struct SHttpClient { uv_connect_t conn; @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - //int32_t fd = taosCreateSocketWithTimeout(5); - //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 1a449a51a1215c428e818d54e506487e194de966 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 12:05:56 +0800 Subject: [PATCH 4/8] fix invalid packet --- source/libs/transport/src/thttp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 76a7611c2e..b902fbd13a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -178,7 +178,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } terrno = 0; - char header[1024] = {0}; + char header[2048] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); From e1ae5bc7f8884898e96c0a661614facf6cd5f350 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:59:23 +0800 Subject: [PATCH 5/8] fix invalid packet --- source/common/src/tglobal.c | 6 +++--- source/libs/transport/src/thttp.c | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c763bbed9c..adc5af1a17 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // telem -bool tsEnableTelem = false; +bool tsEnableTelem = true; int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; @@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; #ifndef _STORAGE -int32_t taosSetTfsCfg(SConfig *pCfg) { +int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); memset(tsDataDir, 0, PATH_MAX); @@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { uError("failed to create dataDir:%s", tsDataDir); return -1; } - return 0; + return 0; } #else int32_t taosSetTfsCfg(SConfig *pCfg); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index b902fbd13a..2b5c56965f 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -152,15 +152,15 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { - uint32_t ipv4 = taosGetIpv4FromFqdn(server); - if (ipv4 == 0xffffffff) { + uint32_t ip = taosGetIpv4FromFqdn(server); + if (ip == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; - tinet_ntoa(ipv4Buf, ipv4); - uv_ip4_addr(ipv4Buf, port, dest); + char buf[128] = {0}; + tinet_ntoa(buf, ip); + uv_ip4_addr(buf, port, dest); return 0; } int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { From 9ef2d736897bb0c0feb81980cb259a31a43ef957 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 18 Aug 2022 16:06:49 +0800 Subject: [PATCH 6/8] fix(query): fix elapsed order by ts desc result inconsist after flush to disk TD-18453 --- source/libs/function/src/builtinsimpl.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5aeaecd598..bf4a07f8e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = - (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } - if (pCtx->end.key != INT64_MIN) { - pInfo->min = pCtx->end.key; + if (pCtx->end.key == INT64_MIN) { + pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->min; } else { - pInfo->min = ptsList[start]; + pInfo->min = pCtx->end.key; } } else { if (pCtx->start.key == INT64_MIN) { @@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { pInfo->min = pCtx->start.key; } - if (pCtx->end.key != INT64_MIN) { - pInfo->max = pCtx->end.key + 1; + if (pCtx->end.key == INT64_MIN) { + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { - pInfo->max = ptsList[start + pInput->numOfRows - 1]; + pInfo->max = pCtx->end.key + 1; } } } From 0bfadd871931dc18f90deb6b843e8ad35feb9a14 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:22:17 +0800 Subject: [PATCH 7/8] fix broken link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 2cfd4b6484..6ee6aa6373 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX) +另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx) From 98fd9aca109e9d791e87dd05f9a72d6b0c22bbf6 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:25:01 +0800 Subject: [PATCH 8/8] remove link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 6ee6aa6373..1ce485b042 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个 TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 - -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx)