From c7e778f9a842e7ce5d7066aa129eda71d789b249 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 13:30:37 +0800 Subject: [PATCH 01/15] refactor code --- source/libs/transport/src/thttp.c | 164 +++++++++++++----------------- 1 file changed, 70 insertions(+), 94 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..52c692d22a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -14,14 +14,21 @@ */ #define _DEFAULT_SOURCE -#ifdef USE_UV -#include -#endif // clang-format off -#include "zlib.h" +#include #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "zlib.h" + +typedef struct SHttpClient { + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* buf; + char* addr; + uint16_t port; +} SHttpClient; static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } -int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { +static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { int32_t code = -1; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); @@ -114,40 +121,53 @@ _OVER: return code; } -#ifdef USE_UV -static void clientConnCb(uv_connect_t* req, int32_t status) { - if (status < 0) { +static void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->buf); + taosMemoryFree(cli->addr); + taosMemoryFree(cli); +} +static void clientCloseCb(uv_handle_t* handle) { + SHttpClient* cli = handle->data; + destroyHttpClient(cli); +} +static void clientSentCb(uv_write_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("connection error %s", uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, NULL); + uError("http-report failed to send data %s", uv_strerror(status)); + } else { + uInfo("http-report succ to send data"); + } + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); +} +static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); return; } - uv_buf_t* wb = req->data; - assert(wb != NULL); - uv_write_t write_req; - uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t*)req->handle, NULL); + uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb); } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ipv4 = taosGetIpv4FromFqdn(server); if (ipv4 == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); + uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; tinet_ntoa(ipv4Buf, ipv4); - + uv_ip4_addr(ipv4Buf, port, dest); + return 0; +} +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { struct sockaddr_in dest = {0}; - uv_ip4_addr(ipv4Buf, port, &dest); - - uv_tcp_t socket_tcp = {0}; - uv_loop_t* loop = uv_default_loop(); - uv_tcp_init(loop, &socket_tcp); - uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t)); - + if (taosBuildDstAddr(server, port, &dest) < 0) { + return -1; + } if (flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(pCont, contLen); if (dstLen > 0) { @@ -156,81 +176,37 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 flag = HTTP_FLAT; } } + terrno = 0; char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - uv_buf_t wb[2]; - wb[0] = uv_buf_init((char*)header, headLen); - wb[1] = uv_buf_init((char*)pCont, contLen); + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, headLen); // stack var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->buf = wb; + cli->addr = tstrdup(server); + cli->port = port; + + uv_loop_t* loop = uv_default_loop(); + uv_tcp_init(loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + + int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); + } - connect->data = wb; - terrno = 0; - uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); - taosMemoryFree(connect); return terrno; } - -#else -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; - TdSocketPtr pSocket = NULL; - - uint32_t ip = taosGetIpv4FromFqdn(server); - if (ip == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); - goto SEND_OVER; - } - - pSocket = taosOpenTcpClientSocket(ip, port, 0); - if (pSocket == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; - } else { - flag = HTTP_FLAT; - } - } - - char header[1024] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteMsg(pSocket, header, headLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http header to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http content to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - // read something to avoid nginx error 499 - if (taosWriteMsg(pSocket, header, 10) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to receive response from %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - code = 0; - -SEND_OVER: - if (pSocket != NULL) { - taosCloseSocket(&pSocket); - } - - return code; -} - // clang-format on -#endif From 4e9ead32aaf3693fc46cf4d2a40d7102951a8046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 15:56:14 +0800 Subject: [PATCH 02/15] refactor code --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 52c692d22a..806a786b00 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); - uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + //int32_t fd = taosCreateSocketWithTimeout(5); + //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 9eb99615fe58b552ba0ad0222d9e8249b05eb99c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 17:58:46 +0800 Subject: [PATCH 03/15] fix invalid packet --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transComm.c | 13 +++++++++---- source/libs/transport/src/transSvr.c | 26 ++++++++++++++++++-------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6b52c74271..1efb0bb316 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -98,6 +98,7 @@ typedef void* queue[2]; #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4272ec0b1c..af49e5845b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -112,15 +112,20 @@ int transClearBuffer(SConnBuffer* buf) { } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3512b27bf8..a94eb01beb 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -184,10 +184,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); + transUnrefSrvHandle(pConn); + return; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -220,7 +225,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -268,11 +272,17 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + uvHandleReq(conn); + } + return; + } else { + tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; From e016c8f729c7d3677c66616257e2a15c856118d6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 20:04:04 +0800 Subject: [PATCH 04/15] fix invalid packet --- source/libs/transport/inc/transComm.h | 6 ++++++ source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 8 ++++++-- source/libs/transport/src/transSvr.c | 16 ++++++++++------ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 1efb0bb316..bc1c6386f6 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -100,6 +100,10 @@ typedef void* queue[2]; #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; @@ -152,6 +156,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -204,6 +209,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index add007e14d..8fdfcd5309 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index af49e5845b..3ba8186e9d 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,6 +109,7 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } @@ -119,7 +121,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return -1; } int total = connBuf->total; - if (total >= HEADSIZE) { + if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); transResetBuffer(connBuf); @@ -178,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -185,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a94eb01beb..be6eafba97 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -183,15 +183,14 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { +static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; STransMsgHead* msg = NULL; int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); if (msgLen <= 0) { - tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); - transUnrefSrvHandle(pConn); - return; + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; } STransMsgHead* pHead = (STransMsgHead*)msg; @@ -207,7 +206,7 @@ static void uvHandleReq(SSvrConn* pConn) { // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); if (uvRecvReleaseReq(pConn, pHead)) { - return; + return true; } STransMsg transMsg; @@ -262,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -275,7 +275,10 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (uvHandleReq(conn) == false) { + destroyConn(conn, true); + break; + } } return; } else { @@ -374,6 +377,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); From 4bc57e96ba4b2c4b8d0861543e84a9a0d86af8e6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 09:40:58 +0800 Subject: [PATCH 05/15] fix invalid packet --- source/libs/transport/src/thttp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 806a786b00..76a7611c2e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -16,10 +16,10 @@ #define _DEFAULT_SOURCE // clang-format off #include +#include "zlib.h" #include "thttp.h" #include "taoserror.h" #include "tlog.h" -#include "zlib.h" typedef struct SHttpClient { uv_connect_t conn; @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - //int32_t fd = taosCreateSocketWithTimeout(5); - //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 4dc5df8453da96264097d857e88211212aa68197 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 10:34:55 +0800 Subject: [PATCH 06/15] fix invalid packet --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index be6eafba97..10063df329 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -277,7 +277,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (uvHandleReq(conn) == false) { destroyConn(conn, true); - break; + return; } } return; From 1a449a51a1215c428e818d54e506487e194de966 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 12:05:56 +0800 Subject: [PATCH 07/15] fix invalid packet --- source/libs/transport/src/thttp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 76a7611c2e..b902fbd13a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -178,7 +178,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } terrno = 0; - char header[1024] = {0}; + char header[2048] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); From f3ec1593bcd85eadff34d141e4825e58314c3b86 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:55:56 +0800 Subject: [PATCH 08/15] fix invalid packet --- source/libs/transport/src/transSvr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 10063df329..753ea00a45 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -265,10 +265,10 @@ static bool uvHandleReq(SSvrConn* pConn) { } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SSvrConn* conn = cli->data; + SSvrConn* conn = cli->data; + STrans* pTransInst = conn->pTransInst; + SConnBuffer* pBuf = &conn->readBuf; - STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); From e1ae5bc7f8884898e96c0a661614facf6cd5f350 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:59:23 +0800 Subject: [PATCH 09/15] fix invalid packet --- source/common/src/tglobal.c | 6 +++--- source/libs/transport/src/thttp.c | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c763bbed9c..adc5af1a17 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // telem -bool tsEnableTelem = false; +bool tsEnableTelem = true; int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; @@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; #ifndef _STORAGE -int32_t taosSetTfsCfg(SConfig *pCfg) { +int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); memset(tsDataDir, 0, PATH_MAX); @@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { uError("failed to create dataDir:%s", tsDataDir); return -1; } - return 0; + return 0; } #else int32_t taosSetTfsCfg(SConfig *pCfg); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index b902fbd13a..2b5c56965f 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -152,15 +152,15 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { - uint32_t ipv4 = taosGetIpv4FromFqdn(server); - if (ipv4 == 0xffffffff) { + uint32_t ip = taosGetIpv4FromFqdn(server); + if (ip == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; - tinet_ntoa(ipv4Buf, ipv4); - uv_ip4_addr(ipv4Buf, port, dest); + char buf[128] = {0}; + tinet_ntoa(buf, ip); + uv_ip4_addr(buf, port, dest); return 0; } int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { From 15110480bc3839889c851b45353f678b2b0ad3bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 14:22:07 +0800 Subject: [PATCH 10/15] fix invalid packet --- source/libs/transport/src/transSvr.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 753ea00a45..c980b70abd 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -204,7 +204,6 @@ static bool uvHandleReq(SSvrConn* pConn) { // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { return true; } From 9ef2d736897bb0c0feb81980cb259a31a43ef957 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 18 Aug 2022 16:06:49 +0800 Subject: [PATCH 11/15] fix(query): fix elapsed order by ts desc result inconsist after flush to disk TD-18453 --- source/libs/function/src/builtinsimpl.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5aeaecd598..bf4a07f8e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = - (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } - if (pCtx->end.key != INT64_MIN) { - pInfo->min = pCtx->end.key; + if (pCtx->end.key == INT64_MIN) { + pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->min; } else { - pInfo->min = ptsList[start]; + pInfo->min = pCtx->end.key; } } else { if (pCtx->start.key == INT64_MIN) { @@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { pInfo->min = pCtx->start.key; } - if (pCtx->end.key != INT64_MIN) { - pInfo->max = pCtx->end.key + 1; + if (pCtx->end.key == INT64_MIN) { + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { - pInfo->max = ptsList[start + pInput->numOfRows - 1]; + pInfo->max = pCtx->end.key + 1; } } } From 13a93d68ebdddc3d27ce39fa1962586f044b70aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 16:09:46 +0800 Subject: [PATCH 12/15] fix invalid packet --- source/libs/transport/src/thttp.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..dc0f9128b0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -221,7 +221,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - code = 0; SEND_OVER: From 7c2d6571f043680ad32bea7d477a741c27e7f716 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 18 Aug 2022 16:30:54 +0800 Subject: [PATCH 13/15] refactor(sync): adjust SYNC_MAX_RECV_TIME_RANGE_MS --- include/libs/sync/sync.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 790cbf906d..e6a4dd1d49 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -33,7 +33,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_READ_RANGE 2 #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) -#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 +#define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MAX_BATCH_SIZE 1 From 0bfadd871931dc18f90deb6b843e8ad35feb9a14 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:22:17 +0800 Subject: [PATCH 14/15] fix broken link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 2cfd4b6484..6ee6aa6373 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX) +另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx) From 98fd9aca109e9d791e87dd05f9a72d6b0c22bbf6 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:25:01 +0800 Subject: [PATCH 15/15] remove link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 6ee6aa6373..1ce485b042 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个 TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 - -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx)