diff --git a/docs/en/07-develop/_sub_java.mdx b/docs/en/07-develop/_sub_java.mdx index ab77f61348..ae0ecd28e0 100644 --- a/docs/en/07-develop/_sub_java.mdx +++ b/docs/en/07-develop/_sub_java.mdx @@ -1,5 +1,7 @@ ```java {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} +{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} +{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} ``` :::note For now Java connector doesn't provide asynchronous subscription, but `TimerTask` can be used to achieve similar purpose. diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml index a48ba398da..634c3f75a8 100644 --- a/docs/examples/java/pom.xml +++ b/docs/examples/java/pom.xml @@ -21,7 +21,7 @@ com.taosdata.jdbc taos-jdbcdriver - 2.0.38 + 3.0.0 diff --git a/docs/examples/java/src/main/java/com/taos/example/Meters.java b/docs/examples/java/src/main/java/com/taos/example/Meters.java new file mode 100644 index 0000000000..0f1eadd55b --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/Meters.java @@ -0,0 +1,62 @@ +package com.taos.example; + +import java.sql.Timestamp; + +public class Meters { + private Timestamp ts; + private float current; + private int voltage; + private int groupid; + private String location; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public float getCurrent() { + return current; + } + + public void setCurrent(float current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public int getGroupid() { + return groupid; + } + + public void setGroupid(int groupid) { + this.groupid = groupid; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + @Override + public String toString() { + return "Meters{" + + "ts=" + ts + + ", current=" + current + + ", voltage=" + voltage + + ", groupid=" + groupid + + ", location='" + location + '\'' + + '}'; + } +} diff --git a/docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java b/docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java new file mode 100644 index 0000000000..9b7fa35e90 --- /dev/null +++ b/docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java @@ -0,0 +1,6 @@ +package com.taos.example; + +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class MetersDeserializer extends ReferenceDeserializer { +} \ No newline at end of file diff --git a/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java index d82d03b9de..b1e675cdf6 100644 --- a/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java @@ -1,65 +1,77 @@ package com.taos.example; -import com.taosdata.jdbc.TSDBConnection; -import com.taosdata.jdbc.TSDBDriver; -import com.taosdata.jdbc.TSDBResultSet; -import com.taosdata.jdbc.TSDBSubscribe; +import com.taosdata.jdbc.tmq.ConsumerRecords; +import com.taosdata.jdbc.tmq.TMQConstants; +import com.taosdata.jdbc.tmq.TaosConsumer; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Collections; import java.util.Properties; -import java.util.concurrent.TimeUnit; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; public class SubscribeDemo { - private static final String topic = "topic-meter-current-bg-10"; - private static final String sql = "select * from meters where current > 10"; + private static final String TOPIC = "tmq_topic"; + private static final String DB_NAME = "meters"; + private static final AtomicBoolean shutdown = new AtomicBoolean(false); public static void main(String[] args) { - Connection connection = null; - TSDBSubscribe subscribe = null; - + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + public void run() { + shutdown.set(true); + } + }, 3_000); try { + // prepare Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata"; + Connection connection = DriverManager.getConnection(jdbcUrl); + try (Statement statement = connection.createStatement()) { + statement.executeUpdate("drop topic if exists " + TOPIC); + statement.executeUpdate("drop database if exists " + DB_NAME); + statement.executeUpdate("create database " + DB_NAME); + statement.executeUpdate("use " + DB_NAME); + statement.executeUpdate( + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(16))"); + statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')"); + statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)"); + statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)"); + statement.executeUpdate( + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119)"); + statement.executeUpdate( + "INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)"); + // create topic + statement.executeUpdate("create topic " + TOPIC + " as select * from meters"); + } + + // create consumer Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata"; - connection = DriverManager.getConnection(jdbcUrl, properties); - // create subscribe - subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true); - int count = 0; - while (count < 10) { - // wait 1 second to avoid frequent calls to consume - TimeUnit.SECONDS.sleep(1); - // consume - TSDBResultSet resultSet = subscribe.consume(); - if (resultSet == null) { - continue; - } - ResultSetMetaData metaData = resultSet.getMetaData(); - while (resultSet.next()) { - int columnCount = metaData.getColumnCount(); - for (int i = 1; i <= columnCount; i++) { - System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t"); + properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030"); + properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true"); + properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); + properties.setProperty(TMQConstants.GROUP_ID, "test"); + properties.setProperty(TMQConstants.VALUE_DESERIALIZER, + "com.taosdata.jdbc.MetersDeserializer"); + + // poll data + try (TaosConsumer consumer = new TaosConsumer<>(properties)) { + consumer.subscribe(Collections.singletonList(TOPIC)); + while (!shutdown.get()) { + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + System.out.println(meter); } - System.out.println(); - count++; } } - } catch (Exception e) { + } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); - } finally { - try { - if (null != subscribe) - // close subscribe - subscribe.close(true); - if (connection != null) - connection.close(); - } catch (SQLException throwable) { - throwable.printStackTrace(); - } } + timer.cancel(); } } \ No newline at end of file diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx index 52df23f7dd..9365941679 100644 --- a/docs/zh/07-develop/_sub_java.mdx +++ b/docs/zh/07-develop/_sub_java.mdx @@ -1,5 +1,7 @@ ```java {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}} +{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}} +{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}} ``` :::note 目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。