docs: add tmq sample (#16080)
This commit is contained in:
parent
35768202a0
commit
46be38457d
|
@ -1,5 +1,7 @@
|
||||||
```java
|
```java
|
||||||
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
|
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
|
||||||
|
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
|
||||||
|
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
|
||||||
```
|
```
|
||||||
:::note
|
:::note
|
||||||
For now Java connector doesn't provide asynchronous subscription, but `TimerTask` can be used to achieve similar purpose.
|
For now Java connector doesn't provide asynchronous subscription, but `TimerTask` can be used to achieve similar purpose.
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>2.0.38</version>
|
<version>3.0.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- ANCHOR_END: dep-->
|
<!-- ANCHOR_END: dep-->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
package com.taos.example;
|
||||||
|
|
||||||
|
import java.sql.Timestamp;
|
||||||
|
|
||||||
|
public class Meters {
|
||||||
|
private Timestamp ts;
|
||||||
|
private float current;
|
||||||
|
private int voltage;
|
||||||
|
private int groupid;
|
||||||
|
private String location;
|
||||||
|
|
||||||
|
public Timestamp getTs() {
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTs(Timestamp ts) {
|
||||||
|
this.ts = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getCurrent() {
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCurrent(float current) {
|
||||||
|
this.current = current;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getVoltage() {
|
||||||
|
return voltage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setVoltage(int voltage) {
|
||||||
|
this.voltage = voltage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getGroupid() {
|
||||||
|
return groupid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGroupid(int groupid) {
|
||||||
|
this.groupid = groupid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLocation() {
|
||||||
|
return location;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLocation(String location) {
|
||||||
|
this.location = location;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Meters{" +
|
||||||
|
"ts=" + ts +
|
||||||
|
", current=" + current +
|
||||||
|
", voltage=" + voltage +
|
||||||
|
", groupid=" + groupid +
|
||||||
|
", location='" + location + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package com.taos.example;
|
||||||
|
|
||||||
|
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
|
||||||
|
|
||||||
|
public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
||||||
|
}
|
|
@ -1,65 +1,77 @@
|
||||||
package com.taos.example;
|
package com.taos.example;
|
||||||
|
|
||||||
import com.taosdata.jdbc.TSDBConnection;
|
import com.taosdata.jdbc.tmq.ConsumerRecords;
|
||||||
import com.taosdata.jdbc.TSDBDriver;
|
import com.taosdata.jdbc.tmq.TMQConstants;
|
||||||
import com.taosdata.jdbc.TSDBResultSet;
|
import com.taosdata.jdbc.tmq.TaosConsumer;
|
||||||
import com.taosdata.jdbc.TSDBSubscribe;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
import java.sql.ResultSetMetaData;
|
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public class SubscribeDemo {
|
public class SubscribeDemo {
|
||||||
private static final String topic = "topic-meter-current-bg-10";
|
private static final String TOPIC = "tmq_topic";
|
||||||
private static final String sql = "select * from meters where current > 10";
|
private static final String DB_NAME = "meters";
|
||||||
|
private static final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Connection connection = null;
|
Timer timer = new Timer();
|
||||||
TSDBSubscribe subscribe = null;
|
timer.schedule(new TimerTask() {
|
||||||
|
public void run() {
|
||||||
|
shutdown.set(true);
|
||||||
|
}
|
||||||
|
}, 3_000);
|
||||||
try {
|
try {
|
||||||
|
// prepare
|
||||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
|
||||||
|
Connection connection = DriverManager.getConnection(jdbcUrl);
|
||||||
|
try (Statement statement = connection.createStatement()) {
|
||||||
|
statement.executeUpdate("drop topic if exists " + TOPIC);
|
||||||
|
statement.executeUpdate("drop database if exists " + DB_NAME);
|
||||||
|
statement.executeUpdate("create database " + DB_NAME);
|
||||||
|
statement.executeUpdate("use " + DB_NAME);
|
||||||
|
statement.executeUpdate(
|
||||||
|
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(16))");
|
||||||
|
statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')");
|
||||||
|
statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
|
||||||
|
statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
|
||||||
|
statement.executeUpdate(
|
||||||
|
"INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119)");
|
||||||
|
statement.executeUpdate(
|
||||||
|
"INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
|
||||||
|
// create topic
|
||||||
|
statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
|
||||||
|
}
|
||||||
|
|
||||||
|
// create consumer
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
|
||||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
|
||||||
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
|
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
|
||||||
connection = DriverManager.getConnection(jdbcUrl, properties);
|
properties.setProperty(TMQConstants.GROUP_ID, "test");
|
||||||
// create subscribe
|
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||||
subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true);
|
"com.taosdata.jdbc.MetersDeserializer");
|
||||||
int count = 0;
|
|
||||||
while (count < 10) {
|
// poll data
|
||||||
// wait 1 second to avoid frequent calls to consume
|
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||||
TimeUnit.SECONDS.sleep(1);
|
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||||
// consume
|
while (!shutdown.get()) {
|
||||||
TSDBResultSet resultSet = subscribe.consume();
|
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
|
||||||
if (resultSet == null) {
|
for (Meters meter : meters) {
|
||||||
continue;
|
System.out.println(meter);
|
||||||
}
|
|
||||||
ResultSetMetaData metaData = resultSet.getMetaData();
|
|
||||||
while (resultSet.next()) {
|
|
||||||
int columnCount = metaData.getColumnCount();
|
|
||||||
for (int i = 1; i <= columnCount; i++) {
|
|
||||||
System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
|
|
||||||
}
|
}
|
||||||
System.out.println();
|
|
||||||
count++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (ClassNotFoundException | SQLException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
if (null != subscribe)
|
|
||||||
// close subscribe
|
|
||||||
subscribe.close(true);
|
|
||||||
if (connection != null)
|
|
||||||
connection.close();
|
|
||||||
} catch (SQLException throwable) {
|
|
||||||
throwable.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
timer.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,5 +1,7 @@
|
||||||
```java
|
```java
|
||||||
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
|
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
|
||||||
|
{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
|
||||||
|
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
|
||||||
```
|
```
|
||||||
:::note
|
:::note
|
||||||
目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。
|
目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。
|
||||||
|
|
Loading…
Reference in New Issue