Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-26174

This commit is contained in:
liuyao 2023-09-21 10:11:36 +08:00
commit 3b3c5d7cb8
38 changed files with 1904 additions and 212 deletions

View File

@ -314,9 +314,9 @@ def pre_test_build_win() {
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.10
python -m pip install taospy==2.7.12
python -m pip uninstall taos-ws-py -y
python -m pip install taos-ws-py==0.2.8
python -m pip install taos-ws-py==0.2.9
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1

View File

@ -142,8 +142,15 @@ TDengine currently supports timestamp, number, character, Boolean type, and the
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
**Note**: Only TAG supports JSON types
Due to historical reasons, the BINARY type data in TDengine is not truly binary data and is no longer recommended for use. Please use VARBINARY type instead.
GEOMETRY type is binary data in little endian byte order, which complies with the WKB specification. For detailed information, please refer to [Data Type] (/tao-sql/data-type/#Data Types)
For WKB specifications, please refer to [Well Known Binary (WKB)] https://libgeos.org/specifications/wkb/
For Java connector, the jts library can be used to easily create GEOMETRY type objects, serialize them, and write them to TDengine. Here is an example [Geometry example]https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java
## Installation Steps
@ -354,7 +361,7 @@ The configuration parameters in properties are as follows.
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR: only works when using JDBC native connection. Client configuration file directory path, default value `/etc/taos` on Linux OS, default value `C:/TDengine/cfg` on Windows OS, default value `/etc/taos` on macOS.
- TSDBDriver.PROPERTY_KEY_CHARSET: In the character set used by the client, the default value is the system character set.
- TSDBDriver.PROPERTY_KEY_LOCALE: this only takes effect when using JDBC native connection. Client language environment, the default value is system current locale.
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone.
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone. Due to historical reasons, we only support some specifications of the POSIX standard, such as UTC-8 (representing timezone Shanghai in China), GMT-7, Europe/Paris.
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection.
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
@ -456,13 +463,15 @@ public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 20;
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -474,21 +483,20 @@ public class ParameterBindingDemo {
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
@ -496,6 +504,11 @@ public class ParameterBindingDemo {
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
@ -674,10 +687,84 @@ public class ParameterBindingDemo {
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
```
**Note**: both setString and setNString require the user to declare the width of the corresponding column in the size parameter of the table definition
**Note**: both String and byte[] require the user to declare the width of the corresponding column in the size parameter of the table definition
The methods to set VALUES columns:
@ -692,6 +779,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
```
</TabItem>
@ -880,6 +969,9 @@ public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
```
### Schemaless Writing

View File

@ -22,7 +22,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.4</version>
<version>3.2.7-SNAPSHOT</version>
</dependency>
<!-- ANCHOR_END: dep-->
<dependency>

View File

@ -142,8 +142,14 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
**注意**JSON 类型仅在 tag 中支持。
由于历史原因TDengine中的BINARY底层不是真正的二进制数据已不建议使用。请用VARBINARY类型代替。
GEOMETRY类型是little endian字节序的二进制数据符合WKB规范。详细信息请参考 [数据类型](/taos-sql/data-type/#数据类型)
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
对于java连接器可以使用jts库来方便的创建GEOMETRY类型对象序列化后写入TDengine这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
## 安装步骤
@ -357,7 +363,7 @@ properties 中的配置参数如下:
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR仅在使用 JDBC 原生连接时生效。客户端配置文件目录路径Linux OS 上默认值 `/etc/taos`Windows OS 上默认值 `C:/TDengine/cfg`。
- TSDBDriver.PROPERTY_KEY_CHARSET客户端使用的字符集默认值为系统字符集。
- TSDBDriver.PROPERTY_KEY_LOCALE仅在使用 JDBC 原生连接时生效。 客户端语言环境,默认值系统当前 locale。
- TSDBDriver.PROPERTY_KEY_TIME_ZONE仅在使用 JDBC 原生连接时生效。 客户端使用的时区,默认值为系统当前时区。
- TSDBDriver.PROPERTY_KEY_TIME_ZONE仅在使用 JDBC 原生连接时生效。 客户端使用的时区,默认值为系统当前时区。因为历史的原因我们只支持POSIX标准的部分规范如UTC-8(代表中国上上海), GMT-8Asia/Shanghai 这几种形式。
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms 默认值为 60000。仅在 REST 连接时生效。
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms默认值为 60000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
@ -459,13 +465,15 @@ public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30;
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -477,21 +485,20 @@ public class ParameterBindingDemo {
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
@ -499,6 +506,11 @@ public class ParameterBindingDemo {
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
@ -677,10 +689,84 @@ public class ParameterBindingDemo {
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
```
**注**setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
**注**字符串和数组类型都要求用户在 size 参数里声明表定义中对应列的列宽
用于设定 VALUES 数据列的取值的方法总共有:
@ -695,6 +781,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
```
</TabItem>
@ -883,6 +971,9 @@ public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
```
### 无模式写入

View File

@ -11,13 +11,20 @@
<properties>
<project.assembly.dir>src/main/resources/assembly</project.assembly.dir>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
<version>3.2.7</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.19.0</version>
</dependency>
</dependencies>
@ -68,12 +75,12 @@
</execution>
<execution>
<id>SubscribeDemo</id>
<id>GeometryDemo</id>
<configuration>
<finalName>SubscribeDemo</finalName>
<finalName>GeometryDemo</finalName>
<archive>
<manifest>
<mainClass>com.taosdata.example.SubscribeDemo</mainClass>
<mainClass>com.taosdata.example.GeometryDemo</mainClass>
</manifest>
</archive>
<descriptorRefs>

View File

@ -0,0 +1,94 @@
package com.taosdata.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("auto.offset.reset", "earliest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
config.setProperty("experimental.snapshot.enable", "true");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
process(bean);
}
}
consumer.unsubscribe();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
public static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}

View File

@ -0,0 +1,190 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.locationtech.jts.geom.*;
import org.locationtech.jts.io.ByteOrderValues;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKBWriter;
import java.sql.*;
import java.util.ArrayList;
import java.util.Properties;
public class GeometryDemo {
private static String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
private Connection connection;
public static void main(String[] args) throws SQLException {
for (int i = 0; i < args.length; i++) {
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1)
host = args[++i];
}
if (host == null) {
printHelp();
}
GeometryDemo demo = new GeometryDemo();
demo.init();
demo.createDatabase();
demo.useDatabase();
demo.dropTable();
demo.createTable();
demo.insert();
demo.stmtInsert();
demo.select();
demo.dropTable();
demo.close();
}
private void init() {
final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
try {
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
connection = DriverManager.getConnection(url, properties);
if (connection != null)
System.out.println("[ OK ] Connection established.");
} catch (SQLException e) {
e.printStackTrace();
}
}
private void createDatabase() {
String sql = "create database if not exists " + dbName;
execute(sql);
}
private void useDatabase() {
String sql = "use " + dbName;
execute(sql);
}
private void dropTable() {
final String sql = "drop table if exists " + dbName + "." + tbName + "";
execute(sql);
}
private void createTable() {
final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int, location geometry(50))";
execute(sql);
}
private void insert() {
final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity, location) values(now, 20.5, 34, 'POINT(1 2)')";
execute(sql);
}
private void stmtInsert() throws SQLException {
TSDBPreparedStatement preparedStatement = (TSDBPreparedStatement) connection.prepareStatement("insert into " + dbName + "." + tbName + " values (?, ?, ?, ?)");
long current = System.currentTimeMillis();
ArrayList<Long> tsList = new ArrayList<>();
tsList.add(current);
tsList.add(current + 1);
preparedStatement.setTimestamp(0, tsList);
ArrayList<Float> tempList = new ArrayList<>();
tempList.add(20.1F);
tempList.add(21.2F);
preparedStatement.setFloat(1, tempList);
ArrayList<Integer> humList = new ArrayList<>();
humList.add(30);
humList.add(31);
preparedStatement.setInt(2, humList);
ArrayList<byte[]> list = new ArrayList<>();
GeometryFactory gf = new GeometryFactory();
Point p1 = gf.createPoint(new Coordinate(1,2));
p1.setSRID(1234);
// NOTE: TDengine current version only support 2D dimension and little endian byte order
WKBWriter w = new WKBWriter(2, ByteOrderValues.LITTLE_ENDIAN, true);
byte[] wkb = w.write(p1);
list.add(wkb);
Coordinate[] coordinates = { new Coordinate(10, 20),
new Coordinate(30, 40)};
LineString lineString = gf.createLineString(coordinates);
lineString.setSRID(2345);
byte[] wkb2 = w.write(lineString);
list.add(wkb2);
preparedStatement.setGeometry(3, list, 50);
preparedStatement.columnDataAddBatch();
preparedStatement.columnDataExecuteBatch();
}
private void select() {
final String sql = "select * from " + dbName + "." + tbName;
executeQuery(sql);
}
private void close() {
try {
if (connection != null) {
this.connection.close();
System.out.println("connection closed.");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
private void executeQuery(String sql) {
long start = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
while (resultSet.next()){
byte[] result1 = resultSet.getBytes(4);
WKBReader reader = new WKBReader();
Geometry g1 = reader.read(result1);
System.out.println("GEO OBJ: " + g1 + ", SRID: " + g1.getSRID());
}
} catch (SQLException e) {
long end = System.currentTimeMillis();
printSql(sql, false, (end - start));
e.printStackTrace();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
private void printSql(String sql, boolean succeed, long cost) {
System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
}
private void execute(String sql) {
long start = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) {
boolean execute = statement.execute(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
} catch (SQLException e) {
long end = System.currentTimeMillis();
printSql(sql, false, (end - start));
e.printStackTrace();
}
}
private static void printHelp() {
System.out.println("Usage: java -jar JDBCDemo.jar -host <hostname>");
System.exit(0);
}
}

View File

@ -0,0 +1,316 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import com.taosdata.jdbc.utils.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setByte(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(0, random.nextFloat());
pstmt.setTagDouble(1, random.nextDouble());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat());
pstmt.setFloat(1, f1List);
ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
// close if no try-with-catch statement is used
pstmt.close();
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "California.SanFrancisco");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("California.LosAngeles");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}

View File

@ -1,74 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class SubscribeDemo {
private static final String usage = "java -jar SubscribeDemo.jar -host <hostname> -database <database name> -topic <topic> -sql <sql>";
public static void main(String[] args) {
// parse args from command line
String host = "", database = "", topic = "", sql = "";
for (int i = 0; i < args.length; i++) {
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-database".equalsIgnoreCase(args[i]) && i < args.length - 1) {
database = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
if ("-sql".equalsIgnoreCase(args[i]) && i < args.length - 1) {
sql = args[++i];
}
}
if (host.isEmpty() || database.isEmpty() || topic.isEmpty() || sql.isEmpty()) {
System.out.println(usage);
return;
}
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
final String url = "jdbc:TAOS://" + host + ":6030/" + database + "?user=root&password=taosdata";
// get TSDBConnection
TSDBConnection connection = (TSDBConnection) DriverManager.getConnection(url, properties);
// create TSDBSubscribe
TSDBSubscribe sub = connection.subscribe(topic, sql, false);
int total = 0;
while (true) {
TSDBResultSet rs = sub.consume();
int count = 0;
ResultSetMetaData meta = rs.getMetaData();
while (rs.next()) {
for (int i = 1; i <= meta.getColumnCount(); i++) {
System.out.print(meta.getColumnLabel(i) + ": " + rs.getString(i) + "\t");
}
System.out.println();
count++;
}
total += count;
// System.out.printf("%d rows consumed, total %d\n", count, total);
if (total >= 10)
break;
TimeUnit.SECONDS.sleep(1);
}
sub.close(false);
connection.close();
} catch (Exception e) {
System.out.println("host: " + host + ", database: " + database + ", topic: " + topic + ", sql: " + sql);
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,170 @@
package com.taosdata.example;
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
import java.sql.*;
import java.util.Random;
public class WSParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_ws_parabind");
stmt.execute("create database if not exists test_ws_parabind");
stmt.execute("use test_ws_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(4, random.nextLong());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
pstmt.setLong(5, random.nextLong());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(1, random.nextFloat());
pstmt.setTagDouble(2, random.nextDouble());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat());
pstmt.setDouble(3, random.nextDouble());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(1, random.nextBoolean());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setBoolean(2, random.nextBoolean());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(1, new String("abc"));
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setString(2, "abc");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(1, "California.SanFrancisco");
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(0, new Timestamp(current + j));
pstmt.setNString(1, "California.SanFrancisco");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
}

View File

@ -236,6 +236,7 @@ bool fmIsInterpPseudoColumnFunc(int32_t funcId);
bool fmIsGroupKeyFunc(int32_t funcId);
bool fmIsBlockDistFunc(int32_t funcId);
bool fmIsConstantResFunc(SFunctionNode* pFunc);
bool fmIsSkipScanCheckFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType);
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);

View File

@ -293,6 +293,7 @@ typedef struct SPartitionLogicNode {
SNodeList* pPartitionKeys;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pAggFuncs;
bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
int32_t pkTsColId;

View File

@ -1415,6 +1415,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps,pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
SMqClientVg clientVg = {
.pollCnt = 0,
.vgId = pVgEp->vgId,

View File

@ -771,6 +771,29 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
return 0;
}
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){
// iter all vnode to delete handle
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = pVgEp->epSet;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
@ -831,6 +854,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
code = sendDeleteSubToVnode(pSub, pTrans);
if (code != 0) {
goto end;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
code = -1;
@ -1117,26 +1145,11 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
sdbCancelFetch(pSdb, pIter);
return -1;
}
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = pVgEp->epSet;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
if (sendDeleteSubToVnode(pSub, pTrans) != 0) {
sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
return -1;
}
}
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);

