commit
13e8a79294
|
@ -142,8 +142,15 @@ TDengine currently supports timestamp, number, character, Boolean type, and the
|
|||
| BINARY | byte array |
|
||||
| NCHAR | java.lang.String |
|
||||
| JSON | java.lang.String |
|
||||
| VARBINARY | byte[] |
|
||||
| GEOMETRY | byte[] |
|
||||
|
||||
**Note**: Only TAG supports JSON types
|
||||
Due to historical reasons, the BINARY type data in TDengine is not truly binary data and is no longer recommended for use. Please use VARBINARY type instead.
|
||||
GEOMETRY type is binary data in little endian byte order, which complies with the WKB specification. For detailed information, please refer to [Data Type] (/tao-sql/data-type/#Data Types)
|
||||
For WKB specifications, please refer to [Well Known Binary (WKB)]( https://libgeos.org/specifications/wkb/ )
|
||||
For Java connector, the jts library can be used to easily create GEOMETRY type objects, serialize them, and write them to TDengine. Here is an example [Geometry example](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
|
||||
|
||||
|
||||
## Installation Steps
|
||||
|
||||
|
@ -354,7 +361,7 @@ The configuration parameters in properties are as follows.
|
|||
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR: only works when using JDBC native connection. Client configuration file directory path, default value `/etc/taos` on Linux OS, default value `C:/TDengine/cfg` on Windows OS, default value `/etc/taos` on macOS.
|
||||
- TSDBDriver.PROPERTY_KEY_CHARSET: In the character set used by the client, the default value is the system character set.
|
||||
- TSDBDriver.PROPERTY_KEY_LOCALE: this only takes effect when using JDBC native connection. Client language environment, the default value is system current locale.
|
||||
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone.
|
||||
- TSDBDriver.PROPERTY_KEY_TIME_ZONE: only takes effect when using JDBC native connection. In the time zone used by the client, the default value is the system's current time zone. Due to historical reasons, we only support some specifications of the POSIX standard, such as UTC-8 (representing timezone Shanghai in China), GMT-7, Europe/Paris.
|
||||
- TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection.
|
||||
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is false.
|
||||
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 60000 ms. It only takes effect when using JDBC REST connection and batchfetch is true.
|
||||
|
@ -456,13 +463,15 @@ public class ParameterBindingDemo {
|
|||
|
||||
private static final String host = "127.0.0.1";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
private static final int BINARY_COLUMN_SIZE = 20;
|
||||
private static final int BINARY_COLUMN_SIZE = 50;
|
||||
private static final String[] schemaList = {
|
||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
|
||||
};
|
||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||
|
||||
|
@ -474,21 +483,20 @@ public class ParameterBindingDemo {
|
|||
init(conn);
|
||||
|
||||
bindInteger(conn);
|
||||
|
||||
bindFloat(conn);
|
||||
|
||||
bindBoolean(conn);
|
||||
|
||||
bindBytes(conn);
|
||||
|
||||
bindString(conn);
|
||||
bindVarbinary(conn);
|
||||
bindGeometry(conn);
|
||||
|
||||
clean(conn);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
private static void init(Connection conn) throws SQLException {
|
||||
clean(conn);
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_parabind");
|
||||
stmt.execute("create database if not exists test_parabind");
|
||||
stmt.execute("use test_parabind");
|
||||
for (int i = 0; i < schemaList.length; i++) {
|
||||
|
@ -496,6 +504,11 @@ public class ParameterBindingDemo {
|
|||
}
|
||||
}
|
||||
}
|
||||
private static void clean(Connection conn) throws SQLException {
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_parabind");
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindInteger(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
||||
|
@ -674,10 +687,84 @@ public class ParameterBindingDemo {
|
|||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindVarbinary(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable6 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t6_" + i);
|
||||
// set tags
|
||||
byte[] bTag = new byte[]{0,2,3,4,5};
|
||||
bTag[0] = (byte) i;
|
||||
pstmt.setTagVarbinary(0, bTag);
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
byte[] v = new byte[]{0,2,3,4,5,6};
|
||||
v[0] = (byte)j;
|
||||
f1List.add(v);
|
||||
}
|
||||
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindGeometry(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable7 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
|
||||
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
|
||||
List<byte[]> listGeo = new ArrayList<>();
|
||||
listGeo.add(g1);
|
||||
listGeo.add(g2);
|
||||
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t7_" + i);
|
||||
// set tags
|
||||
pstmt.setTagGeometry(0, listGeo.get(i - 1));
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
f1List.add(listGeo.get(i - 1));
|
||||
}
|
||||
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Note**: both setString and setNString require the user to declare the width of the corresponding column in the size parameter of the table definition
|
||||
**Note**: both String and byte[] require the user to declare the width of the corresponding column in the size parameter of the table definition
|
||||
|
||||
The methods to set VALUES columns:
|
||||
|
||||
|
@ -692,6 +779,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
|
|||
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
|
||||
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
|
||||
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
|
||||
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
|
||||
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
@ -880,6 +969,9 @@ public void setTagFloat(int index, float value)
|
|||
public void setTagDouble(int index, double value)
|
||||
public void setTagString(int index, String value)
|
||||
public void setTagNString(int index, String value)
|
||||
public void setTagJson(int index, String value)
|
||||
public void setTagVarbinary(int index, byte[] value)
|
||||
public void setTagGeometry(int index, byte[] value)
|
||||
```
|
||||
|
||||
### Schemaless Writing
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.2.4</version>
|
||||
<version>3.2.7-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<!-- ANCHOR_END: dep-->
|
||||
<dependency>
|
||||
|
|
|
@ -142,8 +142,14 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对
|
|||
| BINARY | byte array |
|
||||
| NCHAR | java.lang.String |
|
||||
| JSON | java.lang.String |
|
||||
| VARBINARY | byte[] |
|
||||
| GEOMETRY | byte[] |
|
||||
|
||||
**注意**:JSON 类型仅在 tag 中支持。
|
||||
**注意**:JSON 类型仅在 tag 中支持。
|
||||
由于历史原因,TDengine中的BINARY底层不是真正的二进制数据,已不建议使用。请用VARBINARY类型代替。
|
||||
GEOMETRY类型是little endian字节序的二进制数据,符合WKB规范。详细信息请参考 [数据类型](/taos-sql/data-type/#数据类型)
|
||||
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
|
||||
对于java连接器,可以使用jts库来方便的创建GEOMETRY类型对象,序列化后写入TDengine,这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
|
||||
|
||||
## 安装步骤
|
||||
|
||||
|
@ -357,7 +363,7 @@ properties 中的配置参数如下:
|
|||
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR:仅在使用 JDBC 原生连接时生效。客户端配置文件目录路径,Linux OS 上默认值 `/etc/taos`,Windows OS 上默认值 `C:/TDengine/cfg`。
|
||||
- TSDBDriver.PROPERTY_KEY_CHARSET:客户端使用的字符集,默认值为系统字符集。
|
||||
- TSDBDriver.PROPERTY_KEY_LOCALE:仅在使用 JDBC 原生连接时生效。 客户端语言环境,默认值系统当前 locale。
|
||||
- TSDBDriver.PROPERTY_KEY_TIME_ZONE:仅在使用 JDBC 原生连接时生效。 客户端使用的时区,默认值为系统当前时区。
|
||||
- TSDBDriver.PROPERTY_KEY_TIME_ZONE:仅在使用 JDBC 原生连接时生效。 客户端使用的时区,默认值为系统当前时区。因为历史的原因,我们只支持POSIX标准的部分规范,如UTC-8(代表中国上上海), GMT-8,Asia/Shanghai 这几种形式。
|
||||
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms, 默认值为 60000。仅在 REST 连接时生效。
|
||||
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms,默认值为 60000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
|
||||
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms, 默认值为 60000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
|
||||
|
@ -459,13 +465,15 @@ public class ParameterBindingDemo {
|
|||
|
||||
private static final String host = "127.0.0.1";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
private static final int BINARY_COLUMN_SIZE = 30;
|
||||
private static final int BINARY_COLUMN_SIZE = 50;
|
||||
private static final String[] schemaList = {
|
||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
|
||||
};
|
||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||
|
||||
|
@ -477,21 +485,20 @@ public class ParameterBindingDemo {
|
|||
init(conn);
|
||||
|
||||
bindInteger(conn);
|
||||
|
||||
bindFloat(conn);
|
||||
|
||||
bindBoolean(conn);
|
||||
|
||||
bindBytes(conn);
|
||||
|
||||
bindString(conn);
|
||||
bindVarbinary(conn);
|
||||
bindGeometry(conn);
|
||||
|
||||
clean(conn);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
private static void init(Connection conn) throws SQLException {
|
||||
clean(conn);
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_parabind");
|
||||
stmt.execute("create database if not exists test_parabind");
|
||||
stmt.execute("use test_parabind");
|
||||
for (int i = 0; i < schemaList.length; i++) {
|
||||
|
@ -499,6 +506,11 @@ public class ParameterBindingDemo {
|
|||
}
|
||||
}
|
||||
}
|
||||
private static void clean(Connection conn) throws SQLException {
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_parabind");
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindInteger(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
||||
|
@ -677,10 +689,84 @@ public class ParameterBindingDemo {
|
|||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindVarbinary(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable6 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t6_" + i);
|
||||
// set tags
|
||||
byte[] bTag = new byte[]{0,2,3,4,5};
|
||||
bTag[0] = (byte) i;
|
||||
pstmt.setTagVarbinary(0, bTag);
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
byte[] v = new byte[]{0,2,3,4,5,6};
|
||||
v[0] = (byte)j;
|
||||
f1List.add(v);
|
||||
}
|
||||
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindGeometry(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable7 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
|
||||
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
|
||||
List<byte[]> listGeo = new ArrayList<>();
|
||||
listGeo.add(g1);
|
||||
listGeo.add(g2);
|
||||
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t7_" + i);
|
||||
// set tags
|
||||
pstmt.setTagGeometry(0, listGeo.get(i - 1));
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
f1List.add(listGeo.get(i - 1));
|
||||
}
|
||||
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**注**:setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
|
||||
**注**:字符串和数组类型都要求用户在 size 参数里声明表定义中对应列的列宽。
|
||||
|
||||
用于设定 VALUES 数据列的取值的方法总共有:
|
||||
|
||||
|
@ -695,6 +781,8 @@ public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
|
|||
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
|
||||
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
|
||||
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
|
||||
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
|
||||
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
@ -883,6 +971,9 @@ public void setTagFloat(int index, float value)
|
|||
public void setTagDouble(int index, double value)
|
||||
public void setTagString(int index, String value)
|
||||
public void setTagNString(int index, String value)
|
||||
public void setTagJson(int index, String value)
|
||||
public void setTagVarbinary(int index, byte[] value)
|
||||
public void setTagGeometry(int index, byte[] value)
|
||||
```
|
||||
|
||||
### 无模式写入
|
||||
|
|
|
@ -11,13 +11,20 @@
|
|||
|
||||
<properties>
|
||||
<project.assembly.dir>src/main/resources/assembly</project.assembly.dir>
|
||||
<java.version>1.8</java.version>
|
||||
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.taosdata.jdbc</groupId>
|
||||
<artifactId>taos-jdbcdriver</artifactId>
|
||||
<version>3.0.0</version>
|
||||
<version>3.2.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.locationtech.jts</groupId>
|
||||
<artifactId>jts-core</artifactId>
|
||||
<version>1.19.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
@ -68,12 +75,12 @@
|
|||
</execution>
|
||||
|
||||
<execution>
|
||||
<id>SubscribeDemo</id>
|
||||
<id>GeometryDemo</id>
|
||||
<configuration>
|
||||
<finalName>SubscribeDemo</finalName>
|
||||
<finalName>GeometryDemo</finalName>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.taosdata.example.SubscribeDemo</mainClass>
|
||||
<mainClass>com.taosdata.example.GeometryDemo</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptorRefs>
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
package com.taosdata.example;
|
||||
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecord;
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecords;
|
||||
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
|
||||
import com.taosdata.jdbc.tmq.TaosConsumer;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public abstract class ConsumerLoop {
|
||||
private final TaosConsumer<ResultBean> consumer;
|
||||
private final List<String> topics;
|
||||
private final AtomicBoolean shutdown;
|
||||
private final CountDownLatch shutdownLatch;
|
||||
|
||||
public ConsumerLoop() throws SQLException {
|
||||
Properties config = new Properties();
|
||||
config.setProperty("td.connect.type", "jni");
|
||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
this.shutdown = new AtomicBoolean(false);
|
||||
this.shutdownLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public abstract void process(ResultBean result);
|
||||
|
||||
public void pollData() throws SQLException {
|
||||
try {
|
||||
consumer.subscribe(topics);
|
||||
|
||||
while (!shutdown.get()) {
|
||||
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<ResultBean> record : records) {
|
||||
ResultBean bean = record.value();
|
||||
process(bean);
|
||||
}
|
||||
}
|
||||
consumer.unsubscribe();
|
||||
} finally {
|
||||
consumer.close();
|
||||
shutdownLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() throws InterruptedException {
|
||||
shutdown.set(true);
|
||||
shutdownLatch.await();
|
||||
}
|
||||
|
||||
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
|
||||
|
||||
}
|
||||
|
||||
public static class ResultBean {
|
||||
private Timestamp ts;
|
||||
private int speed;
|
||||
|
||||
public Timestamp getTs() {
|
||||
return ts;
|
||||
}
|
||||
|
||||
public void setTs(Timestamp ts) {
|
||||
this.ts = ts;
|
||||
}
|
||||
|
||||
public int getSpeed() {
|
||||
return speed;
|
||||
}
|
||||
|
||||
public void setSpeed(int speed) {
|
||||
this.speed = speed;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,190 @@
|
|||
package com.taosdata.example;
|
||||
|
||||
import com.taosdata.jdbc.TSDBPreparedStatement;
|
||||
import org.locationtech.jts.geom.*;
|
||||
import org.locationtech.jts.io.ByteOrderValues;
|
||||
import org.locationtech.jts.io.ParseException;
|
||||
import org.locationtech.jts.io.WKBReader;
|
||||
import org.locationtech.jts.io.WKBWriter;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Properties;
|
||||
|
||||
public class GeometryDemo {
|
||||
private static String host = "localhost";
|
||||
private static final String dbName = "test";
|
||||
private static final String tbName = "weather";
|
||||
private static final String user = "root";
|
||||
private static final String password = "taosdata";
|
||||
|
||||
private Connection connection;
|
||||
|
||||
public static void main(String[] args) throws SQLException {
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1)
|
||||
host = args[++i];
|
||||
}
|
||||
if (host == null) {
|
||||
printHelp();
|
||||
}
|
||||
GeometryDemo demo = new GeometryDemo();
|
||||
demo.init();
|
||||
demo.createDatabase();
|
||||
demo.useDatabase();
|
||||
demo.dropTable();
|
||||
demo.createTable();
|
||||
|
||||
demo.insert();
|
||||
demo.stmtInsert();
|
||||
demo.select();
|
||||
|
||||
demo.dropTable();
|
||||
demo.close();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
|
||||
// get connection
|
||||
try {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("charset", "UTF-8");
|
||||
properties.setProperty("locale", "en_US.UTF-8");
|
||||
properties.setProperty("timezone", "UTC-8");
|
||||
System.out.println("get connection starting...");
|
||||
connection = DriverManager.getConnection(url, properties);
|
||||
if (connection != null)
|
||||
System.out.println("[ OK ] Connection established.");
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void createDatabase() {
|
||||
String sql = "create database if not exists " + dbName;
|
||||
execute(sql);
|
||||
}
|
||||
|
||||
private void useDatabase() {
|
||||
String sql = "use " + dbName;
|
||||
execute(sql);
|
||||
}
|
||||
|
||||
private void dropTable() {
|
||||
final String sql = "drop table if exists " + dbName + "." + tbName + "";
|
||||
execute(sql);
|
||||
}
|
||||
|
||||
private void createTable() {
|
||||
final String sql = "create table if not exists " + dbName + "." + tbName + " (ts timestamp, temperature float, humidity int, location geometry(50))";
|
||||
execute(sql);
|
||||
}
|
||||
|
||||
private void insert() {
|
||||
final String sql = "insert into " + dbName + "." + tbName + " (ts, temperature, humidity, location) values(now, 20.5, 34, 'POINT(1 2)')";
|
||||
execute(sql);
|
||||
}
|
||||
|
||||
private void stmtInsert() throws SQLException {
|
||||
TSDBPreparedStatement preparedStatement = (TSDBPreparedStatement) connection.prepareStatement("insert into " + dbName + "." + tbName + " values (?, ?, ?, ?)");
|
||||
|
||||
long current = System.currentTimeMillis();
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
tsList.add(current);
|
||||
tsList.add(current + 1);
|
||||
preparedStatement.setTimestamp(0, tsList);
|
||||
ArrayList<Float> tempList = new ArrayList<>();
|
||||
tempList.add(20.1F);
|
||||
tempList.add(21.2F);
|
||||
preparedStatement.setFloat(1, tempList);
|
||||
ArrayList<Integer> humList = new ArrayList<>();
|
||||
humList.add(30);
|
||||
humList.add(31);
|
||||
preparedStatement.setInt(2, humList);
|
||||
|
||||
|
||||
ArrayList<byte[]> list = new ArrayList<>();
|
||||
GeometryFactory gf = new GeometryFactory();
|
||||
Point p1 = gf.createPoint(new Coordinate(1,2));
|
||||
p1.setSRID(1234);
|
||||
|
||||
// NOTE: TDengine current version only support 2D dimension and little endian byte order
|
||||
WKBWriter w = new WKBWriter(2, ByteOrderValues.LITTLE_ENDIAN, true);
|
||||
byte[] wkb = w.write(p1);
|
||||
list.add(wkb);
|
||||
|
||||
Coordinate[] coordinates = { new Coordinate(10, 20),
|
||||
new Coordinate(30, 40)};
|
||||
LineString lineString = gf.createLineString(coordinates);
|
||||
lineString.setSRID(2345);
|
||||
byte[] wkb2 = w.write(lineString);
|
||||
list.add(wkb2);
|
||||
|
||||
preparedStatement.setGeometry(3, list, 50);
|
||||
|
||||
preparedStatement.columnDataAddBatch();
|
||||
preparedStatement.columnDataExecuteBatch();
|
||||
}
|
||||
|
||||
private void select() {
|
||||
final String sql = "select * from " + dbName + "." + tbName;
|
||||
executeQuery(sql);
|
||||
}
|
||||
|
||||
private void close() {
|
||||
try {
|
||||
if (connection != null) {
|
||||
this.connection.close();
|
||||
System.out.println("connection closed.");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private void executeQuery(String sql) {
|
||||
long start = System.currentTimeMillis();
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
long end = System.currentTimeMillis();
|
||||
printSql(sql, true, (end - start));
|
||||
|
||||
while (resultSet.next()){
|
||||
byte[] result1 = resultSet.getBytes(4);
|
||||
WKBReader reader = new WKBReader();
|
||||
Geometry g1 = reader.read(result1);
|
||||
System.out.println("GEO OBJ: " + g1 + ", SRID: " + g1.getSRID());
|
||||
}
|
||||
|
||||
} catch (SQLException e) {
|
||||
long end = System.currentTimeMillis();
|
||||
printSql(sql, false, (end - start));
|
||||
e.printStackTrace();
|
||||
} catch (ParseException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void printSql(String sql, boolean succeed, long cost) {
|
||||
System.out.println("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
|
||||
}
|
||||
|
||||
private void execute(String sql) {
|
||||
long start = System.currentTimeMillis();
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
boolean execute = statement.execute(sql);
|
||||
long end = System.currentTimeMillis();
|
||||
printSql(sql, true, (end - start));
|
||||
} catch (SQLException e) {
|
||||
long end = System.currentTimeMillis();
|
||||
printSql(sql, false, (end - start));
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static void printHelp() {
|
||||
System.out.println("Usage: java -jar JDBCDemo.jar -host <hostname>");
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,316 @@
|
|||
package com.taosdata.example;
|
||||
|
||||
import com.taosdata.jdbc.TSDBPreparedStatement;
|
||||
import com.taosdata.jdbc.utils.StringUtils;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
public class ParameterBindingDemo {
|
||||
|
||||
private static final String host = "127.0.0.1";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
private static final int BINARY_COLUMN_SIZE = 50;
|
||||
private static final String[] schemaList = {
|
||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable6(ts timestamp, f1 varbinary(" + BINARY_COLUMN_SIZE + ")) tags(t1 varbinary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable7(ts timestamp, f1 geometry(" + BINARY_COLUMN_SIZE + ")) tags(t1 geometry(" + BINARY_COLUMN_SIZE + "))",
|
||||
};
|
||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||
|
||||
public static void main(String[] args) throws SQLException {
|
||||
|
||||
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
|
||||
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
|
||||
|
||||
init(conn);
|
||||
|
||||
bindInteger(conn);
|
||||
bindFloat(conn);
|
||||
bindBoolean(conn);
|
||||
bindBytes(conn);
|
||||
bindString(conn);
|
||||
bindVarbinary(conn);
|
||||
bindGeometry(conn);
|
||||
|
||||
clean(conn);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
private static void init(Connection conn) throws SQLException {
|
||||
clean(conn);
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("create database if not exists test_parabind");
|
||||
stmt.execute("use test_parabind");
|
||||
for (int i = 0; i < schemaList.length; i++) {
|
||||
stmt.execute(schemaList[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
private static void clean(Connection conn) throws SQLException {
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_parabind");
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindInteger(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t1_" + i);
|
||||
// set tags
|
||||
pstmt.setTagByte(0, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
||||
pstmt.setTagShort(1, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
||||
pstmt.setTagInt(2, random.nextInt(Integer.MAX_VALUE));
|
||||
pstmt.setTagLong(3, random.nextLong());
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<Byte> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f1List.add(Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
||||
pstmt.setByte(1, f1List);
|
||||
|
||||
ArrayList<Short> f2List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f2List.add(Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
||||
pstmt.setShort(2, f2List);
|
||||
|
||||
ArrayList<Integer> f3List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f3List.add(random.nextInt(Integer.MAX_VALUE));
|
||||
pstmt.setInt(3, f3List);
|
||||
|
||||
ArrayList<Long> f4List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f4List.add(random.nextLong());
|
||||
pstmt.setLong(4, f4List);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute column
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindFloat(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
|
||||
|
||||
TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class);
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t2_" + i);
|
||||
// set tags
|
||||
pstmt.setTagFloat(0, random.nextFloat());
|
||||
pstmt.setTagDouble(1, random.nextDouble());
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<Float> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f1List.add(random.nextFloat());
|
||||
pstmt.setFloat(1, f1List);
|
||||
|
||||
ArrayList<Double> f2List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f2List.add(random.nextDouble());
|
||||
pstmt.setDouble(2, f2List);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
// close if no try-with-catch statement is used
|
||||
pstmt.close();
|
||||
}
|
||||
|
||||
private static void bindBoolean(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable3 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t3_" + i);
|
||||
// set tags
|
||||
pstmt.setTagBoolean(0, random.nextBoolean());
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<Boolean> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
f1List.add(random.nextBoolean());
|
||||
pstmt.setBoolean(1, f1List);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindBytes(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable4 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t4_" + i);
|
||||
// set tags
|
||||
pstmt.setTagString(0, new String("abc"));
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<String> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
f1List.add(new String("abc"));
|
||||
}
|
||||
pstmt.setString(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindString(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable5 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t5_" + i);
|
||||
// set tags
|
||||
pstmt.setTagNString(0, "California.SanFrancisco");
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<String> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
f1List.add("California.LosAngeles");
|
||||
}
|
||||
pstmt.setNString(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindVarbinary(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable6 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t6_" + i);
|
||||
// set tags
|
||||
byte[] bTag = new byte[]{0,2,3,4,5};
|
||||
bTag[0] = (byte) i;
|
||||
pstmt.setTagVarbinary(0, bTag);
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
byte[] v = new byte[]{0,2,3,4,5,6};
|
||||
v[0] = (byte)j;
|
||||
f1List.add(v);
|
||||
}
|
||||
pstmt.setVarbinary(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindGeometry(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable7 tags(?) values(?,?)";
|
||||
|
||||
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
|
||||
|
||||
byte[] g1 = StringUtils.hexToBytes("0101000000000000000000F03F0000000000000040");
|
||||
byte[] g2 = StringUtils.hexToBytes("0102000020E610000002000000000000000000F03F000000000000004000000000000008400000000000001040");
|
||||
List<byte[]> listGeo = new ArrayList<>();
|
||||
listGeo.add(g1);
|
||||
listGeo.add(g2);
|
||||
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t7_" + i);
|
||||
// set tags
|
||||
pstmt.setTagGeometry(0, listGeo.get(i - 1));
|
||||
|
||||
// set columns
|
||||
ArrayList<Long> tsList = new ArrayList<>();
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++)
|
||||
tsList.add(current + j);
|
||||
pstmt.setTimestamp(0, tsList);
|
||||
|
||||
ArrayList<byte[]> f1List = new ArrayList<>();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
f1List.add(listGeo.get(i - 1));
|
||||
}
|
||||
pstmt.setGeometry(1, f1List, BINARY_COLUMN_SIZE);
|
||||
|
||||
// add column
|
||||
pstmt.columnDataAddBatch();
|
||||
}
|
||||
// execute
|
||||
pstmt.columnDataExecuteBatch();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,74 +0,0 @@
|
|||
package com.taosdata.example;
|
||||
|
||||
import com.taosdata.jdbc.TSDBConnection;
|
||||
import com.taosdata.jdbc.TSDBDriver;
|
||||
import com.taosdata.jdbc.TSDBResultSet;
|
||||
import com.taosdata.jdbc.TSDBSubscribe;
|
||||
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SubscribeDemo {
|
||||
private static final String usage = "java -jar SubscribeDemo.jar -host <hostname> -database <database name> -topic <topic> -sql <sql>";
|
||||
|
||||
public static void main(String[] args) {
|
||||
// parse args from command line
|
||||
String host = "", database = "", topic = "", sql = "";
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if ("-host".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
host = args[++i];
|
||||
}
|
||||
if ("-database".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
database = args[++i];
|
||||
}
|
||||
if ("-topic".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
topic = args[++i];
|
||||
}
|
||||
if ("-sql".equalsIgnoreCase(args[i]) && i < args.length - 1) {
|
||||
sql = args[++i];
|
||||
}
|
||||
}
|
||||
if (host.isEmpty() || database.isEmpty() || topic.isEmpty() || sql.isEmpty()) {
|
||||
System.out.println(usage);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
||||
final String url = "jdbc:TAOS://" + host + ":6030/" + database + "?user=root&password=taosdata";
|
||||
// get TSDBConnection
|
||||
TSDBConnection connection = (TSDBConnection) DriverManager.getConnection(url, properties);
|
||||
// create TSDBSubscribe
|
||||
TSDBSubscribe sub = connection.subscribe(topic, sql, false);
|
||||
|
||||
int total = 0;
|
||||
while (true) {
|
||||
TSDBResultSet rs = sub.consume();
|
||||
int count = 0;
|
||||
ResultSetMetaData meta = rs.getMetaData();
|
||||
while (rs.next()) {
|
||||
for (int i = 1; i <= meta.getColumnCount(); i++) {
|
||||
System.out.print(meta.getColumnLabel(i) + ": " + rs.getString(i) + "\t");
|
||||
}
|
||||
System.out.println();
|
||||
count++;
|
||||
}
|
||||
total += count;
|
||||
// System.out.printf("%d rows consumed, total %d\n", count, total);
|
||||
if (total >= 10)
|
||||
break;
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
sub.close(false);
|
||||
connection.close();
|
||||
} catch (Exception e) {
|
||||
System.out.println("host: " + host + ", database: " + database + ", topic: " + topic + ", sql: " + sql);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,170 @@
|
|||
package com.taosdata.example;
|
||||
|
||||
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.Random;
|
||||
|
||||
public class WSParameterBindingDemo {
|
||||
private static final String host = "127.0.0.1";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
private static final int BINARY_COLUMN_SIZE = 30;
|
||||
private static final String[] schemaList = {
|
||||
"create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
|
||||
"create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
|
||||
"create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
|
||||
"create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE + "))",
|
||||
"create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE + "))"
|
||||
};
|
||||
private static final int numOfSubTable = 10, numOfRow = 10;
|
||||
|
||||
public static void main(String[] args) throws SQLException {
|
||||
|
||||
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
|
||||
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
|
||||
|
||||
init(conn);
|
||||
|
||||
bindInteger(conn);
|
||||
|
||||
bindFloat(conn);
|
||||
|
||||
bindBoolean(conn);
|
||||
|
||||
bindBytes(conn);
|
||||
|
||||
bindString(conn);
|
||||
|
||||
conn.close();
|
||||
}
|
||||
|
||||
private static void init(Connection conn) throws SQLException {
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists test_ws_parabind");
|
||||
stmt.execute("create database if not exists test_ws_parabind");
|
||||
stmt.execute("use test_ws_parabind");
|
||||
for (int i = 0; i < schemaList.length; i++) {
|
||||
stmt.execute(schemaList[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindInteger(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
|
||||
|
||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t1_" + i);
|
||||
// set tags
|
||||
pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
||||
pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
||||
pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
|
||||
pstmt.setTagLong(4, random.nextLong());
|
||||
// set columns
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
||||
pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
|
||||
pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
|
||||
pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
|
||||
pstmt.setLong(5, random.nextLong());
|
||||
pstmt.addBatch();
|
||||
}
|
||||
pstmt.executeBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindFloat(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
|
||||
|
||||
try(TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t2_" + i);
|
||||
// set tags
|
||||
pstmt.setTagFloat(1, random.nextFloat());
|
||||
pstmt.setTagDouble(2, random.nextDouble());
|
||||
// set columns
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
||||
pstmt.setFloat(2, random.nextFloat());
|
||||
pstmt.setDouble(3, random.nextDouble());
|
||||
pstmt.addBatch();
|
||||
}
|
||||
pstmt.executeBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindBoolean(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable3 tags(?) values(?,?)";
|
||||
|
||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t3_" + i);
|
||||
// set tags
|
||||
pstmt.setTagBoolean(1, random.nextBoolean());
|
||||
// set columns
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
||||
pstmt.setBoolean(2, random.nextBoolean());
|
||||
pstmt.addBatch();
|
||||
}
|
||||
pstmt.executeBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindBytes(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable4 tags(?) values(?,?)";
|
||||
|
||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t4_" + i);
|
||||
// set tags
|
||||
pstmt.setTagString(1, new String("abc"));
|
||||
|
||||
// set columns
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
pstmt.setTimestamp(1, new Timestamp(current + j));
|
||||
pstmt.setString(2, "abc");
|
||||
pstmt.addBatch();
|
||||
}
|
||||
pstmt.executeBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void bindString(Connection conn) throws SQLException {
|
||||
String sql = "insert into ? using stable5 tags(?) values(?,?)";
|
||||
|
||||
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
|
||||
|
||||
for (int i = 1; i <= numOfSubTable; i++) {
|
||||
// set table name
|
||||
pstmt.setTableName("t5_" + i);
|
||||
// set tags
|
||||
pstmt.setTagNString(1, "California.SanFrancisco");
|
||||
|
||||
// set columns
|
||||
long current = System.currentTimeMillis();
|
||||
for (int j = 0; j < numOfRow; j++) {
|
||||
pstmt.setTimestamp(0, new Timestamp(current + j));
|
||||
pstmt.setNString(1, "California.SanFrancisco");
|
||||
pstmt.addBatch();
|
||||
}
|
||||
pstmt.executeBatch();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue