From 6d0d2ae03930e4b5d33887072088818bbdfcb53c Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sun, 29 Sep 2024 15:29:09 +0800 Subject: [PATCH 01/12] fix: ostime --- source/os/src/osTimezone.c | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/source/os/src/osTimezone.c b/source/os/src/osTimezone.c index 89ced69f97..6e7c22c7f1 100644 --- a/source/os/src/osTimezone.c +++ b/source/os/src/osTimezone.c @@ -742,6 +742,20 @@ char *tz_win[554][2] = {{"Asia/Shanghai", "China Standard Time"}, static int isdst_now = 0; +void parseTimeStr(char *p, char to[5]) { + for (int i = 0; i < 5; ++i) { + if (strlen(p) > i) { + to[i] = p[i]; + } else { + to[i] = '0'; + } + } + if (strlen(p) == 2) { + to[1] = '0'; + to[2] = p[1]; + } +} + int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, int8_t *outDaylight, enum TdTimezone *tsTimezone) { if (inTimezoneStr == NULL || inTimezoneStr[0] == 0) { @@ -798,7 +812,9 @@ int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, i memcpy(&winStr[3], pp, ppp - pp); indexStr = ppp - pp + 3; } - sprintf(&winStr[indexStr], "%c%c%c:%c%c:00", (p[0] == '+' ? '+' : '-'), p[1], p[2], p[3], p[4]); + char to[5]; + parseTimeStr(p, to); + sprintf(&winStr[indexStr], "%c%c%c:%c%c:00", (to[0] == '+' ? '+' : '-'), to[1], to[2], to[3], to[4]); *tsTimezone = -taosStr2Int32(p, NULL, 10); } else { *tsTimezone = 0; @@ -806,7 +822,9 @@ int32_t taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, i } _putenv(winStr); _tzset(); - strcpy(outTimezoneStr, inTimezoneStr); + if (outTimezoneStr != inTimezoneStr) { + strcpy(outTimezoneStr, inTimezoneStr); + } *outDaylight = 0; #elif defined(_TD_DARWIN_64) From 5a85303c373441ef778f6a9f66de7b91b027a7cc Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sun, 29 Sep 2024 23:40:19 +0800 Subject: [PATCH 02/12] fix: semphore leak --- source/libs/catalog/src/ctgCache.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index eafd85a504..a5ed165118 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -906,7 +906,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { if (gCtgMgmt.queue.stopQueue) { ctgFreeQNode(node); CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); - CTG_RET(TSDB_CODE_CTG_EXIT); + CTG_ERR_JRET(TSDB_CODE_CTG_EXIT); } gCtgMgmt.queue.tail->next = node; @@ -924,7 +924,7 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { code = tsem_post(&gCtgMgmt.queue.reqSem); if (TSDB_CODE_SUCCESS != code) { qError("tsem_post failed, code:%x", code); - CTG_RET(code); + CTG_ERR_JRET(code); } if (syncOp) { @@ -935,9 +935,14 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { if (!operation->unLocked) { CTG_LOCK(CTG_READ, &gCtgMgmt.lock); } + tsem_destroy(&operation->rspSem); taosMemoryFree(operation); } +_return: + if (syncOp) { + tsem_destroy(&operation->rspSem); + } return code; } From 165875ac4f98f39a99a8ed5d01aab503bc5a8668 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 30 Sep 2024 10:31:54 +0800 Subject: [PATCH 03/12] unused pointer --- source/libs/catalog/src/ctgCache.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index a5ed165118..48b949cc28 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -935,13 +935,13 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { if (!operation->unLocked) { CTG_LOCK(CTG_READ, &gCtgMgmt.lock); } - tsem_destroy(&operation->rspSem); + TAOS_UNUSED(tsem_destroy(&operation->rspSem)); taosMemoryFree(operation); } _return: if (syncOp) { - tsem_destroy(&operation->rspSem); + TAOS_UNUSED(tsem_destroy(&operation->rspSem)); } return code; } From 4841b687e3ad9cd3430ead3ccdaa4afe5136fc17 Mon Sep 17 00:00:00 2001 From: yanyuxing Date: Mon, 30 Sep 2024 17:09:37 +0800 Subject: [PATCH 04/12] docs: update taosx-agent InstanceId config --- docs/zh/14-reference/01-components/04-taosx.md | 2 +- docs/zh/14-reference/01-components/05-taosx-agent.md | 3 ++- docs/zh/14-reference/01-components/07-explorer.md | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/zh/14-reference/01-components/04-taosx.md b/docs/zh/14-reference/01-components/04-taosx.md index a5bf9df0c0..114a6b1ce5 100644 --- a/docs/zh/14-reference/01-components/04-taosx.md +++ b/docs/zh/14-reference/01-components/04-taosx.md @@ -239,7 +239,7 @@ d4,2017-07-14T10:40:00.006+08:00,-2.740636,10,-0.893545,7,California.LosAngles - `plugins_home`:外部数据源连接器所在目录。 - `data_dir`:数据文件存放目录。 -- `instanceId`:当前 explorer 服务的实例 ID,如果同一台机器上启动了多个 explorer 实例,必须保证各个实例的实例 ID 互不相同。 +- `instanceId`:当前 taosX 服务的实例 ID,如果同一台机器上启动了多个 taosX 实例,必须保证各个实例的实例 ID 互不相同。 - `logs_home`:日志文件存放目录,`taosX` 日志文件的前缀为 `taosx.log`,外部数据源有自己的日志文件名前缀。已弃用,请使用 `log.path` 代替。 - `log_level`:日志等级,可选级别包括 `error`、`warn`、`info`、`debug`、`trace`,默认值为 `info`。已弃用,请使用 `log.level` 代替。 - `log_keep_days`:日志的最大存储天数,`taosX` 日志将按天划分为不同的文件。已弃用,请使用 `log.keepDays` 代替。 diff --git a/docs/zh/14-reference/01-components/05-taosx-agent.md b/docs/zh/14-reference/01-components/05-taosx-agent.md index e521c8becb..bf2e6f7e78 100644 --- a/docs/zh/14-reference/01-components/05-taosx-agent.md +++ b/docs/zh/14-reference/01-components/05-taosx-agent.md @@ -11,6 +11,7 @@ sidebar_label: taosX-Agent - `endpoint`: 必填,`taosX` 的 GRPC 服务地址。 - `token`: 必填,在 `Explorer` 上创建 `Agent` 时,产生的 Token。 +- `instanceId`:当前 taosx-agent 服务的实例 ID,如果同一台机器上启动了多个 taosx-agent 实例,必须保证各个实例的实例 ID 互不相同。 - `compression`: 非必填,可配置为 `ture` 或 `false`, 默认为 `false`。配置为`true`, 则开启 `Agent` 和 `taosX` 通信数据压缩。 - `log_level`: 非必填,日志级别,默认为 `info`, 同 `taosX` 一样,支持 `error`,`warn`,`info`,`debug`,`trace` 五级。已弃用,请使用 `log.level` 代替。 - `log_keep_days`:非必填,日志保存天数,默认为 `30` 天。已弃用,请使用 `log.keepDays` 代替。 @@ -37,7 +38,7 @@ sidebar_label: taosX-Agent # server instance id # # The instanceId of each instance is unique on the host -# instanceId = 64 +# instanceId = 48 # enable communication data compression between Agent and taosX # diff --git a/docs/zh/14-reference/01-components/07-explorer.md b/docs/zh/14-reference/01-components/07-explorer.md index 5d17970b38..c63bc703e2 100644 --- a/docs/zh/14-reference/01-components/07-explorer.md +++ b/docs/zh/14-reference/01-components/07-explorer.md @@ -132,7 +132,7 @@ cors = true - `cluster`:TDengine 集群的 taosAdapter 地址。 - `cluster_native`:TDengine 集群的原生连接地址,默认关闭。 - `x_api`:taosX 的 gRPC 地址。 -- `grpc`:taosX 代理向 taosX 建立连接的 gRPC 地址. +- `grpc`:taosX 代理向 taosX 建立连接的 gRPC 地址。 - `cors`:CORS 配置开关,默认为 `false`。当为 `true` 时,允许跨域访问。 - `ssl.certificate`:SSL 证书(如果同时设置了 certificate 与 certificate_key 两个参数,则启用 HTTPS 服务,否则不启用)。 - `ssl.certificate_key`:SSL 证书密钥。 From bac78203756c2ae39388ad74a2c50ba8054aa014 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 30 Sep 2024 17:48:30 +0800 Subject: [PATCH 05/12] fix: free error --- source/libs/catalog/src/ctgCache.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 48b949cc28..ef8399d723 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -936,11 +936,12 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { CTG_LOCK(CTG_READ, &gCtgMgmt.lock); } TAOS_UNUSED(tsem_destroy(&operation->rspSem)); - taosMemoryFree(operation); + taosMemoryFreeClear(operation); } + return code; _return: - if (syncOp) { + if (syncOp && operation) { TAOS_UNUSED(tsem_destroy(&operation->rspSem)); } return code; From c6ad2a1e21c37554b0087772e1bb3aea2ddeaed1 Mon Sep 17 00:00:00 2001 From: wangmeng Date: Mon, 30 Sep 2024 17:48:42 +0800 Subject: [PATCH 06/12] fix:[TD-32198] add a test case to verify HAVING key --- tests/army/query/test_having.py | 378 ++++++++++++++++++++++++++++++++ tests/parallel_test/cases.task | 1 + 2 files changed, 379 insertions(+) create mode 100644 tests/army/query/test_having.py diff --git a/tests/army/query/test_having.py b/tests/army/query/test_having.py new file mode 100644 index 0000000000..ff8f6a1c1d --- /dev/null +++ b/tests/army/query/test_having.py @@ -0,0 +1,378 @@ +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from frame.eos import * +import random +import string + +""" + TD-32198: https://jira.taosdata.com:18080/browse/TD-32198 + Having关键字的专项测试,主要覆盖以下 4 种场景: + 1、普通聚合查询 + 2、关联查询 + 3、窗口切分查询 + 4、流计算中的窗口切分查询 +""" + + +class TDTestCase(TBase): + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def prepare_global_data(self): + tdSql.execute("DROP DATABASE IF EXISTS db_td32198;") + tdSql.execute("create database db_td32198;") + tdSql.execute("use db_td32198;") + + def prepare_agg_data(self): + # database for case TD-32198 + # super table + tdSql.execute("DROP STABLE IF EXISTS meters") + tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \ + TAGS (`groupid` TINYINT, `location` VARCHAR(16));") + + # child table + tdSql.execute("CREATE TABLE `ct_1` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');") + # tdSql.execute("CREATE TABLE `ct_2` USING `meters` (`groupid`, `location`) TAGS (2, 'shanghai');") + + data = [ + ('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000), + ('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560), + ('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480), + ('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190), + ('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200), + ('2020-06-01 01:15:00.000', 1.3678550, 0, 0.4226180), + ('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000), + ('2020-06-01 01:45:00.000', 1.8207290, 2, 0.5835760), + ('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880), + ('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070), + ('2020-06-01 02:30:00.000', 2.3981330, 12, 0.7760440), + ('2020-06-01 02:45:00.000', 2.4574561, 14, 0.8291520), + ('2020-06-01 03:00:00.000', 2.6980760, 14, 0.8760250), + ('2020-06-01 03:15:00.000', 2.8189230, 10, 0.9063080), + ('2020-06-01 03:30:00.000', 2.8190780, 6, 0.9396930), + ('2020-06-01 03:45:00.000', 2.8977780, 10, 0.9859260), + ('2020-06-01 04:00:00.000', 2.9544230, 4, 1.0048079), + ('2020-06-01 04:15:00.000', 2.9885840, 14, 1.0061949), + ('2020-06-01 04:30:00.000', 3.0999999, 6, 1.0200000), + ('2020-06-01 04:45:00.000', 3.0885839, 10, 1.0161951), + ('2020-06-01 05:00:00.000', 2.9544230, 18, 0.9848080), + ('2020-06-01 05:15:00.000', 2.9977770, 2, 0.9859260), + ('2020-06-01 05:30:00.000', 2.8190780, 0, 0.9496930), + ('2020-06-01 05:45:00.000', 2.7189231, 18, 0.9163080), + ('2020-06-01 06:00:00.000', 2.5980761, 10, 0.8860250) + ] + + sql = "insert into ct_1 values"; + for t in data: + sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3]) + sql += ";" + tdSql.execute(sql) + tdLog.debug("sql: %s" % sql) + + def test_agg_having(self): + tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage;") + tdSql.checkRows(8) + tdSql.checkData(7, 2, 7) + tdSql.checkData(7, 1, 126) + + tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having count(voltage)>=4;"); + tdSql.checkRows(3) + tdSql.checkData(2, 2, 7) + tdSql.checkData(2, 1, 126) + + tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having count(current)>=4;"); + tdSql.checkRows(3) + tdSql.checkData(2, 2, 7) + tdSql.checkData(2, 1, 126) + + tdSql.query("select voltage, sum(voltage), count(*) from ct_1 group by voltage having voltage >=14;"); + tdSql.checkRows(2) + tdSql.checkData(0, 2, 4) + tdSql.checkData(1, 1, 126) + + tdSql.error("select voltage, count(*) from ct_1 group by voltage having current >1.0260600;"); + + def prepare_join_data(self): + # super table + tdSql.execute("DROP STABLE IF EXISTS meters") + tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \ + TAGS (`groupid` TINYINT, `location` VARCHAR(16));") + + # child table + tdSql.execute("CREATE TABLE `ct_join_1` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');") + tdSql.execute("CREATE TABLE `ct_join_2` USING `meters` (`groupid`, `location`) TAGS (2, 'shanghai');") + + # insert data for ts4806 + data_join_1 = [ + ('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000), + ('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180), + ('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560), + ('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480), + ('2020-06-01 00:40:00.000', 1.5230000, 10, 0.5200000), + ('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190), + ('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760), + ('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200), + ('2020-06-01 01:15:00.000', 1.3678550, 0, 0.4226180), + ('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070), + ('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000), + ('2020-06-01 01:45:00.000', 1.8207290, 2, 0.5835760), + ('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880), + ('2020-06-01 02:05:00.000', 0.9283630, 6, 0.6527880), + ('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070) + ] + + data_join_2 = [ + ('2020-06-01 00:00:00.000', 0.3614670, 9, 0.0200000), + ('2020-06-01 00:15:00.000', 0.1000000, 12, 0.1071560), + ('2020-06-01 00:30:00.000', 0.5209450, 15, 0.1736480), + ('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190), + ('2020-06-01 01:00:00.000', 1.0260600, 15, 0.3620200), + ('2020-06-01 01:15:00.000', 1.3678550, 7, 0.4226180), + ('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000), + ('2020-06-01 01:45:00.000', 1.8207290, 7, 0.5835760), + ('2020-06-01 02:00:00.000', 1.0260600, 13, 0.6527880), + ('2020-06-01 02:15:00.000', 0.5209450, 18, 0.7271070) + ] + + sql = "insert into ct_join_1 values"; + for t in data_join_1: + sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3]) + sql += ";" + tdSql.execute(sql) + tdLog.debug("ct_join_1 sql: %s" % sql) + + sql = "insert into ct_join_2 values"; + for t in data_join_2: + sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3]) + sql += ";" + tdSql.execute(sql) + tdLog.debug("ct_join_2 sql: %s" % sql) + + def test_join_having(self): + tdSql.query("SELECT a.voltage, count(*) FROM ct_join_1 a JOIN ct_join_2 b ON a.ts = b.ts \ + group by a.voltage having count(*) > 4;") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 5) + tdSql.checkData(0, 0, 18) + + tdSql.error("SELECT a.voltage, count(*) FROM ct_join_1 a JOIN ct_join_2 b ON a.ts = b.ts \ + group by a.voltage having b.voltage > 14;") + + tdSql.query("SELECT a.voltage, count(*) FROM ct_join_1 a left JOIN ct_join_2 b ON a.ts = b.ts \ + group by a.voltage having count(*) > 4;"); + tdSql.checkRows(1) + tdSql.checkData(0, 1, 5) + tdSql.checkData(0, 0, 18) + + tdSql.error("SELECT a.voltage, count(*) FROM ct_join_1 a left JOIN ct_join_2 b ON a.ts = b.ts \ + group by a.voltage having b.voltage > 14;"); + + tdSql.query("SELECT a.ts, a.voltage, avg(b.voltage) FROM ct_join_2 a LEFT WINDOW JOIN ct_join_1 b \ + WINDOW_OFFSET(-15m, 15m) where a.voltage >=18 and b.voltage > 11 having avg(b.voltage) > 17;"); + tdSql.checkRows(1) + + tdSql.error("SELECT a.ts, a.voltage, avg(b.voltage) FROM ct_join_2 a LEFT WINDOW JOIN ct_join_1 b \ + WINDOW_OFFSET(-15m, 15m) where a.voltage >=18 and b.voltage > 11 having b.voltage > 17;"); + + def prepare_window_data(self): + # super table + tdSql.execute("DROP STABLE IF EXISTS meters") + tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \ + TAGS (`groupid` TINYINT, `location` VARCHAR(16));") + + # child table + tdSql.execute("CREATE TABLE `ct_win` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');") + + # insert data for ts4806 + data_win = [ + ('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000), + ('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180), + ('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560), + ('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480), + ('2020-06-01 00:40:00.000', 1.5230000, 18, 0.5200000), + ('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190), + ('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760), + ('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200), + ('2020-06-01 01:15:00.000', 1.3678550, 14, 0.4226180), + ('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070), + ('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000), + ('2020-06-01 01:45:00.000', 1.8207290, 12, 0.5835760), + ('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880), + ('2020-06-01 02:05:00.000', 0.9283630, 18, 0.6527880), + ('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070) + ] + + sql = "insert into ct_win values"; + for t in data_win: + sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3]) + sql += ";" + tdSql.execute(sql) + tdLog.debug("data_win sql: %s" % sql) + + def test_window_having(self): + tdSql.query("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having count(*) > 1;") + tdSql.checkRows(5) + tdSql.checkData(0, 2, 2) + + tdSql.error("SELECT _WSTART, _WEND, COUNT(*) FROM ct_win INTERVAL(15m) having voltage > 12;"); + + tdSql.query("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \ + STATE_WINDOW(voltage) having count(*) > 3;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 4) + + tdSql.error("SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM ct_win \ + STATE_WINDOW(voltage) having phase > 0.26;"); + + tdSql.query("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having count(*) > 3;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 5) + + tdSql.error("SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM ct_win SESSION(ts, 10m) having voltage > 12;"); + + tdSql.query("select _wstart, _wend, count(*), first(voltage), last(voltage) from ct_win \ + event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 7) + tdSql.checkData(0, 3, 11) + tdSql.checkData(0, 4, 18) + + tdSql.error("select _wstart, _wend, count(*) from ct_win \ + event_window start with voltage <=12 end with voltage >= 17 having phase > 0.2;"); + + tdSql.query( + "select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having sum(voltage) > 57;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 4) + tdSql.checkData(0, 3, 61) + + tdSql.error("select _wstart, _wend, count(*), sum(voltage) from ct_win count_window(4) having voltage > 12;"); + + + def prepare_stream_window_data(self): + # super table + tdSql.execute("DROP STABLE IF EXISTS meters") + tdSql.execute("CREATE STABLE `meters` (`ts` TIMESTAMP , `current` FLOAT , `voltage` INT , `phase` FLOAT ) \ + TAGS (`groupid` TINYINT, `location` VARCHAR(16));") + + # child table + tdSql.execute("CREATE TABLE `ct_steam_win` USING `meters` (`groupid`, `location`) TAGS (1, 'beijing');") + + # insert data for ts4806 + data_win = [ + ('2020-06-01 00:00:00.000', 0.1000000, 12, 0.0200000), + ('2020-06-01 00:10:00.000', 1.2278550, 9, 0.4226180), + ('2020-06-01 00:15:00.000', 0.3614670, 18, 0.1071560), + ('2020-06-01 00:30:00.000', 0.5209450, 18, 0.1736480), + ('2020-06-01 00:40:00.000', 1.5230000, 18, 0.5200000), + ('2020-06-01 00:45:00.000', 0.8764570, 18, 0.2588190), + ('2020-06-01 00:50:00.000', 1.6507290, 11, 0.5835760), + ('2020-06-01 01:00:00.000', 1.0260600, 14, 0.3620200), + ('2020-06-01 01:15:00.000', 1.3678550, 14, 0.4226180), + ('2020-06-01 01:20:00.000', 1.1213200, 13, 0.7271070), + ('2020-06-01 01:30:00.000', 1.6000000, 12, 0.5200000), + ('2020-06-01 01:45:00.000', 1.8207290, 12, 0.5835760), + ('2020-06-01 02:00:00.000', 1.9283630, 18, 0.6527880), + ('2020-06-01 02:05:00.000', 0.9283630, 18, 0.6527880), + ('2020-06-01 02:15:00.000', 2.1213200, 18, 0.7271070) + ] + + sql = "insert into ct_win values"; + for t in data_win: + sql += "('{}', {}, {}, {}),".format(t[0], t[1], t[2], t[3]) + sql += ";" + tdSql.execute(sql) + tdLog.debug("data_win sql: %s" % sql) + + # 支持会话窗口、状态窗口、滑动窗口、事件窗口和计数窗口, + # 其中,状态窗口、事件窗口 和 计数窗口 搭配超级表时必须与 partition by tbname 一起使用 + def test_stream_window_having(self): + tdSql.execute("CREATE STREAM streams0 fill_history 1 INTO streamt0 AS \ + SELECT _WSTART, _WEND, COUNT(*) FROM meters PARTITION BY tbname INTERVAL(15m) having count(*) > 1;") + tdSql.query("select * from streamt0;"); + tdSql.checkRows(5) + tdSql.checkData(0, 2, 2) + + tdSql.error("CREATE STREAM streams10 fill_history 1 INTO streamt10 AS SELECT _WSTART, _WEND, COUNT(*) \ + FROM meters PARTITION BY tbname INTERVAL(15m) having voltage > 12;"); + + + tdSql.execute("CREATE STREAM streams1 fill_history 1 INTO streamt1 AS \ + SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM meters PARTITION BY tbname \ + STATE_WINDOW(voltage) having count(*) > 3;"); + tdSql.query("select * from streamt1;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 4) + + tdSql.error("CREATE STREAM streams11 fill_history 1 INTO streamt11 AS \ + SELECT _wstart, _wend, COUNT(*) AS cnt, FIRST(ts) AS fst, voltage FROM meters PARTITION BY tbname \ + STATE_WINDOW(voltage) having phase > 0.26;"); + + + tdSql.execute("CREATE STREAM streams2 fill_history 1 INTO streamt2 AS \ + SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM meters SESSION(ts, 10m) having count(*) > 3;"); + tdSql.query("select * from streamt2;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 5) + + tdSql.error("CREATE STREAM streams12 fill_history 1 INTO streamt12 AS \ + SELECT _wstart, _wend, COUNT(*), FIRST(ts) FROM meters SESSION(ts, 10m) having voltage > 12;"); + + tdSql.execute("CREATE STREAM streams3 fill_history 1 INTO streamt3 AS \ + select _wstart, _wend, count(*), first(voltage), last(voltage) from meters PARTITION BY tbname \ + event_window start with voltage <= 12 end with voltage >= 17 having count(*) > 3;"); + tdSql.query("select * from streamt3;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 7) + tdSql.checkData(0, 3, 11) + tdSql.checkData(0, 4, 18) + + tdSql.error("CREATE STREAM streams13 fill_history 1 INTO streamt13 AS \ + select _wstart, _wend, count(*), first(voltage), last(voltage) from meters PARTITION BY tbname \ + event_window start with voltage <= 12 end with voltage >= 17 having phase > 0.2;"); + + tdSql.execute("CREATE STREAM streams4 fill_history 1 INTO streamt4 AS \ + select _wstart, _wend, count(*), sum(voltage) from meters PARTITION BY tbname \ + count_window(4) having sum(voltage) > 57;"); + tdSql.query("select * from streamt4;"); + tdSql.checkRows(1) + tdSql.checkData(0, 2, 4) + tdSql.checkData(0, 3, 61) + + tdSql.error("CREATE STREAM streams14 fill_history 1 INTO streamt14 AS \ + select _wstart, _wend, count(*), sum(voltage) from meters PARTITION BY tbname \ + count_window(4) having voltage > 12;"); + + + + def run(self): + self.prepare_global_data() + + self.prepare_agg_data() + self.test_agg_having() + + self.prepare_join_data() + self.test_join_having() + + self.prepare_window_data() + self.test_window_having() + + ''' + self.prepare_stream_window_data() + self.test_stream_window_having() + ''' + + def stop(self): + tdSql.execute("drop database db_td32198;") + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 72b470f509..53f3317ca9 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -46,6 +46,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/last/test_last.py ,,y,army,./pytest.sh python3 ./test.py -f query/window/base.py ,,y,army,./pytest.sh python3 ./test.py -f query/sys/tb_perf_queries_exist_test.py -N 3 +,,y,army,./pytest.sh python3 ./test.py -f query/test_having.py # # system test From 8b43aec33d71cd947044f2a7786e38b6f8f49993 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Sep 2024 18:15:22 +0800 Subject: [PATCH 07/12] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 5 +- source/libs/stream/src/streamDispatch.c | 227 +++++++++++++--------- source/libs/stream/src/streamSched.c | 13 +- 3 files changed, 146 insertions(+), 99 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fd5591c488..e44bca123b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -904,7 +904,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { return -1; } - if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { + if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", @@ -1055,7 +1055,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { numOfNotSend = taosArrayGetSize(pNotSendList); if (numOfNotSend > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a3146ae9d4..78cbd844a0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -526,6 +526,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t msgId = pMsgInfo->msgId; int32_t code = 0; int64_t now = taosGetTimestampMs(); + bool inDispatch = true; stDebug("s-task:%s start monitor dispatch data", id); @@ -550,12 +551,15 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); - pTask->msgInfo.inMonitor = 0; - streamMutexUnlock(&pMsgInfo->lock); - return; + pMsgInfo->inMonitor = 0; + inDispatch = false; } streamMutexUnlock(&pMsgInfo->lock); + if (!inDispatch) { + return; + } + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); if (numOfFailed == 0) { stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS); @@ -638,15 +642,54 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { "dispatch-monitor"); } +static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, + SArray* vgInfo, uint32_t hashValue, int64_t now, bool* pFound) { + size_t numOfVgroups = taosArrayGetSize(vgInfo); + int32_t code = 0; + + *pFound = false; + + for (int32_t j = 0; j < numOfVgroups; j++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo == NULL) { + continue; + } + + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) { + stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno)); + return code; + } + + if (pReqs[j].blockNum == 0) { + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + if (pDstVgroupInfo != NULL) { + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); + } + } + + pReqs[j].blockNum++; + *pFound = true; + break; + } + } + + return code; +} + int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, int64_t now) { bool found = false; uint32_t hashValue = 0; - int32_t numOfVgroups = 0; + int32_t code = 0; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; if (pTask->pNameMap == NULL) { pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pTask->pNameMap == NULL) { + stError("s-task:%s failed to init the name map, code:%s", pTask->id.idStr, tstrerror(terrno)); + return terrno; + } } void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); @@ -669,11 +712,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } } } else { - int32_t code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, - pDataBlock->info.parTbName); + code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, + pDataBlock->info.parTbName); if (code) { - stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, - groupId, tstrerror(code)); + stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, groupId, + tstrerror(code)); } } @@ -688,44 +731,21 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); // failed to put into name buffer, no need to do anything - if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing - int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing + code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); } } - numOfVgroups = taosArrayGetSize(vgInfo); - - // TODO: optimize search streamMutexLock(&pTask->msgInfo.lock); + code = doAddDispatchBlock(pTask, pReqs, pDataBlock, vgInfo, hashValue, now, &found); + streamMutexUnlock(&pTask->msgInfo.lock); - for (int32_t j = 0; j < numOfVgroups; j++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - if (pVgInfo == NULL) { - continue; - } - - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { - streamMutexUnlock(&pTask->msgInfo.lock); - return -1; - } - - if (pReqs[j].blockNum == 0) { - SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - if (pDstVgroupInfo != NULL) { - addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); - } - } - - pReqs[j].blockNum++; - found = true; - break; - } + if (code) { + return code; } - streamMutexUnlock(&pTask->msgInfo.lock); if (!found) { - stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue); + stError("s-task:%s not found req hash value:%u, failed to add dispatch block", pTask->id.idStr, hashValue); return TSDB_CODE_STREAM_INTERNAL_ERROR; } else { return 0; @@ -919,7 +939,7 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { } static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level, - const char* id) { + const char* id) { SArray* pTmp = taosArrayInit(4, sizeof(int32_t)); if (pTmp == NULL) { return terrno; @@ -940,8 +960,8 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); return terrno; } else { - stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - level, pInfo->upstreamTaskId); + stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, + pInfo->upstreamTaskId); } } @@ -987,13 +1007,48 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) { + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + int32_t vgId = pTask->pMeta->vgId; + int32_t checkpointId = pActiveInfo->activeId; + const char* id = pTask->id.idStr; + int32_t notRsp = 0; + + int32_t code = doTaskChkptStatusCheck(pTask, num); + if (code) { + return code; + } + + code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, + tstrerror(code), ref); + return code; + } + + notRsp = taosArrayGetSize(pNotRspList); + if (notRsp == 0) { + streamClearChkptReadyMsg(pActiveInfo); + } else { + doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + } + + return code; +} + +static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; int32_t vgId = pTask->pMeta->vgId; const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pNotRspList = NULL; + int32_t code = 0; + int32_t notRsp = 0; // check the status every 100ms if (streamTaskShouldStop(pTask)) { @@ -1004,7 +1059,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } if (++pTmrInfo->activeCounter < 50) { - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); return; } @@ -1027,45 +1082,26 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); + code = chkptReadyMsgSendHelper(pTask, pNotRspList); + streamMutexUnlock(&pActiveInfo->lock); - SArray* pList = pActiveInfo->pReadyMsgList; - int32_t num = taosArrayGetSize(pList); - int32_t code = doTaskChkptStatusCheck(pTask, num); - if (code) { - streamMutexUnlock(&pActiveInfo->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); - if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, - tstrerror(code), ref); - streamMutexUnlock(&pActiveInfo->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - taosArrayDestroy(pNotRspList); return; } - int32_t checkpointId = pActiveInfo->activeId; - int32_t notRsp = taosArrayGetSize(pNotRspList); - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); - + notRsp = taosArrayGetSize(pNotRspList); if (notRsp > 0) { // send checkpoint-ready msg again - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id); + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); - streamMutexUnlock(&pActiveInfo->lock); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " "from timer, ref:%d", id, vgId, ref); - - streamClearChkptReadyMsg(pActiveInfo); - streamMutexUnlock(&pActiveInfo->lock); // release should be the last execution, since pTask may be destroy after it immidiately. streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1124,7 +1160,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); // mark the timer monitor checkpointId @@ -1190,6 +1226,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch taosMemoryFree(buf); return terrno; } + SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen); int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN; @@ -1359,29 +1396,11 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstr pReadyInfo->childId = childId; } -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - return TSDB_CODE_SUCCESS; - } - - SStreamUpstreamEpInfo* pInfo = NULL; - streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo); - if (pInfo == NULL) { - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - STaskCheckpointReadyInfo info = {0}; - initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); - - stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d", - pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index); - +static int32_t doAddChkptReadyMsg(SStreamTask* pTask, STaskCheckpointReadyInfo* pInfo) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - streamMutexLock(&pActiveInfo->lock); - void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info); + void* px = taosArrayPush(pActiveInfo->pReadyMsgList, pInfo); if (px == NULL) { - streamMutexUnlock(&pActiveInfo->lock); stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr); return terrno; } @@ -1395,10 +1414,36 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total); } - streamMutexUnlock(&pActiveInfo->lock); return 0; } +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { + int32_t code = 0; + STaskCheckpointReadyInfo info = {0}; + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return TSDB_CODE_SUCCESS; + } + + SStreamUpstreamEpInfo* pInfo = NULL; + streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo); + if (pInfo == NULL) { + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); + + stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d", + pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index); + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + streamMutexLock(&pActiveInfo->lock); + code = doAddChkptReadyMsg(pTask, &info); + streamMutexUnlock(&pActiveInfo->lock); + return code; +} + void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { if (pActiveInfo == NULL) { return; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 63e24b0975..98920e6f70 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -83,12 +83,14 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void streamTaskResumeHelper(void* param, void* tmrId) { - SStreamTask* pTask = (SStreamTask*)param; - SStreamTaskId* pId = &pTask->id; - SStreamTaskState p = streamTaskGetStatus(pTask); + SStreamTask* pTask = (SStreamTask*)param; + SStreamTaskId* pId = &pTask->id; + SStreamTaskState p = streamTaskGetStatus(pTask); + int32_t code = 0; if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) { int8_t status = streamTaskSetSchedStatusInactive(pTask); + TAOS_UNUSED(status); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref); @@ -97,13 +99,12 @@ void streamTaskResumeHelper(void* param, void* tmrId) { return; } - int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); if (code) { stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref); } else { - stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, - ref); + stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref); // release the task ref count streamTaskClearSchedIdleInfo(pTask); From d7d8ea1b1b0dcb3b1d1e7ed9ac18f502e4cde8a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 30 Sep 2024 18:33:21 +0800 Subject: [PATCH 08/12] refactor: do some internal refactor. --- source/libs/stream/src/streamStartHistory.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index b376dbd16b..4d7bf2ba87 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -64,7 +64,6 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; - int32_t code = 0; initScanHistoryReq(pTask, &req, igUntreated); int32_t len = sizeof(SStreamScanHistoryReq); @@ -173,7 +172,7 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) { code = streamTaskStartScanHistory(pTask); } - // NOTE: there will be an deadlock if launch fill history here. + // NOTE: there will be a deadlock if launch fill history here. // start the related fill-history task, when current task is ready // if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { // streamLaunchFillHistoryTask(pTask); @@ -219,7 +218,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); - // Set the execute conditions, including the query time window and the version range + // Set the execution conditions, including the query time window and the version range streamMetaRLock(pMeta); SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); streamMetaRUnLock(pMeta); From a0f5c232c4602f26b811ed6c1dacea412bbb1bcb Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 2 Oct 2024 22:17:08 +0800 Subject: [PATCH 09/12] update mac install script --- packaging/tools/make_install.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index ea19125bf5..13447bd5e4 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -616,8 +616,8 @@ function update_TDengine() { [ -f ${installDir}/bin/taosadapter ] && \ echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}" else - echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}" - echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}" + echo -e "${GREEN_DARK}To start service ${NC}: sudo launchctl start com.tdengine.taosd${NC}" + echo -e "${GREEN_DARK}To start Adapter ${NC}: sudo launchctl start com.tdengine.taosadapter${NC}" fi fi @@ -668,8 +668,8 @@ function install_TDengine() { [ -f ${installDir}/bin/taosadapter ] && \ echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}" else - echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}" - echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}" + echo -e "${GREEN_DARK}To start service ${NC}: sudo launchctl start com.tdengine.taosd${NC}" + echo -e "${GREEN_DARK}To start Adapter ${NC}: sudo launchctl start com.tdengine.taosadapter${NC}" fi fi From c150465da22495d62e6932153a85704ea2ad3cf4 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Tue, 8 Oct 2024 06:38:56 +0800 Subject: [PATCH 10/12] docs: release ver-3.3.3.0 --- cmake/cmake.version | 2 +- docs/en/28-releases/01-tdengine.md | 4 ++++ docs/zh/28-releases/01-tdengine.md | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index 3bb764612e..c600c084fd 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.3.3.0.alpha") + SET(TD_VER_NUMBER "3.3.4.0.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index a6e157cf74..486fe2c015 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -20,6 +20,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t import Release from "/components/ReleaseV3"; +## 3.3.3.0 + + + ## 3.3.2.0 diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 5b3abcb341..0f9ceada50 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -24,6 +24,10 @@ TDengine 3.x 各版本安装包下载链接如下: import Release from "/components/ReleaseV3"; +## 3.3.3.0 + + + ## 3.3.2.0 From 7a0d49a747a8a3e8c3cea01d3f9a86cc74e6cef1 Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Tue, 8 Oct 2024 14:01:07 +0800 Subject: [PATCH 11/12] fix taos/taosws dll path issue --- docs/en/14-reference/05-connectors/10-cpp.mdx | 2 +- docs/zh/14-reference/05-connector/10-cpp.mdx | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/en/14-reference/05-connectors/10-cpp.mdx b/docs/en/14-reference/05-connectors/10-cpp.mdx index 6a570b2490..ca32660ac7 100644 --- a/docs/en/14-reference/05-connectors/10-cpp.mdx +++ b/docs/en/14-reference/05-connectors/10-cpp.mdx @@ -19,7 +19,7 @@ After TDengine server or client installation, `taos.h` is located at The dynamic libraries for the TDengine client driver are located in. - Linux: `/usr/local/taos/driver/libtaos.so` -- Windows: `C:\TDengine\taos.dll` +- Windows: `C:\TDengine\driver\taos.dll` - macOS: `/usr/local/lib/libtaos.dylib` ## Supported platforms diff --git a/docs/zh/14-reference/05-connector/10-cpp.mdx b/docs/zh/14-reference/05-connector/10-cpp.mdx index 0df6ed924c..c618601fb9 100644 --- a/docs/zh/14-reference/05-connector/10-cpp.mdx +++ b/docs/zh/14-reference/05-connector/10-cpp.mdx @@ -27,7 +27,7 @@ TDengine 服务端或客户端安装后,`taosws.h` 位于: TDengine 客户端驱动的动态库位于: - Linux: `/usr/local/taos/driver/libtaosws.so` -- Windows: `C:\TDengine\taosws.dll` +- Windows: `C:\TDengine\driver\taosws.dll` - macOS: `/usr/local/lib/libtaosws.dylib` ### 支持的平台 @@ -626,7 +626,7 @@ TDengine 服务端或客户端安装后,`taos.h` 位于: TDengine 客户端驱动的动态库位于: - Linux: `/usr/local/taos/driver/libtaos.so` -- Windows: `C:\TDengine\taos.dll` +- Windows: `C:\TDengine\driver\taos.dll` - macOS: `/usr/local/lib/libtaos.dylib` ### 支持的平台 From e51199d541217f2b00cf340f757e87d62e2bccd7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 8 Oct 2024 15:16:48 +0800 Subject: [PATCH 12/12] fix: data deleter memory leak issue --- source/libs/executor/src/dataDeleter.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 57f4289ebf..c284e9a8a9 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -273,10 +273,18 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { int32_t code = TSDB_CODE_SUCCESS; + if (pParam == NULL) { + code = TSDB_CODE_QRY_INVALID_INPUT; + qError("invalid input param in creating data deleter, code%s", tstrerror(code)); + goto _end; + } + + SDeleterParam* pDeleterParam = (SDeleterParam*)pParam; SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { code = terrno; + taosArrayDestroy(pDeleterParam->pUidList); taosMemoryFree(pParam); goto _end; } @@ -292,12 +300,6 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; - if (pParam == NULL) { - code = TSDB_CODE_QRY_INVALID_INPUT; - qError("invalid input param in creating data deleter, code%s", tstrerror(code)); - goto _end; - } - deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; deleter->queryEnd = false;