add exception handling

This commit is contained in:
sheyanjie-qq 2024-07-31 10:09:56 +08:00 committed by gccgdb1234
parent 0f8754c5ea
commit 4223965dba
20 changed files with 766 additions and 1034 deletions

View File

@ -9,17 +9,26 @@ import com.taosdata.jdbc.TSDBDriver;
public class JNIConnectExample {
public static void main(String[] args) throws SQLException {
// use
// String jdbcUrl = "jdbc:TAOS://localhost:6030/dbName?user=root&password=taosdata";
// if you want to connect a specified database named "dbName".
String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
try {
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
System.out.println("Connected");
// you can use the connection for execute SQL here
conn.close();
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
}
// use
// String jdbcUrl = "jdbc:TAOS://localhost:6030/dbName?user=root&password=taosdata";
// if you want to connect a specified database named "dbName".

View File

@ -8,9 +8,15 @@ public class RESTConnectExample {
// ANCHOR: main
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
try (Connection conn = DriverManager.getConnection(jdbcUrl)){
System.out.println("Connected");
conn.close();
// you can use the connection for execute SQL here
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
// ANCHOR_END: main
}

View File

@ -10,12 +10,21 @@ import java.util.Properties;
public class WSConnectExample {
// ANCHOR: main
public static void main(String[] args) throws SQLException {
// use
// String jdbcUrl = "jdbc:TAOS-RS://localhost:6041/dbName?user=root&password=taosdata";
// if you want to connect a specified database named "dbName".
String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
try (Connection conn = DriverManager.getConnection(jdbcUrl, connProps)){
System.out.println("Connected");
conn.close();
// you can use the connection for execute SQL here
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
// ANCHOR_END: main
}

View File

@ -39,7 +39,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
关键不同点在于:
1. 使用 原生连接,需要保证客户端的驱动程序 taosc 和服务端的 TDengine 版本配套。
2. 使用 REST 连接,用户无需安装客户端驱动程序 taosc具有跨平台易用的优势但是无法体验数据订阅和二进制数据类型等功能。另外与 原生连接 和 Websocket 连接相比REST连接的性能最低。
2. 使用 REST 连接,用户无需安装客户端驱动程序 taosc具有跨平台易用的优势但是无法体验数据订阅和二进制数据类型等功能。另外与 原生连接 和 Websocket 连接相比REST连接的性能最低。REST 接口是无状态的。在使用 REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。
3. 使用 Websocket 连接,用户也无需安装客户端驱动程序 taosc。
4. 连接云服务实例,必须使用 REST 连接 或 Websocket 连接。
@ -263,15 +263,95 @@ phpize && ./configure --enable-swoole && make -j && make install
## 建立连接
在执行这一步之前,请确保有一个正在运行的,且可以访问到的 TDengine而且服务端的 FQDN 配置正确。以下示例代码,都假设 TDengine 安装在本机,且 FQDN默认 localhost 和 serverPort默认 6030 都使用默认配置。
### 连接参数
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
Java 连接器建立连接的参数有 URL 和 properties下面分别详细介绍。
TDengine 的 JDBC URL 规范格式为:
`jdbc:[TAOS|TAOS-RS]://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
对于建立连接,原生连接与 REST 连接有细微不同。
**注**REST 连接中增加 `batchfetch` 参数并设置为 true将开启 WebSocket 连接。
**注意**:使用 JDBC 原生连接taos-jdbcdriver 需要依赖客户端驱动Linux 下是 libtaos.soWindows 下是 taos.dllmacOS 下是 libtaos.dylib
url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 60000。
- httpSocketTimeout: socket 超时时间,单位 ms默认值为 60000。仅在 batchfetch 设置为 false 时生效。
- messageWaitTimeout: 消息超时时间, 单位 ms 默认值为 60000。 仅在 batchfetch 设置为 true 时生效。
- useSSL: 连接中是否使用 SSL。
- httpPoolSize: REST 并发请求大小,默认 20。
**注意**部分配置项比如locale、timezone在 REST 连接中不生效。
除了通过指定的 URL 获取连接,还可以使用 Properties 指定建立连接时的参数。
properties 中的配置参数如下:
- TSDBDriver.PROPERTY_KEY_USER登录 TDengine 用户名,默认值 'root'。
- TSDBDriver.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TSDBDriver.PROPERTY_KEY_BATCH_LOAD: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。
- TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNOREtrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 sq 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR仅在使用 JDBC 原生连接时生效。客户端配置文件目录路径Linux OS 上默认值 `/etc/taos`Windows OS 上默认值 `C:/TDengine/cfg`
- TSDBDriver.PROPERTY_KEY_CHARSET客户端使用的字符集默认值为系统字符集。
- TSDBDriver.PROPERTY_KEY_LOCALE仅在使用 JDBC 原生连接时生效。 客户端语言环境,默认值系统当前 locale。
- TSDBDriver.PROPERTY_KEY_TIME_ZONE仅在使用 JDBC 原生连接时生效。 客户端使用的时区默认值为系统当前时区。因为历史的原因我们只支持POSIX标准的部分规范如UTC-8(代表中国上上海), GMT-8Asia/Shanghai 这几种形式。
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms 默认值为 60000。仅在 REST 连接时生效。
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms默认值为 60000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
- TSDBDriver.PROPERTY_KEY_USE_SSL: 连接中是否使用 SSL。仅在 REST 连接时生效。
- TSDBDriver.HTTP_POOL_SIZE: REST 并发请求大小,默认 20。
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。仅在使用 REST/Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
> **注意**:启用自动重连仅对简单执行 SQL 语句以及 无模式写入、数据订阅有效。对于参数绑定无效。自动重连仅对连接建立时通过参数指定数据库有效,对后面的 `use db` 语句切换数据库无效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
**配置参数的优先级:**
通过前面三种方式获取连接,如果配置参数在 url、Properties、客户端配置文件中有重复则参数的`优先级由高到低`分别如下:
1. JDBC URL 参数,如上所述,可以在 JDBC URL 的参数中指定。
2. Properties connProps
3. 使用原生连接时TDengine 客户端驱动的配置文件 taos.cfg
例如:在 url 中指定了 password 为 taosdata在 Properties 中指定了 password 为 taosdemo那么JDBC 会使用 url 中的 password 建立连接。
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### Websocket 连接
<Tabs groupId="websocket">
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
```java
{{#include docs/examples/java/src/main/java/com/taos/example/WSConnectExample.java:main}}
```
</TabItem>
<TabItem label="Python" value="python">
@ -305,9 +385,11 @@ phpize && ./configure --enable-swoole && make -j && make install
</Tabs>
### 原生连接
<Tabs groupId="native">
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
<ConnJava />
```java
{{#include docs/examples/java/src/main/java/com/taos/example/JNIConnectExample.java}}
```
</TabItem>
<TabItem label="Python" value="python">
<ConnPythonNative />
@ -334,9 +416,11 @@ phpize && ./configure --enable-swoole && make -j && make install
</Tabs>
### REST 连接
<Tabs groupId="rest">
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
<ConnJava />
```java
{{#include docs/examples/java/src/main/java/com/taos/example/RESTConnectExample.java:main}}
```
</TabItem>
<TabItem label="Python" value="python">
<ConnPythonNative />
@ -367,3 +451,57 @@ phpize && ./configure --enable-swoole && make -j && make install
如果建立连接失败,大部分情况下是 FQDN 或防火墙的配置不正确,详细的排查方法请看[《常见问题及反馈》](https://docs.taosdata.com/train-faq/faq)中的“遇到错误 Unable to establish connection, 我怎么办?”
:::
## 连接池
有些连接器提供了连接池,或者可以与已有的连接池组件配合使用。 使用连接池,应用程序可以快速地从连接池中获取可用连接,避免了每次操作时创建和销毁连接的开销。这不仅减少了资源消耗,还提高了响应速度。此外,连接池还支持对连接的管理,如最大连接数限制、连接的有效性检查,确保了连接的高效和可靠使用。我们推荐使用连接池管理连接。
下面是各语言连接器的连接池支持代码样例。
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
**HikariCP**
使用示例如下:
```java
{{#include examples/JDBC/connectionPools/src/main/java/com/taosdata/example/HikariDemo.java:connection_pool}}
```
> 通过 HikariDataSource.getConnection() 获取连接后,使用完成后需要调用 close() 方法,实际上它并不会关闭连接,只是放回连接池中。
> 更多 HikariCP 使用问题请查看[官方说明](https://github.com/brettwooldridge/HikariCP)。
**Druid**
使用示例如下:
```java
{{#include examples/JDBC/connectionPools/src/main/java/com/taosdata/example/DruidDemo.java:connection_pool}}
```
> 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。
</TabItem>
<TabItem label="Python" value="python">
<ConnPythonNative />
</TabItem>
<TabItem label="Go" value="go">
<ConnGo />
</TabItem>
<TabItem label="Rust" value="rust">
<ConnRust />
</TabItem>
<TabItem label="C#" value="csharp">
<ConnCSNative />
</TabItem>
<TabItem label="R" value="r">
<ConnR/>
</TabItem>
<TabItem label="C" value="c">
<ConnC />
</TabItem>
<TabItem label="PHP" value="php">
<ConnPHP />
</TabItem>
</Tabs>

View File

@ -7,75 +7,100 @@ toc_max_heading_level: 4
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
上一节我们介绍了如何建立连接,本节以 WebSocket 连接为例,使用各种语言连接器执行 SQL 完成写入。
TDengine 对 SQL 语言提供了全面的支持,允许用户以熟悉的 SQL 语法进行数据的查询、插入和删除操作。 TDengine 的 SQL 还支持对数据库和数据表的管理操作如创建、修改和删除数据库及数据表。TDengine 扩展了标准 SQL引入了时序数据处理特有的功能如时间序列数据的聚合查询、降采样、插值查询等以适应时序数据的特点。这些扩展使得用户可以更高效地处理时间序列数据进行复杂的数据分析和处理。 具体支持的 SQL 语法请参考 [TDengine SQL](../../reference/taos-sql/)
下面介绍使用各语言连接器通过执行 SQL 完成建库、建表、写入数据和查询数据。
## 建库和表
以智能电表为例,展示如何使用连接器执行 SQL 来创建数据库和表。
<Tabs defaultValue="java" groupId="create">
<TabItem value="java" label="Java">
```java
// create statement
Statement stmt = conn.createStatement();
// create database
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
// use database
stmt.executeUpdate("USE power");
// create table
stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcCreatDBDemo.java:create_db_and_table}}
```
> **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
## 插入数据
以智能电表为例,展示如何使用连接器执行 SQL 来插入数据。
<Tabs defaultValue="java" groupId="insert">
<TabItem value="java" label="Java">
```java
// insert data
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ";
int affectedRows = stmt.executeUpdate(insertQuery);
System.out.println("insert " + affectedRows + " rows.");
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcInsertDataDemo.java:insert_data}}
```
</TabItem>
</Tabs>
**Note**
NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW + 1s 代表客户端当前时间往后加 1 秒数字后面代表时间单位a毫秒smh小时dwny
## 查询数据
<Tabs defaultValue="java" groupId="query">
<TabItem value="java" label="Java">
```java
// query data
ResultSet resultSet = stmt.executeQuery("SELECT * FROM meters");
Timestamp ts;
float current;
String location;
while(resultSet.next()) {
ts = resultSet.getTimestamp(1);
current = resultSet.getFloat(2);
location = resultSet.getString("location");
System.out.printf("%s, %f, %s\n", ts, current, location);
}
```
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
## 查询数据
以智能电表为例,展示如何使用各语言连接器执行 SQL 来查询数据,并将获取到的结果打印出来。
<Tabs defaultValue="java" groupId="query">
<TabItem label="Java" value="java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcQueryDemo.java:query_data}}
```
**Note** 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
## 执行带有 reqId 的 SQL
reqId 可用于请求链路追踪reqId 就像分布式系统中的 traceId 作用一样。一个请求可能需要经过多个服务或者模块才能完成。reqId 用于标识和关联这个请求的所有相关操作,以便于我们可以追踪和分析请求的完整路径。
@ -87,20 +112,28 @@ reqId 可用于请求链路追踪reqId 就像分布式系统中的 traceId
如果用户不设置 reqId连接器会在内部随机生成一个但建议由显式用户设置以以更好地跟用户请求关联起来。
下面是各语言连接器设置 reqId 执行 SQL 的代码样例。
<Tabs defaultValue="java" groupId="query">
<TabItem value="java" label="Java">
<TabItem label="Java" value="java">
```java
AbstractStatement aStmt = (AbstractStatement) connection.createStatement();
aStmt.execute("CREATE DATABASE IF NOT EXISTS power", 1L);
aStmt.executeUpdate("USE power", 2L);
try (ResultSet rs = aStmt.executeQuery("SELECT * FROM meters limit 1", 3L)) {
while(rs.next()){
Timestamp timestamp = rs.getTimestamp(1);
System.out.println("timestamp = " + timestamp);
}
}
aStmt.close();
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcReqIdDemo.java:with_reqid}}
```
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>

View File

@ -40,7 +40,7 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要
- 对于空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义(均为英文半角符号)。无模式写入协议的域转义规则如下表所示。
| **序号** | **域** | **需转义字符** |
| -------- | ----------- | ----------------------------- |
| -------- | -------- | ---------------- |
| 1 | 超级表名 | 逗号,空格 |
| 2 | 标签名 | 逗号,等号,空格 |
| 3 | 标签值 | 逗号,等号,空格 |
@ -50,7 +50,7 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要
如果使用两个连续的反斜杠则第1个反斜杠作为转义符当只有一个反斜杠时则无须转义。无模式写入协议的反斜杠转义规则如下表所示。
| **序号** | **反斜杠** | **转义为** |
| -------- | ----------- | ----------------------------- |
| -------- | ------------ | ---------- |
| 1 | \ | \ |
| 2 | \\\\ | \ |
| 3 | \\\\\\ | \\\\ |
@ -151,46 +151,74 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000
第二行数据相对于第一行来说增加了一个列 c6类型为 binary(6)。那么此时会自动增加一个列 c6 类型为 binary(6)。
## 无模式写入示例
下面以智能电表为例,介绍各语言连接器使用无模式写入接口写入数据的代码样例,包含了三种协议: InfluxDB 的行协议、OpenTSDB 的 TELNET 行协议和 OpenTSDB 的 JSON 格式协议。
### Websocket 连接
<Tabs defaultValue="java" groupId="schemaless">
<TabItem value="java" label="Java">
```java
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000";
private static final String telnetDemo = "stb0_0 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata&batchfetch=true";
try(Connection connection = DriverManager.getConnection(url)){
init(connection);
try(SchemalessWriter writer = new SchemalessWriter(connection, "power")){
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
}
}
}
private static void init(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
}
}
}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java:schemaless}}
```
执行带有 reqId 的无模式写入,此 reqId 可用于请求链路追踪。
```java
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
```
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### 原生连接
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java:schemaless}}
```
执行带有 reqId 的无模式写入,此 reqId 可用于请求链路追踪。
```java
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
```
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
## 查询写入的数据
运行上节的样例代码,会在 power 数据库中自动建表,我们可以通过 taos shell 或者应用程序来查询数据。下面给出用 taos shell 查询超级表和 meters 表数据的样例。

View File

@ -7,64 +7,23 @@ toc_max_heading_level: 4
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
通过参数绑定方式写入数据时能避免SQL语法解析的资源消耗从而显著提升写入性能。示例代码如下。
通过参数绑定方式写入数据时能避免SQL语法解析的资源消耗从而显著提升写入性能。参数绑定能提高写入效率的原因主要有以下几点:
## Websocket
<Tabs defaultValue="java" groupId="websocket">
- 减少解析时间通过参数绑定SQL 语句的结构在第一次执行时就已经确定,后续的执行只需要替换参数值,这样可以避免每次执行时都进行语法解析,从而减少解析时间。
- 预编译当使用参数绑定时SQL 语句可以被预编译并缓存,后续使用不同的参数值执行时,可以直接使用预编译的版本,提高执行效率。
- 减少网络开销:参数绑定还可以减少发送到数据库的数据量,因为只需要发送参数值而不是完整的 SQL 语句,特别是在执行大量相似的插入或更新操作时,这种差异尤为明显。
下面我们继续以智能电表为例,展示各语言连接器使用参数绑定高效写入的功能。
## Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
public class WSParameterBindingBasicDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
init(conn);
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set columns
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat() * 30);
pstmt.setInt(3, random.nextInt(300));
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
pstmt.executeBatch();
}
}
conn.close();
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java:para_bind}}
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java)
</TabItem>
<TabItem label="Python" value="python">
@ -98,10 +57,16 @@ public class WSParameterBindingBasicDemo {
</TabItem>
</Tabs>
## 原生
<Tabs defaultValue="java" groupId="native">
## 原生连接
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java:para_bind}}
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java)
</TabItem>
<TabItem label="Python" value="python">

View File

@ -7,129 +7,84 @@ toc_max_heading_level: 4
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
TDengine提供了类似Kafka的数据订阅功能。本章以 WebSocket 连接方式为例介绍数据订阅的相关API以及使用方法。
TDengine 提供了类似于消息队列产品的数据订阅和消费接口。在许多场景中采用TDengine 的时序大数据平台无须再集成消息队列产品从而简化应用程序设计并降低运维成本。本章介绍各语言连接器数据订阅的相关API以及使用方法。 数据订阅的基础知识请参考 [数据订阅](../../advanced/subscription/)
## 创建主题
请用 taos shell 或者 参考 [执行 SQL](../sql/) 章节用程序执行创建主题的 SQL`CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters`
创建主题的示例代码如下。
### Websocket
<Tabs defaultValue="java" groupId="websocketCreateTopic">
<TabItem value="java" label="Java">
```java
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
```
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="nativeCreateTopic">
<TabItem value="java" label="Java">
```java
Connection connection = DriverManager.getConnection(url, properties);
Statement statement = connection.createStatement();
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
```
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
上述代码将使用SQL“select ts, current, voltage, phase, groupId, location from meters”创建一个名为topic_meters的订阅。使用该订阅所获取的消息中的每条记录都由该查询语句所选择的列组成。
上述 SQL 将创建一个名为 topic_meters 的订阅。使用该订阅所获取的消息中的每条记录都由此查询语句 `SELECT ts, current, voltage, phase, groupid, location FROM meters` 所选择的列组成。
**注意**
在TDengine中对于订阅查询有以下限制。
在 TDengine 连接器实现中,对于订阅查询,有以下限制。
- 查询语句限制:订阅查询只能使用 select 语句不支持其他类型的SQL如 insert、update或delete等。
- 始数据查询:订阅查询只能查询原始数据,而不能查询聚合或计算结果。
- 原始始数据查询:订阅查询只能查询原始数据,而不能查询聚合或计算结果。
- 时间顺序限制:订阅查询只能按照时间正序查询数据。
## 创建消费者
### Websocket
<Tabs defaultValue="java" groupId="websocketCreateConsumer">
TDengine 消费者的概念跟 Kafka 类似,是指使用 TDengine 数据库进行数据订阅的客户端应用程序。它通过订阅特定的主题来接收数据流,这些主题由 SQL 查询定义。消费者可以配置多种参数如连接方式、服务器地址、自动提交偏移量、消费者组标识、反序列化方法等以适应不同的数据处理需求。TDengine 支持通过 JNI 或 WebSocket 连接,提供了灵活的数据接收方式。此外,消费者还支持自动重连和数据传输压缩等高级功能,以确保数据的高效和稳定接收。
### 创建参数
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
Java 连接器创建消费者的参数为 Properties 可以设置如下参数:
- td.connect.type: 连接方式。jni表示使用动态库连接的方式ws/WebSocket表示使用 WebSocket 进行数据通信。默认为 jni 方式。
- bootstrap.servers: TDengine 服务端所在的`ip:port`,如果使用 WebSocket 连接,则为 taosAdapter 所在的`ip:port`。
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
- httpConnectTimeout: 创建连接超时参数,单位 ms默认为 5000 ms。仅在 WebSocket 连接下有效。
- messageWaitTimeout: 数据传输超时参数,单位 ms默认为 10000 ms。仅在 WebSocket 连接下有效。
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
其他参数请参考:[Consumer 参数列表](../../develop/tmq/#数据订阅相关参数) 注意TDengine服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
:::note
- Java 连接器数据订阅 WebSocket 连接方式跟 原生连接方式,除了在创建消费者时参数不同之外,其他接口并无区别。因此我们以 Websocket 连接方式为例介绍数据订阅的其他功能。
:::
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
this.consumer = new TaosConsumer<(config);
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:create_consumer}}
```
</TabItem>
@ -170,25 +125,16 @@ this.consumer = new TaosConsumer<(config);
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="NativeCreateConsumer">
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
```java
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
this.consumer = new TaosConsumer<(config);
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:create_consumer}}
```
</TabItem>
<TabItem label="Python" value="python">
@ -223,30 +169,20 @@ this.consumer = new TaosConsumer<(config);
</TabItem>
</Tabs>
相关参数说明如下:
1. td.connect.type 连接方式。jni表示使用动态库连接的方式ws/WebSocket表示使用 WebSocket 进行数据通信。默认为 jni 方式。
2. bootstrap.servers TDengine 服务端所在的ip:port如果使用 WebSocket 连接,则为 taosAdapter 所在的ip:port。
3. auto.offset.reset消费组订阅的初始位置earliest 从头开始订阅; latest 仅从最新数据开始订阅。
4. enable.auto.commit 是否允许自动提交。
5. group.id consumer: 所在的 group。
6. value.deserializer 结果集反序列化方法,可以继承 com.taosdata.jdbc.tmq.ReferenceDeserializer并指定结果集 bean实现反序列化。也可以继承 com.taosdata.jdbc.tmq.Deserializer根据 SQL 的 resultSet 自定义反序列化方式。
## 订阅消费数据
订阅消费数据的示例代码如下
### Websocket
<Tabs defaultValue="java" groupId="websocketPoll">
消费者订阅主题后,可以开始接收并处理这些主题中的消息。订阅消费数据的示例代码如下:
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
while (!shutdown.get()) {
ConsumerRecords<ResultBean records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean record : records) {
ResultBean bean = record.value();
process(bean);
}
}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:poll_data_code_piece}}
```
- `subscribe` 方法的参数含义如为:订阅的主题列表(即名称),支持同时订阅多个主题。
- `poll` 每次调用获取一个消息,一个消息中可能包含多个记录。
- `ResultBean` 是我们自定义的一个内部类,其字段名和数据类型与列名和数据类型一一对应,这样根据 `value.deserializer` 属性对应的反序列化类可以反序列化出 `ResultBean` 类型的对象。
</TabItem>
<TabItem label="Python" value="python">
@ -285,19 +221,12 @@ while (!shutdown.get()) {
</TabItem>
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="nativePoll">
### 原生连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
while (!shutdown.get()) {
ConsumerRecords<ResultBean records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean record : records) {
ResultBean bean = record.value();
process(bean);
}
}
```
同 Websocket 代码样例。
</TabItem>
<TabItem label="Python" value="python">
@ -332,11 +261,11 @@ while (!shutdown.get()) {
</TabItem>
</Tabs>
poll 每次调用获取一个消息,一个消息中可能有多个记录,需要循环处理。
## 指定订阅的 Offset
### Websocket
<Tabs defaultValue="java" groupId="websocketSeek">
消费者可以指定从特定 Offset 开始读取分区中的消息,这允许消费者重读消息或跳过已处理的消息。本节展示各语言连接器如何指定订阅的 Offset。
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
@ -355,6 +284,13 @@ void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition partitions) throws SQLException;
```
示例代码:
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java:consumer_seek}}
```
</TabItem>
<TabItem label="Python" value="python">
@ -393,26 +329,13 @@ void seekToEnd(Collection<TopicPartition partitions) throws SQLException;
</TabItem>
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="nativeSeek">
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
```java
// 获取订阅的 topicPartition
Set<TopicPartition assignment() throws SQLException;
同 Websocket 代码样例。
// 获取 offset
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long position(String topic) throws SQLException;
Map<TopicPartition, Long beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata committed(Set<TopicPartition partitions) throws SQLException;
// 指定下一次 poll 中使用的 offset
void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition partitions) throws SQLException;
```
</TabItem>
<TabItem label="Python" value="python">
@ -449,10 +372,10 @@ void seekToEnd(Collection<TopicPartition partitions) throws SQLException;
## 提交 Offset
`enable.auto.commit` 为 false 时,可以手动提交 offset。
### Websocket
<Tabs defaultValue="java" groupId="websocketCommit">
当消费者读取并处理完消息后,它可以提交 Offset这表示消费者已经成功处理到这个 Offset 的消息。Offset 提交可以是自动的(根据配置定期提交)或手动的(应用程序控制何时提交)。
创建消费者时,属性 `enable.auto.commit` 为 false 时,可以手动提交 offset。
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
@ -501,17 +424,13 @@ void commitAsync(Map<TopicPartition, OffsetAndMetadata offsets, OffsetCommitCall
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="nativeCommit">
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
```java
void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata offsets) throws SQLException;
// 异步提交仅在 native 连接下有效
void commitAsync(OffsetCommitCallback<V callback) throws SQLException;
void commitAsync(Map<TopicPartition, OffsetAndMetadata offsets, OffsetCommitCallback<V callback) throws SQLException;
```
同 Websocket 代码样例。
</TabItem>
<TabItem label="Python" value="python">
@ -553,64 +472,156 @@ void commitAsync(Map<TopicPartition, OffsetAndMetadata offsets, OffsetCommitCall
## 取消订阅和关闭消费
### Websocket
<Tabs defaultValue="java" groupId="websocketClose">
消费者可以取消对主题的订阅,停止接收消息。当消费者不再需要时,应该关闭消费者实例,以释放资源和断开与 TDengine 服务器的连接。
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:unsubscribe_data_code_piece}}
```
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
同 Websocket 代码样例。
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
## 完整示例
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}}
```
**注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}}
```
</TabItem>
<TabItem label="Python" value="python">
```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```
</TabItem>
<TabItem label="Go" value="go">
</TabItem>
<TabItem label="Rust" value="rust">
</TabItem>
<TabItem label="Node.js" value="node">
</TabItem>
<TabItem label="C#" value="csharp">
</TabItem>
<TabItem label="R" value="r">
</TabItem>
<TabItem label="C" value="c">
</TabItem>
<TabItem label="PHP" value="php">
</TabItem>
</Tabs>
### 原生
<Tabs defaultValue="java" groupId="nativeClose">
<TabItem value="java" label="Java">
```java
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
```
</TabItem>
<TabItem label="Python" value="python">

View File

@ -19,14 +19,6 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致
:::
## 连接方式
`taos-jdbcdriver`主要提供三种形式的连接器。一般我们推荐使用 **Websocket 连接**。
- **原生连接**,通过 TDengine 客户端驱动程序taosc原生连接 TDengine 实例支持数据写入、查询、数据订阅、schemaless 接口和参数绑定接口等功能。
- **REST 连接**,通过 taosAdapter 提供的 HTTP 接口连接 TDengine 实例,不支持 schemaless 和数据订阅等特性。
- **Websocket 连接**,通过 taosAdapter 提供的 Websocket 接口连接 TDengine 实例WebSocket 连接实现的功能集合和原生连接有少量不同。
连接方式的详细介绍请参考:[连接器建立连接的方式](../../develop/connect/#连接器建立连接的方式)
## JDBC 和 JRE 兼容性
- JDBC: 支持 JDBC 4.2 版本,部分功能如无模式写入和数据订阅单独提供
@ -151,517 +143,7 @@ GEOMETRY类型是little endian字节序的二进制数据符合WKB规范。
WKB规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)
对于java连接器可以使用jts库来方便的创建GEOMETRY类型对象序列化后写入TDengine这里有一个样例[Geometry示例](https://github.com/taosdata/TDengine/blob/3.0/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/GeometryDemo.java)
## 安装步骤
### 安装前准备
使用 Java Connector 连接数据库前,需要具备以下条件:
- 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本
- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动)
### 安装连接器
<Tabs defaultValue="maven">
<TabItem value="maven" label="使用 Maven 安装">
目前 taos-jdbcdriver 已经发布到 [Sonatype Repository](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) 仓库,且各大仓库都已同步。
- [sonatype](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver)
- [mvnrepository](https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver)
- [maven.aliyun](https://maven.aliyun.com/mvn/search)
Maven 项目中,在 pom.xml 中添加以下依赖:
```xml-dtd
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.3.0</version>
</dependency>
```
</TabItem>
<TabItem value="source" label="使用源码编译安装">
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
```shell
git clone https://github.com/taosdata/taos-connector-jdbc.git
cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true
```
编译后,在 target 目录下会产生 taos-jdbcdriver-3.2.\*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
</TabItem>
</Tabs>
## 建立连接
TDengine 的 JDBC URL 规范格式为:
`jdbc:[TAOS|TAOS-RS]://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&charset={charset}|&cfgdir={config_dir}|&locale={locale}|&timezone={timezone}]`
对于建立连接,原生连接与 REST 连接有细微不同。
**注**REST 连接中增加 `batchfetch` 参数并设置为 true将开启 WebSocket 连接。
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```java
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/power?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
```
以上示例,使用了 JDBC 原生连接的 TSDBDriver建立了到 hostname 为 taosdemo.com端口为 6030TDengine 的默认端口),数据库名为 power 的连接。这个 URL
中指定用户名user为 root密码password为 taosdata。
**注意**:使用 JDBC 原生连接taos-jdbcdriver 需要依赖客户端驱动Linux 下是 libtaos.soWindows 下是 taos.dllmacOS 下是 libtaos.dylib
url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- cfgdir客户端配置文件目录路径Linux OS 上默认值 `/etc/taos`Windows OS 上默认值 `C:/TDengine/cfg`。
- charset客户端使用的字符集默认值为系统字符集。
- locale客户端语言环境默认值系统当前 locale。
- timezone客户端使用的时区默认值为系统当前时区。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为true。开启批量拉取同时获取一批数据在查询数据量较大时批量拉取可以有效的提升查询性能。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败将继续执行下面的 SQL。false不再执行失败 SQL 后的任何语句。默认值为false。
JDBC 原生连接的使用请参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1955.html)。
**使用 TDengine 客户端驱动配置文件建立连接 **
当使用 JDBC 原生连接连接 TDengine 集群时,可以使用 TDengine 客户端驱动配置文件,在配置文件中指定集群的 firstEp、secondEp 等参数。如下所示:
1. 在 Java 应用中不指定 hostname 和 port
```java
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://:/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
```
2. 在配置文件中指定 firstEp 和 secondEp
```shell
# first fully qualified domain name (FQDN) for TDengine system
firstEp cluster_node1:6030
# second fully qualified domain name (FQDN) for TDengine system, for cluster only
secondEp cluster_node2:6030
# default system charset
# charset UTF-8
# system locale
# locale en_US.UTF-8
```
以上示例jdbc 会使用客户端的配置文件,建立到 hostname 为 cluster_node1、端口为 6030、数据库名为 power 的连接。当集群中 firstEp 节点失效时JDBC 会尝试使用 secondEp 连接集群。
TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可以正常建立到集群的连接。
> **注意**:这里的配置文件指的是调用 JDBC Connector 的应用程序所在机器上的配置文件Linux OS 上默认值 /etc/taos/taos.cfg Windows OS 上默认值 C://TDengine/cfg/taos.cfg。
</TabItem>
<TabItem value="rest" label="REST 连接">
```java
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/power?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
```
以上示例,使用了 JDBC REST 连接的 RestfulDriver建立了到 hostname 为 taosdemo.com端口为 6041数据库名为 power 的连接。这个 URL 中指定用户名user为 root密码password为 taosdata。
使用 JDBC REST 连接,不需要依赖客户端驱动。与 JDBC 原生连接相比,仅需要:
1. driverClass 指定为“com.taosdata.jdbc.rs.RestfulDriver”
2. jdbcUrl 以“jdbc:TAOS-RS://”开头;
3. 使用 6041 作为连接端口。
url 中的配置参数如下:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- batchfetch: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTPWebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnoretrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 60000。
- httpSocketTimeout: socket 超时时间,单位 ms默认值为 60000。仅在 batchfetch 设置为 false 时生效。
- messageWaitTimeout: 消息超时时间, 单位 ms 默认值为 60000。 仅在 batchfetch 设置为 true 时生效。
- useSSL: 连接中是否使用 SSL。
- httpPoolSize: REST 并发请求大小,默认 20。
**注意**部分配置项比如locale、timezone在 REST 连接中不生效。
:::note
- 与原生连接方式不同REST 接口是无状态的。在使用 JDBC REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。例如:
```sql
INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW, 10.30000, 219, 0.31000);
```
- 如果在 url 中指定了 dbname那么JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url在 SQL 中不需要指定 dbname。例如url 为 jdbc:TAOS-RS://127.0.0.1:6041/power那么可以执行 sqlINSERT INTO d1001 USING meters TAGS(2,'California.SanFrancisco') VALUES (NOW, 10.30000, 219, 0.31000);
:::
</TabItem>
</Tabs>
### 指定 URL 和 Properties 获取连接
除了通过指定的 URL 获取连接,还可以使用 Properties 指定建立连接时的参数。
**注意**
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
- 以下示例代码基于 taos-jdbcdriver-3.1.0 或以上版本。
```java
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty("debugFlag", "135");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
public Connection getRestConn() throws Exception{
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
```
以上示例,建立一个到 hostname 为 taosdemo.com端口为 6030/6041数据库名为 power 的连接。这个连接在 url 中指定了用户名(user)为 root密码password为 taosdata并在 connProps 中指定了使用的字符集、语言环境、时区、是否开启批量拉取等信息。
properties 中的配置参数如下:
- TSDBDriver.PROPERTY_KEY_USER登录 TDengine 用户名,默认值 'root'。
- TSDBDriver.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TSDBDriver.PROPERTY_KEY_BATCH_LOAD: true在执行查询时批量拉取结果集false逐行拉取结果集。默认值为false。
- TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNOREtrue在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 sq 了。false不再执行失败 SQL 后的任何语句。默认值为false。
- TSDBDriver.PROPERTY_KEY_CONFIG_DIR仅在使用 JDBC 原生连接时生效。客户端配置文件目录路径Linux OS 上默认值 `/etc/taos`Windows OS 上默认值 `C:/TDengine/cfg`。
- TSDBDriver.PROPERTY_KEY_CHARSET客户端使用的字符集默认值为系统字符集。
- TSDBDriver.PROPERTY_KEY_LOCALE仅在使用 JDBC 原生连接时生效。 客户端语言环境,默认值系统当前 locale。
- TSDBDriver.PROPERTY_KEY_TIME_ZONE仅在使用 JDBC 原生连接时生效。 客户端使用的时区默认值为系统当前时区。因为历史的原因我们只支持POSIX标准的部分规范如UTC-8(代表中国上上海), GMT-8Asia/Shanghai 这几种形式。
- TSDBDriver.HTTP_CONNECT_TIMEOUT: 连接超时时间,单位 ms 默认值为 60000。仅在 REST 连接时生效。
- TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms默认值为 60000。仅在 REST 连接且 batchfetch 设置为 false 时生效。
- TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。
- TSDBDriver.PROPERTY_KEY_USE_SSL: 连接中是否使用 SSL。仅在 REST 连接时生效。
- TSDBDriver.HTTP_POOL_SIZE: REST 并发请求大小,默认 20。
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。仅在使用 REST/Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
> **注意**:启用自动重连仅对简单执行 SQL 语句以及 无模式写入、数据订阅有效。对于参数绑定无效。自动重连仅对连接建立时通过参数指定数据库有效,对后面的 `use db` 语句切换数据库无效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
此外对 JDBC 原生连接,通过指定 URL 和 Properties 还可以指定其他参数比如日志级别、SQL 长度等。更多详细配置请参考[客户端配置](../../reference/config/)。
### 配置参数的优先级
通过前面三种方式获取连接,如果配置参数在 url、Properties、客户端配置文件中有重复则参数的`优先级由高到低`分别如下:
1. JDBC URL 参数,如上所述,可以在 JDBC URL 的参数中指定。
2. Properties connProps
3. 使用原生连接时TDengine 客户端驱动的配置文件 taos.cfg
例如:在 url 中指定了 password 为 taosdata在 Properties 中指定了 password 为 taosdemo那么JDBC 会使用 url 中的 password 建立连接。
## 使用示例
### 创建数据库和表
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java:create_db_and_table}}
```
> **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
### 插入数据
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java:insert_data}}
```
> NOW 为系统内部函数,默认为客户端所在计算机当前时间。
> `NOW + 1s` 代表客户端当前时间往后加 1 秒数字后面代表时间单位a(毫秒)s(秒)m(分)h(小时)d(天)w(周)n(月)y(年)。
### 查询数据
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java:query_data}}
```
> 查询和操作关系型数据库一致,使用下标获取返回字段内容时从 1 开始,建议使用字段名称获取。
### 执行带有 reqId 的 SQL
<RequestId />
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/JdbcBasicDemo.java:with_reqid}}
```
### 通过参数绑定写入数据
TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据写入INSERT场景的支持。采用这种方式写入数据时能避免 SQL 语法解析的资源消耗,从而在很多情况下显著提升写入性能。
**注意**
- JDBC REST 连接目前不支持参数绑定
- 以下示例代码基于 taos-jdbcdriver-3.2.1 或以上版本
- binary 类型数据需要调用 setString 方法nchar 类型数据需要调用 setNString 方法
- 预处理语句中指定数据库与子表名称不要使用 `db.?`,应直接使用 `?`,然后在 setTableName 中指定数据库,如:`prepareStatement.setTableName("db.t1")`。
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java:para_bind}}
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingFullDemo.java)
</TabItem>
<TabItem value="rest" label="WebSocket 连接">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingBasicDemo.java:para_bind}}
```
这是一个[更详细的参数绑定示例](https://github.com/taosdata/TDengine/blob/main/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WSParameterBindingFullDemo.java)
</TabItem>
</Tabs>
用于设定 VALUES 数据列的取值的方法总共有:
```java
public void setInt(int columnIndex, ArrayList<Integer> list) throws SQLException
public void setFloat(int columnIndex, ArrayList<Float> list) throws SQLException
public void setTimestamp(int columnIndex, ArrayList<Long> list) throws SQLException
public void setLong(int columnIndex, ArrayList<Long> list) throws SQLException
public void setDouble(int columnIndex, ArrayList<Double> list) throws SQLException
public void setBoolean(int columnIndex, ArrayList<Boolean> list) throws SQLException
public void setByte(int columnIndex, ArrayList<Byte> list) throws SQLException
public void setShort(int columnIndex, ArrayList<Short> list) throws SQLException
public void setString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setNString(int columnIndex, ArrayList<String> list, int size) throws SQLException
public void setVarbinary(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
public void setGeometry(int columnIndex, ArrayList<byte[]> list, int size) throws SQLException
```
**注**:字符串和数组类型都要求用户在 size 参数里声明表定义中对应列的列宽。
用于设定 TAGS 取值的方法总共有:
```java
public void setTagNull(int index, int type)
public void setTagBoolean(int index, boolean value)
public void setTagInt(int index, int value)
public void setTagByte(int index, byte value)
public void setTagShort(int index, short value)
public void setTagLong(int index, long value)
public void setTagTimestamp(int index, long value)
public void setTagFloat(int index, float value)
public void setTagDouble(int index, double value)
public void setTagString(int index, String value)
public void setTagNString(int index, String value)
public void setTagJson(int index, String value)
public void setTagVarbinary(int index, byte[] value)
public void setTagGeometry(int index, byte[] value)
```
### 无模式写入
TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议Line Protocol、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessJniTest.java:schemaless}}
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/SchemalessWsTest.java:schemaless}}
```
</TabItem>
</Tabs>
### 执行带有 reqId 的无模式写入
此 reqId 可用于请求链路追踪。
```java
writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS, 1L);
```
### 数据订阅
TDengine Java 连接器支持订阅功能,应用 API 如下:
#### 创建 Topic
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopImp.java:create_topic}}
```
如上面的例子将使用 SQL 语句 `SELECT ts, current, voltage, phase, groupid, location FROM meters` 创建一个名为 `topic_meters` 的主题。
> **注意**:订阅主题的查询语句只能是 `SELECT` 语句,只应查询原始数据,只能按时间正序查询数据。
#### 创建 Consumer
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:create_consumer}}
```
- td.connect.type: 连接方式。jni表示使用动态库连接的方式ws/WebSocket表示使用 WebSocket 进行数据通信。默认为 jni 方式。
- bootstrap.servers: TDengine 服务端所在的`ip:port`,如果使用 WebSocket 连接,则为 taosAdapter 所在的`ip:port`。
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
- httpConnectTimeout: 创建连接超时参数,单位 ms默认为 5000 ms。仅在 WebSocket 连接下有效。
- messageWaitTimeout: 数据传输超时参数,单位 ms默认为 10000 ms。仅在 WebSocket 连接下有效。
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
- TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。仅在使用 Websocket 连接时生效。true: 启用false: 不启用。默认为 false。
- TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
其他参数请参考:[Consumer 参数列表](../../develop/tmq/#数据订阅相关参数) 注意TDengine服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。
#### 订阅消费数据
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:poll_data}}
```
`subscribe` 方法的参数含义如为:订阅的主题列表(即名称),支持同时订阅多个主题。
`poll` 每次调用获取一个消息,一个消息中可能包含多个记录。
#### 指定订阅 Offset
```java
// 获取订阅的 topicPartition
Set<TopicPartition> assignment() throws SQLException;
// 获取 offset
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
// 指定下一次 poll 中使用的 offset
void seek(TopicPartition partition, long offset) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
```
示例代码:
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java:consumer_seek}}
```
#### 提交 Offset
当`enable.auto.commit`为 false 时,可以手动提交 offset。
```java
void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
// 异步提交仅在 native 连接下有效
void commitAsync(OffsetCommitCallback<V> callback) throws SQLException;
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback<V> callback) throws SQLException;
```
#### 关闭订阅
```java
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close()
```
详情请参考:[数据订阅](../../develop/tmq)
#### 完整示例
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}}
```
</TabItem>
<TabItem value="ws" label="WebSocket 连接">
除了原生的连接方式Java 连接器还支持通过 WebSocket 订阅数据。
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}}
```
</TabItem>
</Tabs>
> **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
### 与连接池使用
#### HikariCP
使用示例如下:
```java
{{#include examples/JDBC/connectionPools/src/main/java/com/taosdata/example/HikariDemo.java:connection_pool}}
```
> 通过 HikariDataSource.getConnection() 获取连接后,使用完成后需要调用 close() 方法,实际上它并不会关闭连接,只是放回连接池中。
> 更多 HikariCP 使用问题请查看[官方说明](https://github.com/brettwooldridge/HikariCP)。
#### Druid
使用示例如下:
```java
{{#include examples/JDBC/connectionPools/src/main/java/com/taosdata/example/DruidDemo.java:connection_pool}}
```
> 更多 druid 使用问题请查看[官方说明](https://github.com/alibaba/druid)。
### 更多示例程序
## 示例程序汇总
示例程序源码位于 `TDengine/examples/JDBC` 下:
@ -785,35 +267,7 @@ TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可
除了通过指定的 URL 获取连接,还可以使用 Properties 指定建立连接时的参数。
**注意**
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
- 以下示例代码基于 taos-jdbcdriver-3.1.0 或以上版本。
```java
public Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty("debugFlag", "135");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
public Connection getRestConn() throws Exception{
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/power?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
return conn;
}
```
以上示例,建立一个到 hostname 为 taosdemo.com端口为 6030/6041数据库名为 power 的连接。这个连接在 url 中指定了用户名(user)为 root密码password为 taosdata并在 connProps 中指定了使用的字符集、语言环境、时区、是否开启批量拉取等信息。
> **注意**:应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
properties 中的配置参数如下:
@ -1369,7 +823,7 @@ JDBC 驱动支持标准的 ResultSet 接口,提供了用于读取结果集中
- **返回值**:此 ResultSet 对象的数据的 ResultSetMetaData 对象。
- **异常**:如果发生数据库访问错误,将抛出 SQLException 异常。
- `boolean next() throws SQLException`
- **接口说明**:将标从当前位置向前移动一行。用于遍历查询结果集。
- **接口说明**:将标从当前位置向前移动一行。用于遍历查询结果集。
- **返回值**:如果新的当前行有效,则返回 true如果结果集中没有更多行则返回 false。
- **异常**:如果发生数据库访问错误,将抛出 SQLException 异常。
- `void close() throws SQLException`
@ -1562,28 +1016,28 @@ JDBC 驱动支持标准的 ResultSet 接口,提供了用于读取结果集中
- **异常**:如果列名不存在或发生数据库访问错误,将抛出 `SQLException` 异常。
- `boolean isBeforeFirst() throws SQLException`
- **接口说明**:判断标是否在第一行之前。
- **返回值**:如果标在第一行之前,则返回 true否则返回 false。
- **接口说明**:判断标是否在第一行之前。
- **返回值**:如果标在第一行之前,则返回 true否则返回 false。
- **异常**:如果发生数据库访问错误,将抛出 `SQLException` 异常。
- `boolean isAfterLast() throws SQLException`
- **接口说明**:判断标是否在最后一行之后。
- **返回值**:如果标在最后一行之后,则返回 true否则返回 false。
- **接口说明**:判断标是否在最后一行之后。
- **返回值**:如果标在最后一行之后,则返回 true否则返回 false。
- **异常**:如果发生数据库访问错误,将抛出 `SQLException` 异常。
- `boolean isFirst() throws SQLException`
- **接口说明**:判断标是否在第一行。
- **返回值**:如果标在第一行,则返回 true否则返回 false。
- **接口说明**:判断标是否在第一行。
- **返回值**:如果标在第一行,则返回 true否则返回 false。
- **异常**:如果发生数据库访问错误,将抛出 `SQLException` 异常。
- `boolean isLast() throws SQLException`
- **接口说明**:判断标是否在最后一行。
- **返回值**:如果标在最后一行,则返回 true否则返回 false。
- **接口说明**:判断标是否在最后一行。
- **返回值**:如果标在最后一行,则返回 true否则返回 false。
- **异常**:如果发生数据库访问错误,将抛出 `SQLException` 异常。
- `int getRow() throws SQLException`
- **接口说明**:获取当前标所在行的行号。
- **返回值**:当前光标所在行的行号;如果光标在结果集外,则返回 0。
- **接口说明**:获取当前标所在行的行号。
- **返回值**:当前游标所在行的行号;如果游标在结果集外,则返回 0。
- **异常**:如果发生数据库访问错误,将抛出 `SQLException` 异常。
- `void setFetchSize(int rows) throws SQLException`

View File

@ -35,8 +35,13 @@ config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
this.consumer = new TaosConsumer<>(config);
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle exception
System.out.println("SQLException: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: create_consumer
this.topics = Collections.singletonList("topic_meters");
@ -46,6 +51,42 @@ this.consumer = new TaosConsumer<>(config);
public abstract void process(ResultBean result);
public void pollDataCodePiece() throws SQLException {
// ANCHOR: poll_data_code_piece
try {
consumer.subscribe(topics);
while (!shutdown.get()) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
process(bean);
}
}
} catch (Exception ex){
// handle exception
System.out.println("SQLException: " + ex.getMessage());
} finally {
consumer.close();
shutdownLatch.countDown();
}
// ANCHOR_END: poll_data_code_piece
}
public void unsubscribeCodePiece() throws SQLException {
// ANCHOR: unsubscribe_data_code_piece
try {
consumer.unsubscribe();
} catch (Exception ex){
// handle exception
System.out.println("SQLException: " + ex.getMessage());
} finally {
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
}
public void pollData() throws SQLException {
try {

View File

@ -35,7 +35,13 @@ public abstract class AbsConsumerLoopFull {
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle exception
System.out.println("SQLException: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
this.topics = Collections.singletonList("topic_meters");
this.shutdown = new AtomicBoolean(false);

View File

@ -22,19 +22,28 @@ public abstract class AbsWsConsumerLoop {
private final CountDownLatch shutdownLatch;
public AbsWsConsumerLoop() throws SQLException {
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group2");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoopWs$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group2");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoopWs$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle exception
System.out.println("SQLException: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: create_consumer
this.topics = Collections.singletonList("topic_speed");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);

View File

@ -51,7 +51,12 @@ try (TaosConsumer<AbsConsumerLoop.ResultBean> consumer = new TaosConsumer<>(conf
}
}
ConsumerRecords<AbsConsumerLoop.ResultBean> records = consumer.poll(Duration.ofMillis(500));
// you can handle data here
}
} catch (SQLException ex) {
// handle exception
System.out.println("SQLException: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: consumer_seek
}

View File

@ -17,7 +17,6 @@ public class JdbcBasicDemo {
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS://" + host + ":6030/?user=" + user + "&password=" + password;
Connection connection;
// get connection
Properties properties = new Properties();
@ -25,7 +24,9 @@ properties.setProperty("charset", "UTF-8");
properties.setProperty("locale", "en_US.UTF-8");
properties.setProperty("timezone", "UTC-8");
System.out.println("get connection starting...");
connection = DriverManager.getConnection(url, properties);
try(Connection connection = DriverManager.getConnection(url, properties)){
if (connection != null){
System.out.println("[ OK ] Connection established.");
} else {
@ -106,6 +107,10 @@ try (Statement statement = connection.createStatement()) {
e.printStackTrace();
}
// ANCHOR_END: jdbc_exception
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
private static void printResult(ResultSet resultSet) throws SQLException {

View File

@ -22,7 +22,7 @@ public class ParameterBindingBasicDemo {
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
@ -68,8 +68,10 @@ public class ParameterBindingBasicDemo {
// execute column
pstmt.columnDataExecuteBatch();
}
conn.close();
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
private static void init(Connection conn) throws SQLException {

View File

@ -12,7 +12,7 @@ import java.sql.Statement;
// ANCHOR: schemaless
public class SchemalessJniTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "stb0_0 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
@ -22,9 +22,11 @@ public class SchemalessJniTest {
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
} catch (SQLException ex) {
System.out.println("SQLException: " + ex.getMessage());
}
}

View File

@ -12,20 +12,21 @@ import java.sql.Statement;
// ANCHOR: schemaless
public class SchemalessWsTest {
private static final String host = "127.0.0.1";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000";
private static final String lineDemo = "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639";
private static final String telnetDemo = "stb0_0 1707095283260 4 host=host0 interface=eth0";
private static final String jsonDemo = "{\"metric\": \"meter_current\",\"timestamp\": 1626846400,\"value\": 10.3, \"tags\": {\"groupid\": 2, \"location\": \"California.SanFrancisco\", \"id\": \"d1001\"}}";
public static void main(String[] args) throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041/power?user=root&password=taosdata&batchfetch=true";
final String url = "jdbc:TAOS-RS://" + host + ":6041?user=root&password=taosdata&batchfetch=true";
try(Connection connection = DriverManager.getConnection(url)){
init(connection);
AbstractConnection conn = connection.unwrap(AbstractConnection.class);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS);
conn.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.MILLI_SECONDS);
conn.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS);
conn.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS);
} catch (SQLException ex) {
System.out.println("SQLException: " + ex.getMessage());
}
}

View File

@ -18,8 +18,7 @@ public class WSParameterBindingBasicDemo {
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
@ -43,11 +42,15 @@ public class WSParameterBindingBasicDemo {
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
pstmt.executeBatch();
int [] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("insert " + exeResult.length + " rows.");
}
}
conn.close();
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());
}
}
private static void init(Connection conn) throws SQLException {

View File

@ -21,7 +21,7 @@ public class WSParameterBindingDemo {
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
@ -35,7 +35,9 @@ public class WSParameterBindingDemo {
bindString(conn);
conn.close();
} catch (SQLException ex) {
System.out.println("SQLException: " + ex.getMessage());
}
}
private static void init(Connection conn) throws SQLException {

View File

@ -22,7 +22,8 @@ public class WSParameterBindingFullDemo {
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS-RS://" + host + ":6041/?batchfetch=true";
Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
@ -36,7 +37,9 @@ public class WSParameterBindingFullDemo {
bindString(conn);
conn.close();
} catch (SQLException ex) {
System.out.println("SQLException: " + ex.getMessage());
}
}
private static void init(Connection conn) throws SQLException {