merge
This commit is contained in:
parent
d03124360e
commit
4acfdfe403
|
@ -3,10 +3,7 @@ package com.taos.example;
|
||||||
import com.taosdata.jdbc.TSDBPreparedStatement;
|
import com.taosdata.jdbc.TSDBPreparedStatement;
|
||||||
import com.taosdata.jdbc.utils.StringUtils;
|
import com.taosdata.jdbc.utils.StringUtils;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.*;
|
||||||
import java.sql.DriverManager;
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.sql.Statement;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -16,15 +13,32 @@ public class ParameterBindingFullDemo {
|
||||||
|
|
||||||
private static final String host = "127.0.0.1";
|
private static final String host = "127.0.0.1";
|
||||||
private static final Random random = new Random(System.currentTimeMillis());
|
private static final Random random = new Random(System.currentTimeMillis());
|
||||||
private static final int BINARY_COLUMN_SIZE = 50;
|
private static final int BINARY_COLUMN_SIZE = 100;
|
||||||
private static final String[] schemaList = {
|
private static final String[] schemaList = {
|
||||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
"drop database if exists example_all_type_stmt",
|
||||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
"CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
|
||||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
"USE example_all_type_stmt",
|
||||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
"CREATE STABLE IF NOT EXISTS stb_json (" +
|
||||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
|
"ts TIMESTAMP, " +
|
||||||
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
|
"int_col INT) " +
|
||||||
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
|
"tags (json_tag json)",
|
||||||
|
"CREATE STABLE IF NOT EXISTS stb (" +
|
||||||
|
"ts TIMESTAMP, " +
|
||||||
|
"int_col INT, " +
|
||||||
|
"double_col DOUBLE, " +
|
||||||
|
"bool_col BOOL, " +
|
||||||
|
"binary_col BINARY(100), " +
|
||||||
|
"nchar_col NCHAR(100), " +
|
||||||
|
"varbinary_col VARBINARY(100), " +
|
||||||
|
"geometry_col GEOMETRY(100)) " +
|
||||||
|
"tags (" +
|
||||||
|
"int_tag INT, " +
|
||||||
|
"double_tag DOUBLE, " +
|
||||||
|
"bool_tag BOOL, " +
|
||||||
|
"binary_tag BINARY(100), " +
|
||||||
|
"nchar_tag NCHAR(100), " +
|
||||||
|
"varbinary_tag VARBINARY(100), " +
|
||||||
|
"geometry_tag GEOMETRY(100))"
|
||||||
};
|
};
|
||||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||||
|
|
||||||
|
@ -34,55 +48,37 @@ public class ParameterBindingFullDemo {
|
||||||
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
|
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
|
||||||
|
|
||||||
init(conn);
|
init(conn);
|
||||||
|
stmtJsonTag(conn);
|
||||||
|
stmtAll(conn);
|
||||||
|
|
||||||
bindInteger(conn);
|
|
||||||
bindFloat(conn);
|
|
||||||
bindBoolean(conn);
|
|
||||||
bindBytes(conn);
|
|
||||||
bindString(conn);
|
|
||||||
bindVarbinary(conn);
|
|
||||||
bindGeometry(conn);
|
|
||||||
|
|
||||||
clean(conn);
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (Exception ex){
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void init(Connection conn) throws SQLException {
|
private static void init(Connection conn) throws SQLException {
|
||||||
clean(conn);
|
|
||||||
try (Statement stmt = conn.createStatement()) {
|
try (Statement stmt = conn.createStatement()) {
|
||||||
stmt.execute("create database if not exists test_parabind");
|
|
||||||
stmt.execute("use test_parabind");
|
|
||||||
for (int i = 0; i < schemaList.length; i++) {
|
for (int i = 0; i < schemaList.length; i++) {
|
||||||
stmt.execute(schemaList[i]);
|
stmt.execute(schemaList[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private static void clean(Connection conn) throws SQLException {
|
|
||||||
try (Statement stmt = conn.createStatement()) {
|
|
||||||
stmt.execute("drop database if exists test_parabind");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindInteger(Connection conn) throws SQLException {
|
private static void stmtJsonTag(Connection conn) throws SQLException {
|
||||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
for (int i = 1; i <= numOfSubTable; i++) {
|
||||||
// set table name
|
// set table name
|
||||||
pstmt.setTableName("t1_" + i);
|
pstmt.setTableName("ntb_json_" + i);
|
||||||
// set tags
|
// set tags
|
||||||
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
pstmt.setTagJson(0, "{\"device\":\"device_" + i + "\"}");
|
||||||
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
|
||||||
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
|
|
||||||
pstmt.setTagLong(3, random.nextLong());
|
|
||||||
// set columns
|
// set columns
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
ArrayList<Long> tsList = new ArrayList<>();
|
||||||
long current = System.currentTimeMillis();
|
long current = System.currentTimeMillis();
|
||||||
|
@ -90,45 +86,42 @@ public class ParameterBindingFullDemo {
|
||||||
tsList.add(current + j);
|
tsList.add(current + j);
|
||||||
pstmt.setTimestamp(0, tsList);
|
pstmt.setTimestamp(0, tsList);
|
||||||
|
|
||||||
ArrayList<Byte> f1List = new ArrayList<>();
|
ArrayList<Integer> f1List = new ArrayList<>();
|
||||||
for (int j = 0; j < numOfRow; j++)
|
for (int j = 0; j < numOfRow; j++)
|
||||||
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
f1List.add(random.nextInt(Integer.MAX_VALUE));
|
||||||
pstmt.setByte(1, f1List);
|
pstmt.setInt(1, f1List);
|
||||||
|
|
||||||
ArrayList<Short> f2List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
|
||||||
pstmt.setShort(2, f2List);
|
|
||||||
|
|
||||||
ArrayList<Integer> f3List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
f3List.add(random.nextInt(Integer.MAX_VALUE));
|
|
||||||
pstmt.setInt(3, f3List);
|
|
||||||
|
|
||||||
ArrayList<Long> f4List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
f4List.add(random.nextLong());
|
|
||||||
pstmt.setLong(4, f4List);
|
|
||||||
|
|
||||||
// add column
|
// add column
|
||||||
pstmt.columnDataAddBatch();
|
pstmt.columnDataAddBatch();
|
||||||
}
|
}
|
||||||
// execute column
|
// execute column
|
||||||
pstmt.columnDataExecuteBatch();
|
pstmt.columnDataExecuteBatch();
|
||||||
|
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void bindFloat(Connection conn) throws SQLException {
|
private static void stmtAll(Connection conn) throws SQLException {
|
||||||
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
|
String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
|
||||||
|
|
||||||
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
|
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
for (int i = 1; i <= numOfSubTable; i++) {
|
||||||
// set table name
|
// set table name
|
||||||
pstmt.setTableName("t2_" + i);
|
pstmt.setTableName("ntb" + i);
|
||||||
// set tags
|
// set tags
|
||||||
pstmt.setTagFloat(0, random.nextFloat());
|
pstmt.setTagInt(0, i);
|
||||||
pstmt.setTagDouble(1, random.nextDouble());
|
pstmt.setTagDouble(1, 1.1);
|
||||||
|
pstmt.setTagBoolean(2, true);
|
||||||
|
pstmt.setTagString(3, "binary_value");
|
||||||
|
pstmt.setTagNString(4, "nchar_value");
|
||||||
|
pstmt.setTagVarbinary(5, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
|
||||||
|
pstmt.setTagGeometry(6, new byte[]{
|
||||||
|
0x01, 0x01, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59,
|
||||||
|
0x40, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59, 0x40});
|
||||||
|
|
||||||
// set columns
|
// set columns
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
ArrayList<Long> tsList = new ArrayList<>();
|
||||||
long current = System.currentTimeMillis();
|
long current = System.currentTimeMillis();
|
||||||
|
@ -136,190 +129,54 @@ public class ParameterBindingFullDemo {
|
||||||
tsList.add(current + j);
|
tsList.add(current + j);
|
||||||
pstmt.setTimestamp(0, tsList);
|
pstmt.setTimestamp(0, tsList);
|
||||||
|
|
||||||
ArrayList<Float> f1List = new ArrayList<>();
|
ArrayList<Integer> f1List = new ArrayList<>();
|
||||||
for (int j = 0; j < numOfRow; j++)
|
for (int j = 0; j < numOfRow; j++)
|
||||||
f1List.add(random.nextFloat());
|
f1List.add(random.nextInt(Integer.MAX_VALUE));
|
||||||
pstmt.setFloat(1, f1List);
|
pstmt.setInt(1, f1List);
|
||||||
|
|
||||||
ArrayList<Double> f2List = new ArrayList<>();
|
ArrayList<Double> f2List = new ArrayList<>();
|
||||||
for (int j = 0; j < numOfRow; j++)
|
for (int j = 0; j < numOfRow; j++)
|
||||||
f2List.add(random.nextDouble());
|
f2List.add(random.nextDouble());
|
||||||
pstmt.setDouble(2, f2List);
|
pstmt.setDouble(2, f2List);
|
||||||
|
|
||||||
|
ArrayList<Boolean> f3List = new ArrayList<>();
|
||||||
|
for (int j = 0; j < numOfRow; j++)
|
||||||
|
f3List.add(true);
|
||||||
|
pstmt.setBoolean(3, f3List);
|
||||||
|
|
||||||
|
ArrayList<String> f4List = new ArrayList<>();
|
||||||
|
for (int j = 0; j < numOfRow; j++)
|
||||||
|
f4List.add("binary_value");
|
||||||
|
pstmt.setString(4, f4List, BINARY_COLUMN_SIZE);
|
||||||
|
|
||||||
|
ArrayList<String> f5List = new ArrayList<>();
|
||||||
|
for (int j = 0; j < numOfRow; j++)
|
||||||
|
f5List.add("nchar_value");
|
||||||
|
pstmt.setNString(5, f5List, BINARY_COLUMN_SIZE);
|
||||||
|
|
||||||
|
ArrayList<byte[]> f6List = new ArrayList<>();
|
||||||
|
for (int j = 0; j < numOfRow; j++)
|
||||||
|
f6List.add(new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
|
||||||
|
pstmt.setVarbinary(6, f6List, BINARY_COLUMN_SIZE);
|
||||||
|
|
||||||
|
ArrayList<byte[]> f7List = new ArrayList<>();
|
||||||
|
for (int j = 0; j < numOfRow; j++)
|
||||||
|
f7List.add(new byte[]{
|
||||||
|
0x01, 0x01, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59,
|
||||||
|
0x40, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59, 0x40});
|
||||||
|
pstmt.setGeometry(7, f7List, BINARY_COLUMN_SIZE);
|
||||||
|
|
||||||
// add column
|
// add column
|
||||||
pstmt.columnDataAddBatch();
|
pstmt.columnDataAddBatch();
|
||||||
}
|
}
|
||||||
// execute
|
// execute
|
||||||
pstmt.columnDataExecuteBatch();
|
pstmt.columnDataExecuteBatch();
|
||||||
|
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
|
||||||
// close if no try-with-catch statement is used
|
// close if no try-with-catch statement is used
|
||||||
pstmt.close();
|
pstmt.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void bindBoolean(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable3 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t3_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagBoolean(0, random.nextBoolean());
|
|
||||||
// set columns
|
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
tsList.add(current + j);
|
|
||||||
pstmt.setTimestamp(0, tsList);
|
|
||||||
|
|
||||||
ArrayList<Boolean> f1List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
f1List.add(random.nextBoolean());
|
|
||||||
pstmt.setBoolean(1, f1List);
|
|
||||||
|
|
||||||
// add column
|
|
||||||
pstmt.columnDataAddBatch();
|
|
||||||
}
|
|
||||||
// execute
|
|
||||||
pstmt.columnDataExecuteBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindBytes(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable4 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t4_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagString(0, new String("abc"));
|
|
||||||
|
|
||||||
// set columns
|
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
tsList.add(current + j);
|
|
||||||
pstmt.setTimestamp(0, tsList);
|
|
||||||
|
|
||||||
ArrayList<String> f1List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
f1List.add(new String("abc"));
|
|
||||||
}
|
|
||||||
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
|
|
||||||
|
|
||||||
// add column
|
|
||||||
pstmt.columnDataAddBatch();
|
|
||||||
}
|
|
||||||
// execute
|
|
||||||
pstmt.columnDataExecuteBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindString(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable5 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t5_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagNString(0, "California.SanFrancisco");
|
|
||||||
|
|
||||||
// set columns
|
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
tsList.add(current + j);
|
|
||||||
pstmt.setTimestamp(0, tsList);
|
|
||||||
|
|
||||||
ArrayList<String> f1List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
f1List.add("California.LosAngeles");
|
|
||||||
}
|
|
||||||
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
|
|
||||||
|
|
||||||
// add column
|
|
||||||
pstmt.columnDataAddBatch();
|
|
||||||
}
|
|
||||||
// execute
|
|
||||||
pstmt.columnDataExecuteBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindVarbinary(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable6 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t6_" + i);
|
|
||||||
// set tags
|
|
||||||
byte[] bTag = new byte[]{0,2,3,4,5};
|
|
||||||
bTag[0] = (byte) i;
|
|
||||||
pstmt.setTagVarbinary(0, bTag);
|
|
||||||
|
|
||||||
// set columns
|
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
tsList.add(current + j);
|
|
||||||
pstmt.setTimestamp(0, tsList);
|
|
||||||
|
|
||||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
byte[] v = new byte[]{0,2,3,4,5,6};
|
|
||||||
v[0] = (byte)j;
|
|
||||||
f1List.add(v);
|
|
||||||
}
|
|
||||||
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
|
|
||||||
|
|
||||||
// add column
|
|
||||||
pstmt.columnDataAddBatch();
|
|
||||||
}
|
|
||||||
// execute
|
|
||||||
pstmt.columnDataExecuteBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindGeometry(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable7 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
|
||||||
|
|
||||||
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
|
|
||||||
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
|
|
||||||
List<byte[]> listGeo = new ArrayList<>();
|
|
||||||
listGeo.add(g1);
|
|
||||||
listGeo.add(g2);
|
|
||||||
|
|
||||||
for (int i = 1; i <= 2; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t7_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagGeometry(0, listGeo.get(i - 1));
|
|
||||||
|
|
||||||
// set columns
|
|
||||||
ArrayList<Long> tsList = new ArrayList<>();
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++)
|
|
||||||
tsList.add(current + j);
|
|
||||||
pstmt.setTimestamp(0, tsList);
|
|
||||||
|
|
||||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
f1List.add(listGeo.get(i - 1));
|
|
||||||
}
|
|
||||||
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
|
|
||||||
|
|
||||||
// add column
|
|
||||||
pstmt.columnDataAddBatch();
|
|
||||||
}
|
|
||||||
// execute
|
|
||||||
pstmt.columnDataExecuteBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// ANCHOR_END: para_bind
|
// ANCHOR_END: para_bind
|
||||||
|
|
|
@ -11,11 +11,30 @@ public class WSParameterBindingFullDemo {
|
||||||
private static final Random random = new Random(System.currentTimeMillis());
|
private static final Random random = new Random(System.currentTimeMillis());
|
||||||
private static final int BINARY_COLUMN_SIZE = 30;
|
private static final int BINARY_COLUMN_SIZE = 30;
|
||||||
private static final String[] schemaList = {
|
private static final String[] schemaList = {
|
||||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
"drop database if exists example_all_type_stmt",
|
||||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
"CREATE DATABASE IF NOT EXISTS example_all_type_stmt",
|
||||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
"USE example_all_type_stmt",
|
||||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
"CREATE STABLE IF NOT EXISTS stb_json (" +
|
||||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
|
"ts TIMESTAMP, " +
|
||||||
|
"int_col INT) " +
|
||||||
|
"tags (json_tag json)",
|
||||||
|
"CREATE STABLE IF NOT EXISTS stb (" +
|
||||||
|
"ts TIMESTAMP, " +
|
||||||
|
"int_col INT, " +
|
||||||
|
"double_col DOUBLE, " +
|
||||||
|
"bool_col BOOL, " +
|
||||||
|
"binary_col BINARY(100), " +
|
||||||
|
"nchar_col NCHAR(100), " +
|
||||||
|
"varbinary_col VARBINARY(100), " +
|
||||||
|
"geometry_col GEOMETRY(100)) " +
|
||||||
|
"tags (" +
|
||||||
|
"int_tag INT, " +
|
||||||
|
"double_tag DOUBLE, " +
|
||||||
|
"bool_tag BOOL, " +
|
||||||
|
"binary_tag BINARY(100), " +
|
||||||
|
"nchar_tag NCHAR(100), " +
|
||||||
|
"varbinary_tag VARBINARY(100), " +
|
||||||
|
"geometry_tag GEOMETRY(100))"
|
||||||
};
|
};
|
||||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||||
|
|
||||||
|
@ -27,153 +46,91 @@ public class WSParameterBindingFullDemo {
|
||||||
|
|
||||||
init(conn);
|
init(conn);
|
||||||
|
|
||||||
bindInteger(conn);
|
stmtJsonTag(conn);
|
||||||
|
|
||||||
bindFloat(conn);
|
stmtAll(conn);
|
||||||
|
|
||||||
bindBoolean(conn);
|
|
||||||
|
|
||||||
bindBytes(conn);
|
|
||||||
|
|
||||||
bindString(conn);
|
|
||||||
|
|
||||||
} catch (SQLException ex) {
|
} catch (SQLException ex) {
|
||||||
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
|
||||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to insert data using stmt, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (Exception ex){
|
} catch (Exception ex) {
|
||||||
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrMessage: " + ex.getMessage());
|
System.out.println("Failed to insert data using stmt, ErrMessage: " + ex.getMessage());
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void init(Connection conn) throws SQLException {
|
private static void init(Connection conn) throws SQLException {
|
||||||
try (Statement stmt = conn.createStatement()) {
|
try (Statement stmt = conn.createStatement()) {
|
||||||
stmt.execute("drop database if exists test_ws_parabind");
|
|
||||||
stmt.execute("create database if not exists test_ws_parabind");
|
|
||||||
stmt.execute("use test_ws_parabind");
|
|
||||||
for (int i = 0; i < schemaList.length; i++) {
|
for (int i = 0; i < schemaList.length; i++) {
|
||||||
stmt.execute(schemaList[i]);
|
stmt.execute(schemaList[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void bindInteger(Connection conn) throws SQLException {
|
private static void stmtJsonTag(Connection conn) throws SQLException {
|
||||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
String sql = "INSERT INTO ? using stb_json tags(?) VALUES (?,?)";
|
||||||
|
|
||||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
for (int i = 1; i <= numOfSubTable; i++) {
|
||||||
// set table name
|
// set table name
|
||||||
pstmt.setTableName("t1_" + i);
|
pstmt.setTableName("ntb_json_" + i);
|
||||||
// set tags
|
// set tags
|
||||||
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
pstmt.setTagJson(1, "{\"device\":\"device_" + i + "\"}");
|
||||||
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
|
||||||
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
|
|
||||||
pstmt.setTagLong(4, random.nextLong());
|
|
||||||
// set columns
|
// set columns
|
||||||
long current = System.currentTimeMillis();
|
long current = System.currentTimeMillis();
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
for (int j = 0; j < numOfRow; j++) {
|
||||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
pstmt.setTimestamp(1, new Timestamp(current + j));
|
||||||
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
pstmt.setInt(2, j);
|
||||||
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
|
||||||
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
|
|
||||||
pstmt.setLong(5, random.nextLong());
|
|
||||||
pstmt.addBatch();
|
pstmt.addBatch();
|
||||||
}
|
}
|
||||||
pstmt.executeBatch();
|
pstmt.executeBatch();
|
||||||
}
|
}
|
||||||
|
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb_json");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void bindFloat(Connection conn) throws SQLException {
|
private static void stmtAll(Connection conn) throws SQLException {
|
||||||
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
|
String sql = "INSERT INTO ? using stb tags(?,?,?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)";
|
||||||
|
|
||||||
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t2_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagFloat(1, random.nextFloat());
|
|
||||||
pstmt.setTagDouble(2, random.nextDouble());
|
|
||||||
// set columns
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
|
||||||
pstmt.setFloat(2, random.nextFloat());
|
|
||||||
pstmt.setDouble(3, random.nextDouble());
|
|
||||||
pstmt.addBatch();
|
|
||||||
}
|
|
||||||
pstmt.executeBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindBoolean(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable3 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
|
||||||
// set table name
|
|
||||||
pstmt.setTableName("t3_" + i);
|
|
||||||
// set tags
|
|
||||||
pstmt.setTagBoolean(1, random.nextBoolean());
|
|
||||||
// set columns
|
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
|
||||||
pstmt.setBoolean(2, random.nextBoolean());
|
|
||||||
pstmt.addBatch();
|
|
||||||
}
|
|
||||||
pstmt.executeBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindBytes(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable4 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||||
|
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
// set table name
|
||||||
// set table name
|
pstmt.setTableName("ntb");
|
||||||
pstmt.setTableName("t4_" + i);
|
// set tags
|
||||||
// set tags
|
pstmt.setTagInt(1, 1);
|
||||||
pstmt.setTagString(1, new String("abc"));
|
pstmt.setTagDouble(2, 1.1);
|
||||||
|
pstmt.setTagBoolean(3, true);
|
||||||
|
pstmt.setTagString(4, "binary_value");
|
||||||
|
pstmt.setTagNString(5, "nchar_value");
|
||||||
|
pstmt.setTagVarbinary(6, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
|
||||||
|
pstmt.setTagGeometry(7, new byte[]{
|
||||||
|
0x01, 0x01, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59,
|
||||||
|
0x40, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x59, 0x40});
|
||||||
|
|
||||||
// set columns
|
long current = System.currentTimeMillis();
|
||||||
long current = System.currentTimeMillis();
|
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
|
||||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
|
||||||
pstmt.setString(2, "abc");
|
|
||||||
pstmt.addBatch();
|
|
||||||
}
|
|
||||||
pstmt.executeBatch();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void bindString(Connection conn) throws SQLException {
|
|
||||||
String sql = "insert into ? using stable5 tags(?) values(?,?)";
|
|
||||||
|
|
||||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
pstmt.setTimestamp(1, new Timestamp(current));
|
||||||
|
pstmt.setInt(2, 1);
|
||||||
for (int i = 1; i <= numOfSubTable; i++) {
|
pstmt.setDouble(3, 1.1);
|
||||||
// set table name
|
pstmt.setBoolean(4, true);
|
||||||
pstmt.setTableName("t5_" + i);
|
pstmt.setString(5, "binary_value");
|
||||||
// set tags
|
pstmt.setNString(6, "nchar_value");
|
||||||
pstmt.setTagNString(1, "California.SanFrancisco");
|
pstmt.setVarbinary(7, new byte[]{(byte) 0x98, (byte) 0xf4, 0x6e});
|
||||||
|
pstmt.setGeometry(8, new byte[]{
|
||||||
// set columns
|
0x01, 0x01, 0x00, 0x00,
|
||||||
long current = System.currentTimeMillis();
|
0x00, 0x00, 0x00, 0x00,
|
||||||
for (int j = 0; j < numOfRow; j++) {
|
0x00, 0x00, 0x00, 0x59,
|
||||||
pstmt.setTimestamp(0, new Timestamp(current + j));
|
0x40, 0x00, 0x00, 0x00,
|
||||||
pstmt.setNString(1, "California.SanFrancisco");
|
0x00, 0x00, 0x00, 0x59, 0x40});
|
||||||
pstmt.addBatch();
|
pstmt.addBatch();
|
||||||
}
|
pstmt.executeBatch();
|
||||||
pstmt.executeBatch();
|
System.out.println("Successfully inserted rows to example_all_type_stmt.ntb");
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,36 +50,68 @@ public class TestAll {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRestInsert() throws SQLException {
|
public void testWsConnect() throws Exception {
|
||||||
dropDB("power");
|
WSConnectExample.main(args);
|
||||||
RestInsertExample.main(args);
|
|
||||||
RestQueryExample.main(args);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStmtInsert() throws SQLException {
|
public void testBase() throws Exception {
|
||||||
|
JdbcCreatDBDemo.main(args);
|
||||||
|
JdbcInsertDataDemo.main(args);
|
||||||
|
JdbcQueryDemo.main(args);
|
||||||
|
|
||||||
dropDB("power");
|
dropDB("power");
|
||||||
StmtInsertExample.main(args);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSubscribe() {
|
public void testWsSchemaless() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
SchemalessWsTest.main(args);
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testJniSchemaless() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
SchemalessJniTest.main(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJniStmtBasic() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
ParameterBindingBasicDemo.main(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJniStmtFull() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
ParameterBindingFullDemo.main(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWsStmtBasic() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
WSParameterBindingBasicDemo.main(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWsStmtFull() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
|
WSParameterBindingFullDemo.main(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumer() throws Exception {
|
||||||
|
dropDB("power");
|
||||||
SubscribeDemo.main(args);
|
SubscribeDemo.main(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// @Test
|
||||||
@Test
|
// public void testSubscribeJni() throws SQLException, InterruptedException {
|
||||||
public void testSubscribeOverWebsocket() {
|
// dropDB("power");
|
||||||
WebsocketSubscribeDemo.main(args);
|
// ConsumerLoopFull.main(args);
|
||||||
}
|
// }
|
||||||
|
// @Test
|
||||||
@Test
|
// public void testSubscribeWs() throws SQLException, InterruptedException {
|
||||||
public void testSchemaless() throws SQLException {
|
// dropDB("power");
|
||||||
LineProtocolExample.main(args);
|
// WsConsumerLoopFull.main(args);
|
||||||
TelnetLineProtocolExample.main(args);
|
// }
|
||||||
// for json protocol, tags may be double type. but for telnet protocol tag must be nchar type.
|
|
||||||
// To avoid type mismatch, we delete database test.
|
|
||||||
dropDB("test");
|
|
||||||
JSONProtocolExample.main(args);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ edition = "2021"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
|
tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread"] }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pretty_env_logger = "0.5.0"
|
pretty_env_logger = "0.5.0"
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
use taos::*;
|
||||||
|
use taos_query::util::hex::hex_string_to_bytes;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "taos://";
|
||||||
|
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
|
||||||
|
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
|
||||||
|
.await?;
|
||||||
|
taos.create_database("example_all_type_stmt").await?;
|
||||||
|
taos.use_database("example_all_type_stmt").await?;
|
||||||
|
taos.exec(
|
||||||
|
r#"
|
||||||
|
CREATE STABLE IF NOT EXISTS stb (
|
||||||
|
ts TIMESTAMP,
|
||||||
|
int_col INT,
|
||||||
|
double_col DOUBLE,
|
||||||
|
bool_col BOOL,
|
||||||
|
binary_col BINARY(100),
|
||||||
|
nchar_col NCHAR(100),
|
||||||
|
varbinary_col VARBINARY(100),
|
||||||
|
geometry_col GEOMETRY(100))
|
||||||
|
TAGS (
|
||||||
|
int_tag INT,
|
||||||
|
double_tag DOUBLE,
|
||||||
|
bool_tag BOOL,
|
||||||
|
binary_tag BINARY(100),
|
||||||
|
nchar_tag NCHAR(100))
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut stmt = Stmt::init(&taos).await?;
|
||||||
|
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
const NUM_TABLES: usize = 10;
|
||||||
|
const NUM_ROWS: usize = 10;
|
||||||
|
for i in 0..NUM_TABLES {
|
||||||
|
let table_name = format!("d_bind_{}", i);
|
||||||
|
let tags = vec![
|
||||||
|
Value::Int(i as i32),
|
||||||
|
Value::Double(1.1),
|
||||||
|
Value::Bool(true),
|
||||||
|
Value::VarChar("binary_value".into()),
|
||||||
|
Value::NChar("nchar_value".into()),
|
||||||
|
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
|
||||||
|
// Value::Geometry(
|
||||||
|
// vec![
|
||||||
|
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
|
||||||
|
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
|
||||||
|
// ]
|
||||||
|
// .into(),
|
||||||
|
// ),
|
||||||
|
];
|
||||||
|
|
||||||
|
// set table name and tags for the prepared statement.
|
||||||
|
match stmt.set_tbname_tags(&table_name, &tags).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
|
||||||
|
table_name, tags, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for j in 0..NUM_ROWS {
|
||||||
|
let values = vec![
|
||||||
|
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
|
||||||
|
ColumnView::from_ints(vec![j as i32]),
|
||||||
|
ColumnView::from_doubles(vec![1.1]),
|
||||||
|
ColumnView::from_bools(vec![true]),
|
||||||
|
ColumnView::from_varchar(vec!["ABC"]),
|
||||||
|
ColumnView::from_nchar(vec!["涛思数据"]),
|
||||||
|
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
|
||||||
|
ColumnView::from_geobytes(vec![hex_string_to_bytes(
|
||||||
|
"0101000000000000000000F03F0000000000000040",
|
||||||
|
)
|
||||||
|
.to_vec()]),
|
||||||
|
];
|
||||||
|
// bind values to the prepared statement.
|
||||||
|
match stmt.bind(&values).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to bind values, values:{:?}, ErrMessage: {}",
|
||||||
|
values, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match stmt.add_batch().await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to add batch, ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute.
|
||||||
|
match stmt.execute().await {
|
||||||
|
Ok(affected_rows) => println!(
|
||||||
|
"Successfully inserted {} rows to example_all_type_stmt.stb.",
|
||||||
|
affected_rows
|
||||||
|
),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to insert to table stb using stmt, ErrMessage: {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,94 @@
|
||||||
|
use serde_json::json;
|
||||||
|
use taos::*;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "taos://";
|
||||||
|
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
|
||||||
|
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
|
||||||
|
.await?;
|
||||||
|
taos.create_database("example_all_type_stmt").await?;
|
||||||
|
taos.use_database("example_all_type_stmt").await?;
|
||||||
|
taos.exec(
|
||||||
|
r#"
|
||||||
|
CREATE STABLE IF NOT EXISTS stb_json (
|
||||||
|
ts TIMESTAMP,
|
||||||
|
int_col INT)
|
||||||
|
TAGS (
|
||||||
|
json_tag JSON)
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut stmt = Stmt::init(&taos).await?;
|
||||||
|
stmt.prepare("INSERT INTO ? using stb_json tags(?) VALUES (?,?)")
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
const NUM_TABLES: usize = 1;
|
||||||
|
const NUM_ROWS: usize = 1;
|
||||||
|
for i in 0..NUM_TABLES {
|
||||||
|
let table_name = format!("d_bind_{}", i);
|
||||||
|
let json_value: serde_json::Value = json!({
|
||||||
|
"name": "value"
|
||||||
|
});
|
||||||
|
|
||||||
|
dbg!(json_value.to_string());
|
||||||
|
|
||||||
|
let tags = vec![Value::Json(json_value)];
|
||||||
|
|
||||||
|
// set table name and tags for the prepared statement.
|
||||||
|
match stmt.set_tbname_tags(&table_name, &tags).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
|
||||||
|
table_name, tags, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for j in 0..NUM_ROWS {
|
||||||
|
let values = vec![
|
||||||
|
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
|
||||||
|
ColumnView::from_ints(vec![j as i32]),
|
||||||
|
];
|
||||||
|
// bind values to the prepared statement.
|
||||||
|
match stmt.bind(&values).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to bind values, values:{:?}, ErrMessage: {}",
|
||||||
|
values, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match stmt.add_batch().await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to add batch, ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute.
|
||||||
|
match stmt.execute().await {
|
||||||
|
Ok(affected_rows) => println!(
|
||||||
|
"Successfully inserted {} rows to example_all_type_stmt.stb_json.",
|
||||||
|
affected_rows
|
||||||
|
),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to insert to table stb_json using stmt, ErrMessage: {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,121 @@
|
||||||
|
use taos::*;
|
||||||
|
use taos_query::util::hex::hex_string_to_bytes;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let dsn = "ws://";
|
||||||
|
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||||
|
|
||||||
|
taos.exec("DROP DATABASE IF EXISTS example_all_type_stmt")
|
||||||
|
.await?;
|
||||||
|
taos.create_database("example_all_type_stmt").await?;
|
||||||
|
taos.use_database("example_all_type_stmt").await?;
|
||||||
|
taos.exec(
|
||||||
|
r#"
|
||||||
|
CREATE STABLE IF NOT EXISTS stb (
|
||||||
|
ts TIMESTAMP,
|
||||||
|
int_col INT,
|
||||||
|
double_col DOUBLE,
|
||||||
|
bool_col BOOL,
|
||||||
|
binary_col BINARY(100),
|
||||||
|
nchar_col NCHAR(100),
|
||||||
|
varbinary_col VARBINARY(100),
|
||||||
|
geometry_col GEOMETRY(100))
|
||||||
|
TAGS (
|
||||||
|
int_tag INT,
|
||||||
|
double_tag DOUBLE,
|
||||||
|
bool_tag BOOL,
|
||||||
|
binary_tag BINARY(100),
|
||||||
|
nchar_tag NCHAR(100))
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut stmt = Stmt::init(&taos).await?;
|
||||||
|
stmt.prepare("INSERT INTO ? using stb tags(?,?,?,?,?) VALUES (?,?,?,?,?,?,?,?)")
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
const NUM_TABLES: usize = 10;
|
||||||
|
const NUM_ROWS: usize = 10;
|
||||||
|
for i in 0..NUM_TABLES {
|
||||||
|
let table_name = format!("d_bind_{}", i);
|
||||||
|
let tags = vec![
|
||||||
|
Value::Int(i as i32),
|
||||||
|
Value::Double(1.1),
|
||||||
|
Value::Bool(true),
|
||||||
|
Value::VarChar("binary_value".into()),
|
||||||
|
Value::NChar("nchar_value".into()),
|
||||||
|
// Value::VarBinary(vec![0x98, 0xf4, 0x6e].into()),
|
||||||
|
// Value::Geometry(
|
||||||
|
// vec![
|
||||||
|
// 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
|
||||||
|
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x59, 0x40,
|
||||||
|
// ]
|
||||||
|
// .into(),
|
||||||
|
// ),
|
||||||
|
];
|
||||||
|
|
||||||
|
// set table name and tags for the prepared statement.
|
||||||
|
match stmt.set_tbname_tags(&table_name, &tags).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to set table name and tags, table_name:{}, tags:{:?}, ErrMessage: {}",
|
||||||
|
table_name, tags, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for j in 0..NUM_ROWS {
|
||||||
|
let values = vec![
|
||||||
|
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
|
||||||
|
ColumnView::from_ints(vec![j as i32]),
|
||||||
|
ColumnView::from_doubles(vec![1.1]),
|
||||||
|
ColumnView::from_bools(vec![true]),
|
||||||
|
ColumnView::from_varchar(vec!["ABC"]),
|
||||||
|
ColumnView::from_nchar(vec!["涛思数据"]),
|
||||||
|
ColumnView::from_bytes(vec![hex_string_to_bytes("123456").to_vec()]),
|
||||||
|
ColumnView::from_geobytes(vec![hex_string_to_bytes(
|
||||||
|
"0101000000000000000000F03F0000000000000040",
|
||||||
|
)
|
||||||
|
.to_vec()]),
|
||||||
|
];
|
||||||
|
// bind values to the prepared statement.
|
||||||
|
match stmt.bind(&values).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to bind values, values:{:?}, ErrMessage: {}",
|
||||||
|
values, err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match stmt.add_batch().await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Failed to add batch, ErrMessage: {}", err);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute.
|
||||||
|
match stmt.execute().await {
|
||||||
|
Ok(affected_rows) => println!(
|
||||||
|
"Successfully inserted {} rows to example_all_type_stmt.stb.",
|
||||||
|
affected_rows
|
||||||
|
),
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!(
|
||||||
|
"Failed to insert to table stb using stmt, ErrMessage: {}",
|
||||||
|
err
|
||||||
|
);
|
||||||
|
return Err(err.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -80,6 +80,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Rust 对
|
||||||
| BINARY | Vec\<u8> |
|
| BINARY | Vec\<u8> |
|
||||||
| NCHAR | String |
|
| NCHAR | String |
|
||||||
| JSON | serde_json::Value |
|
| JSON | serde_json::Value |
|
||||||
|
| VARBINARY | Bytes |
|
||||||
|
| GEOMETRY | Bytes |
|
||||||
|
|
||||||
**注意**:JSON 类型仅在 tag 中支持。
|
**注意**:JSON 类型仅在 tag 中支持。
|
||||||
|
|
||||||
|
|
|
@ -243,6 +243,7 @@ vi source-demo.json
|
||||||
"topic.per.stable": true,
|
"topic.per.stable": true,
|
||||||
"topic.ignore.db": false,
|
"topic.ignore.db": false,
|
||||||
"out.format": "line",
|
"out.format": "line",
|
||||||
|
"data.precision": "ms",
|
||||||
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
|
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
|
||||||
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
|
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
|
||||||
}
|
}
|
||||||
|
@ -331,14 +332,13 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
|
||||||
|
|
||||||
1. 打开 KAFKA_HOME/config/producer.properties 配置文件。
|
1. 打开 KAFKA_HOME/config/producer.properties 配置文件。
|
||||||
2. 参数说明及配置建议如下:
|
2. 参数说明及配置建议如下:
|
||||||
| **参数** | **参数说明** | **设置建议** |
|
| **参数** | **参数说明** | **设置建议** |
|
||||||
| --------| --------------------------------- | -------------- |
|
| --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
|
||||||
| producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
|
| producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
|
||||||
| request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。|1|
|
| request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时,表示只要领导者副本成功写入消息就会给生产者发送确认,而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性,同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功,所以可以减少生产者的等待时间,提高发送消息的效率。 | 1 |
|
||||||
| max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600|
|
| max.request.size | 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576,也就是 1M。如果设置得太小,可能会导致频繁的网络请求,降低吞吐量。如果设置得太大,可能会导致内存占用过高,或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。 | 104857600 |
|
||||||
|batch.size| 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288|
|
| batch.size | 此参数用于设定 batch 的大小,默认值为 16384,即 16KB。在消息发送过程中,发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。 | 524288 |
|
||||||
| buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824|
|
| buffer.memory | 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。 | 1073741824 |
|
||||||
|
|
||||||
|
|
||||||
## 配置参考
|
## 配置参考
|
||||||
|
|
||||||
|
@ -370,7 +370,7 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
|
||||||
7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
|
7. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
|
||||||
1. ms : 表示毫秒
|
1. ms : 表示毫秒
|
||||||
2. us : 表示微秒
|
2. us : 表示微秒
|
||||||
3. ns : 表示纳秒。默认为纳秒。
|
3. ns : 表示纳秒。
|
||||||
|
|
||||||
### TDengine Source Connector 特有的配置
|
### TDengine Source Connector 特有的配置
|
||||||
|
|
||||||
|
@ -381,12 +381,16 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
|
||||||
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
|
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
|
||||||
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。
|
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。
|
||||||
7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。
|
7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。
|
||||||
8. `topic.per.stable`: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix><topic.delimiter><connection.database>`
|
8. `data.precision`: 使用 InfluxDB 行协议格式时,时间戳的精度。可选值为:
|
||||||
9. `topic.ignore.db`: topic 命名规则是否包含 database 名称,true 表示规则为 `<topic.prefix><topic.delimiter><stable.name>`,false 表示规则为 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。
|
1. ms : 表示毫秒,
|
||||||
10. `topic.delimiter`: topic 名称分割符,默认为 `-`。
|
2. us : 表示微秒
|
||||||
11. `read.method`: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。
|
3. ns : 表示纳秒。
|
||||||
12. `subscription.group.id`: 指定 TDengine 数据订阅的组 id,当 `read.method` 为 subscription 时,此项为必填项。
|
9. `topic.per.stable`: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix><topic.delimiter><connection.database>`
|
||||||
13. `subscription.from`: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。
|
10. `topic.ignore.db`: topic 命名规则是否包含 database 名称,true 表示规则为 `<topic.prefix><topic.delimiter><stable.name>`,false 表示规则为 `<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>`,默认 false。此配置项在 `topic.per.stable` 设置为 false 时不生效。
|
||||||
|
11. `topic.delimiter`: topic 名称分割符,默认为 `-`。
|
||||||
|
12. `read.method`: 从 TDengine 读取数据方式,query 或是 subscription。默认为 subscription。
|
||||||
|
13. `subscription.group.id`: 指定 TDengine 数据订阅的组 id,当 `read.method` 为 subscription 时,此项为必填项。
|
||||||
|
14. `subscription.from`: 指定 TDengine 数据订阅起始位置,latest 或是 earliest。默认为 latest。
|
||||||
|
|
||||||
## 其他说明
|
## 其他说明
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue