diff --git a/cmake/cmake.version b/cmake/cmake.version index 6d893c0627..3d0dc80902 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.4.3") + SET(TD_VER_NUMBER "3.0.5.0") ENDIF () IF (DEFINED VERCOMPATIBLE) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 59986a3b3c..1f34f9080a 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -265,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_FALLOCATE "" OFF) option(WITH_JEMALLOC "" OFF) option(WITH_GFLAGS "" OFF) - option(PORTABLE "" OFF) + option(PORTABLE "" ON) option(WITH_LIBURING "" OFF) option(FAIL_ON_WARNINGS OFF) diff --git a/docs/en/12-taos-sql/01-data-type.md b/docs/en/12-taos-sql/01-data-type.md index 70bea97dba..13007d5bb1 100644 --- a/docs/en/12-taos-sql/01-data-type.md +++ b/docs/en/12-taos-sql/01-data-type.md @@ -45,7 +45,7 @@ In TDengine, the data types below can be used when specifying a column or tag. :::note - Only ASCII visible characters are suggested to be used in a column or tag of BINARY type. Multi-byte characters must be stored in NCHAR type. -- The length of BINARY can be up to 16,374 bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'` +- The length of BINARY can be up to 16,374(data column is 65,517 and tag column is 16,382 since version 3.0.5.0) bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'` - Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number. ::: diff --git a/docs/en/12-taos-sql/03-table.md b/docs/en/12-taos-sql/03-table.md index f61d1f5147..7f39fb5867 100644 --- a/docs/en/12-taos-sql/03-table.md +++ b/docs/en/12-taos-sql/03-table.md @@ -45,7 +45,7 @@ table_option: { 1. The first column of a table MUST be of type TIMESTAMP. It is automatically set as the primary key. 2. The maximum length of the table name is 192 bytes. -3. The maximum length of each row is 48k bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted. +3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted. 4. The name of the subtable can only consist of characters from the English alphabet, digits and underscore. Table names can't start with a digit. Table names are case insensitive. 5. The maximum length in bytes must be specified when using BINARY or NCHAR types. 6. Escape character "\`" can be used to avoid the conflict between table names and reserved keywords, above rules will be bypassed when using escape character on table names, but the upper limit for the name length is still valid. The table names specified using escape character are case sensitive. diff --git a/docs/en/12-taos-sql/19-limit.md b/docs/en/12-taos-sql/19-limit.md index 654fae7560..22ad2055e4 100644 --- a/docs/en/12-taos-sql/19-limit.md +++ b/docs/en/12-taos-sql/19-limit.md @@ -26,7 +26,7 @@ The following characters cannot occur in a password: single quotation marks ('), - Maximum length of database name is 64 bytes - Maximum length of table name is 192 bytes, excluding the database name prefix and the separator. -- Maximum length of each data row is 48K bytes. Note that the upper limit includes the extra 2 bytes consumed by each column of BINARY/NCHAR type. +- Maximum length of each data row is 48K(64K since version 3.0.5.0) bytes. Note that the upper limit includes the extra 2 bytes consumed by each column of BINARY/NCHAR type. - The maximum length of a column name is 64 bytes. - Maximum number of columns is 4096. There must be at least 2 columns, and the first column must be timestamp. - The maximum length of a tag name is 64 bytes diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 114026eca0..db49e5f395 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -1256,6 +1256,7 @@ The source code of the sample application is under `TDengine/examples/JDBC`: - connectionPools: using taos-jdbcdriver in connection pools such as HikariCP, Druid, dbcp, c3p0, etc. - SpringJdbcTemplate: using taos-jdbcdriver in Spring JdbcTemplate. - mybatisplus-demo: using taos-jdbcdriver in Springboot + Mybatis. +- consumer-demo: consumer TDengine data example, the consumption rate can be controlled by parameters. [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC) diff --git a/docs/en/14-reference/13-schemaless/13-schemaless.md b/docs/en/14-reference/13-schemaless/13-schemaless.md index aad0e63a42..3ae9098a73 100644 --- a/docs/en/14-reference/13-schemaless/13-schemaless.md +++ b/docs/en/14-reference/13-schemaless/13-schemaless.md @@ -90,7 +90,7 @@ You can configure smlChildTableName in taos.cfg to specify table names, for exam Note: TDengine 3.0.3.0 and later automatically detect whether order is consistent. This parameter is no longer used. :::tip -All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area. +All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB(64 KB since version 3.0.5.0) and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area. ::: ## Time resolution recognition diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index c7836d1298..a9336697f2 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w import Release from "/components/ReleaseV3"; +## 3.0.5.0 + + + ## 3.0.4.2 diff --git a/docs/en/28-releases/02-tools.md b/docs/en/28-releases/02-tools.md index 9f8dbfee7e..28c2ff7a7f 100644 --- a/docs/en/28-releases/02-tools.md +++ b/docs/en/28-releases/02-tools.md @@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat import Release from "/components/ReleaseV3"; +## 2.5.1 + + + ## 2.5.0 diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index e4cf4a83e7..46800226d7 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -1258,6 +1258,7 @@ public static void main(String[] args) throws Exception { - connectionPools:HikariCP, Druid, dbcp, c3p0 等连接池中使用 taos-jdbcdriver。 - SpringJdbcTemplate:Spring JdbcTemplate 中使用 taos-jdbcdriver。 - mybatisplus-demo:Springboot + Mybatis 中使用 taos-jdbcdriver。 +- consumer-demo:Consumer 消费 TDengine 数据示例,可通过参数控制消费速度。 请参考:[JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC) diff --git a/docs/zh/12-taos-sql/01-data-type.md b/docs/zh/12-taos-sql/01-data-type.md index f014573ca6..4a4c1d6ec6 100644 --- a/docs/zh/12-taos-sql/01-data-type.md +++ b/docs/zh/12-taos-sql/01-data-type.md @@ -45,9 +45,9 @@ CREATE DATABASE db_name PRECISION 'ns'; :::note -- 表的每行长度不能超过 48KB(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。 +- 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB)(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。 - 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。 -- BINARY 类型理论上最长可以有 16,374 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`。 +- BINARY 类型理论上最长可以有 16,374(从 3.0.5.0 版本开始,数据列为 65,517,标签列为 16,382) 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`。 - SQL 语句中的数值类型将依据是否存在小数点,或使用科学计数法表示,来判断数值类型是否为整型或者浮点型,因此在使用时要注意相应类型越界的情况。例如,9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。 ::: diff --git a/docs/zh/12-taos-sql/03-table.md b/docs/zh/12-taos-sql/03-table.md index 5687c7e740..2e66ac4002 100644 --- a/docs/zh/12-taos-sql/03-table.md +++ b/docs/zh/12-taos-sql/03-table.md @@ -43,7 +43,7 @@ table_option: { 1. 表的第一个字段必须是 TIMESTAMP,并且系统自动将其设为主键; 2. 表名最大长度为 192; -3. 表的每行长度不能超过 48KB;(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置) +3. 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB);(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置) 4. 子表名只能由字母、数字和下划线组成,且不能以数字开头,不区分大小写 5. 使用数据类型 binary 或 nchar,需指定其最长的字节数,如 binary(20),表示 20 字节; 6. 为了兼容支持更多形式的表名,TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。 diff --git a/docs/zh/12-taos-sql/19-limit.md b/docs/zh/12-taos-sql/19-limit.md index 7b6692f1b7..e5a492580e 100644 --- a/docs/zh/12-taos-sql/19-limit.md +++ b/docs/zh/12-taos-sql/19-limit.md @@ -26,7 +26,7 @@ description: 合法字符集和命名中的限制规则 - 数据库名最大长度为 64 字节 - 表名最大长度为 192 字节,不包括数据库名前缀和分隔符 -- 每行数据最大长度 48KB (注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置) +- 每行数据最大长度 48KB(从 3.0.5.0 版本开始为 64KB) (注意:数据行内每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置) - 列名最大长度为 64 字节 - 最多允许 4096 列,最少需要 2 列,第一列必须是时间戳。 - 标签名最大长度为 64 字节 diff --git a/docs/zh/14-reference/13-schemaless/13-schemaless.md b/docs/zh/14-reference/13-schemaless/13-schemaless.md index e5f232c1fc..6c2007938b 100644 --- a/docs/zh/14-reference/13-schemaless/13-schemaless.md +++ b/docs/zh/14-reference/13-schemaless/13-schemaless.md @@ -87,7 +87,7 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000 :::tip 无模式所有的处理逻辑,仍会遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过 -48KB,标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit) +48KB(从 3.0.5.0 版本开始为 64KB),标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit) ::: diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 2b28bae745..3ee19de84f 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do import Release from "/components/ReleaseV3"; +## 3.0.5.0 + + + ## 3.0.4.2 diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 7f93483ed4..fbd12b1440 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下: import Release from "/components/ReleaseV3"; +## 2.5.1 + + + ## 2.5.0 diff --git a/examples/JDBC/consumer-demo/pom.xml b/examples/JDBC/consumer-demo/pom.xml new file mode 100644 index 0000000000..aa3cb154e5 --- /dev/null +++ b/examples/JDBC/consumer-demo/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + com.taosdata + consumer + 1.0-SNAPSHOT + + + 8 + 8 + + + + + com.taosdata.jdbc + taos-jdbcdriver + 3.2.1 + + + com.google.guava + guava + 30.1.1-jre + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.3.0 + + + ConsumerDemo + + ConsumerDemo + + + com.taosdata.ConsumerDemo + + + + jar-with-dependencies + + + package + + single + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + UTF-8 + + + + + + + \ No newline at end of file diff --git a/examples/JDBC/consumer-demo/readme.md b/examples/JDBC/consumer-demo/readme.md new file mode 100644 index 0000000000..c211b017a7 --- /dev/null +++ b/examples/JDBC/consumer-demo/readme.md @@ -0,0 +1,52 @@ +# How to Run the Consumer Demo Code On Linux OS +TDengine's Consumer demo project is organized in a Maven way so that users can easily compile, package and run the project. If you don't have Maven on your server, you may install it using +``` +sudo apt-get install maven +``` + +## Install TDengine Client and TaosAdapter +Make sure you have already installed a tdengine client on your current develop environment. +Download the tdengine package on our website: ``https://www.taosdata.com/cn/all-downloads/`` and install the client. + +## Run Consumer Demo using mvn plugin +run command: +``` +mvn clean compile exec:java -Dexec.mainClass="com.taosdata.ConsumerDemo" +``` + +## Custom configuration +```shell +# the host of TDengine server +export TAOS_HOST="127.0.0.1" + +# the port of TDengine server +export TAOS_PORT="6041" + +# the consumer type, can be "ws" or "jni" +export TAOS_TYPE="ws" + +# the number of consumers +export TAOS_JDBC_CONSUMER_NUM="1" + +# the number of processors to consume +export TAOS_JDBC_PROCESSOR_NUM="2" + +# the number of records to be consumed per processor per second +export TAOS_JDBC_RATE_PER_PROCESSOR="1000" + +# poll wait time in ms +export TAOS_JDBC_POLL_SLEEP="100" +``` + +## Run Consumer Demo using jar + +To compile the demo project, go to the source directory ``TDengine/tests/examples/JDBC/consumer-demo`` and execute +``` +mvn clean package assembly:single +``` + +To run ConsumerDemo.jar, go to ``TDengine/tests/examples/JDBC/consumer-demo`` and execute +``` +java -jar target/ConsumerDemo-jar-with-dependencies.jar +``` + diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java new file mode 100644 index 0000000000..2f2467b371 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Bean.java @@ -0,0 +1,43 @@ +package com.taosdata; + +import java.sql.Timestamp; + +public class Bean { + private Timestamp ts; + private Integer c1; + private String c2; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public Integer getC1() { + return c1; + } + + public void setC1(Integer c1) { + this.c1 = c1; + } + + public String getC2() { + return c2; + } + + public void setC2(String c2) { + this.c2 = c2; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Bean {"); + sb.append("ts=").append(ts); + sb.append(", c1=").append(c1); + sb.append(", c2='").append(c2).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java new file mode 100644 index 0000000000..478af9e70d --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/BeanDeserializer.java @@ -0,0 +1,6 @@ +package com.taosdata; + +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class BeanDeserializer extends ReferenceDeserializer { +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java new file mode 100644 index 0000000000..08579926e3 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Config.java @@ -0,0 +1,78 @@ +package com.taosdata; + +public class Config { + public static final String TOPIC = "test_consumer"; + public static final String TAOS_HOST = "127.0.0.1"; + public static final String TAOS_PORT = "6041"; + public static final String TAOS_TYPE = "ws"; + public static final int TAOS_JDBC_CONSUMER_NUM = 1; + public static final int TAOS_JDBC_PROCESSOR_NUM = 2; + public static final int TAOS_JDBC_RATE_PER_PROCESSOR = 1000; + public static final int TAOS_JDBC_POLL_SLEEP = 100; + + private final int consumerNum; + private final int processCapacity; + private final int rate; + private final int pollSleep; + private final String type; + private final String host; + private final String port; + + public Config(String type, String host, String port, int consumerNum, int processCapacity, int rate, int pollSleep) { + this.type = type; + this.consumerNum = consumerNum; + this.processCapacity = processCapacity; + this.rate = rate; + this.pollSleep = pollSleep; + this.host = host; + this.port = port; + } + + public int getConsumerNum() { + return consumerNum; + } + + public int getProcessCapacity() { + return processCapacity; + } + + public int getRate() { + return rate; + } + + public int getPollSleep() { + return pollSleep; + } + + public String getHost() { + return host; + } + + public String getPort() { + return port; + } + + public String getType() { + return type; + } + + public static Config getFromENV() { + String host = System.getenv("TAOS_HOST") != null ? System.getenv("TAOS_HOST") : TAOS_HOST; + String port = System.getenv("TAOS_PORT") != null ? System.getenv("TAOS_PORT") : TAOS_PORT; + String type = System.getenv("TAOS_TYPE") != null ? System.getenv("TAOS_TYPE") : TAOS_TYPE; + + String c = System.getenv("TAOS_JDBC_CONSUMER_NUM"); + int num = c != null ? Integer.parseInt(c) : TAOS_JDBC_CONSUMER_NUM; + + String p = System.getenv("TAOS_JDBC_PROCESSOR_NUM"); + int capacity = p != null ? Integer.parseInt(p) : TAOS_JDBC_PROCESSOR_NUM; + + String r = System.getenv("TAOS_JDBC_RATE_PER_PROCESSOR"); + int rate = r != null ? Integer.parseInt(r) : TAOS_JDBC_RATE_PER_PROCESSOR; + + String s = System.getenv("TAOS_JDBC_POLL_SLEEP"); + int sleep = s != null ? Integer.parseInt(s) : TAOS_JDBC_POLL_SLEEP; + + return new Config(type, host, port, num, capacity, rate, sleep); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java new file mode 100644 index 0000000000..7c7719c639 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/ConsumerDemo.java @@ -0,0 +1,65 @@ +package com.taosdata; + +import com.taosdata.jdbc.tmq.TMQConstants; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.taosdata.Config.*; + +public class ConsumerDemo { + public static void main(String[] args) throws SQLException { + // Config + Config config = Config.getFromENV(); + // Generated data + mockData(); + + Properties prop = new Properties(); + prop.setProperty(TMQConstants.CONNECT_TYPE, config.getType()); + prop.setProperty(TMQConstants.BOOTSTRAP_SERVERS, config.getHost() + ":" + config.getPort()); + prop.setProperty(TMQConstants.CONNECT_USER, "root"); + prop.setProperty(TMQConstants.CONNECT_PASS, "taosdata"); + prop.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true"); + prop.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); + prop.setProperty(TMQConstants.GROUP_ID, "gId"); + prop.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.BeanDeserializer"); + for (int i = 0; i < config.getConsumerNum() - 1; i++) { + new Thread(new Worker(prop, config)).start(); + } + new Worker(prop, config).run(); + } + + public static void mockData() throws SQLException { + String dbName = "test_consumer"; + String tableName = "st"; + String url = "jdbc:TAOS-RS://" + TAOS_HOST + ":" + TAOS_PORT + "/?user=root&password=taosdata&batchfetch=true"; + Connection connection = DriverManager.getConnection(url); + Statement statement = connection.createStatement(); + statement.executeUpdate("create database if not exists " + dbName + " WAL_RETENTION_PERIOD 3650"); + statement.executeUpdate("use " + dbName); + statement.executeUpdate("create table if not exists " + tableName + " (ts timestamp, c1 int, c2 nchar(100)) "); + statement.executeUpdate("create topic if not exists " + TOPIC + " as select ts, c1, c2 from " + tableName); + + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r); + t.setName("mock-data-thread-" + t.getId()); + return t; + }); + AtomicInteger atomic = new AtomicInteger(); + scheduledExecutorService.scheduleWithFixedDelay(() -> { + int i = atomic.getAndIncrement(); + try { + statement.executeUpdate("insert into " + tableName + " values(now, " + i + ",'" + i + "')"); + } catch (SQLException e) { + // ignore + } + }, 0, 10, TimeUnit.MILLISECONDS); + } +} diff --git a/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java new file mode 100644 index 0000000000..f6e21cd729 --- /dev/null +++ b/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java @@ -0,0 +1,60 @@ +package com.taosdata; + +import com.google.common.util.concurrent.RateLimiter; +import com.taosdata.jdbc.tmq.ConsumerRecord; +import com.taosdata.jdbc.tmq.ConsumerRecords; +import com.taosdata.jdbc.tmq.TaosConsumer; + +import java.sql.SQLException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Semaphore; + +public class Worker implements Runnable { + + int sleepTime; + int rate; + + ForkJoinPool pool = new ForkJoinPool(); + Semaphore semaphore; + + TaosConsumer consumer; + + public Worker(Properties prop, Config config) throws SQLException { + consumer = new TaosConsumer<>(prop); + consumer.subscribe(Collections.singletonList(Config.TOPIC)); + semaphore = new Semaphore(config.getProcessCapacity()); + sleepTime = config.getPollSleep(); + rate = config.getRate(); + } + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + // 控制请求频率 + if (semaphore.tryAcquire()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(sleepTime)); + pool.submit(() -> { + RateLimiter limiter = RateLimiter.create(rate); + try { + for (ConsumerRecord record : records) { + // 流量控制 + limiter.acquire(); + // 业务处理数据 + System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value()); + } + } finally { + semaphore.release(); + } + }); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + } +} diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e015f4182e..c92ce254a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,6 +163,7 @@ typedef struct { int64_t checkPointId; int32_t taskId; int64_t streamId; + int64_t streamBackendRid; } SStreamState; typedef struct SFunctionStateStore { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8316e6ef50..3222a125dd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -344,7 +344,6 @@ typedef struct SStreamMeta { SRWLatch lock; int32_t walScanCounter; void* streamBackend; - int32_t streamBackendId; int64_t streamBackendRid; SHashObj* pTaskBackendUnique; } SStreamMeta; diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 18cd0d9cc6..683d10e926 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -22,21 +22,20 @@ extern "C" { // If the error is in a third-party library, place this header file under the third-party library header file. // When you want to use this feature, you should find or add the same function in the following sectio -// #if !defined(WINDOWS) +#if !defined(WINDOWS) -// #ifndef ALLOW_FORBID_FUNC -// #define malloc MALLOC_FUNC_TAOS_FORBID -// #define calloc CALLOC_FUNC_TAOS_FORBID -// #define realloc REALLOC_FUNC_TAOS_FORBID -// #define free FREE_FUNC_TAOS_FORBID -// #ifdef strdup -// #undef strdup -// #define strdup STRDUP_FUNC_TAOS_FORBID -// #endif -// #endif // ifndef ALLOW_FORBID_FUNC -// #endif // if !defined(WINDOWS) +#ifndef ALLOW_FORBID_FUNC +#define malloc MALLOC_FUNC_TAOS_FORBID +#define calloc CALLOC_FUNC_TAOS_FORBID +#define realloc REALLOC_FUNC_TAOS_FORBID +#define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup +#define strdup STRDUP_FUNC_TAOS_FORBID +#endif +#endif // ifndef ALLOW_FORBID_FUNC +#endif // if !defined(WINDOWS) -// // #define taosMemoryFree malloc // #define taosMemoryMalloc malloc // #define taosMemoryCalloc calloc // #define taosMemoryRealloc realloc diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c659c8f4a2..4ec66f82a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -691,6 +691,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; if (!pLastCol) { pLastCol = &noneCol; + reallocVarData(&pLastCol->colVal); } taosArraySet(pLastArray, idxKey->idx, pLastCol); @@ -2848,14 +2849,16 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); if (pCol->colVal.value.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + if (pColVal->value.nData > 0) { + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } } if (!COL_VAL_IS_VALUE(pColVal)) { @@ -3016,14 +3019,16 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) { pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); if (pCol->colVal.value.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + if (pColVal->value.nData > 0) { + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } } /*if (COL_VAL_IS_NONE(pColVal)) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index ffc63a22a8..38890f8c34 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -457,6 +457,7 @@ typedef struct SStreamIntervalOperatorInfo { int64_t dataVersion; SStateStore statestore; bool recvGetAll; + SHashObj* pFinalPullDataMap; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c93b9f4c73..2702cf6861 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1865,7 +1865,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } else { - pInfo->pUpdateInfo->maxDataVersion = pTaskInfo->streamInfo.fillHistoryVer2; + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2); doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); } } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3a83472079..d6429fd121 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1306,6 +1306,8 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) return true; } +static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } + static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; @@ -1340,8 +1342,14 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { - getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); - continue; + int32_t childId = getChildIndex(pBlock); + SArray* chArray = *(void**)chIds; + int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); + if (index != -1) { + qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId); + getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); + continue; + } } bool res = doDeleteWindow(pOperator, win.skey, winGpId); if (pUpWins && res) { @@ -1497,6 +1505,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { taosArrayDestroy(*(void**)pIte); } taosHashCleanup(pInfo->pPullDataMap); + taosHashCleanup(pInfo->pFinalPullDataMap); taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pDelWins); @@ -2067,8 +2076,6 @@ void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*)); } -static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } - static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) { tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); @@ -2112,7 +2119,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) { +void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); @@ -2136,6 +2143,22 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts); + + void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); + if (pFinalCh) { + taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey)); + doDeleteWindow(pOperator, winRes.ts, winRes.groupId); + STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); + SPullWindowInfo pull = {.window = nextWin, + .groupId = winRes.groupId, + .calWin.skey = nextWin.skey, + .calWin.ekey = nextWin.skey}; + // add pull data request + if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + addPullWindow(pMap, &winRes, numOfCh); + qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); + } + } } } } @@ -2144,7 +2167,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) } } -static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { +static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) { int32_t size = taosArrayGetSize(wins); for (int32_t i = 0; i < size; i++) { SWinKey* winKey = taosArrayGet(wins, i); @@ -2161,6 +2184,14 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild); qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild); } + } else { + SArray* chArray = *(void**)chIds; + int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); + qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index); + if (index == -1) { + qDebug("===stream===add final retrive %" PRId64, winKey->ts); + taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0); + } } } } @@ -2554,7 +2585,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { - addRetriveWindow(delWins, pInfo); + int32_t chId = getChildIndex(pBlock); + addRetriveWindow(delWins, pInfo, chId); if (pBlock->info.type != STREAM_CLEAR) { taosArrayAddAll(pInfo->pDelWins, delWins); } @@ -2589,7 +2621,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { - processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); + processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; @@ -2772,6 +2804,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pullIndex = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); + pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -4963,6 +4996,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pPhyNode = NULL; // create new child pInfo->pPullDataMap = NULL; + pInfo->pFinalPullDataMap = NULL; pInfo->pPullWins = NULL; // SPullWindowInfo pInfo->pullIndex = 0; pInfo->pPullDataRes = NULL; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 2c1956998a..c7ee308b61 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv; int32_t streamDispatchStreamBlock(SStreamTask* pTask); SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); -void destroyStreamDataBlock(SStreamDataBlock* pBlock); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, + SArray* pRes); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); @@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +extern int32_t streamBackendId; + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b3995f020b..df045eef20 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -16,7 +16,9 @@ #include "streamBackendRocksdb.h" #include "executor.h" #include "query.h" +#include "streamInc.h" #include "tcommon.h" +#include "tref.h" typedef struct SCompactFilteFactory { void* status; @@ -79,8 +81,8 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { - qDebug("init stream backend"); - SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); + qDebug("start to init stream backend at %s", path); + SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL); @@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) { if (err != NULL) { qError("failed to open rocksdb, path:%s, reason:%s", path, err); taosMemoryFreeClear(err); + goto _EXIT; } } else { /* @@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } + qDebug("succ to init stream backend at %s, backend:%p", path, pHandle); return (void*)pHandle; _EXIT: @@ -140,7 +144,8 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); - free(pHandle); + taosMemoryFree(pHandle); + qDebug("failed to init stream backend at %s", path); return NULL; } void streamBackendCleanup(void* arg) { @@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) { rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); - taosThreadMutexDestroy(&pHandle->mutex); SListNode* head = tdListPopHead(pHandle->list); while (head != NULL) { streamStateDestroyCompar(head->data); taosMemoryFree(head); head = tdListPopHead(pHandle->list); } - // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); + tdListFree(pHandle->list); + taosThreadMutexDestroy(&pHandle->mutex); + taosThreadMutexDestroy(&pHandle->cfMutex); taosMemoryFree(pHandle); - + qDebug("destroy stream backend backend:%p", pHandle); return; } SListNode* streamBackendAddCompare(void* backend, void* arg) { @@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); + taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendHandle* handle = backend; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); @@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); - qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); return 0; } @@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; - qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, - pState->taskId); + qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle, + pState->streamId, pState->taskId); if (pState->pTdbState->rocksdb == NULL) { return; } @@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadRwlockDestroy(&pState->pTdbState->rwLock); pState->pTdbState->rocksdb = NULL; + taosReleaseRef(streamBackendId, pState->streamBackendRid); } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8c26052fdb..5c31b1dd60 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -20,7 +20,7 @@ #include "ttimer.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; -static int32_t streamBackendId = 0; +int32_t streamBackendId = 0; static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } @@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - pMeta->streamBackendId = streamBackendId; memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "state"); @@ -90,6 +89,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } pMeta->streamBackend = streamBackendInit(streamPath); + if (pMeta->streamBackend == NULL) { + goto _err; + } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosMemoryFree(streamPath); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 71a21ac150..967c7733c9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } SStreamTask* pStreamTask = pTask; - char statePath[1024]; + char statePath[1024]; if (!specPath) { sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); } else { @@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; - taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid); + pState->streamBackendRid = pMeta->streamBackendRid; int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { - taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid); + taosReleaseRef(streamBackendId, pMeta->streamBackendRid); taosMemoryFree(pState); pState = NULL; } @@ -222,9 +222,7 @@ _err: void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; #ifdef USE_ROCKSDB - // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; @@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* } int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index bc84509728..bfaeca89f6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; maxCheckPointId = atol((char*)buf); taosMemoryFree(val); @@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; taosMemoryFree(val); diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index 959b32fa59..5bb03c8cbf 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -1,36 +1,11 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 -system sh/deploy.sh -n dnode2 -i 2 system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -sql create dnode $hostname2 port 7200 -system sh/exec.sh -n dnode2 -s start - -print ===== step1 -$x = 0 -step1: - $x = $x + 1 - sleep 1000 - if $x == 10 then - print ====> dnode not ready! - return -1 - endi -sql select * from information_schema.ins_dnodes -print ===> $data00 $data01 $data02 $data03 $data04 $data05 -print ===> $data10 $data11 $data12 $data13 $data14 $data15 -if $rows != 2 then - return -1 -endi -if $data(1)[4] != ready then - goto step1 -endi -if $data(2)[4] != ready then - goto step1 -endi print ===== step2 sql drop stream if exists stream_t1; @@ -248,10 +223,56 @@ sql insert into ts3 values(1648791223002,2,2,3,1.1); sql insert into ts4 values(1648791233003,3,2,3,2.1); sql insert into ts3 values(1648791243004,4,2,43,73.1); sql insert into ts4 values(1648791213002,24,22,23,4.1); + +$loop_count = 0 +loop032: + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +sleep 1000 +print 6-0 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop032 +endi + +if $data01 != 8 then + print =6====data01=$data01 + goto loop032 +endi + sql insert into ts3 values(1648791243005,4,20,3,3.1); sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); + +$loop_count = 0 +loop033: + +$loop_count = $loop_count + 1 +if $loop_count == 30 then + return -1 +endi + +sleep 1000 +print 6-1 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop033 +endi + +if $data01 != 8 then + print =6====data01=$data01 + goto loop033 +endi + sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; $loop_count = 0 diff --git a/tests/system-test/0-others/multilevel.py b/tests/system-test/0-others/multilevel.py index 7ad4eba645..f086dcb735 100644 --- a/tests/system-test/0-others/multilevel.py +++ b/tests/system-test/0-others/multilevel.py @@ -116,7 +116,7 @@ class TDTestCase: tdSql.checkRows(1000) tdLog.info("================= step3") tdSql.execute('drop database test') - for i in range(50): + for i in range(10): tdSql.execute("create database test%d duration 1" %(i)) tdSql.execute("use test%d" %(i)) tdSql.execute("create table tb (ts timestamp,i int)")