Merge pull request #27965 from taosdata/docs/xftan/TD-32154-3.0

docs: add varbinary and geometry types
This commit is contained in:
wade zhang 2024-09-24 08:26:00 +08:00 committed by GitHub
commit 78f5b3584a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 847 additions and 387 deletions

View File

@ -3,10 +3,7 @@ package com.taos.example;
import com.taosdata.jdbc.TSDBPreparedStatement; import com.taosdata.jdbc.TSDBPreparedStatement;
import com.taosdata.jdbc.utils.StringUtils; import com.taosdata.jdbc.utils.StringUtils;
import java.sql.Connection; import java.sql.*;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
@ -16,15 +13,32 @@ public class ParameterBindingFullDemo {
private static final String host = "127.0.0.1"; private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 50; private static final int BINARY_COLUMN_SIZE = 100;
private static final String[] schemaList = { private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)", "drop database if exists example_all_type_stmt",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)", "CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)", "USE example_all_type_stmt",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))", "CREATE STABLE IF NOT EXISTS stb_json (" +
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))", "ts TIMESTAMP, " +
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))", "int_col INT) " +
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))", "tags (json_tag json)",
"CREATE STABLE IF NOT EXISTS stb (" +
"ts TIMESTAMP, " +
"int_col INT, " +
"double_col DOUBLE, " +
"bool_col BOOL, " +
"binary_col BINARY(100), " +
"nchar_col NCHAR(100), " +
"varbinary_col VARBINARY(100), " +
"geometry_col GEOMETRY(100)) " +
"tags (" +
"int_tag INT, " +
"double_tag DOUBLE, " +
"bool_tag BOOL, " +
"binary_tag BINARY(100), " +
"nchar_tag NCHAR(100), " +
"varbinary_tag VARBINARY(100), " +
"geometry_tag GEOMETRY(100))"
}; };
private static final int numOfSubTable = 10, numOfRow = 10; private static final int numOfSubTable = 10, numOfRow = 10;
@ -34,55 +48,37 @@ public class ParameterBindingFullDemo {
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn); init(conn);
stmtJsonTag(conn);
stmtAll(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex; throw ex;
} catch (Exception ex){ } catch (Exception ex) {
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
throw ex; throw ex;
} }
} }
private static void init(Connection conn) throws SQLException { private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) { for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]); stmt.execute(schemaList[i]);
} }
} }
} }
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException { private static void stmtJsonTag(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)"; String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) { for (int i = 1; i <= numOfSubTable; i++) {
// set table name // set table name
pstmt.setTableName("t1_" + i); pstmt.setTableName("ntb_json_" + i);
// set tags // set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); pstmt.setTagJson(0, "{\"device\":\"device_" + i + "\"}");
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
// set columns // set columns
ArrayList<Long> tsList = new ArrayList<>(); ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
@ -90,45 +86,42 @@ public class ParameterBindingFullDemo {
tsList.add(current + j); tsList.add(current + j);
pstmt.setTimestamp(0, tsList); pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>(); ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); f1List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setByte(1, f1List); pstmt.setInt(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
// add column // add column
pstmt.columnDataAddBatch(); pstmt.columnDataAddBatch();
} }
// execute column // execute column
pstmt.columnDataExecuteBatch(); pstmt.columnDataExecuteBatch();
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
} }
} }
private static void bindFloat(Connection conn) throws SQLException { private static void stmtAll(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)"; String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class); TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) { for (int i = 1; i <= numOfSubTable; i++) {
// set table name // set table name
pstmt.setTableName("t2_" + i); pstmt.setTableName("ntb" + i);
// set tags // set tags
pstmt.setTagFloat(0, random.nextFloat()); pstmt.setTagInt(0, i);
pstmt.setTagDouble(1, random.nextDouble()); pstmt.setTagDouble(1, 1.1);
pstmt.setTagBoolean(2, true);
pstmt.setTagString(3, "binary_value");
pstmt.setTagNString(4, "nchar_value");
pstmt.setTagVarbinary(5, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setTagGeometry(6, new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
// set columns // set columns
ArrayList<Long> tsList = new ArrayList<>(); ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
@ -136,190 +129,54 @@ public class ParameterBindingFullDemo {
tsList.add(current + j); tsList.add(current + j);
pstmt.setTimestamp(0, tsList); pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>(); ArrayList<Integer> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat()); f1List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setFloat(1, f1List); pstmt.setInt(1, f1List);
ArrayList<Double> f2List = new ArrayList<>(); ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble()); f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List); pstmt.setDouble(2, f2List);
ArrayList<Boolean> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(true);
pstmt.setBoolean(3, f3List);
ArrayList<String> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add("binary_value");
pstmt.setString(4, f4List, BINARY_COLUMN_SIZE);
ArrayList<String> f5List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f5List.add("nchar_value");
pstmt.setNString(5, f5List, BINARY_COLUMN_SIZE);
ArrayList<byte[]> f6List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f6List.add(new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setVarbinary(6, f6List, BINARY_COLUMN_SIZE);
ArrayList<byte[]> f7List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f7List.add(new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
pstmt.setGeometry(7, f7List, BINARY_COLUMN_SIZE);
// add column // add column
pstmt.columnDataAddBatch(); pstmt.columnDataAddBatch();
} }
// execute // execute
pstmt.columnDataExecuteBatch(); pstmt.columnDataExecuteBatch();
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
// close if no try-with-catch statement is used // close if no try-with-catch statement is used
pstmt.close(); pstmt.close();
} }
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "California.SanFrancisco");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("California.LosAngeles");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
} }
// ANCHOR_END: para_bind // ANCHOR_END: para_bind