View File

@ -65,7 +65,8 @@ set(
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqStreamTask.c"
"src/tq/tqSnapshot.c"
"src/tq/tqHandleSnapshot.c"
"src/tq/tqCheckInfoSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"

View File

@ -134,7 +134,7 @@ int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq);
//int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);

View File

@ -69,6 +69,8 @@ typedef struct STqSnapReader STqSnapReader;
typedef struct STqSnapWriter STqSnapWriter;
typedef struct STqOffsetReader STqOffsetReader;
typedef struct STqOffsetWriter STqOffsetWriter;
typedef struct STqCheckInfoReader STqCheckInfoReader;
typedef struct STqCheckInfoWriter STqCheckInfoWriter;
typedef struct SStreamTaskReader SStreamTaskReader;
typedef struct SStreamTaskWriter SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader;
@ -308,6 +310,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqCheckInfoshotReader ==
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader);
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader);
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData);
// STqCheckInfoshotWriter ======================================
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter);
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback);
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqOffsetReader ========================================
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
@ -503,6 +513,7 @@ enum {
SNAP_DATA_STREAM_TASK_CHECKPOINT = 10,
SNAP_DATA_STREAM_STATE = 11,
SNAP_DATA_STREAM_STATE_BACKEND = 12,
SNAP_DATA_TQ_CHECKINFO = 13,
};
struct SSnapDataHdr {

View File

@ -697,7 +697,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDestroyTqHandle(&handle);
goto end;
}
taosWLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
} else {
while(1){
taosWLockLatch(&pTq->lock);

View File

@ -0,0 +1,196 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqCheckInfoReader ========================================
struct STqCheckInfoReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) {
int32_t code = 0;
STqCheckInfoReader* pReader = NULL;
// alloc
pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
}
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) {
int32_t code = 0;
void* pKey = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_CHECKINFO;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
_exit:
tdbFree(pKey);
tdbFree(pVal);
tqInfo("vgId:%d, vnode check info tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
return code;
_err:
tdbFree(pKey);
tdbFree(pVal);
tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
// STqCheckInfoWriter ========================================
struct STqCheckInfoWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN* txn;
};
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) {
int32_t code = 0;
STqCheckInfoWriter* pWriter;
// alloc
pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqCheckInfoWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
_err:
tqError("vgId:%d, tq check info writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqCheckInfo info = {0};
SDecoder decoder;
SDecoder* pDecoder = &decoder;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqCheckInfo(pDecoder, &info);
if (code) goto _err;
code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
if (code) goto _err;
code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -75,31 +75,15 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
void* pKey = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
STqHandle handle;
*ppData = NULL;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
tDecoderClear(&decoder);
if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) {
tdbTbcMoveToNext(pReader->pCur);
break;
} else {
tdbTbcMoveToNext(pReader->pCur);
}
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -111,13 +95,15 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode),
handle.snapshotVer, handle.subKey, vLen);
_exit:
tdbFree(pKey);
tdbFree(pVal);
tqInfo("vgId:%d, vnode snapshot tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
return code;
_err:
tdbFree(pKey);
tdbFree(pVal);
tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
@ -173,20 +159,13 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if (code) goto _err;
}
int vgId = TD_VID(pWriter->pTq->pVnode);
taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
if (tqMetaRestoreHandle(pTq) < 0) {
goto _err;
}
return code;
_err:
tqError("vgId:%d, tq snapshot writer close failed since %s", vgId, tstrerror(code));
tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
@ -200,7 +179,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
taosWLockLatch(&pTq->lock);
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
if (code < 0) goto _err;
tDecoderClear(pDecoder);

