support geometry data type

This commit is contained in:
sheyanjie 2023-09-19 11:20:06 +08:00
parent d12539be1f
commit faca8f2ea9
9 changed files with 982 additions and 96 deletions

View File

@ -142,8 +142,15 @@ TDengine currently supports timestamp, number, character, Boolean type, and the
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
**Note**: Only TAG supports JSON types
Due to historical reasons, the BINARY type data in TDengine is not truly binary data and is no longer recommended for use. Please use VARBINARY type instead.
GEOMETRY type is binary data in little endian byte order, which complies with the WKB specification. For detailed information, please refer to [Data Type] (/tao-sql/data-type/#Data Types)
For WKB specifications, please refer to [Well Known Binary (WKB)] https://libgeos.org/specifications/wkb/
For Java connectors, the jts library can be used to easily create GEOMETRY type objects, serialize them, and write them to TDengine. Here is an example [Geometry example]https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java
## Installation Steps
@ -456,13 +463,15 @@ public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 20;
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -474,21 +483,20 @@ public class ParameterBindingDemo {
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
@ -496,6 +504,11 @@ public class ParameterBindingDemo {
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
@ -674,10 +687,84 @@ public class ParameterBindingDemo {
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
```
**Note**: both setString and setNString require the user to declare the width of the corresponding column in the size parameter of the table definition
**Note**: both String and byte[] require the user to declare the width of the corresponding column in the size parameter of the table definition
The methods to set VALUES columns:
@ -692,6 +779,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
```
</TabItem>
@ -880,6 +969,9 @@ public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
```
### Schemaless Writing

View File

@ -22,7 +22,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.4</version>
<version>3.2.7-SNAPSHOT</version>
</dependency>
<!-- ANCHOR_END: dep-->
<dependency>

View File

@ -142,8 +142,14 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
**注意**JSON 类型仅在 tag 中支持。
**注意**JSON 类型仅在 tag 中支持。
由于历史原因TDengine中的BINARY底层不是真正的二进制数据已不建议使用。请用VARBINARY类型代替。
GEOMETRY类型是little endian字节序的二进制数据符合WKB规范。详细信息请参考 [数据类型](/taos-sql/data-type/#数据类型)
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
对于java连接器可以使用jts库来方便的创建GEOMETRY类型对象序列化后写入TDengine这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
## 安装步骤
@ -459,13 +465,15 @@ public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30;
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
@ -477,21 +485,20 @@ public class ParameterBindingDemo {
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
@ -499,6 +506,11 @@ public class ParameterBindingDemo {
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
@ -677,10 +689,84 @@ public class ParameterBindingDemo {
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}
```
**注**setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
**注**字符串和数组类型都要求用户在 size 参数里声明表定义中对应列的列宽
用于设定 VALUES 数据列的取值的方法总共有:
@ -695,6 +781,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
```
</TabItem>
@ -883,6 +971,9 @@ public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
```
### 无模式写入

View File

@ -11,13 +11,20 @@
<properties>
<project.assembly.dir>src/main/resources/assembly</project.assembly.dir>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
<version>3.2.7</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.19.0</version>
</dependency>
</dependencies>
@ -68,12 +75,12 @@
</execution>
<execution>
<id>SubscribeDemo</id>
<id>GeometryDemo</id>
<configuration>
<finalName>SubscribeDemo</finalName>
<finalName>GeometryDemo</finalName>
<archive>
<manifest>
<mainClass>com.taosdata.example.SubscribeDemo</mainClass>
<mainClass>com.taosdata.example.GeometryDemo</mainClass>
</manifest>
</archive>
<descriptorRefs>

View File

@ -0,0 +1,94 @@
package com.taosdata.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class ConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public ConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("auto.offset.reset", "earliest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
config.setProperty("experimental.snapshot.enable", "true");
this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
process(bean);
}
}
consumer.unsubscribe();
} finally {
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
public static class ResultBean {
private Timestamp ts;
private int speed;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public int getSpeed() {
return speed;
}
public void setSpeed(int speed) {
this.speed = speed;
}
}
}

View File

@ -0,0 +1,190 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.locationtech.jts.geom.*;
import org.locationtech.jts.io.ByteOrderValues;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKBWriter;
import java.sql.*;
import java.util.ArrayList;
import java.util.Properties;
public class GeometryDemo {
private static String host = "localhost";
private static final String dbName = "test";
private static final String tbName = "weather";
private static final String user = "root";
private static final String password = "taosdata";
private Connection connection;
public static void main(String[] args) throws SQLException {
for (int i = 0; i < args.length; i++) {
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1)
host = args[++i];
}
if (host == null) {
printHelp();
}
GeometryDemo demo = new GeometryDemo();
demo.init();
demo.createDatabase();
demo.useDatabase();
demo.dropTable();
demo.createTable();
demo.insert();
demo.stmtInsert();
demo.select();
demo.dropTable();
demo.close();
}
private void init() {
final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
// get connection
try {
Properties properties = new Properties();
properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
connection = DriverManager.getConnection(url, properties);
if (connection != null)
System.out.println("[ OK ] Connection established.");
} catch (SQLException e) {
e.printStackTrace();
}
}
private void createDatabase() {
String sql = "create database if not exists " + dbName;
execute(sql);
}
private void useDatabase() {
String sql = "use " + dbName;
execute(sql);
}
private void dropTable() {
final String sql = "drop table if exists " + dbName + "." + tbName + "";
execute(sql);
}
private void createTable() {
final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int, location geometry(50))";
execute(sql);
}
private void insert() {
final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity, location) values(now, 20.5, 34, 'POINT(1 2)')";
execute(sql);
}
private void stmtInsert() throws SQLException {
TSDBPreparedStatement preparedStatement = (TSDBPreparedStatement) connection.prepareStatement("insert into " + dbName + "." + tbName + " values (?, ?, ?, ?)");
long current = System.currentTimeMillis();
ArrayList<Long> tsList = new ArrayList<>();
tsList.add(current);
tsList.add(current + 1);
preparedStatement.setTimestamp(0, tsList);
ArrayList<Float> tempList = new ArrayList<>();
tempList.add(20.1F);
tempList.add(21.2F);
preparedStatement.setFloat(1, tempList);
ArrayList<Integer> humList = new ArrayList<>();
humList.add(30);
humList.add(31);
preparedStatement.setInt(2, humList);
ArrayList<byte[]> list = new ArrayList<>();
GeometryFactory gf = new GeometryFactory();
Point p1 = gf.createPoint(new Coordinate(1,2));
p1.setSRID(1234);
// NOTE: TDengine current version only support 2D dimension and little endian byte order
WKBWriter w = new WKBWriter(2, ByteOrderValues.LITTLE_ENDIAN, true);
byte[] wkb = w.write(p1);
list.add(wkb);
Coordinate[] coordinates = { new Coordinate(10, 20),
new Coordinate(30, 40)};
LineString lineString = gf.createLineString(coordinates);
lineString.setSRID(2345);
byte[] wkb2 = w.write(lineString);
list.add(wkb2);
preparedStatement.setGeometry(3, list, 50);
preparedStatement.columnDataAddBatch();
preparedStatement.columnDataExecuteBatch();
}
private void select() {
final String sql = "select * from " + dbName + "." + tbName;
executeQuery(sql);
}
private void close() {
try {
if (connection != null) {
this.connection.close();
System.out.println("connection closed.");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
private void executeQuery(String sql) {
long start = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
while (resultSet.next()){
byte[] result1 = resultSet.getBytes(4);
WKBReader reader = new WKBReader();
Geometry g1 = reader.read(result1);
System.out.println("GEO OBJ: " + g1 + ", SRID: " + g1.getSRID());
}
} catch (SQLException e) {
long end = System.currentTimeMillis();
printSql(sql, false, (end - start));
e.printStackTrace();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
private void printSql(String sql, boolean succeed, long cost) {
System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
}
private void execute(String sql) {
long start = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) {
boolean execute = statement.execute(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
} catch (SQLException e) {
long end = System.currentTimeMillis();
printSql(sql, false, (end - start));
e.printStackTrace();
}
}
private static void printHelp() {
System.out.println("Usage: java -jar JDBCDemo.jar -host <hostname>");
System.exit(0);
}
}

View File

@ -0,0 +1,316 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import com.taosdata.jdbc.utils.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class ParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 50;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
bindVarbinary(conn);
bindGeometry(conn);
clean(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
clean(conn);
try (Statement stmt = conn.createStatement()) {
stmt.execute("create database if not exists test_parabind");
stmt.execute("use test_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void clean(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_parabind");
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(3, random.nextLong());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Byte> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setByte(1, f1List);
ArrayList<Short> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setShort(2, f2List);
ArrayList<Integer> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextInt(Integer.MAX_VALUE));
pstmt.setInt(3, f3List);
ArrayList<Long> f4List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f4List.add(random.nextLong());
pstmt.setLong(4, f4List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(0, random.nextFloat());
pstmt.setTagDouble(1, random.nextDouble());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Float> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat());
pstmt.setFloat(1, f1List);
ArrayList<Double> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextDouble());
pstmt.setDouble(2, f2List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
// close if no try-with-catch statement is used
pstmt.close();
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(0, random.nextBoolean());
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<Boolean> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextBoolean());
pstmt.setBoolean(1, f1List);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(0, new String("abc"));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(new String("abc"));
}
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(0, "California.SanFrancisco");
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<String> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add("California.LosAngeles");
}
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindVarbinary(Connection conn) throws SQLException {
String sql = "insert into ? using stable6 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t6_" + i);
// set tags
byte[] bTag = new byte[]{0,2,3,4,5};
bTag[0] = (byte) i;
pstmt.setTagVarbinary(0, bTag);
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
byte[] v = new byte[]{0,2,3,4,5,6};
v[0] = (byte)j;
f1List.add(v);
}
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
private static void bindGeometry(Connection conn) throws SQLException {
String sql = "insert into ? using stable7 tags(?) values(?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
List<byte[]> listGeo = new ArrayList<>();
listGeo.add(g1);
listGeo.add(g2);
for (int i = 1; i <= 2; i++) {
// set table name
pstmt.setTableName("t7_" + i);
// set tags
pstmt.setTagGeometry(0, listGeo.get(i - 1));
// set columns
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
ArrayList<byte[]> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++) {
f1List.add(listGeo.get(i - 1));
}
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
// add column
pstmt.columnDataAddBatch();
}
// execute
pstmt.columnDataExecuteBatch();
}
}
}