View File

@ -11,11 +11,30 @@ public class WSParameterBindingFullDemo {
private static final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30; private static final int BINARY_COLUMN_SIZE = 30;
private static final String[] schemaList = { private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)", "drop database if exists example_all_type_stmt",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)", "CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)", "USE example_all_type_stmt",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))", "CREATE STABLE IF NOT EXISTS stb_json (" +
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))" "ts TIMESTAMP, " +
"int_col INT) " +
"tags (json_tag json)",
"CREATE STABLE IF NOT EXISTS stb (" +
"ts TIMESTAMP, " +
"int_col INT, " +
"double_col DOUBLE, " +
"bool_col BOOL, " +
"binary_col BINARY(100), " +
"nchar_col NCHAR(100), " +
"varbinary_col VARBINARY(100), " +
"geometry_col GEOMETRY(100)) " +
"tags (" +
"int_tag INT, " +
"double_tag DOUBLE, " +
"bool_tag BOOL, " +
"binary_tag BINARY(100), " +
"nchar_tag NCHAR(100), " +
"varbinary_tag VARBINARY(100), " +
"geometry_tag GEOMETRY(100))"
}; };
private static final int numOfSubTable = 10, numOfRow = 10; private static final int numOfSubTable = 10, numOfRow = 10;
@ -27,153 +46,91 @@ public class WSParameterBindingFullDemo {
init(conn); init(conn);
bindInteger(conn); stmtJsonTag(conn);
bindFloat(conn); stmtAll(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
} catch (SQLException ex) { } catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info // handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw ex; throw ex;
} catch (Exception ex){ } catch (Exception ex) {
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage()); System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
throw ex; throw ex;
} }
} }
private static void init(Connection conn) throws SQLException { private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_ws_parabind");
stmt.execute("create database if not exists test_ws_parabind");
stmt.execute("use test_ws_parabind");
for (int i = 0; i < schemaList.length; i++) { for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]); stmt.execute(schemaList[i]);
} }
} }
} }
private static void bindInteger(Connection conn) throws SQLException { private static void stmtJsonTag(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)"; String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) { for (int i = 1; i <= numOfSubTable; i++) {
// set table name // set table name
pstmt.setTableName("t1_" + i); pstmt.setTableName("ntb_json_" + i);
// set tags // set tags
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); pstmt.setTagJson(1, "{\"device\":\"device_" + i + "\"}");
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(4, random.nextLong());
// set columns // set columns
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) { for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j)); pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE)))); pstmt.setInt(2, j);
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
pstmt.setLong(5, random.nextLong());
pstmt.addBatch(); pstmt.addBatch();
} }
pstmt.executeBatch(); pstmt.executeBatch();
} }
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
} }
} }
private static void bindFloat(Connection conn) throws SQLException { private static void stmtAll(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)"; String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(1, random.nextFloat());
pstmt.setTagDouble(2, random.nextDouble());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat());
pstmt.setDouble(3, random.nextDouble());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(1, random.nextBoolean());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setBoolean(2, random.nextBoolean());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) { // set table name
// set table name pstmt.setTableName("ntb");
pstmt.setTableName("t4_" + i); // set tags
// set tags pstmt.setTagInt(1, 1);
pstmt.setTagString(1, new String("abc")); pstmt.setTagDouble(2, 1.1);
pstmt.setTagBoolean(3, true);
pstmt.setTagString(4, "binary_value");
pstmt.setTagNString(5, "nchar_value");
pstmt.setTagVarbinary(6, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setTagGeometry(7, new byte[]{
0x01, 0x01, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59,
0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x59, 0x40});
// set columns long current = System.currentTimeMillis();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setString(2, "abc");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) { pstmt.setTimestamp(1, new Timestamp(current));
pstmt.setInt(2, 1);
for (int i = 1; i <= numOfSubTable; i++) { pstmt.setDouble(3, 1.1);
// set table name pstmt.setBoolean(4, true);
pstmt.setTableName("t5_" + i); pstmt.setString(5, "binary_value");
// set tags pstmt.setNString(6, "nchar_value");
pstmt.setTagNString(1, "California.SanFrancisco"); pstmt.setVarbinary(7, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
pstmt.setGeometry(8, new byte[]{
// set columns 0x01, 0x01, 0x00, 0x00,
long current = System.currentTimeMillis(); 0x00, 0x00, 0x00, 0x00,
for (int j = 0; j < numOfRow; j++) { 0x00, 0x00, 0x00, 0x59,
pstmt.setTimestamp(0, new Timestamp(current + j)); 0x40, 0x00, 0x00, 0x00,
pstmt.setNString(1, "California.SanFrancisco"); 0x00, 0x00, 0x00, 0x59, 0x40});
pstmt.addBatch(); pstmt.addBatch();
} pstmt.executeBatch();
pstmt.executeBatch(); System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
}
} }
} }
} }

