docs: add tmq sample and doc (#16082)

* docs: modify jdbc version

* docs: add tmq sample
This commit is contained in:
huolibo 2022-08-13 01:32:41 +08:00 committed by GitHub
parent 1531cf44c4
commit 959cf23bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 160 additions and 63 deletions

View File

@ -1,5 +1,7 @@
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
:::note
For now Java connector doesn't provide asynchronous subscription, but `TimerTask` can be used to achieve similar purpose.

View File

@ -41,19 +41,20 @@ Please refer to [Version Support List](/reference/connector#version-support).
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Java is as follows:
| TDengine DataType | JDBCType (driver version < 2.0.24) | JDBCType (driver version > = 2.0.24) |
| ----------------- | ---------------------------------- | ------------------------------------ |
| TIMESTAMP | java.lang.Long | java.sql.Timestamp |
| INT | java.lang.Integer | java.lang.Integer |
| BIGINT | java.lang.Long | java.lang.Long |
| FLOAT | java.lang.Float | java.lang.Float |
| DOUBLE | java.lang.Double | java.lang.Double |
| SMALLINT | java.lang.Short | java.lang.Short |
| TINYINT | java.lang.Byte | java.lang.Byte |
| BOOL | java.lang.Boolean | java.lang.Boolean |
| BINARY | java.lang.String | byte array |
| NCHAR | java.lang.String | java.lang.String |
| JSON | - | java.lang.String |
| TDengine DataType | JDBCType |
| ----------------- | ---------------------------------- |
| TIMESTAMP | java.sql.Timestamp |
| INT | java.lang.Integer |
| BIGINT | java.lang.Long |
| FLOAT | java.lang.Float |
| DOUBLE | java.lang.Double |
| SMALLINT | java.lang.Short |
| TINYINT | java.lang.Byte |
| BOOL | java.lang.Boolean |
| BINARY | byte array |
| NCHAR | java.lang.String |
| JSON | java.lang.String |
**Note**: Only TAG supports JSON types
@ -81,7 +82,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.**</version>
<version>3.0.0</version>
</dependency>
```
@ -845,7 +846,13 @@ Please refer to: [JDBC example](https://github.com/taosdata/TDengine/tree/develo
**Cause**: Currently, TDengine only supports 64-bit JDK.
**Solution**: Reinstall the 64-bit JDK. 4.
**Solution**: Reinstall the 64-bit JDK.
4. java.lang.NoSuchMethodError: setByteArray
**Cause**: taos-jdbcdriver version 3.* only supports TDengine 3.0 or above.
**Solution**: connect TDengine 2.* using taos-jdbcdriver 2.* version.
For other questions, please refer to [FAQ](/train-faq/faq)

View File

@ -21,7 +21,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.38</version>
<version>3.0.0</version>
</dependency>
<!-- ANCHOR_END: dep-->
<dependency>

View File

@ -0,0 +1,62 @@
package com.taos.example;
import java.sql.Timestamp;
public class Meters {
private Timestamp ts;
private float current;
private int voltage;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Meters{" +
"ts=" + ts +
", current=" + current +
", voltage=" + voltage +
", groupid=" + groupid +
", location='" + location + '\'' +
'}';
}
}

View File

@ -0,0 +1,6 @@
package com.taos.example;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}

View File

@ -1,65 +1,77 @@
package com.taos.example;
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TMQConstants;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
public class SubscribeDemo {
private static final String topic = "topic-meter-current-bg-10";
private static final String sql = "select * from meters where current > 10";
private static final String TOPIC = "tmq_topic";
private static final String DB_NAME = "meters";
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
public static void main(String[] args) {
Connection connection = null;
TSDBSubscribe subscribe = null;
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
shutdown.set(true);
}
}, 3_000);
try {
// prepare
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
Connection connection = DriverManager.getConnection(jdbcUrl);
try (Statement statement = connection.createStatement()) {
statement.executeUpdate("drop topic if exists " + TOPIC);
statement.executeUpdate("drop database if exists " + DB_NAME);
statement.executeUpdate("create database " + DB_NAME);
statement.executeUpdate("use " + DB_NAME);
statement.executeUpdate(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(16))");
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')");
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
statement.executeUpdate(
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119)");
statement.executeUpdate(
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
// create topic
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
}
// create consumer
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
connection = DriverManager.getConnection(jdbcUrl, properties);
// create subscribe
subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true);
int count = 0;
while (count < 10) {
// wait 1 second to avoid frequent calls to consume
TimeUnit.SECONDS.sleep(1);
// consume
TSDBResultSet resultSet = subscribe.consume();
if (resultSet == null) {
continue;
}
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
properties.setProperty(TMQConstants.GROUP_ID, "test");
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
"com.taosdata.jdbc.MetersDeserializer");
// poll data
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(TOPIC));
while (!shutdown.get()) {
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {
System.out.println(meter);
}
System.out.println();
count++;
}
}
} catch (Exception e) {
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} finally {
try {
if (null != subscribe)
// close subscribe
subscribe.close(true);
if (connection != null)
connection.close();
} catch (SQLException throwable) {
throwable.printStackTrace();
}
}
timer.cancel();
}
}

View File

@ -1,5 +1,7 @@
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
```
:::note
目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。

View File

@ -83,7 +83,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.**</version>
<version>3.0.0</version>
</dependency>
```
@ -712,7 +712,7 @@ while(true) {
}
```
`poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
`poll` 每次调用获取一个消息。请按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
#### 关闭订阅
@ -900,7 +900,13 @@ public static void main(String[] args) throws Exception {
**解决方法**:重新安装 64 位 JDK。
4. 其它问题请参考 [FAQ](../../../train-faq/faq)
4. java.lang.NoSuchMethodError: setByteArray
**原因**taos-jdbcdriver 3.* 版本仅支持 TDengine 3.0 及以上版本。
**解决方法** 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。
其它问题请参考 [FAQ](../../../train-faq/faq)
## API 参考