View File

@ -1,74 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class SubscribeDemo {
private static final String usage = "java -jar SubscribeDemo.jar -host <hostname> -database <database name> -topic <topic> -sql <sql>";
public static void main(String[] args) {
// parse args from command line
String host = "", database = "", topic = "", sql = "";
for (int i = 0; i < args.length; i++) {
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) {
host = args[++i];
}
if ("-database".equalsIgnoreCase(args[i]) && i < args.length - 1) {
database = args[++i];
}
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
topic = args[++i];
}
if ("-sql".equalsIgnoreCase(args[i]) && i < args.length - 1) {
sql = args[++i];
}
}
if (host.isEmpty() || database.isEmpty() || topic.isEmpty() || sql.isEmpty()) {
System.out.println(usage);
return;
}
try {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
final String url = "jdbc:TAOS://" + host + ":6030/" + database + "?user=root&password=taosdata";
// get TSDBConnection
TSDBConnection connection = (TSDBConnection) DriverManager.getConnection(url, properties);
// create TSDBSubscribe
TSDBSubscribe sub = connection.subscribe(topic, sql, false);
int total = 0;
while (true) {
TSDBResultSet rs = sub.consume();
int count = 0;
ResultSetMetaData meta = rs.getMetaData();
while (rs.next()) {
for (int i = 1; i <= meta.getColumnCount(); i++) {
System.out.print(meta.getColumnLabel(i) + ": " + rs.getString(i) + "\t");
}
System.out.println();
count++;
}
total += count;
// System.out.printf("%d rows consumed, total %d\n", count, total);
if (total >= 10)
break;
TimeUnit.SECONDS.sleep(1);
}
sub.close(false);
connection.close();
} catch (Exception e) {
System.out.println("host: " + host + ", database: " + database + ", topic: " + topic + ", sql: " + sql);
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,170 @@
package com.taosdata.example;
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
import java.sql.*;
import java.util.Random;
public class WSParameterBindingDemo {
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int BINARY_COLUMN_SIZE = 30;
private static final String[] schemaList = {
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
};
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
bindInteger(conn);
bindFloat(conn);
bindBoolean(conn);
bindBytes(conn);
bindString(conn);
conn.close();
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists test_ws_parabind");
stmt.execute("create database if not exists test_ws_parabind");
stmt.execute("use test_ws_parabind");
for (int i = 0; i < schemaList.length; i++) {
stmt.execute(schemaList[i]);
}
}
}
private static void bindInteger(Connection conn) throws SQLException {
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t1_" + i);
// set tags
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
pstmt.setTagLong(4, random.nextLong());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
pstmt.setLong(5, random.nextLong());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindFloat(Connection conn) throws SQLException {
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t2_" + i);
// set tags
pstmt.setTagFloat(1, random.nextFloat());
pstmt.setTagDouble(2, random.nextDouble());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat());
pstmt.setDouble(3, random.nextDouble());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBoolean(Connection conn) throws SQLException {
String sql = "insert into ? using stable3 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t3_" + i);
// set tags
pstmt.setTagBoolean(1, random.nextBoolean());
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setBoolean(2, random.nextBoolean());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindBytes(Connection conn) throws SQLException {
String sql = "insert into ? using stable4 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t4_" + i);
// set tags
pstmt.setTagString(1, new String("abc"));
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setString(2, "abc");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
private static void bindString(Connection conn) throws SQLException {
String sql = "insert into ? using stable5 tags(?) values(?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("t5_" + i);
// set tags
pstmt.setTagNString(1, "California.SanFrancisco");
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(0, new Timestamp(current + j));
pstmt.setNString(1, "California.SanFrancisco");
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
}
}