View File

@ -50,36 +50,68 @@ public class TestAll {
} }
@Test @Test
public void testRestInsert() throws SQLException { public void testWsConnect() throws Exception {
dropDB("power"); WSConnectExample.main(args);
RestInsertExample.main(args);
RestQueryExample.main(args);
} }
@Test @Test
public void testStmtInsert() throws SQLException { public void testBase() throws Exception {
JdbcCreatDBDemo.main(args);
JdbcInsertDataDemo.main(args);
JdbcQueryDemo.main(args);
dropDB("power"); dropDB("power");
StmtInsertExample.main(args);
} }
@Test @Test
public void testSubscribe() { public void testWsSchemaless() throws Exception {
dropDB("power");
SchemalessWsTest.main(args);
}
@Test
public void testJniSchemaless() throws Exception {
dropDB("power");
SchemalessJniTest.main(args);
}
@Test
public void testJniStmtBasic() throws Exception {
dropDB("power");
ParameterBindingBasicDemo.main(args);
}
@Test
public void testJniStmtFull() throws Exception {
dropDB("power");
ParameterBindingFullDemo.main(args);
}
@Test
public void testWsStmtBasic() throws Exception {
dropDB("power");
WSParameterBindingBasicDemo.main(args);
}
@Test
public void testWsStmtFull() throws Exception {
dropDB("power");
WSParameterBindingFullDemo.main(args);
}
@Test
public void testConsumer() throws Exception {
dropDB("power");
SubscribeDemo.main(args); SubscribeDemo.main(args);
} }
// @Test
@Test // public void testSubscribeJni() throws SQLException, InterruptedException {
public void testSubscribeOverWebsocket() { // dropDB("power");
WebsocketSubscribeDemo.main(args); // ConsumerLoopFull.main(args);
} // }
// @Test
@Test // public void testSubscribeWs() throws SQLException, InterruptedException {
public void testSchemaless() throws SQLException { // dropDB("power");
LineProtocolExample.main(args); // WsConsumerLoopFull.main(args);
TelnetLineProtocolExample.main(args); // }
// for json protocol, tags may be double type. but for telnet protocol tag must be nchar type.
// To avoid type mismatch, we delete database test.
dropDB("test");
JSONProtocolExample.main(args);
}
} }

