From b888c90054fb5f82479f93c6fafe9b577f8598e1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 17 Aug 2022 10:38:40 +0800 Subject: [PATCH 01/29] fix: change the interval data load status when scan --- source/libs/executor/src/scanoperator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 454a0b0070..12df378a42 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -139,7 +139,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.ekey > pBlockInfo->window.ekey); - if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) { return true; } } @@ -147,7 +147,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey); assert(w.skey <= pBlockInfo->window.ekey); - if (w.skey > pBlockInfo->window.skey) { + if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } @@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn } assert(w.skey < pBlockInfo->window.skey); - if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { + if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) { return true; } } From c7e778f9a842e7ce5d7066aa129eda71d789b249 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 13:30:37 +0800 Subject: [PATCH 02/29] refactor code --- source/libs/transport/src/thttp.c | 164 +++++++++++++----------------- 1 file changed, 70 insertions(+), 94 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..52c692d22a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -14,14 +14,21 @@ */ #define _DEFAULT_SOURCE -#ifdef USE_UV -#include -#endif // clang-format off -#include "zlib.h" +#include #include "thttp.h" #include "taoserror.h" #include "tlog.h" +#include "zlib.h" + +typedef struct SHttpClient { + uv_connect_t conn; + uv_tcp_t tcp; + uv_write_t req; + uv_buf_t* buf; + char* addr; + uint16_t port; +} SHttpClient; static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -45,7 +52,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH } } -int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { +static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { int32_t code = -1; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); @@ -114,40 +121,53 @@ _OVER: return code; } -#ifdef USE_UV -static void clientConnCb(uv_connect_t* req, int32_t status) { - if (status < 0) { +static void destroyHttpClient(SHttpClient* cli) { + taosMemoryFree(cli->buf); + taosMemoryFree(cli->addr); + taosMemoryFree(cli); +} +static void clientCloseCb(uv_handle_t* handle) { + SHttpClient* cli = handle->data; + destroyHttpClient(cli); +} +static void clientSentCb(uv_write_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { terrno = TAOS_SYSTEM_ERROR(status); - uError("connection error %s", uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, NULL); + uError("http-report failed to send data %s", uv_strerror(status)); + } else { + uInfo("http-report succ to send data"); + } + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); +} +static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpClient* cli = req->data; + if (status != 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); return; } - uv_buf_t* wb = req->data; - assert(wb != NULL); - uv_write_t write_req; - uv_write(&write_req, req->handle, wb, 2, NULL); - uv_close((uv_handle_t*)req->handle, NULL); + uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->buf, 2, clientSentCb); } -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ipv4 = taosGetIpv4FromFqdn(server); if (ipv4 == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); + uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; tinet_ntoa(ipv4Buf, ipv4); - + uv_ip4_addr(ipv4Buf, port, dest); + return 0; +} +int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { struct sockaddr_in dest = {0}; - uv_ip4_addr(ipv4Buf, port, &dest); - - uv_tcp_t socket_tcp = {0}; - uv_loop_t* loop = uv_default_loop(); - uv_tcp_init(loop, &socket_tcp); - uv_connect_t* connect = (uv_connect_t*)taosMemoryMalloc(sizeof(uv_connect_t)); - + if (taosBuildDstAddr(server, port, &dest) < 0) { + return -1; + } if (flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(pCont, contLen); if (dstLen > 0) { @@ -156,81 +176,37 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 flag = HTTP_FLAT; } } + terrno = 0; char header[1024] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - uv_buf_t wb[2]; - wb[0] = uv_buf_init((char*)header, headLen); - wb[1] = uv_buf_init((char*)pCont, contLen); + uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + wb[0] = uv_buf_init((char*)header, headLen); // stack var + wb[1] = uv_buf_init((char*)pCont, contLen); // heap var + + SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + cli->conn.data = cli; + cli->tcp.data = cli; + cli->req.data = cli; + cli->buf = wb; + cli->addr = tstrdup(server); + cli->port = port; + + uv_loop_t* loop = uv_default_loop(); + uv_tcp_init(loop, &cli->tcp); + // set up timeout to avoid stuck; + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + + int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); + if (ret != 0) { + uError("http-report failed to connect to server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + destroyHttpClient(cli); + } - connect->data = wb; - terrno = 0; - uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); uv_run(loop, UV_RUN_DEFAULT); uv_loop_close(loop); - taosMemoryFree(connect); return terrno; } - -#else -int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; - TdSocketPtr pSocket = NULL; - - uint32_t ip = taosGetIpv4FromFqdn(server); - if (ip == 0xffffffff) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to get http server:%s ip since %s", server, terrstr()); - goto SEND_OVER; - } - - pSocket = taosOpenTcpClientSocket(ip, port, 0); - if (pSocket == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (flag == HTTP_GZIP) { - int32_t dstLen = taosCompressHttpRport(pCont, contLen); - if (dstLen > 0) { - contLen = dstLen; - } else { - flag = HTTP_FLAT; - } - } - - char header[1024] = {0}; - int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); - if (taosWriteMsg(pSocket, header, headLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http header to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - if (taosWriteMsg(pSocket, (void*)pCont, contLen) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http content to %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - // read something to avoid nginx error 499 - if (taosWriteMsg(pSocket, header, 10) < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to receive response from %s:%u since %s", server, port, terrstr()); - goto SEND_OVER; - } - - code = 0; - -SEND_OVER: - if (pSocket != NULL) { - taosCloseSocket(&pSocket); - } - - return code; -} - // clang-format on -#endif From 4e9ead32aaf3693fc46cf4d2a40d7102951a8046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 15:56:14 +0800 Subject: [PATCH 03/29] refactor code --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 52c692d22a..806a786b00 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); - uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + //int32_t fd = taosCreateSocketWithTimeout(5); + //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 9eb99615fe58b552ba0ad0222d9e8249b05eb99c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 17:58:46 +0800 Subject: [PATCH 04/29] fix invalid packet --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transComm.c | 13 +++++++++---- source/libs/transport/src/transSvr.c | 26 ++++++++++++++++++-------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6b52c74271..1efb0bb316 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -98,6 +98,7 @@ typedef void* queue[2]; #define TRANS_RETRY_INTERVAL 15 // retry interval (ms) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 4272ec0b1c..af49e5845b 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -112,15 +112,20 @@ int transClearBuffer(SConnBuffer* buf) { } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3512b27bf8..a94eb01beb 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -184,10 +184,15 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); + transUnrefSrvHandle(pConn); + return; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -220,7 +225,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -268,11 +272,17 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + uvHandleReq(conn); + } + return; + } else { + tError("%s conn %p read unexpected packet, exceed limit", transLabel(pTransInst), conn); + destroyConn(conn, true); + return; } - return; } if (nread == 0) { return; From e016c8f729c7d3677c66616257e2a15c856118d6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 20:04:04 +0800 Subject: [PATCH 05/29] fix invalid packet --- source/libs/transport/inc/transComm.h | 6 ++++++ source/libs/transport/src/transCli.c | 1 + source/libs/transport/src/transComm.c | 8 ++++++-- source/libs/transport/src/transSvr.c | 16 ++++++++++------ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 1efb0bb316..bc1c6386f6 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -100,6 +100,10 @@ typedef void* queue[2]; #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; @@ -152,6 +156,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -204,6 +209,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index add007e14d..8fdfcd5309 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -759,6 +759,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index af49e5845b..3ba8186e9d 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,6 +109,7 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } @@ -119,7 +121,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return -1; } int total = connBuf->total; - if (total >= HEADSIZE) { + if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); transResetBuffer(connBuf); @@ -178,6 +180,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -185,7 +188,8 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a94eb01beb..be6eafba97 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -183,15 +183,14 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { +static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; STransMsgHead* msg = NULL; int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); if (msgLen <= 0) { - tError("%s conn %p alread read complete packet", transLabel(pTransInst), pConn); - transUnrefSrvHandle(pConn); - return; + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; } STransMsgHead* pHead = (STransMsgHead*)msg; @@ -207,7 +206,7 @@ static void uvHandleReq(SSvrConn* pConn) { // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); if (uvRecvReleaseReq(pConn, pHead)) { - return; + return true; } STransMsg transMsg; @@ -262,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -275,7 +275,10 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->len <= TRANS_PACKET_LIMIT) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (uvHandleReq(conn) == false) { + destroyConn(conn, true); + break; + } } return; } else { @@ -374,6 +377,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); From c6f60ba3d78e92ef494c5d1129e998a40f3f4d2d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 20:49:59 +0800 Subject: [PATCH 06/29] fix invalid packet --- source/libs/transport/inc/transComm.h | 8 +++ source/libs/transport/src/transCli.c | 66 +++++++++++------- source/libs/transport/src/transComm.c | 19 +++-- source/libs/transport/src/transSvr.c | 99 +++++++++++++++------------ 4 files changed, 119 insertions(+), 73 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 04b58da570..117455f722 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -99,6 +99,12 @@ typedef void* queue[2]; #define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) +#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 + +#define TRANS_MAGIC_NUM 0x5f375a86 + +#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) + typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; @@ -151,6 +157,7 @@ typedef struct { char hasEpSet : 2; // contain epset or not, 0(default): no epset, 1: contain epset char user[TSDB_UNI_LEN]; + uint32_t magicNum; STraceId traceId; uint64_t ahandle; // ahandle assigned by client uint32_t code; // del later @@ -203,6 +210,7 @@ typedef struct SConnBuffer { int cap; int left; int total; + int invalid; } SConnBuffer; typedef void (*AsyncCB)(uv_async_t* handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9eea43be23..5428b8acf6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -127,6 +127,8 @@ static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); +static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); + static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -211,28 +213,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - uint64_t ahandle = head->ahandle; \ - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ - if (cliMsg->type == Release) return; \ - } \ - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \ - if (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - destroyCmsg(pMsg); \ - cliReleaseUnfinishedMsg(conn); \ - transQueueClear(&conn->cliMsgs); \ - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ - return; \ - } \ - } while (0) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -346,10 +326,17 @@ void cliHandleResp(SCliConn* conn) { } STransMsgHead* pHead = NULL; - transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDumpFromBuffer(&conn->readBuf, (char**)&pHead) <= 0) { + tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + return; + } pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); + if (cliRecvReleaseReq(conn, pHead)) { + return; + } + STransMsg transMsg = {0}; transMsg.contLen = transContLenFromMsg(pHead->msgLen); transMsg.pCont = transContFromHead((char*)pHead); @@ -361,7 +348,6 @@ void cliHandleResp(SCliConn* conn) { SCliMsg* pMsg = NULL; STransConnCtx* pCtx = NULL; - CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -625,7 +611,12 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { pBuf->len += nread; while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); - cliHandleResp(conn); + if (pBuf->invalid) { + cliHandleExcept(conn); + break; + } else { + cliHandleResp(conn); + } } return; } @@ -786,6 +777,7 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); pHead->traceId = pMsg->info.traceId; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); @@ -1053,6 +1045,30 @@ static void cliPrepareCb(uv_prepare_t* handle) { if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } +bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { + if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + uint64_t ahandle = pHead->ahandle; + SCliMsg* pMsg = NULL; + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + transClearBuffer(&conn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { + SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); + if (cliMsg->type == Release) return true; + } + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); + if (T_REF_VAL_GET(conn) > 1) { + transUnrefCliHandle(conn); + } + destroyCmsg(pMsg); + cliReleaseUnfinishedMsg(conn); + transQueueClear(&conn->cliMsgs); + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); + return true; + } + return false; +} + static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b568163e23..c50d0d3e5c 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -91,6 +91,7 @@ int transInitBuffer(SConnBuffer* buf) { buf->left = -1; buf->len = 0; buf->total = 0; + buf->invalid = 0; return 0; } int transDestroyBuffer(SConnBuffer* p) { @@ -108,19 +109,24 @@ int transClearBuffer(SConnBuffer* buf) { p->left = -1; p->len = 0; p->total = 0; + p->invalid = 0; return 0; } int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { - SConnBuffer* p = connBuf; + static const int HEADSIZE = sizeof(STransMsgHead); + SConnBuffer* p = connBuf; if (p->left != 0) { return -1; } int total = connBuf->total; - *buf = taosMemoryCalloc(1, total); - memcpy(*buf, p->buf, total); - - transResetBuffer(connBuf); + if (total >= HEADSIZE && !p->invalid) { + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + transResetBuffer(connBuf); + } else { + total = -1; + } return total; } @@ -173,6 +179,7 @@ bool transReadComplete(SConnBuffer* connBuf) { memcpy((char*)&head, connBuf->buf, sizeof(head)); int32_t msgLen = (int32_t)htonl(head.msgLen); p->total = msgLen; + p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); } if (p->total >= p->len) { p->left = p->total - p->len; @@ -180,7 +187,7 @@ bool transReadComplete(SConnBuffer* connBuf) { p->left = 0; } } - return p->left == 0 ? true : false; + return (p->left == 0 || p->invalid) ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4d35e346b1..f8f55ab1d8 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -114,6 +114,8 @@ static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvPrepareCb(uv_prepare_t* handle); +static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead); + /* * time-consuming task throwed into BG work thread */ @@ -154,37 +156,6 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - reallocConnRef(conn); \ - tTrace("conn %p received release request", conn); \ - \ - STraceId traceId = head->traceId; \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - \ - STransMsg tmsg = { \ - .code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - #define SRV_RELEASE_UV(loop) \ do { \ uv_walk(loop, uvWalkCb, NULL); \ @@ -212,17 +183,25 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSvrConn* pConn) { - STransMsgHead* msg = NULL; - int msgLen = 0; +static bool uvHandleReq(SSvrConn* pConn) { + STrans* pTransInst = pConn->pTransInst; - msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + STransMsgHead* msg = NULL; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); + if (msgLen <= 0) { + tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); + return false; + } STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); memcpy(pConn->user, pHead->user, strlen(pHead->user)); + if (uvRecvReleaseReq(pConn, pHead)) { + return true; + } + // TODO(dengyihao): time-consuming task throwed into BG Thread // uv_work_t* wreq = taosMemoryMalloc(sizeof(uv_work_t)); // wreq->data = pConn; @@ -230,8 +209,6 @@ static void uvHandleReq(SSvrConn* pConn) { // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - CONN_SHOULD_RELEASE(pConn, pHead); - STransMsg transMsg; memset(&transMsg, 0, sizeof(transMsg)); transMsg.contLen = transContLenFromMsg(pHead->msgLen); @@ -247,7 +224,6 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } - STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); @@ -285,6 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); + return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -295,11 +272,18 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); - while (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); - uvHandleReq(conn); + if (pBuf->len <= TRANS_PACKET_LIMIT) { + while (transReadComplete(pBuf)) { + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); + if (pBuf->invalid) { + destroyConn(conn, true); + break; + } else { + if (false == uvHandleReq(conn)) break; + } + } + return; } - return; } if (nread == 0) { return; @@ -391,6 +375,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { pHead->ahandle = (uint64_t)pMsg->info.ahandle; pHead->traceId = pMsg->info.traceId; pHead->hasEpSet = pMsg->info.hasEpSet; + pHead->magicNum = htonl(TRANS_MAGIC_NUM); if (pConn->status == ConnNormal) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); @@ -591,6 +576,36 @@ static void uvPrepareCb(uv_prepare_t* handle) { } } +static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { + if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { + reallocConnRef(pConn); + tTrace("conn %p received release request", pConn); + + STraceId traceId = pHead->traceId; + pConn->status = ConnRelease; + transClearBuffer(&pConn->readBuf); + transFreeMsg(transContFromHead((char*)pHead)); + + STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + srvMsg->msg = tmsg; + srvMsg->type = Release; + srvMsg->pConn = pConn; + if (!transQueuePush(&pConn->srvMsgs, srvMsg)) { + return true; + } + if (pConn->regArg.init) { + tTrace("conn %p release, notify server app", pConn); + STrans* pTransInst = pConn->pTransInst; + (*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL); + memset(&pConn->regArg, 0, sizeof(pConn->regArg)); + } + uvStartSendResp(srvMsg); + return true; + } + return false; +} + static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task // only auth conn currently, add more func later From 66f6dcddb85206630df1b1d339ee7915ec7269e3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 17 Aug 2022 23:02:22 +0800 Subject: [PATCH 07/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f8f55ab1d8..f84b87a43d 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -276,6 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (pBuf->invalid) { + tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); destroyConn(conn, true); break; } else { @@ -600,7 +601,7 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { (*pTransInst->cfp)(pTransInst->parent, &(pConn->regArg.msg), NULL); memset(&pConn->regArg, 0, sizeof(pConn->regArg)); } - uvStartSendResp(srvMsg); + uvStartSendRespInternal(srvMsg); return true; } return false; @@ -872,6 +873,7 @@ static int reallocConnRef(SSvrConn* conn) { } static void uvDestroyConn(uv_handle_t* handle) { SSvrConn* conn = handle->data; + if (conn == NULL) { return; } @@ -887,9 +889,8 @@ static void uvDestroyConn(uv_handle_t* handle) { SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); destroySmsg(msg); } - - transReqQueueClear(&conn->wreqQueue); transQueueDestroy(&conn->srvMsgs); + transReqQueueClear(&conn->wreqQueue); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); From e7e6a4c7e4669f45ae2eca690c7bda57d115a398 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 09:22:00 +0800 Subject: [PATCH 08/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f84b87a43d..f50711c59c 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -284,6 +284,9 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } } return; + } else { + destroyConn(conn, true); + return; } } if (nread == 0) { From 4bc57e96ba4b2c4b8d0861543e84a9a0d86af8e6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 09:40:58 +0800 Subject: [PATCH 09/29] fix invalid packet --- source/libs/transport/src/thttp.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 806a786b00..76a7611c2e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -16,10 +16,10 @@ #define _DEFAULT_SOURCE // clang-format off #include +#include "zlib.h" #include "thttp.h" #include "taoserror.h" #include "tlog.h" -#include "zlib.h" typedef struct SHttpClient { uv_connect_t conn; @@ -196,8 +196,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uv_loop_t* loop = uv_default_loop(); uv_tcp_init(loop, &cli->tcp); // set up timeout to avoid stuck; - //int32_t fd = taosCreateSocketWithTimeout(5); - //uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); + int32_t fd = taosCreateSocketWithTimeout(5); + uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int32_t ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&dest, clientConnCb); if (ret != 0) { From 4dc5df8453da96264097d857e88211212aa68197 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 10:34:55 +0800 Subject: [PATCH 10/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index be6eafba97..10063df329 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -277,7 +277,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (uvHandleReq(conn) == false) { destroyConn(conn, true); - break; + return; } } return; From 1a449a51a1215c428e818d54e506487e194de966 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 12:05:56 +0800 Subject: [PATCH 11/29] fix invalid packet --- source/libs/transport/src/thttp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 76a7611c2e..b902fbd13a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -178,7 +178,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 } terrno = 0; - char header[1024] = {0}; + char header[2048] = {0}; int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag); uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); From f3ec1593bcd85eadff34d141e4825e58314c3b86 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:55:56 +0800 Subject: [PATCH 12/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 10063df329..753ea00a45 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -265,10 +265,10 @@ static bool uvHandleReq(SSvrConn* pConn) { } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SSvrConn* conn = cli->data; + SSvrConn* conn = cli->data; + STrans* pTransInst = conn->pTransInst; + SConnBuffer* pBuf = &conn->readBuf; - STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; tTrace("%s conn %p total read:%d, current read:%d", transLabel(pTransInst), conn, pBuf->len, (int)nread); From e1ae5bc7f8884898e96c0a661614facf6cd5f350 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 13:59:23 +0800 Subject: [PATCH 13/29] fix invalid packet --- source/common/src/tglobal.c | 6 +++--- source/libs/transport/src/thttp.c | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c763bbed9c..adc5af1a17 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,7 +75,7 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // telem -bool tsEnableTelem = false; +bool tsEnableTelem = true; int32_t tsTelemInterval = 86400; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; @@ -166,7 +166,7 @@ int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; #ifndef _STORAGE -int32_t taosSetTfsCfg(SConfig *pCfg) { +int32_t taosSetTfsCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); memset(tsDataDir, 0, PATH_MAX); @@ -180,7 +180,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { uError("failed to create dataDir:%s", tsDataDir); return -1; } - return 0; + return 0; } #else int32_t taosSetTfsCfg(SConfig *pCfg); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index b902fbd13a..2b5c56965f 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -152,15 +152,15 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } static int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { - uint32_t ipv4 = taosGetIpv4FromFqdn(server); - if (ipv4 == 0xffffffff) { + uint32_t ip = taosGetIpv4FromFqdn(server); + if (ip == 0xffffffff) { terrno = TAOS_SYSTEM_ERROR(errno); uError("http-report failed to get http server:%s ip since %s", server, terrstr()); return -1; } - char ipv4Buf[128] = {0}; - tinet_ntoa(ipv4Buf, ipv4); - uv_ip4_addr(ipv4Buf, port, dest); + char buf[128] = {0}; + tinet_ntoa(buf, ip); + uv_ip4_addr(buf, port, dest); return 0; } int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { From 15110480bc3839889c851b45353f678b2b0ad3bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 14:22:07 +0800 Subject: [PATCH 14/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 753ea00a45..c980b70abd 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -204,7 +204,6 @@ static bool uvHandleReq(SSvrConn* pConn) { // uv_read_stop((uv_stream_t*)pConn->pTcp); // transRefSrvHandle(pConn); // uv_queue_work(((SWorkThrd*)pConn->hostThrd)->loop, wreq, uvWorkDoTask, uvWorkAfterTask); - if (uvRecvReleaseReq(pConn, pHead)) { return true; } From 45dc2ec1f20f9e1df21b6f044bfb4cc116b6bdb0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 Aug 2022 14:32:11 +0800 Subject: [PATCH 15/29] fix(tsdb/cache): add DCLP to tsdbCache --- source/dnode/vnode/src/inc/tsdb.h | 3 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 53 ++++++++++++++++--------- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 +- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f1e980c026..a30f308ecd 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -260,7 +260,7 @@ void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); // tsdbCache int32_t tsdbOpenCache(STsdb *pTsdb); -void tsdbCloseCache(SLRUCache *pCache); +void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); @@ -298,6 +298,7 @@ struct STsdb { SMemTable *imem; STsdbFS fs; SLRUCache *lruCache; + TdThreadMutex lruMutex; }; struct TSDBKEY { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f03b02af27..1d2c5c3b32 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -33,16 +33,21 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { taosLRUCacheSetStrictCapacity(pCache, true); + taosThreadMutexInit(&pTsdb->lruMutex, NULL); + _err: pTsdb->lruCache = pCache; return code; } -void tsdbCloseCache(SLRUCache *pCache) { +void tsdbCloseCache(STsdb *pTsdb) { + SLRUCache *pCache = pTsdb->lruCache; if (pCache) { taosLRUCacheEraseUnrefEntries(pCache); taosLRUCacheCleanup(pCache); + + taosThreadMutexDestroy(&pTsdb->lruMutex); } } @@ -1100,26 +1105,38 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH // getTableCacheKeyS(uid, "lr", key, &keyLen); getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - } else { - STSRow *pRow = NULL; - bool dup = false; // which is always false for now - code = mergeLastRow(uid, pTsdb, &dup, &pRow); - // if table's empty or error, return code of -1 - if (code < 0 || pRow == NULL) { - if (!dup && pRow) { - taosMemoryFree(pRow); + if (!h) { + taosThreadMutexLock(&pTsdb->lruMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); + if (!h) { + STSRow *pRow = NULL; + bool dup = false; // which is always false for now + code = mergeLastRow(uid, pTsdb, &dup, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + if (!dup && pRow) { + taosMemoryFree(pRow); + } + + *handle = NULL; + return 0; } - *handle = NULL; - return 0; - } + _taos_lru_deleter_t deleter = deleteTableCacheLastrow; + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); + if (status != TAOS_LRU_STATUS_OK) { + code = -1; + } - _taos_lru_deleter_t deleter = deleteTableCacheLastrow; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; + taosThreadMutexUnlock(&pTsdb->lruMutex); + } else { + taosThreadMutexUnlock(&pTsdb->lruMutex); + + *handle = h; + + return code; } h = taosLRUCacheLookup(pCache, key, keyLen); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index be2828d187..ec760e3c57 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -86,7 +86,7 @@ int tsdbClose(STsdb **pTsdb) { if (*pTsdb) { taosThreadRwlockDestroy(&(*pTsdb)->rwLock); tsdbFSClose(*pTsdb); - tsdbCloseCache((*pTsdb)->lruCache); + tsdbCloseCache(*pTsdb); taosMemoryFreeClear(*pTsdb); } return 0; From 9ef2d736897bb0c0feb81980cb259a31a43ef957 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 18 Aug 2022 16:06:49 +0800 Subject: [PATCH 16/29] fix(query): fix elapsed order by ts desc result inconsist after flush to disk TD-18453 --- source/libs/function/src/builtinsimpl.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5aeaecd598..bf4a07f8e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3970,16 +3970,16 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, 0); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = - (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start]) ? ptsList[start] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } - if (pCtx->end.key != INT64_MIN) { - pInfo->min = pCtx->end.key; + if (pCtx->end.key == INT64_MIN) { + pInfo->min = (pInfo->min > ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->min; } else { - pInfo->min = ptsList[start]; + pInfo->min = pCtx->end.key; } } else { if (pCtx->start.key == INT64_MIN) { @@ -3988,10 +3988,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { pInfo->min = pCtx->start.key; } - if (pCtx->end.key != INT64_MIN) { - pInfo->max = pCtx->end.key + 1; + if (pCtx->end.key == INT64_MIN) { + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? + ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { - pInfo->max = ptsList[start + pInput->numOfRows - 1]; + pInfo->max = pCtx->end.key + 1; } } } From 13a93d68ebdddc3d27ce39fa1962586f044b70aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 16:09:46 +0800 Subject: [PATCH 17/29] fix invalid packet --- source/libs/transport/src/thttp.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 62277a7569..dc0f9128b0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -221,7 +221,6 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - code = 0; SEND_OVER: From 48bf21d321b923e3f1331b2f984189f1a1785c39 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Thu, 18 Aug 2022 17:32:08 +0800 Subject: [PATCH 18/29] build: move check into shell file --- cmake/cmake.install | 32 +----------------------- contrib/CMakeLists.txt | 18 -------------- packaging/tools/make_install.bat | 42 +++++++++++++++++++++++++++++++- packaging/tools/make_install.sh | 4 ++- 4 files changed, 45 insertions(+), 51 deletions(-) diff --git a/cmake/cmake.install b/cmake/cmake.install index 4e3d0b166a..6dc6864975 100644 --- a/cmake/cmake.install +++ b/cmake/cmake.install @@ -1,38 +1,8 @@ -IF (EXISTS /var/lib/taos/dnode/dnodeCfg.json) - INSTALL(CODE "MESSAGE(\"The default data directory /var/lib/taos contains old data of tdengine 2.x, please clear it before installing!\")") -ELSEIF (EXISTS C:/TDengine/data/dnode/dnodeCfg.json) - INSTALL(CODE "MESSAGE(\"The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing!\")") -ELSEIF (TD_LINUX) +IF (TD_LINUX) SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})") ELSEIF (TD_WINDOWS) - SET(CMAKE_INSTALL_PREFIX C:/TDengine) - - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/go DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/nodejs DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector) - # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .) - INSTALL(CODE "IF (NOT EXISTS ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) - execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) - ENDIF ()") - INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include) - INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include) - INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.lib DESTINATION driver) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos_static.lib DESTINATION driver) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos.dll DESTINATION driver) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taos.exe DESTINATION .) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosd.exe DESTINATION .) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/udfd.exe DESTINATION .) - IF (BUILD_TOOLS) - INSTALL(FILES ${EXECUTABLE_OUTPUT_PATH}/taosBenchmark.exe DESTINATION .) - ENDIF () - - IF (TD_MVN_INSTALLED) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.38-dist.jar DESTINATION connector/jdbc) - ENDIF () SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.bat") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} :needAdmin ${TD_SOURCE_DIR} ${PROJECT_BINARY_DIR} Windows ${TD_VER_NUMBER})") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index a1eec81ee0..494d1577fc 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -135,24 +135,6 @@ execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") execute_process(COMMAND "${CMAKE_COMMAND}" --build . WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") - -# clear submodule -execute_process(COMMAND git submodule deinit -f tools/taos-tools - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taos-tools - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f tools/taosadapter - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taosadapter - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f tools/taosws-rs - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached tools/taosws-rs - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git submodule deinit -f examples/rust - WORKING_DIRECTORY "${TD_SOURCE_DIR}") -execute_process(COMMAND git rm --cached examples/rust - WORKING_DIRECTORY "${TD_SOURCE_DIR}") # ================================================================================================ # Build diff --git a/packaging/tools/make_install.bat b/packaging/tools/make_install.bat index 0f9e836ae2..3c27c1beca 100644 --- a/packaging/tools/make_install.bat +++ b/packaging/tools/make_install.bat @@ -1,7 +1,47 @@ @echo off goto %1 :needAdmin + +if exist C:\\TDengine\\data\\dnode\\dnodeCfg.json ( + echo The default data directory C:/TDengine/data contains old data of tdengine 2.x, please clear it before installing! +) +set source_dir=%2 +set source_dir=%source_dir:/=\\% +set binary_dir=%3 +set binary_dir=%binary_dir:/=\\% +set osType=%4 +set verNumber=%5 +set tagert_dir=C:\\TDengine + +if not exist %tagert_dir% ( + mkdir %tagert_dir% +) +if not exist %tagert_dir%\\cfg ( + mkdir %tagert_dir%\\cfg +) +if not exist %tagert_dir%\\include ( + mkdir %tagert_dir%\\include +) +if not exist %tagert_dir%\\driver ( + mkdir %tagert_dir%\\driver +) +if not exist C:\\TDengine\\cfg\\taos.cfg ( + copy %source_dir%\\packaging\\cfg\\taos.cfg %tagert_dir%\\cfg\\taos.cfg > nul +) +copy %source_dir%\\include\\client\\taos.h %tagert_dir%\\include > nul +copy %source_dir%\\include\\util\\taoserror.h %tagert_dir%\\include > nul +copy %source_dir%\\include\\libs\\function\\taosudf.h %tagert_dir%\\include > nul +copy %binary_dir%\\build\\lib\\taos.lib %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos_static.lib %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\lib\\taos.dll %tagert_dir%\\driver > nul +copy %binary_dir%\\build\\bin\\taos.exe %tagert_dir% > nul +copy %binary_dir%\\build\\bin\\taosd.exe %tagert_dir% > nul +copy %binary_dir%\\build\\bin\\udfd.exe %tagert_dir% > nul +if exist %binary_dir%\\build\\bin\\taosBenchmark.exe ( + copy %binary_dir%\\build\\bin\\taosBenchmark.exe %tagert_dir% > nul +) + mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&& echo To start/stop TDengine with administrator privileges: sc start/stop taosd &goto :eof :hasAdmin -cp -f C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 +copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index d8d4c5bf2a..6a95ace99e 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -664,7 +664,9 @@ function install_TDengine() { ## ==============================Main program starts from here============================ echo source directory: $1 echo binary directory: $2 -if [ "$osType" != "Darwin" ]; then +if [ -x ${data_dir}/dnode/dnodeCfg.json ]; then + echo -e "\033[44;31;5mThe default data directory ${data_dir} contains old data of tdengine 2.x, please clear it before installing!\033[0m" +elif [ "$osType" != "Darwin" ]; then if [ -x ${bin_dir}/${clientName} ]; then update_TDengine else From 3e6ccb83570052d781d85e491dcf9695e3be5478 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 Aug 2022 18:00:03 +0800 Subject: [PATCH 19/29] fix: filter producing zero rows will not be treated as sys table scan operator completion --- source/libs/executor/src/scanoperator.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 367fae66a6..7da290240f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2213,9 +2213,18 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { colDataAppend(pColInfoData, numOfRows, n, false); if (++numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + if (pInfo->pRes->info.rows > 0) + break; } } + blockDataDestroy(p); // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { @@ -2224,14 +2233,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } From 0bfadd871931dc18f90deb6b843e8ad35feb9a14 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:22:17 +0800 Subject: [PATCH 20/29] fix broken link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 2cfd4b6484..6ee6aa6373 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX) +另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx) From 98fd9aca109e9d791e87dd05f9a72d6b0c22bbf6 Mon Sep 17 00:00:00 2001 From: BoDing Date: Thu, 18 Aug 2022 18:25:01 +0800 Subject: [PATCH 21/29] remove link to taosx --- docs/zh/17-operation/03-tolerance.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index 6ee6aa6373..1ce485b042 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -26,5 +26,3 @@ TDengine 集群中的时序数据的副本数是与数据库关联的,一个 TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 - -另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosx) From 1a8fb961823be1ed14f05852155700fe2f002040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Thu, 18 Aug 2022 18:30:03 +0800 Subject: [PATCH 22/29] fix: reset row number when filter produces zero row --- source/libs/executor/src/scanoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f2c4a8e3b6..e578e34b82 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2220,8 +2220,12 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doFilterResult(pInfo); blockDataCleanup(p); - if (pInfo->pRes->info.rows > 0) + if (pInfo->pRes->info.rows > 0) { break; + } else { + numOfRows = 0; + continue; + } } } blockDataDestroy(p); From 2b092f6551988aebd4aaf6938363593b999979ed Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 18 Aug 2022 18:39:21 +0800 Subject: [PATCH 23/29] fix(query): memory leak --- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/executor/src/executorimpl.c | 36 +++++++++---------- source/libs/executor/src/scanoperator.c | 2 ++ source/libs/executor/src/timewindowoperator.c | 6 +++- source/libs/stream/src/streamTask.c | 3 +- 6 files changed, 29 insertions(+), 22 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 006d9e749c..6db9b59c69 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); ASSERT(smaObj.uid != 0); char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; - snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name); + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreate->name); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index b8803b20fc..e6a331f20e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -341,7 +341,7 @@ FAIL: return -1; } -void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } +void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { if (pReader->tbIdHash) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d15dc99122..9ff5b5d759 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3217,8 +3217,8 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; SSDataBlock* pResBlock = pInfo->pFinalRes; @@ -3242,8 +3242,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOp pInfo->existNewGroupBlock = NULL; } -static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, - SExecTaskInfo* pTaskInfo) { +static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, + SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); @@ -3259,8 +3259,8 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SFillOperatorInfo* pInfo = pOperator->info; - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pResBlock = pInfo->pFinalRes; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = pInfo->pFinalRes; setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); @@ -3270,13 +3270,13 @@ static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlo SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); - for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { + for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr]; ASSERT(pCol->notFillCol); SExprInfo* pExpr = pCol->pExpr; - int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; - int32_t dstSlotId = pExpr->base.resSchema.slotId; + int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); @@ -3664,7 +3664,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { taosRemoveRef(exchangeObjRefPool, pExInfo->self); } -void freeSourceDataInfo(void *p) { +void freeSourceDataInfo(void* p) { SSourceDataInfo* pInfo = (SSourceDataInfo*)p; taosMemoryFreeClear(pInfo->pRsp); } @@ -3694,8 +3694,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); - pInfo->pFillInfo = - taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); + pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + pInfo->primaryTsCol, order, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -3721,10 +3721,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); - SInterval* pInterval = + SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3741,9 +3741,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - int32_t code = - initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, - pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); + int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, + (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, + pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index aca0078592..d116533fe5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1649,6 +1649,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } taosArrayDestroy(tableIdList); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); + } else { + taosArrayDestroy(pColIds); } // create the pseduo columns info diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 757ab09d1a..02be01cdb8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1706,14 +1706,19 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pRecycledPages); blockDataDestroy(pInfo->pUpdateRes); + taosArrayDestroy(pInfo->pDelWins); + blockDataDestroy(pInfo->pDelRes); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); + taosMemoryFree(pChildOp->pDownstream); + cleanupExprSupp(&pChildOp->exprSupp); taosMemoryFreeClear(pChildOp); } + taosArrayDestroy(pInfo->pChildren); } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); @@ -2644,7 +2649,6 @@ void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } - SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index d588d90543..4009a47c65 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -152,11 +152,12 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } void tFreeSStreamTask(SStreamTask* pTask) { + qDebug("free stream task %d", pTask->taskId); if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); - taosArrayDestroy(pTask->childEpInfo); + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); From 5a79aa1978b2633f84c86fc3101722b2e7f6745b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 18 Aug 2022 18:48:17 +0800 Subject: [PATCH 24/29] fix(sma): memory leak --- source/dnode/mnode/impl/src/mndSma.c | 4 +++- source/dnode/mnode/impl/src/mndVgroup.c | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 6db9b59c69..2fb934aaad 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -530,7 +530,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sourceDbUid = pDb->uid; streamObj.targetDbUid = pDb->uid; streamObj.version = 1; - streamObj.sql = pCreate->sql; + streamObj.sql = strdup(pCreate->sql); streamObj.smaId = smaObj.uid; streamObj.watermark = pCreate->watermark; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; @@ -585,6 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea return -1; } if (pAst != NULL) nodesDestroyNode(pAst); + nodesDestroyNode((SNode *)pPlan); int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq); @@ -609,6 +610,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea code = 0; _OVER: + tFreeStreamObj(&streamObj); mndDestroySmaObj(&smaObj); mndTransDrop(pTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0567ec4e14..09eed7fb32 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -509,6 +509,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { pVgroup->replica = 1; if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1; + taosArrayDestroy(pArray); mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId); return 0; @@ -1862,4 +1863,4 @@ _OVER: #endif } -bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } \ No newline at end of file +bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } From 8f5fa8b98270cbb47b909797054e7842008cd404 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 18 Aug 2022 19:36:40 +0800 Subject: [PATCH 25/29] fix invalid packet --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f50711c59c..68b911f553 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -278,7 +278,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (pBuf->invalid) { tTrace("%s conn %p alread read invalid packet", transLabel(pTransInst), conn); destroyConn(conn, true); - break; + return; } else { if (false == uvHandleReq(conn)) break; } From 65903a7c0d3d65be07c3137f2ee84f71d03f222e Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 18 Aug 2022 18:57:49 +0800 Subject: [PATCH 26/29] fix: refact mutex locking --- source/dnode/vnode/src/tsdb/tsdbCache.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1d2c5c3b32..bb367ff8b1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1119,7 +1119,10 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH taosMemoryFree(pRow); } + taosThreadMutexUnlock(&pTsdb->lruMutex); + *handle = NULL; + return 0; } @@ -1131,15 +1134,11 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH } taosThreadMutexUnlock(&pTsdb->lruMutex); + + h = taosLRUCacheLookup(pCache, key, keyLen); } else { taosThreadMutexUnlock(&pTsdb->lruMutex); - - *handle = h; - - return code; } - - h = taosLRUCacheLookup(pCache, key, keyLen); } *handle = h; From 4e5315f1ac4ca551fdf49aacb898b61b5cbaa0c6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 Aug 2022 20:41:25 +0800 Subject: [PATCH 27/29] fix: process remaining rows out of loop --- source/libs/executor/src/scanoperator.c | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e578e34b82..7d188dadc7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2220,14 +2220,25 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { doFilterResult(pInfo); blockDataCleanup(p); + numOfRows = 0; + if (pInfo->pRes->info.rows > 0) { break; - } else { - numOfRows = 0; - continue; } } } + + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + blockDataDestroy(p); // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found From 24bfe2e2ebaef14a8d199f213ce996371183b5fa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 18 Aug 2022 20:51:28 +0800 Subject: [PATCH 28/29] test: valgrind case --- tests/script/tsim/stream/basic0.sim | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/stream/basic0.sim b/tests/script/tsim/stream/basic0.sim index 9a5fb8012f..6d05f69dcf 100644 --- a/tests/script/tsim/stream/basic0.sim +++ b/tests/script/tsim/stream/basic0.sim @@ -1,7 +1,7 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/exec.sh -n dnode1 -s start -sleep 50 +system sh/cfg.sh -n dnode1 -c debugflag -v 131 +system sh/exec.sh -n dnode1 -s start -v sql connect print =============== create database @@ -137,4 +137,17 @@ if $data13 != 789 then return -1 endi +_OVER: system sh/exec.sh -n dnode1 -s stop -x SIGINT +print =============== check +$null= + +system_content sh/checkValgrind.sh -n dnode1 +print cmd return result ----> [ $system_content ] +if $system_content > 0 then + return -1 +endi + +if $system_content == $null then + return -1 +endi From ff22c97d9964790ac3aa34b7b6a3356edd0644aa Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 19 Aug 2022 08:39:19 +0800 Subject: [PATCH 29/29] fix: user tags scan filter produces zero row will not end the sys table scan operator --- source/libs/executor/src/scanoperator.c | 34 ++++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7d188dadc7..5689b70824 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2038,10 +2038,34 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smr); if (numOfRows >= pOperator->resultInfo.capacity) { - break; + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + break; + } } } + if (numOfRows > 0) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + doFilterResult(pInfo); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found if (ret != 0) { metaCloseTbCursor(pInfo->pCur); @@ -2049,14 +2073,6 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - p->info.rows = numOfRows; - pInfo->pRes->info.rows = numOfRows; - - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); - doFilterResult(pInfo); - - blockDataDestroy(p); - pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; }