View File

@ -388,34 +388,34 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
int32_t tqMetaRestoreHandle(STQ* pTq) {
int code = 0;
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle = {0};
code = restoreHandle(pTq, pVal, vLen, &handle);
if (code < 0) {
tqDestroyTqHandle(&handle);
break;
}
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return code;
}
//int32_t tqMetaRestoreHandle(STQ* pTq) {
// int code = 0;
// TBC* pCur = NULL;
// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
// return -1;
// }
//
// void* pKey = NULL;
// int kLen = 0;
// void* pVal = NULL;
// int vLen = 0;
//
// tdbTbcMoveToFirst(pCur);
//
// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
// STqHandle handle = {0};
// code = restoreHandle(pTq, pVal, vLen, &handle);
// if (code < 0) {
// tqDestroyTqHandle(&handle);
// break;
// }
// }
//
// tdbFree(pKey);
// tdbFree(pVal);
// tdbTbcClose(pCur);
// return code;
//}
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
void* pVal = NULL;

View File

@ -159,6 +159,7 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
taosCloseFile(&pFile);
return -1;
}
taosCloseFile(&pFile);
} else {
return -1;
}

View File

@ -34,6 +34,8 @@ struct SVSnapReader {
STqSnapReader *pTqSnapReader;
int8_t tqOffsetDone;
STqOffsetReader *pTqOffsetReader;
int8_t tqCheckInfoDone;
STqCheckInfoReader *pTqCheckInfoReader;
// stream
int8_t streamTaskDone;
SStreamTaskReader *pStreamTaskReader;
@ -81,6 +83,18 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
metaSnapReaderClose(&pReader->pMetaReader);
}
if (pReader->pTqSnapReader) {
tqSnapReaderClose(&pReader->pTqSnapReader);
}
if (pReader->pTqOffsetReader) {
tqOffsetReaderClose(&pReader->pTqOffsetReader);
}
if (pReader->pTqCheckInfoReader) {
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
}
taosMemoryFree(pReader);
}
@ -181,6 +195,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
// TQ ================
vInfo("vgId:%d tq transform start", vgId);
if (!pReader->tqHandleDone) {
if (pReader->pTqSnapReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
@ -200,6 +215,25 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
}
}
if (!pReader->tqCheckInfoDone) {
if (pReader->pTqCheckInfoReader == NULL) {
code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader);
if (code < 0) goto _err;
}
code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->tqCheckInfoDone = 1;
code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
if (code) goto _err;
}
}
}
if (!pReader->tqOffsetDone) {
if (pReader->pTqOffsetReader == NULL) {
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
@ -334,6 +368,7 @@ struct SVSnapWriter {
// tq
STqSnapWriter *pTqSnapWriter;
STqOffsetWriter *pTqOffsetWriter;
STqCheckInfoWriter *pTqCheckInfoWriter;
// stream
SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter;
@ -411,6 +446,21 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit;
}
if (pWriter->pTqSnapWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pTqCheckInfoWriter) {
code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pTqOffsetWriter) {
code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pStreamTaskWriter) {
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
if (code) goto _exit;
@ -519,8 +569,34 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (code) goto _err;
} break;
case SNAP_DATA_TQ_HANDLE: {
// tq handle
if (pWriter->pTqSnapWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter);
if (code) goto _err;
}
code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_TQ_CHECKINFO: {
// tq checkinfo
if (pWriter->pTqCheckInfoWriter == NULL) {
code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter);
if (code) goto _err;
}
code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_TQ_OFFSET: {
// tq offset
if (pWriter->pTqOffsetWriter == NULL) {
code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter);
if (code) goto _err;
}
code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_STREAM_TASK:
case SNAP_DATA_STREAM_TASK_CHECKPOINT: {

View File

@ -52,6 +52,7 @@ extern "C" {
#define FUNC_MGT_INTERP_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(23)
#define FUNC_MGT_GEOMETRY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24)
#define FUNC_MGT_FORBID_SYSTABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(25)
#define FUNC_MGT_SKIP_SCAN_CHECK_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(26)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -3446,7 +3446,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_group_key",
.type = FUNCTION_TYPE_GROUP_KEY,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_SKIP_SCAN_CHECK_FUNC,
.translateFunc = translateGroupKey,
.getEnvFunc = getGroupKeyFuncEnv,
.initFunc = functionSetup,

View File

@ -346,6 +346,10 @@ bool fmIsConstantResFunc(SFunctionNode* pFunc) {
return true;
}
bool fmIsSkipScanCheckFunc(int32_t funcId) {
return isSpecificClassifyFunc(funcId, FUNC_MGT_SKIP_SCAN_CHECK_FUNC);
}
void getLastCacheDataType(SDataType* pType) {
pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE;
pType->type = TSDB_DATA_TYPE_BINARY;

View File

@ -543,6 +543,7 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
CLONE_NODE_LIST_FIELD(pPartitionKeys);
CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable);
CLONE_NODE_LIST_FIELD(pAggFuncs);
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
COPY_SCALAR_FIELD(pkTsColId);
COPY_SCALAR_FIELD(pkTsColTbId);

View File

@ -1205,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pPartitionKeys);
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
nodesDestroyList(pLogicNode->pAggFuncs);
break;
}
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: {

View File

@ -1256,6 +1256,10 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, NULL, fmIsAggFunc, &pPartition->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
pPartition->pPartitionKeys = nodesCloneList(pSelect->pPartitionByList);
if (NULL == pPartition->pPartitionKeys) {

View File

@ -171,11 +171,16 @@ static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
}
static bool scanPathOptShouldGetFuncs(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
if (pNode->pParent && QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
if (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType) return true;
} else {
return !scanPathOptHaveNormalCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
}
}
if ((QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode)->winType) ||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) && pNode->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) &&
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType)) {
WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode)->winType)) {
return true;
}
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode)) {
@ -191,30 +196,17 @@ static SNodeList* scanPathOptGetAllFuncs(SLogicNode* pNode) {
return ((SWindowLogicNode*)pNode)->pFuncs;
case QUERY_NODE_LOGIC_PLAN_AGG:
return ((SAggLogicNode*)pNode)->pAggFuncs;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return ((SPartitionLogicNode*)pNode)->pAggFuncs;
default:
break;
}
return NULL;
}
static bool scanPathOptNeedOptimizeDataRequire(const SFunctionNode* pFunc) {
if (!fmIsSpecialDataRequiredFunc(pFunc->funcId)) {
return false;
}
SNode* pPara = NULL;
FOREACH(pPara, pFunc->pParameterList) {
if (QUERY_NODE_COLUMN != nodeType(pPara) && QUERY_NODE_VALUE != nodeType(pPara)) {
return false;
}
}
return true;
}
static bool scanPathOptNeedDynOptimize(const SFunctionNode* pFunc) {
if (!fmIsDynamicScanOptimizedFunc(pFunc->funcId)) {
return false;
}
SNode* pPara = NULL;
static bool scanPathOptIsSpecifiedFuncType(const SFunctionNode* pFunc, bool (*typeCheckFn)(int32_t)) {
if (!typeCheckFn(pFunc->funcId)) return false;
SNode* pPara;
FOREACH(pPara, pFunc->pParameterList) {
if (QUERY_NODE_COLUMN != nodeType(pPara) && QUERY_NODE_VALUE != nodeType(pPara)) {
return false;
@ -232,10 +224,12 @@ static int32_t scanPathOptGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSd
FOREACH(pNode, pAllFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t code = TSDB_CODE_SUCCESS;
if (scanPathOptNeedOptimizeDataRequire(pFunc)) {
if (scanPathOptIsSpecifiedFuncType(pFunc, fmIsSpecialDataRequiredFunc)) {
code = nodesListMakeStrictAppend(&pTmpSdrFuncs, nodesCloneNode(pNode));
} else if (scanPathOptNeedDynOptimize(pFunc)) {
} else if (scanPathOptIsSpecifiedFuncType(pFunc, fmIsDynamicScanOptimizedFunc)) {
code = nodesListMakeStrictAppend(&pTmpDsoFuncs, nodesCloneNode(pNode));
} else if (scanPathOptIsSpecifiedFuncType(pFunc, fmIsSkipScanCheckFunc)) {
continue;
} else {
otherFunc = true;
break;

View File

@ -420,6 +420,7 @@ void transThreadOnce();
void transInit();
void transCleanup();
void transPrintEpSet(SEpSet* pEpSet);
void transFreeMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len);

View File

@ -2221,7 +2221,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
tDebug("epset not equal, retry new epset1");
transPrintEpSet(&pCtx->epSet);
transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
@ -2246,7 +2248,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
tDebug("epset not equal, retry new epset2");
transPrintEpSet(&pCtx->epSet);
transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {

View File

@ -163,6 +163,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -i True
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform.py -N 2 -n 1
,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeReplicate.py -M 3 -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-19201.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TS-3404.py

View File

@ -45,9 +45,9 @@ class TMQCom:
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb', replicaVar=1):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("create database if not exists %s vgroups 1 replica %d"%(cdbName,replicaVar))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))

View File

@ -12,7 +12,7 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135}
# updatecfgDict = {'debugFlag': 135}
def __init__(self):
self.vgroups = 2
@ -252,7 +252,6 @@ class TDTestCase:
break
tdLog.info("all consumers status into 'lost'")
# drop consumer groups
tdLog.info("drop all consumers")
for i in range(len(groupIdList)):

View File

@ -0,0 +1,173 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctbn',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
topicNameList = ['topic1']
# expectRowsList = []
tmqCom.initConsumerTable("cdb", self.replicaVar)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
tdSql.query("select * from information_schema.ins_vnodes")
# tdLog.debug(tdSql.queryResult)
tdDnodes = cluster.dnodes
for result in tdSql.queryResult:
if result[2] == 'dbt' and result[3] == 'leader':
tdLog.debug("leader is %d"%(result[0] - 1))
tdDnodes[result[0] - 1].stoptaosd()
break
pInsertThread.join()
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,337 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def getDataPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
return selfPath + '/../../../sim/dnode%d/data/vnode/vnode%d/wal/*';
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
# tdLog.info("create ctb")
# tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
# ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("insert data")
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def restartAndRemoveWal(self):
tdDnodes = cluster.dnodes
tdSql.query("select * from information_schema.ins_vnodes")
for result in tdSql.queryResult:
if result[2] == 'dbt':
tdLog.debug("dnode is %d"%(result[0]))
dnodeId = result[0]
vnodeId = result[1]
tdDnodes[dnodeId - 1].stoptaosd()
time.sleep(1)
dataPath = self.getDataPath()
dataPath = dataPath%(dnodeId,vnodeId)
os.system('rm -rf ' + dataPath)
tdLog.debug("dataPath:%s"%dataPath)
tdDnodes[dnodeId - 1].starttaosd()
time.sleep(1)
break
tdLog.debug("restart dnode ok")
def redistributeVgroups(self):
dnodesList = []
tdSql.query("show dnodes")
for result in tdSql.queryResult:
dnodesList.append(result[0])
tdSql.query("select * from information_schema.ins_vnodes")
vnodeId = 0
for result in tdSql.queryResult:
if result[2] == 'dbt':
tdLog.debug("dnode is %d"%(result[0]))
dnodesList.remove(result[0])
vnodeId = result[1]
break
redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0])
tdLog.debug("redistributeSql:%s"%(redistributeSql))
tdSql.query(redistributeSql)
tdLog.debug("redistributeSql ok")
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
# expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tdLog.info("create ctb1")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
#restart dnode & remove wal
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
tdLog.info("create ctb2")
paraDict['ctbPrefix'] = "ctbn"
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
pInsertThread1 = tmqCom.asyncInsertDataByInterlace(paraDict)
pInsertThread.join()
pInsertThread1.join()
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName':'dbt'}
ntbName = "ntb"
topicNameList = ['topic2']
tmqCom.initConsumerTable()
sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName)
tdLog.info("create nomal table sql: %s"%sqlString)
tdSql.execute(sqlString)
tdLog.info("create topics from nomal table")
queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName)
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query("flush database %s"%(paraDict['dbName']))
#restart dnode & remove wal
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName)
tdLog.info("alter table sql: %s"%sqlString)
tdSql.error(sqlString)
time.sleep(1)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self):
tdLog.printNoPrefix("======== test case 3: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stbn',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 2,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic3']
tmqCom.initConsumerTable()
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
time.sleep(5)
#restart dnode & remove wal
self.restartAndRemoveWal()
# redistribute vgroup
self.redistributeVgroups();
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2()
# self.tmqCase3()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())