View File

@ -0,0 +1,98 @@
const taos = require("@tdengine/websocket");
let dsn = 'ws://localhost:6041';
async function json_tag_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS example_json_tag');
console.log("Create database example_json_tag successfully.");
// create table
await wsSql.exec('create table if not exists example_json_tag.stb (ts timestamp, v int) tags(jt json)');
console.log("Create stable example_json_tag.stb successfully");
let insertQuery = 'INSERT INTO ' +
'example_json_tag.tb1 USING example_json_tag.stb TAGS(\'{"name":"value"}\') ' +
"values(now, 1) ";
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to example_json_tag.stb.");
let sql = 'SELECT ts, v, jt FROM example_json_tag.stb limit 100';
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', v: ' + row[1] + ', jt: ' + row[2]);
}
} catch (err) {
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function all_type_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS all_type_example');
console.log("Create database all_type_example successfully.");
// create table
await wsSql.exec('create table if not exists all_type_example.stb (ts timestamp, ' +
'int_col INT, double_col DOUBLE, bool_col BOOL, binary_col BINARY(100),' +
'nchar_col NCHAR(100), varbinary_col VARBINARY(100), geometry_col GEOMETRY(100)) ' +
'tags(int_tag INT, double_tag DOUBLE, bool_tag BOOL, binary_tag BINARY(100),' +
'nchar_tag NCHAR(100), varbinary_tag VARBINARY(100), geometry_tag GEOMETRY(100));');
console.log("Create stable all_type_example.stb successfully");
let insertQuery = "INSERT INTO all_type_example.tb1 using all_type_example.stb "
+ "tags(1, 1.1, true, 'binary_value', 'nchar_value', '\\x98f46e', 'POINT(100 100)') "
+ "values(now, 1, 1.1, true, 'binary_value', 'nchar_value', '\\x98f46e', 'POINT(100 100)')";
taosResult = await wsSql.exec(insertQuery);
console.log("Successfully inserted " + taosResult.getAffectRows() + " rows to all_type_example.stb.");
let sql = 'SELECT * FROM all_type_example.stb limit 100';
let wsRows = await wsSql.query(sql);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log(row);
}
} catch (err) {
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function test() {
await json_tag_example()
await all_type_example()
taos.destroy();
}
test()

View File

@ -0,0 +1,149 @@
const taos = require("@tdengine/websocket");
let dsn = 'ws://localhost:6041';
async function json_tag_example() {
let wsSql = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS example_json_tag');
console.log("Create database example_json_tag successfully.");
await wsSql.exec('use example_json_tag');
// create table
await wsSql.exec('create table if not exists stb (ts timestamp, v int) tags(jt json)');
console.log("Create stable example_json_tag.stb successfully");
let stmt = await wsSql.stmtInit();
await stmt.prepare("INSERT INTO ? using stb tags(?) VALUES (?,?)");
await stmt.setTableName(`tb1`);
let tagParams = stmt.newStmtParam();
tagParams.setJson(['{"name":"value"}'])
await stmt.setTags(tagParams);
let bindParams = stmt.newStmtParam();
const currentMillis = new Date().getTime();
bindParams.setTimestamp([currentMillis]);
bindParams.setInt([1]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
await stmt.close();
let sql = 'SELECT ts, v, jt FROM example_json_tag.stb limit 100';
wsRows = await wsSql.query(sql);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log('ts: ' + row[0] + ', v: ' + row[1] + ', jt: ' + row[2]);
}
} catch (err) {
console.error(`Failed to create database example_json_tag or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
} finally {
if (wsSql) {
await wsSql.close();
}
}
}
async function all_type_example() {
let wsSql = null;
let stmt = null;
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS all_type_example');
console.log("Create database all_type_example successfully.");
await wsSql.exec('use all_type_example');
// create table
await wsSql.exec('create table if not exists stb (ts timestamp, ' +
'int_col INT, double_col DOUBLE, bool_col BOOL, binary_col BINARY(100),' +
'nchar_col NCHAR(100), varbinary_col VARBINARY(100), geometry_col GEOMETRY(100)) ' +
'tags(int_tag INT, double_tag DOUBLE, bool_tag BOOL, binary_tag BINARY(100),' +
'nchar_tag NCHAR(100), varbinary_tag VARBINARY(100), geometry_tag GEOMETRY(100));');
console.log("Create stable all_type_example.stb successfully");
let geometryData = new Uint8Array([0x01,0x01,0x00,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x59,0x40,0x00,0x00,0x00,0x00,0x00,0x00,0x59,0x40,]).buffer;
const encoder = new TextEncoder();
let vbData = encoder.encode(`Hello, world!`).buffer;
stmt = await wsSql.stmtInit();
await stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)");
await stmt.setTableName(`tb1`);
let tagParams = stmt.newStmtParam();
tagParams.setInt([1]);
tagParams.setDouble([1.1]);
tagParams.setBoolean([true]);
tagParams.setVarchar(["hello"]);
tagParams.setNchar(["stmt"]);
tagParams.setGeometry([geometryData]);
tagParams.setVarBinary([vbData]);
await stmt.setTags(tagParams);
let bindParams = stmt.newStmtParam();
const currentMillis = new Date().getTime();
bindParams.setTimestamp([currentMillis]);
bindParams.setInt([1]);
bindParams.setDouble([1.1]);
bindParams.setBoolean([true]);
bindParams.setVarchar(["hello"]);
bindParams.setNchar(["stmt"]);
bindParams.setGeometry([geometryData]);
bindParams.setVarBinary([vbData]);
await stmt.bind(bindParams);
await stmt.batch();
await stmt.exec();
let sql = 'SELECT * FROM all_type_example.stb limit 100';
let wsRows = await wsSql.query(sql);
let meta = wsRows.getMeta();
console.log("wsRow:meta:=>", meta);
while (await wsRows.next()) {
let row = wsRows.getData();
console.log(row);
}
} catch (err) {
console.error(`Failed to create database all_type_example or stable stb, ErrCode: ${err.code}, ErrMessage: ${err.message}`);
} finally {
if (stmt) {
await stmt.close();
}
if (wsSql) {
await wsSql.close();
}
}
}
async function test() {
taos.setLevel("debug")
await json_tag_example()
await all_type_example()
taos.destroy();
}
test()

View File

@ -24,13 +24,18 @@ async function createConnect() {
async function createDbAndTable() { async function createDbAndTable() {
let wsSql = null; let wsSql = null;
try { try {
wsSql = await createConnect(); let conf = new taos.WSConfig(dsn);
conf.setUser('root');
conf.setPwd('taosdata');
conf.setDb('power');
wsSql = await taos.sqlConnect(conf);
console.log("Connected to " + dsn + " successfully.");
// create database // create database
await wsSql.exec('CREATE DATABASE IF NOT EXISTS power'); await wsSql.exec('CREATE DATABASE IF NOT EXISTS power');
console.log("Create database power successfully."); console.log("Create database power successfully.");
// create table // create table
await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' + await wsSql.exec('CREATE STABLE IF NOT EXISTS power.meters ' +
'(_ts timestamp, current float, voltage int, phase float) ' + '(ts timestamp, current float, voltage int, phase float) ' +
'TAGS (location binary(64), groupId int);'); 'TAGS (location binary(64), groupId int);');
console.log("Create stable power.meters successfully"); console.log("Create stable power.meters successfully");

View File

@ -8,6 +8,7 @@ edition = "2021"
anyhow = "1" anyhow = "1"
chrono = "0.4" chrono = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
log = "0.4" log = "0.4"
pretty_env_logger = "0.5.0" pretty_env_logger = "0.5.0"

View File

@ -0,0 +1,121 @@
use taos::*;
use taos_query::util::hex::hex_string_to_bytes;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb (
ts TIMESTAMP,
int_col INT,
double_col DOUBLE,
bool_col BOOL,
binary_col BINARY(100),
nchar_col NCHAR(100),
varbinary_col VARBINARY(100),
geometry_col GEOMETRY(100))
TAGS (
int_tag INT,
double_tag DOUBLE,
bool_tag BOOL,
binary_tag BINARY(100),
nchar_tag NCHAR(100))
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
.await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![
Value::Int(i as i32),
Value::Double(1.1),
Value::Bool(true),
Value::VarChar("binary_value".into()),
Value::NChar("nchar_value".into()),
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
// Value::Geometry(
// vec![
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// ]
// .into(),
// ),
];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
ColumnView::from_doubles(vec![1.1]),
ColumnView::from_bools(vec![true]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
ColumnView::from_geobytes(vec![hex_string_to_bytes(
"0101000000000000000000F03F0000000000000040",
)
.to_vec()]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -0,0 +1,94 @@
use serde_json::json;
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb_json (
ts TIMESTAMP,
int_col INT)
TAGS (
json_tag JSON)
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb_json tags(?) VALUES (?,?)")
.await?;
const NUM_TABLES: usize = 1;
const NUM_ROWS: usize = 1;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let json_value: serde_json::Value = json!({
"name": "value"
});
dbg!(json_value.to_string());
let tags = vec![Value::Json(json_value)];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb_json.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb_json using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -0,0 +1,121 @@
use taos::*;
use taos_query::util::hex::hex_string_to_bytes;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "ws://";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
.await?;
taos.create_database("example_all_type_stmt").await?;
taos.use_database("example_all_type_stmt").await?;
taos.exec(
r#"
CREATE STABLE IF NOT EXISTS stb (
ts TIMESTAMP,
int_col INT,
double_col DOUBLE,
bool_col BOOL,
binary_col BINARY(100),
nchar_col NCHAR(100),
varbinary_col VARBINARY(100),
geometry_col GEOMETRY(100))
TAGS (
int_tag INT,
double_tag DOUBLE,
bool_tag BOOL,
binary_tag BINARY(100),
nchar_tag NCHAR(100))
"#,
)
.await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
.await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d_bind_{}", i);
let tags = vec![
Value::Int(i as i32),
Value::Double(1.1),
Value::Bool(true),
Value::VarChar("binary_value".into()),
Value::NChar("nchar_value".into()),
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
// Value::Geometry(
// vec![
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
// ]
// .into(),
// ),
];
// set table name and tags for the prepared statement.
match stmt.set_tbname_tags(&table_name, &tags).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
table_name, tags, err
);
return Err(err.into());
}
}
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_ints(vec![j as i32]),
ColumnView::from_doubles(vec![1.1]),
ColumnView::from_bools(vec![true]),
ColumnView::from_varchar(vec!["ABC"]),
ColumnView::from_nchar(vec!["涛思数据"]),
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
ColumnView::from_geobytes(vec![hex_string_to_bytes(
"0101000000000000000000F03F0000000000000040",
)
.to_vec()]),
];
// bind values to the prepared statement.
match stmt.bind(&values).await {
Ok(_) => {}
Err(err) => {
eprintln!(
"Failed to bind values, values:{:?}, ErrMessage: {}",
values, err
);
return Err(err.into());
}
}
}
match stmt.add_batch().await {
Ok(_) => {}
Err(err) => {
eprintln!("Failed to add batch, ErrMessage: {}", err);
return Err(err.into());
}
}
}
// execute.
match stmt.execute().await {
Ok(affected_rows) => println!(
"Successfully inserted {} rows to example_all_type_stmt.stb.",
affected_rows
),
Err(err) => {
eprintln!(
"Failed to insert to table stb using stmt, ErrMessage: {}",
err
);
return Err(err.into());
}
}
Ok(())
}

View File

@ -63,10 +63,15 @@ TDengine 其他功能模块的报错,请参考 [错误码](../../../reference/
| BINARY | string | | BINARY | string |
| NCHAR | string | | NCHAR | string |
| JSON | []byte | | JSON | []byte |
| GEOMETRY | []byte |
| VARBINARY | []byte |
**注意**JSON 类型仅在 tag 中支持。 **注意**JSON 类型仅在 tag 中支持。
GEOMETRY类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 [数据类型](../../taos-sql/data-type/#数据类型)
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
## 示例程序汇总 ## 示例程序汇总
示例程序源码请参考:[示例程序](https://github.com/taosdata/driver-go/tree/main/examples) 示例程序源码请参考:[示例程序](https://github.com/taosdata/driver-go/tree/main/examples)
## 常见问题 ## 常见问题

View File

@ -80,6 +80,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对
| BINARY | Vec\<u8> | | BINARY | Vec\<u8> |
| NCHAR | String | | NCHAR | String |
| JSON | serde_json::Value | | JSON | serde_json::Value |
| VARBINARY | Bytes |
| GEOMETRY | Bytes |
**注意**JSON 类型仅在 tag 中支持。 **注意**JSON 类型仅在 tag 中支持。

View File

@ -103,7 +103,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对
|BINARY|str| |BINARY|str|
|NCHAR|str| |NCHAR|str|
|JSON|str| |JSON|str|
|GEOMETRY|bytearray|
|VARBINARY|bytearray|
## 示例程序汇总 ## 示例程序汇总
| 示例程序链接 | 示例程序内容 | | 示例程序链接 | 示例程序内容 |
@ -113,6 +114,13 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB 行协议写入 | | [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB 行协议写入 |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | 使用 JSON 类型的标签 | | [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | 使用 JSON 类型的标签 |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | tmq 订阅 | | [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | tmq 订阅 |
| [native_all_type_query.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/native_all_type_query.py) | 支持全部类型示例 |
| [native_all_type_stmt.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/native_all_type_stmt.py) | 参数绑定支持全部类型示例 |
示例程序源码请参考:
1. [原生更多示例程序](https://github.com/taosdata/taos-connector-python/tree/main/examples)
2. [WebSocket 更多示例程序](https://github.com/taosdata/taos-connector-python/tree/main/taos-ws-py/examples)
## 关于纳秒 (nanosecond) ## 关于纳秒 (nanosecond)

View File

@ -88,6 +88,8 @@ Node.js 连接器目前仅支持 Websocket 连接器, 其通过 taosAdapter
| [telnet_line_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/telnet_line_example.js) | OpenTSDB Telnet 行协议写入示例。 | | [telnet_line_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/telnet_line_example.js) | OpenTSDB Telnet 行协议写入示例。 |
| [json_line_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/json_line_example.js) | OpenTSDB JSON 行协议写入示例。 | | [json_line_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/json_line_example.js) | OpenTSDB JSON 行协议写入示例。 |
| [tmq_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/tmq_example.js) | 订阅的使用示例。 | | [tmq_example](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/tmq_example.js) | 订阅的使用示例。 |
| [all_type_query](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/all_type_query.js) | 支持全部类型示例。 |
| [all_type_stmt](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/all_type_stmt.js) | 参数绑定支持全部类型示例。 |
## 使用限制 ## 使用限制

View File

@ -67,9 +67,13 @@ TDengine 其他功能模块的报错,请参考 [错误码](../../../reference/
| VARBINARY | byte[] | | VARBINARY | byte[] |
| GEOMETRY | byte[] | | GEOMETRY | byte[] |
:::note **注意**JSON 类型仅在 tag 中支持。
JSON 类型仅在 tag 中支持。 GEOMETRY类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 [数据类型](../../taos-sql/data-type/#数据类型)
::: WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
## 示例程序汇总
示例程序源码请参考:[示例程序](https://github.com/taosdata/taos-connector-dotnet/tree/3.0/examples)
## API 参考 ## API 参考

View File

@ -243,6 +243,7 @@ vi source-demo.json
"topic.per.stable": true, "topic.per.stable": true,
"topic.ignore.db": false, "topic.ignore.db": false,
"out.format": "line", "out.format": "line",
"data.precision": "ms",
"key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter" "value.converter": "org.apache.kafka.connect.storage.StringConverter"
} }
@ -331,14 +332,13 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
1. 打开 KAFKA_HOME/config/producer.properties 配置文件。 1. 打开 KAFKA_HOME/config/producer.properties 配置文件。
2. 参数说明及配置建议如下: 2. 参数说明及配置建议如下:
| **参数** | **参数说明** | **设置建议** | | **参数** | **参数说明** | **设置建议** |
| --------| --------------------------------- | -------------- | | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
| producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async | | producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
| request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。|1| | request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。 | 1 |
| max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600| | max.request.size | 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。 | 104857600 |
|batch.size| 此参数用于设定 batch 的大小,默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288| | batch.size | 此参数用于设定 batch 的大小,默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。 | 524288 |
| buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824| | buffer.memory | 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。 | 1073741824 |
## 配置参考 ## 配置参考
@ -370,7 +370,7 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为: 7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
1. ms 表示毫秒 1. ms 表示毫秒
2. us 表示微秒 2. us 表示微秒
3. ns 表示纳秒。默认为纳秒。 3. ns 表示纳秒。
### TDengine Source Connector 特有的配置 ### TDengine Source Connector 特有的配置
@ -381,12 +381,16 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。 5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0即获取到当前最新时间的所有数据。 6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0即获取到当前最新时间的所有数据。
7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。 7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。
8. `topic.per.stable`: 如果设置为 true表示一个超级表对应一个 Kafka topictopic的命名规则 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`;如果设置为 false则指定的 DB 中的所有数据进入一个 Kafka topictopic 的命名规则为 `<topic.prefix><topic.delimiter><connection.database>` 8. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
9. `topic.ignore.db`: topic 命名规则是否包含 database 名称true 表示规则为 `<topic.prefix><topic.delimiter><stable.name>`false 表示规则为 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。 1. ms 表示毫秒,
10. `topic.delimiter`: topic 名称分割符,默认为 `-` 2. us 表示微秒
11. `read.method`: 从 TDengine 读取数据方式query 或是 subscription。默认为 subscription。 3. ns 表示纳秒。
12. `subscription.group.id`: 指定 TDengine 数据订阅的组 id`read.method` 为 subscription 时,此项为必填项。 9. `topic.per.stable`: 如果设置为 true表示一个超级表对应一个 Kafka topictopic的命名规则 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`;如果设置为 false则指定的 DB 中的所有数据进入一个 Kafka topictopic 的命名规则为 `<topic.prefix><topic.delimiter><connection.database>`
13. `subscription.from`: 指定 TDengine 数据订阅起始位置latest 或是 earliest。默认为 latest。 10. `topic.ignore.db`: topic 命名规则是否包含 database 名称true 表示规则为 `<topic.prefix><topic.delimiter><stable.name>`false 表示规则为 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。
11. `topic.delimiter`: topic 名称分割符,默认为 `-`
12. `read.method`: 从 TDengine 读取数据方式query 或是 subscription。默认为 subscription。
13. `subscription.group.id`: 指定 TDengine 数据订阅的组 id`read.method` 为 subscription 时,此项为必填项。
14. `subscription.from`: 指定 TDengine 数据订阅起始位置latest 或是 earliest。默认为 latest。
## 其他说明 ## 其他说明