From 4acfdfe403f69aa1d506c9f4b8a1e3728fc8484b Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Sat, 21 Sep 2024 17:04:30 +0800 Subject: [PATCH] merge --- .../example/ParameterBindingFullDemo.java | 321 +++++------------- .../example/WSParameterBindingFullDemo.java | 181 ++++------ .../src/test/java/com/taos/test/TestAll.java | 76 +++-- docs/examples/rust/nativeexample/Cargo.toml | 1 + .../rust/nativeexample/examples/stmt_all.rs | 121 +++++++ .../nativeexample/examples/stmt_json_tag.rs | 94 +++++ .../rust/restexample/examples/stmt_all.rs | 121 +++++++ docs/zh/14-reference/05-connector/26-rust.mdx | 2 + .../20-third-party/01-collection/11-kafka.md | 34 +- 9 files changed, 570 insertions(+), 381 deletions(-) create mode 100644 docs/examples/rust/nativeexample/examples/stmt_all.rs create mode 100644 docs/examples/rust/nativeexample/examples/stmt_json_tag.rs create mode 100644 docs/examples/rust/restexample/examples/stmt_all.rs diff --git a/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java b/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java index 5eb0cf0a61..dfb2915037 100644 --- a/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/ParameterBindingFullDemo.java @@ -3,10 +3,7 @@ package com.taos.example; import com.taosdata.jdbc.TSDBPreparedStatement; import com.taosdata.jdbc.utils.StringUtils; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -16,15 +13,32 @@ public class ParameterBindingFullDemo { private static final String host = "127.0.0.1"; private static final Random random = new Random(System.currentTimeMillis()); - private static final int BINARY_COLUMN_SIZE = 50; + private static final int BINARY_COLUMN_SIZE = 100; private static final String[] schemaList = { - "create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)", - "create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)", - "create table stable3(ts timestamp, f1 bool) tags(t1 bool)", - "create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))", - "create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))", - "create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))", - "create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))", + "drop database if exists example_all_type_stmt", + "CREATE DATABASE IF NOT EXISTS example_all_type_stmt", + "USE example_all_type_stmt", + "CREATE STABLE IF NOT EXISTS stb_json (" + + "ts TIMESTAMP, " + + "int_col INT) " + + "tags (json_tag json)", + "CREATE STABLE IF NOT EXISTS stb (" + + "ts TIMESTAMP, " + + "int_col INT, " + + "double_col DOUBLE, " + + "bool_col BOOL, " + + "binary_col BINARY(100), " + + "nchar_col NCHAR(100), " + + "varbinary_col VARBINARY(100), " + + "geometry_col GEOMETRY(100)) " + + "tags (" + + "int_tag INT, " + + "double_tag DOUBLE, " + + "bool_tag BOOL, " + + "binary_tag BINARY(100), " + + "nchar_tag NCHAR(100), " + + "varbinary_tag VARBINARY(100), " + + "geometry_tag GEOMETRY(100))" }; private static final int numOfSubTable = 10, numOfRow = 10; @@ -34,55 +48,37 @@ public class ParameterBindingFullDemo { try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { init(conn); + stmtJsonTag(conn); + stmtAll(conn); - bindInteger(conn); - bindFloat(conn); - bindBoolean(conn); - bindBytes(conn); - bindString(conn); - bindVarbinary(conn); - bindGeometry(conn); - - clean(conn); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw ex; - } catch (Exception ex){ - System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + } catch (Exception ex) { + System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage()); throw ex; } } private static void init(Connection conn) throws SQLException { - clean(conn); try (Statement stmt = conn.createStatement()) { - stmt.execute("create database if not exists test_parabind"); - stmt.execute("use test_parabind"); for (int i = 0; i < schemaList.length; i++) { stmt.execute(schemaList[i]); } } } - private static void clean(Connection conn) throws SQLException { - try (Statement stmt = conn.createStatement()) { - stmt.execute("drop database if exists test_parabind"); - } - } - private static void bindInteger(Connection conn) throws SQLException { - String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)"; + private static void stmtJsonTag(Connection conn) throws SQLException { + String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)"; try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { for (int i = 1; i <= numOfSubTable; i++) { // set table name - pstmt.setTableName("t1_" + i); + pstmt.setTableName("ntb_json_" + i); // set tags - pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); - pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE)))); - pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE)); - pstmt.setTagLong(3, random.nextLong()); + pstmt.setTagJson(0, "{\"device\":\"device_" + i + "\"}"); // set columns ArrayList tsList = new ArrayList<>(); long current = System.currentTimeMillis(); @@ -90,45 +86,42 @@ public class ParameterBindingFullDemo { tsList.add(current + j); pstmt.setTimestamp(0, tsList); - ArrayList f1List = new ArrayList<>(); + ArrayList f1List = new ArrayList<>(); for (int j = 0; j < numOfRow; j++) - f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); - pstmt.setByte(1, f1List); - - ArrayList f2List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE)))); - pstmt.setShort(2, f2List); - - ArrayList f3List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f3List.add(random.nextInt(Integer.MAX_VALUE)); - pstmt.setInt(3, f3List); - - ArrayList f4List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f4List.add(random.nextLong()); - pstmt.setLong(4, f4List); + f1List.add(random.nextInt(Integer.MAX_VALUE)); + pstmt.setInt(1, f1List); // add column pstmt.columnDataAddBatch(); } // execute column pstmt.columnDataExecuteBatch(); + System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json"); } } - private static void bindFloat(Connection conn) throws SQLException { - String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)"; + private static void stmtAll(Connection conn) throws SQLException { + String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)"; TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class); for (int i = 1; i <= numOfSubTable; i++) { // set table name - pstmt.setTableName("t2_" + i); + pstmt.setTableName("ntb" + i); // set tags - pstmt.setTagFloat(0, random.nextFloat()); - pstmt.setTagDouble(1, random.nextDouble()); + pstmt.setTagInt(0, i); + pstmt.setTagDouble(1, 1.1); + pstmt.setTagBoolean(2, true); + pstmt.setTagString(3, "binary_value"); + pstmt.setTagNString(4, "nchar_value"); + pstmt.setTagVarbinary(5, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e}); + pstmt.setTagGeometry(6, new byte[]{ + 0x01, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, + 0x40, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, 0x40}); + // set columns ArrayList tsList = new ArrayList<>(); long current = System.currentTimeMillis(); @@ -136,190 +129,54 @@ public class ParameterBindingFullDemo { tsList.add(current + j); pstmt.setTimestamp(0, tsList); - ArrayList f1List = new ArrayList<>(); + ArrayList f1List = new ArrayList<>(); for (int j = 0; j < numOfRow; j++) - f1List.add(random.nextFloat()); - pstmt.setFloat(1, f1List); + f1List.add(random.nextInt(Integer.MAX_VALUE)); + pstmt.setInt(1, f1List); ArrayList f2List = new ArrayList<>(); for (int j = 0; j < numOfRow; j++) f2List.add(random.nextDouble()); pstmt.setDouble(2, f2List); + ArrayList f3List = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + f3List.add(true); + pstmt.setBoolean(3, f3List); + + ArrayList f4List = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + f4List.add("binary_value"); + pstmt.setString(4, f4List, BINARY_COLUMN_SIZE); + + ArrayList f5List = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + f5List.add("nchar_value"); + pstmt.setNString(5, f5List, BINARY_COLUMN_SIZE); + + ArrayList f6List = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + f6List.add(new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e}); + pstmt.setVarbinary(6, f6List, BINARY_COLUMN_SIZE); + + ArrayList f7List = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + f7List.add(new byte[]{ + 0x01, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, + 0x40, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, 0x40}); + pstmt.setGeometry(7, f7List, BINARY_COLUMN_SIZE); + // add column pstmt.columnDataAddBatch(); } // execute pstmt.columnDataExecuteBatch(); + System.out.println("Successfully inserted rows to example_all_type_stmt.ntb"); // close if no try-with-catch statement is used pstmt.close(); } - - private static void bindBoolean(Connection conn) throws SQLException { - String sql = "insert into ? using stable3 tags(?) values(?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t3_" + i); - // set tags - pstmt.setTagBoolean(0, random.nextBoolean()); - // set columns - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f1List.add(random.nextBoolean()); - pstmt.setBoolean(1, f1List); - - // add column - pstmt.columnDataAddBatch(); - } - // execute - pstmt.columnDataExecuteBatch(); - } - } - - private static void bindBytes(Connection conn) throws SQLException { - String sql = "insert into ? using stable4 tags(?) values(?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t4_" + i); - // set tags - pstmt.setTagString(0, new String("abc")); - - // set columns - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) { - f1List.add(new String("abc")); - } - pstmt.setString(1, f1List, BINARY_COLUMN_SIZE); - - // add column - pstmt.columnDataAddBatch(); - } - // execute - pstmt.columnDataExecuteBatch(); - } - } - - private static void bindString(Connection conn) throws SQLException { - String sql = "insert into ? using stable5 tags(?) values(?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t5_" + i); - // set tags - pstmt.setTagNString(0, "California.SanFrancisco"); - - // set columns - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) { - f1List.add("California.LosAngeles"); - } - pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE); - - // add column - pstmt.columnDataAddBatch(); - } - // execute - pstmt.columnDataExecuteBatch(); - } - } - - private static void bindVarbinary(Connection conn) throws SQLException { - String sql = "insert into ? using stable6 tags(?) values(?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t6_" + i); - // set tags - byte[] bTag = new byte[]{0,2,3,4,5}; - bTag[0] = (byte) i; - pstmt.setTagVarbinary(0, bTag); - - // set columns - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) { - byte[] v = new byte[]{0,2,3,4,5,6}; - v[0] = (byte)j; - f1List.add(v); - } - pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE); - - // add column - pstmt.columnDataAddBatch(); - } - // execute - pstmt.columnDataExecuteBatch(); - } - } - - private static void bindGeometry(Connection conn) throws SQLException { - String sql = "insert into ? using stable7 tags(?) values(?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040"); - byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040"); - List listGeo = new ArrayList<>(); - listGeo.add(g1); - listGeo.add(g2); - - for (int i = 1; i <= 2; i++) { - // set table name - pstmt.setTableName("t7_" + i); - // set tags - pstmt.setTagGeometry(0, listGeo.get(i - 1)); - - // set columns - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) { - f1List.add(listGeo.get(i - 1)); - } - pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE); - - // add column - pstmt.columnDataAddBatch(); - } - // execute - pstmt.columnDataExecuteBatch(); - } - } } // ANCHOR_END: para_bind diff --git a/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java b/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java index ec94f2ded6..f23fb187f4 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/WSParameterBindingFullDemo.java @@ -11,11 +11,30 @@ public class WSParameterBindingFullDemo { private static final Random random = new Random(System.currentTimeMillis()); private static final int BINARY_COLUMN_SIZE = 30; private static final String[] schemaList = { - "create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)", - "create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)", - "create table stable3(ts timestamp, f1 bool) tags(t1 bool)", - "create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))", - "create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))" + "drop database if exists example_all_type_stmt", + "CREATE DATABASE IF NOT EXISTS example_all_type_stmt", + "USE example_all_type_stmt", + "CREATE STABLE IF NOT EXISTS stb_json (" + + "ts TIMESTAMP, " + + "int_col INT) " + + "tags (json_tag json)", + "CREATE STABLE IF NOT EXISTS stb (" + + "ts TIMESTAMP, " + + "int_col INT, " + + "double_col DOUBLE, " + + "bool_col BOOL, " + + "binary_col BINARY(100), " + + "nchar_col NCHAR(100), " + + "varbinary_col VARBINARY(100), " + + "geometry_col GEOMETRY(100)) " + + "tags (" + + "int_tag INT, " + + "double_tag DOUBLE, " + + "bool_tag BOOL, " + + "binary_tag BINARY(100), " + + "nchar_tag NCHAR(100), " + + "varbinary_tag VARBINARY(100), " + + "geometry_tag GEOMETRY(100))" }; private static final int numOfSubTable = 10, numOfRow = 10; @@ -27,153 +46,91 @@ public class WSParameterBindingFullDemo { init(conn); - bindInteger(conn); + stmtJsonTag(conn); - bindFloat(conn); - - bindBoolean(conn); - - bindBytes(conn); - - bindString(conn); + stmtAll(conn); } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); throw ex; - } catch (Exception ex){ - System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); + } catch (Exception ex) { + System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage()); throw ex; } } private static void init(Connection conn) throws SQLException { try (Statement stmt = conn.createStatement()) { - stmt.execute("drop database if exists test_ws_parabind"); - stmt.execute("create database if not exists test_ws_parabind"); - stmt.execute("use test_ws_parabind"); for (int i = 0; i < schemaList.length; i++) { stmt.execute(schemaList[i]); } } } - private static void bindInteger(Connection conn) throws SQLException { - String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)"; + private static void stmtJsonTag(Connection conn) throws SQLException { + String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)"; try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { for (int i = 1; i <= numOfSubTable; i++) { // set table name - pstmt.setTableName("t1_" + i); + pstmt.setTableName("ntb_json_" + i); // set tags - pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); - pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE)))); - pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE)); - pstmt.setTagLong(4, random.nextLong()); + pstmt.setTagJson(1, "{\"device\":\"device_" + i + "\"}"); // set columns long current = System.currentTimeMillis(); for (int j = 0; j < numOfRow; j++) { pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); - pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE)))); - pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE)); - pstmt.setLong(5, random.nextLong()); + pstmt.setInt(2, j); pstmt.addBatch(); } pstmt.executeBatch(); } + System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json"); } } - private static void bindFloat(Connection conn) throws SQLException { - String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)"; - - try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t2_" + i); - // set tags - pstmt.setTagFloat(1, random.nextFloat()); - pstmt.setTagDouble(2, random.nextDouble()); - // set columns - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setFloat(2, random.nextFloat()); - pstmt.setDouble(3, random.nextDouble()); - pstmt.addBatch(); - } - pstmt.executeBatch(); - } - } - } - - private static void bindBoolean(Connection conn) throws SQLException { - String sql = "insert into ? using stable3 tags(?) values(?,?)"; - - try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t3_" + i); - // set tags - pstmt.setTagBoolean(1, random.nextBoolean()); - // set columns - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setBoolean(2, random.nextBoolean()); - pstmt.addBatch(); - } - pstmt.executeBatch(); - } - } - } - - private static void bindBytes(Connection conn) throws SQLException { - String sql = "insert into ? using stable4 tags(?) values(?,?)"; + private static void stmtAll(Connection conn) throws SQLException { + String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)"; try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t4_" + i); - // set tags - pstmt.setTagString(1, new String("abc")); + // set table name + pstmt.setTableName("ntb"); + // set tags + pstmt.setTagInt(1, 1); + pstmt.setTagDouble(2, 1.1); + pstmt.setTagBoolean(3, true); + pstmt.setTagString(4, "binary_value"); + pstmt.setTagNString(5, "nchar_value"); + pstmt.setTagVarbinary(6, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e}); + pstmt.setTagGeometry(7, new byte[]{ + 0x01, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, + 0x40, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, 0x40}); - // set columns - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setString(2, "abc"); - pstmt.addBatch(); - } - pstmt.executeBatch(); - } - } - } + long current = System.currentTimeMillis(); - private static void bindString(Connection conn) throws SQLException { - String sql = "insert into ? using stable5 tags(?) values(?,?)"; - try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("t5_" + i); - // set tags - pstmt.setTagNString(1, "California.SanFrancisco"); - - // set columns - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(0, new Timestamp(current + j)); - pstmt.setNString(1, "California.SanFrancisco"); - pstmt.addBatch(); - } - pstmt.executeBatch(); - } + pstmt.setTimestamp(1, new Timestamp(current)); + pstmt.setInt(2, 1); + pstmt.setDouble(3, 1.1); + pstmt.setBoolean(4, true); + pstmt.setString(5, "binary_value"); + pstmt.setNString(6, "nchar_value"); + pstmt.setVarbinary(7, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e}); + pstmt.setGeometry(8, new byte[]{ + 0x01, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, + 0x40, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x59, 0x40}); + pstmt.addBatch(); + pstmt.executeBatch(); + System.out.println("Successfully inserted rows to example_all_type_stmt.ntb"); } } } diff --git a/docs/examples/java/src/test/java/com/taos/test/TestAll.java b/docs/examples/java/src/test/java/com/taos/test/TestAll.java index 6a45c5fd5b..e014a3b315 100644 --- a/docs/examples/java/src/test/java/com/taos/test/TestAll.java +++ b/docs/examples/java/src/test/java/com/taos/test/TestAll.java @@ -50,36 +50,68 @@ public class TestAll { } @Test - public void testRestInsert() throws SQLException { - dropDB("power"); - RestInsertExample.main(args); - RestQueryExample.main(args); + public void testWsConnect() throws Exception { + WSConnectExample.main(args); } @Test - public void testStmtInsert() throws SQLException { + public void testBase() throws Exception { + JdbcCreatDBDemo.main(args); + JdbcInsertDataDemo.main(args); + JdbcQueryDemo.main(args); + dropDB("power"); - StmtInsertExample.main(args); } @Test - public void testSubscribe() { + public void testWsSchemaless() throws Exception { + dropDB("power"); + SchemalessWsTest.main(args); + } + @Test + public void testJniSchemaless() throws Exception { + dropDB("power"); + SchemalessJniTest.main(args); + } + + @Test + public void testJniStmtBasic() throws Exception { + dropDB("power"); + ParameterBindingBasicDemo.main(args); + } + + @Test + public void testJniStmtFull() throws Exception { + dropDB("power"); + ParameterBindingFullDemo.main(args); + } + + @Test + public void testWsStmtBasic() throws Exception { + dropDB("power"); + WSParameterBindingBasicDemo.main(args); + } + + @Test + public void testWsStmtFull() throws Exception { + dropDB("power"); + WSParameterBindingFullDemo.main(args); + } + + @Test + public void testConsumer() throws Exception { + dropDB("power"); SubscribeDemo.main(args); } - - @Test - public void testSubscribeOverWebsocket() { - WebsocketSubscribeDemo.main(args); - } - - @Test - public void testSchemaless() throws SQLException { - LineProtocolExample.main(args); - TelnetLineProtocolExample.main(args); - // for json protocol, tags may be double type. but for telnet protocol tag must be nchar type. - // To avoid type mismatch, we delete database test. - dropDB("test"); - JSONProtocolExample.main(args); - } +// @Test +// public void testSubscribeJni() throws SQLException, InterruptedException { +// dropDB("power"); +// ConsumerLoopFull.main(args); +// } +// @Test +// public void testSubscribeWs() throws SQLException, InterruptedException { +// dropDB("power"); +// WsConsumerLoopFull.main(args); +// } } diff --git a/docs/examples/rust/nativeexample/Cargo.toml b/docs/examples/rust/nativeexample/Cargo.toml index 13e68d6d9d..041ca4f617 100644 --- a/docs/examples/rust/nativeexample/Cargo.toml +++ b/docs/examples/rust/nativeexample/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" anyhow = "1" chrono = "0.4" serde = { version = "1", features = ["derive"] } +serde_json = "1.0" tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } log = "0.4" pretty_env_logger = "0.5.0" diff --git a/docs/examples/rust/nativeexample/examples/stmt_all.rs b/docs/examples/rust/nativeexample/examples/stmt_all.rs new file mode 100644 index 0000000000..6560d8a0ab --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/stmt_all.rs @@ -0,0 +1,121 @@ +use taos::*; +use taos_query::util::hex::hex_string_to_bytes; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + let taos = TaosBuilder::from_dsn(dsn)?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt") + .await?; + taos.create_database("example_all_type_stmt").await?; + taos.use_database("example_all_type_stmt").await?; + taos.exec( + r#" + CREATE STABLE IF NOT EXISTS stb ( + ts TIMESTAMP, + int_col INT, + double_col DOUBLE, + bool_col BOOL, + binary_col BINARY(100), + nchar_col NCHAR(100), + varbinary_col VARBINARY(100), + geometry_col GEOMETRY(100)) + TAGS ( + int_tag INT, + double_tag DOUBLE, + bool_tag BOOL, + binary_tag BINARY(100), + nchar_tag NCHAR(100)) + "#, + ) + .await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)") + .await?; + + const NUM_TABLES: usize = 10; + const NUM_ROWS: usize = 10; + for i in 0..NUM_TABLES { + let table_name = format!("d_bind_{}", i); + let tags = vec![ + Value::Int(i as i32), + Value::Double(1.1), + Value::Bool(true), + Value::VarChar("binary_value".into()), + Value::NChar("nchar_value".into()), + // Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()), + // Value::Geometry( + // vec![ + // 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40, + // ] + // .into(), + // ), + ]; + + // set table name and tags for the prepared statement. + match stmt.set_tbname_tags(&table_name, &tags).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", + table_name, tags, err + ); + return Err(err.into()); + } + } + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_ints(vec![j as i32]), + ColumnView::from_doubles(vec![1.1]), + ColumnView::from_bools(vec![true]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]), + ColumnView::from_geobytes(vec![hex_string_to_bytes( + "0101000000000000000000F03F0000000000000040", + ) + .to_vec()]), + ]; + // bind values to the prepared statement. + match stmt.bind(&values).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to bind values, values:{:?}, ErrMessage: {}", + values, err + ); + return Err(err.into()); + } + } + } + + match stmt.add_batch().await { + Ok(_) => {} + Err(err) => { + eprintln!("Failed to add batch, ErrMessage: {}", err); + return Err(err.into()); + } + } + } + + // execute. + match stmt.execute().await { + Ok(affected_rows) => println!( + "Successfully inserted {} rows to example_all_type_stmt.stb.", + affected_rows + ), + Err(err) => { + eprintln!( + "Failed to insert to table stb using stmt, ErrMessage: {}", + err + ); + return Err(err.into()); + } + } + + Ok(()) +} diff --git a/docs/examples/rust/nativeexample/examples/stmt_json_tag.rs b/docs/examples/rust/nativeexample/examples/stmt_json_tag.rs new file mode 100644 index 0000000000..7c1b26a0f5 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/stmt_json_tag.rs @@ -0,0 +1,94 @@ +use serde_json::json; +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + let taos = TaosBuilder::from_dsn(dsn)?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt") + .await?; + taos.create_database("example_all_type_stmt").await?; + taos.use_database("example_all_type_stmt").await?; + taos.exec( + r#" +CREATE STABLE IF NOT EXISTS stb_json ( + ts TIMESTAMP, + int_col INT) +TAGS ( + json_tag JSON) +"#, + ) + .await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? using stb_json tags(?) VALUES (?,?)") + .await?; + + const NUM_TABLES: usize = 1; + const NUM_ROWS: usize = 1; + for i in 0..NUM_TABLES { + let table_name = format!("d_bind_{}", i); + let json_value: serde_json::Value = json!({ + "name": "value" + }); + + dbg!(json_value.to_string()); + + let tags = vec![Value::Json(json_value)]; + + // set table name and tags for the prepared statement. + match stmt.set_tbname_tags(&table_name, &tags).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", + table_name, tags, err + ); + return Err(err.into()); + } + } + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_ints(vec![j as i32]), + ]; + // bind values to the prepared statement. + match stmt.bind(&values).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to bind values, values:{:?}, ErrMessage: {}", + values, err + ); + return Err(err.into()); + } + } + } + + match stmt.add_batch().await { + Ok(_) => {} + Err(err) => { + eprintln!("Failed to add batch, ErrMessage: {}", err); + return Err(err.into()); + } + } + } + + // execute. + match stmt.execute().await { + Ok(affected_rows) => println!( + "Successfully inserted {} rows to example_all_type_stmt.stb_json.", + affected_rows + ), + Err(err) => { + eprintln!( + "Failed to insert to table stb_json using stmt, ErrMessage: {}", + err + ); + return Err(err.into()); + } + } + + Ok(()) +} diff --git a/docs/examples/rust/restexample/examples/stmt_all.rs b/docs/examples/rust/restexample/examples/stmt_all.rs new file mode 100644 index 0000000000..07ab658bad --- /dev/null +++ b/docs/examples/rust/restexample/examples/stmt_all.rs @@ -0,0 +1,121 @@ +use taos::*; +use taos_query::util::hex::hex_string_to_bytes; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "ws://"; + let taos = TaosBuilder::from_dsn(dsn)?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt") + .await?; + taos.create_database("example_all_type_stmt").await?; + taos.use_database("example_all_type_stmt").await?; + taos.exec( + r#" + CREATE STABLE IF NOT EXISTS stb ( + ts TIMESTAMP, + int_col INT, + double_col DOUBLE, + bool_col BOOL, + binary_col BINARY(100), + nchar_col NCHAR(100), + varbinary_col VARBINARY(100), + geometry_col GEOMETRY(100)) + TAGS ( + int_tag INT, + double_tag DOUBLE, + bool_tag BOOL, + binary_tag BINARY(100), + nchar_tag NCHAR(100)) + "#, + ) + .await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)") + .await?; + + const NUM_TABLES: usize = 10; + const NUM_ROWS: usize = 10; + for i in 0..NUM_TABLES { + let table_name = format!("d_bind_{}", i); + let tags = vec![ + Value::Int(i as i32), + Value::Double(1.1), + Value::Bool(true), + Value::VarChar("binary_value".into()), + Value::NChar("nchar_value".into()), + // Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()), + // Value::Geometry( + // vec![ + // 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40, + // ] + // .into(), + // ), + ]; + + // set table name and tags for the prepared statement. + match stmt.set_tbname_tags(&table_name, &tags).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}", + table_name, tags, err + ); + return Err(err.into()); + } + } + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_ints(vec![j as i32]), + ColumnView::from_doubles(vec![1.1]), + ColumnView::from_bools(vec![true]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]), + ColumnView::from_geobytes(vec![hex_string_to_bytes( + "0101000000000000000000F03F0000000000000040", + ) + .to_vec()]), + ]; + // bind values to the prepared statement. + match stmt.bind(&values).await { + Ok(_) => {} + Err(err) => { + eprintln!( + "Failed to bind values, values:{:?}, ErrMessage: {}", + values, err + ); + return Err(err.into()); + } + } + } + + match stmt.add_batch().await { + Ok(_) => {} + Err(err) => { + eprintln!("Failed to add batch, ErrMessage: {}", err); + return Err(err.into()); + } + } + } + + // execute. + match stmt.execute().await { + Ok(affected_rows) => println!( + "Successfully inserted {} rows to example_all_type_stmt.stb.", + affected_rows + ), + Err(err) => { + eprintln!( + "Failed to insert to table stb using stmt, ErrMessage: {}", + err + ); + return Err(err.into()); + } + } + + Ok(()) +} diff --git a/docs/zh/14-reference/05-connector/26-rust.mdx b/docs/zh/14-reference/05-connector/26-rust.mdx index 88be297ac6..c5d2a165d4 100644 --- a/docs/zh/14-reference/05-connector/26-rust.mdx +++ b/docs/zh/14-reference/05-connector/26-rust.mdx @@ -80,6 +80,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对 | BINARY | Vec\ | | NCHAR | String | | JSON | serde_json::Value | +| VARBINARY | Bytes | +| GEOMETRY | Bytes | **注意**:JSON 类型仅在 tag 中支持。 diff --git a/docs/zh/20-third-party/01-collection/11-kafka.md b/docs/zh/20-third-party/01-collection/11-kafka.md index d9a416aa40..75adefbc50 100644 --- a/docs/zh/20-third-party/01-collection/11-kafka.md +++ b/docs/zh/20-third-party/01-collection/11-kafka.md @@ -243,6 +243,7 @@ vi source-demo.json "topic.per.stable": true, "topic.ignore.db": false, "out.format": "line", + "data.precision": "ms", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" } @@ -331,14 +332,13 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector 1. 打开 KAFKA_HOME/config/producer.properties 配置文件。 2. 参数说明及配置建议如下: - | **参数** | **参数说明** | **设置建议** | - | --------| --------------------------------- | -------------- | - | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | - | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。|1| - | max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600| - |batch.size| 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288| - | buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824| - + | **参数** | **参数说明** | **设置建议** | + | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ | + | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | + | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。 | 1 | + | max.request.size | 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。 | 104857600 | + | batch.size | 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。 | 524288 | + | buffer.memory | 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。 | 1073741824 | ## 配置参考 @@ -370,7 +370,7 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector 7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为: 1. ms : 表示毫秒 2. us : 表示微秒 - 3. ns : 表示纳秒。默认为纳秒。 + 3. ns : 表示纳秒。 ### TDengine Source Connector 特有的配置 @@ -381,12 +381,16 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector 5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。 7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。 -8. `topic.per.stable`: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 ``;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `` -9. `topic.ignore.db`: topic 命名规则是否包含 database 名称,true 表示规则为 ``,false 表示规则为 ``,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。 -10. `topic.delimiter`: topic 名称分割符,默认为 `-`。 -11. `read.method`: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。 -12. `subscription.group.id`: 指定 TDengine 数据订阅的组 id,当 `read.method` 为 subscription 时,此项为必填项。 -13. `subscription.from`: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。 +8. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为: + 1. ms : 表示毫秒, + 2. us : 表示微秒 + 3. ns : 表示纳秒。 +9. `topic.per.stable`: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 ``;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `` +10. `topic.ignore.db`: topic 命名规则是否包含 database 名称,true 表示规则为 ``,false 表示规则为 ``,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。 +11. `topic.delimiter`: topic 名称分割符,默认为 `-`。 +12. `read.method`: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。 +13. `subscription.group.id`: 指定 TDengine 数据订阅的组 id,当 `read.method` 为 subscription 时,此项为必填项。 +14. `subscription.from`: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。 ## 其他说明