From cbf666b529fd80ea72a91ee3fd17ff2c454f45ba Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Thu, 1 Aug 2024 21:15:53 +0800 Subject: [PATCH] mod tmq example --- docs/zh/08-develop/07-tmq.md | 66 +--- .../com/taosdata/example/AbsConsumerLoop.java | 2 +- .../taosdata/example/AbsConsumerLoopFull.java | 142 ------- .../taosdata/example/AbsWsConsumerLoop.java | 144 ------- .../taosdata/example/ConsumerLoopFull.java | 357 +++++++++++++++++ .../example/ParameterBindingBasicDemo.java | 44 ++- .../example/ParameterBindingBatchDemo.java | 83 ---- .../taosdata/example/WsConsumerLoopFull.java | 358 ++++++++++++++++++ 8 files changed, 764 insertions(+), 432 deletions(-) delete mode 100644 examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java delete mode 100644 examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java create mode 100644 examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java delete mode 100644 examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBatchDemo.java create mode 100644 examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java diff --git a/docs/zh/08-develop/07-tmq.md b/docs/zh/08-develop/07-tmq.md index 16a538757f..1d5b59307e 100644 --- a/docs/zh/08-develop/07-tmq.md +++ b/docs/zh/08-develop/07-tmq.md @@ -85,7 +85,7 @@ Java 连接器创建消费者的参数为 Properties, 可以设置的参数列 ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:create_consumer}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:create_consumer}} ``` @@ -135,7 +135,7 @@ Java 连接器创建消费者的参数为 Properties, 可以设置的参数列 ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:create_consumer}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:create_consumer}} ``` @@ -180,7 +180,7 @@ Java 连接器创建消费者的参数为 Properties, 可以设置的参数列 ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:poll_data_code_piece}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:poll_data_code_piece}} ``` - `subscribe` 方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。 @@ -273,33 +273,7 @@ Java 连接器创建消费者的参数为 Properties, 可以设置的参数列 ```java - -// 获取当前消费者分配的 TopicPartition 集合 -Set assignment() throws SQLException; - -// 获取指定分区的当前偏移量 -long position(TopicPartition partition) throws SQLException; -// 获取指定主题的所有分区的当前偏移量 -Map position(String topic) throws SQLException; -// 获取指定主题的所有分区的起始偏移量 -Map beginningOffsets(String topic) throws SQLException; -// 获取指定主题的所有分区的最新偏移量 -Map endOffsets(String topic) throws SQLException; -// 获取指定分区集合中的已提交偏移量 -Map committed(Set partitions) throws SQLException; - -// 设置指定分区的偏移量 -void seek(TopicPartition partition, long offset) throws SQLException; -// 将指定分区集合的偏移量设置为最开始 -void seekToBeginning(Collection partitions) throws SQLException; -// 将指定分区集合的偏移量设置为最新 -void seekToEnd(Collection partitions) throws SQLException; -``` - -示例代码: - -```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java:consumer_seek}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}} ``` @@ -344,7 +318,6 @@ void seekToEnd(Collection partitions) throws SQLException; - 同 Websocket 代码样例。 @@ -389,22 +362,9 @@ void seekToEnd(Collection partitions) throws SQLException; -```java -// 同步提交当前消费者的偏移量 -void commitSync() throws SQLException; -// 同步提交指定的偏移量 -void commitSync(Map offsets) throws SQLException; - -// 异步提交仅在 native 连接下有效 -// 异步提交当前消费者的偏移量,需要提供回调以处理可能的提交结果 -void commitAsync(OffsetCommitCallback callback) throws SQLException; -// 异步提交指定的偏移量,需要提供回调以处理可能的提交结果 -void commitAsync(Map offsets, OffsetCommitCallback callback) throws SQLException; -``` - ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:commit_code_piece}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:commit_code_piece}} ``` @@ -501,7 +461,7 @@ void commitAsync(Map offsets, OffsetCommitCal ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:unsubscribe_data_code_piece}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:unsubscribe_data_code_piece}} ``` @@ -593,12 +553,14 @@ void commitAsync(Map offsets, OffsetCommitCal ### Websocket 连接 +
+完整 Websocket 连接代码示例 ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_demo}} ``` **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。 -其余代码请参考: [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC/JDBCDemo) +
@@ -641,12 +603,16 @@ void commitAsync(Map offsets, OffsetCommitCal ### 原生连接 +
+完整原生连接代码示例 ```java -{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}} +{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:consumer_demo}} ``` **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。 -其余代码请参考: [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC/JDBCDemo) +
+ +
diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java index 90e6a950cd..f7570c742a 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java @@ -32,7 +32,7 @@ config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); config.setProperty("group.id", "group1"); -config.setProperty("client.id", "1"); +config.setProperty("client.id", "client1"); config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); config.setProperty("value.deserializer.encoding", "UTF-8"); try { diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java deleted file mode 100644 index bd8f1799f6..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java +++ /dev/null @@ -1,142 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.tmq.ConsumerRecord; -import com.taosdata.jdbc.tmq.ConsumerRecords; -import com.taosdata.jdbc.tmq.ReferenceDeserializer; -import com.taosdata.jdbc.tmq.TaosConsumer; - -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -// ANCHOR: consumer_demo -public abstract class AbsConsumerLoopFull { - private final TaosConsumer consumer; - private final List topics; - private final AtomicBoolean shutdown; - private final CountDownLatch shutdownLatch; - - public AbsConsumerLoopFull() throws SQLException { - - Properties config = new Properties(); - config.setProperty("td.connect.type", "jni"); - config.setProperty("bootstrap.servers", "localhost:6030"); - config.setProperty("auto.offset.reset", "latest"); - config.setProperty("msg.with.table.name", "true"); - config.setProperty("enable.auto.commit", "true"); - config.setProperty("auto.commit.interval.ms", "1000"); - config.setProperty("group.id", "group1"); - config.setProperty("client.id", "1"); - config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer"); - config.setProperty("value.deserializer.encoding", "UTF-8"); - - try { - this.consumer = new TaosConsumer<>(config); - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); - } - - this.topics = Collections.singletonList("topic_meters"); - this.shutdown = new AtomicBoolean(false); - this.shutdownLatch = new CountDownLatch(1); - } - - public abstract void process(ResultBean result); - - public void pollData() throws SQLException { - try { - // subscribe to the topics - consumer.subscribe(topics); - while (!shutdown.get()) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process the data here - process(bean); - } - } - // unsubscribe the topics - consumer.unsubscribe(); - } finally { - // close the consumer - consumer.close(); - shutdownLatch.countDown(); - } - } - - public void shutdown() throws InterruptedException { - shutdown.set(true); - shutdownLatch.await(); - } - - public static class ResultDeserializer extends ReferenceDeserializer { - - } - - // use this class to define the data structure of the result record - public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; - } - - public void setTs(Timestamp ts) { - this.ts = ts; - } - - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - } -} -// ANCHOR_END: consumer_demo diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java deleted file mode 100644 index 60466629f6..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java +++ /dev/null @@ -1,144 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.tmq.ConsumerRecord; -import com.taosdata.jdbc.tmq.ConsumerRecords; -import com.taosdata.jdbc.tmq.ReferenceDeserializer; -import com.taosdata.jdbc.tmq.TaosConsumer; - -import java.sql.SQLException; -import java.sql.Timestamp; -import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -// ANCHOR: consumer_demo -public abstract class AbsWsConsumerLoop { - private final TaosConsumer consumer; - private final List topics; - private final AtomicBoolean shutdown; - private final CountDownLatch shutdownLatch; - - public AbsWsConsumerLoop() throws SQLException { -// ANCHOR: create_consumer -Properties config = new Properties(); -config.setProperty("td.connect.type", "ws"); -config.setProperty("bootstrap.servers", "localhost:6041"); -config.setProperty("auto.offset.reset", "latest"); -config.setProperty("msg.with.table.name", "true"); -config.setProperty("enable.auto.commit", "true"); -config.setProperty("auto.commit.interval.ms", "1000"); -config.setProperty("group.id", "group1"); -config.setProperty("client.id", "client1"); -config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoopWs$ResultDeserializer"); -config.setProperty("value.deserializer.encoding", "UTF-8"); - -try { - this.consumer = new TaosConsumer<>(config); -} catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to create ws consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - throw new SQLException("Failed to create consumer", ex); -} -// ANCHOR_END: create_consumer - - this.topics = Collections.singletonList("topic_meters"); - this.shutdown = new AtomicBoolean(false); - this.shutdownLatch = new CountDownLatch(1); - } - - public abstract void process(ResultBean result); - - public void pollData() throws SQLException { - try { - // Subscribe to the topic - consumer.subscribe(topics); - - while (!shutdown.get()) { - // poll data - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord record : records) { - ResultBean bean = record.value(); - // process data here - process(bean); - } - } - // unsubscribe the topics - consumer.unsubscribe(); - } finally { - // close the consumer - consumer.close(); - shutdownLatch.countDown(); - } - } - - public void shutdown() throws InterruptedException { - shutdown.set(true); - shutdownLatch.await(); - } - - public static class ResultDeserializer extends ReferenceDeserializer { - - } - - // use this class to define the data structure of the result record - public static class ResultBean { - private Timestamp ts; - private double current; - private int voltage; - private double phase; - private int groupid; - private String location; - - public Timestamp getTs() { - return ts; - } - - public void setTs(Timestamp ts) { - this.ts = ts; - } - - public double getCurrent() { - return current; - } - - public void setCurrent(double current) { - this.current = current; - } - - public int getVoltage() { - return voltage; - } - - public void setVoltage(int voltage) { - this.voltage = voltage; - } - - public double getPhase() { - return phase; - } - - public void setPhase(double phase) { - this.phase = phase; - } - - public int getGroupid() { - return groupid; - } - - public void setGroupid(int groupid) { - this.groupid = groupid; - } - - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - } -} -// ANCHOR_END: consumer_demo diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java new file mode 100644 index 0000000000..4012f2e679 --- /dev/null +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java @@ -0,0 +1,357 @@ +package com.taosdata.example; + +import com.alibaba.fastjson.JSON; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.tmq.*; + +import java.sql.*; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +// ANCHOR: consumer_demo +public class ConsumerLoopFull { + static private Connection connection; + static private Statement statement; + public static TaosConsumer getConsumer() throws SQLException{ +// ANCHOR: create_consumer +Properties config = new Properties(); +config.setProperty("td.connect.type", "jni"); +config.setProperty("bootstrap.servers", "localhost:6030"); +config.setProperty("auto.offset.reset", "latest"); +config.setProperty("msg.with.table.name", "true"); +config.setProperty("enable.auto.commit", "true"); +config.setProperty("auto.commit.interval.ms", "1000"); +config.setProperty("group.id", "group1"); +config.setProperty("client.id", "1"); +config.setProperty("td.connect.user", "root"); +config.setProperty("td.connect.pass", "taosdata"); +config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer"); +config.setProperty("value.deserializer.encoding", "UTF-8"); + +try { + return new TaosConsumer<>(config); +} catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); +} catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); +} +// ANCHOR_END: create_consumer + } + + public static void pollDataExample() throws SQLException { + try (TaosConsumer consumer = getConsumer()){ + // subscribe to the topics + List topics = Collections.singletonList("topic_meters"); + + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + // unsubscribe the topics + consumer.unsubscribe(); + System.out.println("unsubscribed topics successfully"); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data from topic_meters", ex); + } + } + + public static void pollExample() throws SQLException { +// ANCHOR: poll_data_code_piece +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + +} catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); +} +// ANCHOR_END: poll_data_code_piece +} + + public static void seekExample() throws SQLException { +// ANCHOR: consumer_seek +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()){ + records = consumer.poll(Duration.ofMillis(100)); + } + + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } + + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } + + +} catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); +} +// ANCHOR_END: consumer_seek + } + + + public static void commitExample() throws SQLException { +// ANCHOR: commit_code_piece +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } +} catch (SQLException ex){ + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); +} +// ANCHOR_END: commit_code_piece + } + public static void unsubscribeExample() throws SQLException { + TaosConsumer consumer = getConsumer(); + List topics = Collections.singletonList("topic_meters"); + consumer.subscribe(topics); +// ANCHOR: unsubscribe_data_code_piece + try { + consumer.unsubscribe(); + } catch (SQLException ex){ + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); + } finally { + consumer.close(); + } +// ANCHOR_END: unsubscribe_data_code_piece + } + + public static class ResultDeserializer extends ReferenceDeserializer { + + } + // use this class to define the data structure of the result record + public static class ResultBean { + private Timestamp ts; + private double current; + private int voltage; + private double phase; + private int groupid; + private String location; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public double getCurrent() { + return current; + } + + public void setCurrent(double current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public double getPhase() { + return phase; + } + + public void setPhase(double phase) { + this.phase = phase; + } + + public int getGroupid() { + return groupid; + } + + public void setGroupid(int groupid) { + this.groupid = groupid; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + } + + public static void prepareData() throws SQLException{ + StringBuilder insertQuery = new StringBuilder(); + insertQuery.append("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES "); + for (int i = 0; i < 10000; i++){ + insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); + } + try { + int affectedRows = statement.executeUpdate(insertQuery.toString()); + assert affectedRows == 10000; + } catch (SQLException ex) { + System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to insert data to power.meters", ex); + } + } + public static void prepareMeta() throws SQLException{ + try { + statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + statement.executeUpdate("USE power"); + statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + } catch (SQLException ex) { + System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create db and table", ex); + } + } + + public static void initConnection() throws SQLException { + String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + + try { + connection = DriverManager.getConnection(url, properties); + } catch (SQLException ex) { + System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create connection", ex); + } + try { + statement = connection.createStatement(); + } catch (SQLException ex) { + System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create statement", ex); + } + System.out.println("Connection created successfully."); + } + public static void closeConnection() throws SQLException { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException ex) { + System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to close statement", ex); + } + + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException ex) { + System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to close connection", ex); + } + System.out.println("Connection closed Successfully."); + } + + + public static void main(String[] args) throws SQLException { + initConnection(); + prepareMeta(); + + // create a single thread executor + ExecutorService executor = Executors.newSingleThreadExecutor(); + + // submit a task + executor.submit(() -> { + try { + // please use one example at a time + pollDataExample(); +// seekExample(); +// pollExample(); +// commitExample(); + unsubscribeExample(); + } catch (SQLException ex) { + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } + System.out.println("pollDataExample executed successfully"); + }); + + prepareData(); + closeConnection(); + + System.out.println("Data prepared successfully"); + + // 关闭线程池,不再接收新任务 + executor.shutdown(); + + try { + // 等待直到所有任务完成 + boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + assert result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e){ + e.printStackTrace(); + System.out.println("Wait executor termination failed."); + } + + System.out.println("program end."); + } +} +// ANCHOR_END: consumer_demo diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java index 469316efc7..fa2efab2ee 100644 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBasicDemo.java @@ -2,7 +2,10 @@ package com.taosdata.example; import com.taosdata.jdbc.TSDBPreparedStatement; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Random; @@ -24,6 +27,7 @@ public class ParameterBindingBasicDemo { String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"; try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { + for (int i = 1; i <= numOfSubTable; i++) { // set table name pstmt.setTableName("d_bind_" + i); @@ -32,19 +36,35 @@ public class ParameterBindingBasicDemo { pstmt.setTagInt(0, i); pstmt.setTagString(1, "location_" + i); - // set columns + // set column ts + ArrayList tsList = new ArrayList<>(); long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) { - pstmt.setTimestamp(1, new Timestamp(current + j)); - pstmt.setFloat(2, random.nextFloat() * 30); - pstmt.setInt(3, random.nextInt(300)); - pstmt.setFloat(4, random.nextFloat()); - pstmt.addBatch(); - } - int [] exeResult = pstmt.executeBatch(); - // you can check exeResult here - System.out.println("insert " + exeResult.length + " rows."); + for (int j = 0; j < numOfRow; j++) + tsList.add(current + j); + pstmt.setTimestamp(0, tsList); + + // set column current + ArrayList currentList = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + currentList.add(random.nextFloat() * 30); + pstmt.setFloat(1, currentList); + + // set column voltage + ArrayList voltageList = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + voltageList.add(random.nextInt(300)); + pstmt.setInt(2, voltageList); + + // set column phase + ArrayList phaseList = new ArrayList<>(); + for (int j = 0; j < numOfRow; j++) + phaseList.add(random.nextFloat()); + pstmt.setFloat(3, phaseList); + // add column + pstmt.columnDataAddBatch(); } + // execute column + pstmt.columnDataExecuteBatch(); } } catch (SQLException ex) { // handle any errors, please refer to the JDBC specifications for detailed exceptions info diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBatchDemo.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBatchDemo.java deleted file mode 100644 index 60d76eee4f..0000000000 --- a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ParameterBindingBatchDemo.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.taosdata.example; - -import com.taosdata.jdbc.TSDBPreparedStatement; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Random; - -// ANCHOR: para_bind -public class ParameterBindingBatchDemo { - - // modify host to your own - private static final String host = "127.0.0.1"; - private static final Random random = new Random(System.currentTimeMillis()); - private static final int numOfSubTable = 10, numOfRow = 10; - - public static void main(String[] args) throws SQLException { - - String jdbcUrl = "jdbc:TAOS://" + host + ":6030/"; - try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { - - init(conn); - - String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)"; - - try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) { - - for (int i = 1; i <= numOfSubTable; i++) { - // set table name - pstmt.setTableName("d_bind_" + i); - - // set tags - pstmt.setTagInt(0, i); - pstmt.setTagString(1, "location_" + i); - - // set column ts - ArrayList tsList = new ArrayList<>(); - long current = System.currentTimeMillis(); - for (int j = 0; j < numOfRow; j++) - tsList.add(current + j); - pstmt.setTimestamp(0, tsList); - - // set column current - ArrayList f1List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f1List.add(random.nextFloat() * 30); - pstmt.setFloat(1, f1List); - - // set column voltage - ArrayList f2List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f2List.add(random.nextInt(300)); - pstmt.setInt(2, f2List); - - // set column phase - ArrayList f3List = new ArrayList<>(); - for (int j = 0; j < numOfRow; j++) - f3List.add(random.nextFloat()); - pstmt.setFloat(3, f3List); - // add column - pstmt.columnDataAddBatch(); - } - // execute column - pstmt.columnDataExecuteBatch(); - } - } catch (SQLException ex) { - // handle any errors, please refer to the JDBC specifications for detailed exceptions info - System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); - } - } - - private static void init(Connection conn) throws SQLException { - try (Statement stmt = conn.createStatement()) { - stmt.execute("CREATE DATABASE IF NOT EXISTS power"); - stmt.execute("USE power"); - stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); - } - } -} -// ANCHOR_END: para_bind diff --git a/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java new file mode 100644 index 0000000000..d8084eb9f6 --- /dev/null +++ b/examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java @@ -0,0 +1,358 @@ +package com.taosdata.example; + +import com.alibaba.fastjson.JSON; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.tmq.*; + +import java.sql.*; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +// ANCHOR: consumer_demo +public class WsConsumerLoopFull { + static private Connection connection; + static private Statement statement; + public static TaosConsumer getConsumer() throws SQLException{ +// ANCHOR: create_consumer +Properties config = new Properties(); +config.setProperty("td.connect.type", "ws"); +config.setProperty("bootstrap.servers", "localhost:6041"); +config.setProperty("auto.offset.reset", "latest"); +config.setProperty("msg.with.table.name", "true"); +config.setProperty("enable.auto.commit", "true"); +config.setProperty("auto.commit.interval.ms", "1000"); +config.setProperty("group.id", "group1"); +config.setProperty("client.id", "client1"); +config.setProperty("td.connect.user", "root"); +config.setProperty("td.connect.pass", "taosdata"); +config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer"); +config.setProperty("value.deserializer.encoding", "UTF-8"); + +try { + return new TaosConsumer<>(config); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create consumer", ex); + } catch (Exception e) { + e.printStackTrace(); + throw new SQLException("Failed to create consumer", e); + } +// ANCHOR_END: create_consumer +} + + public static void pollDataExample() throws SQLException { + try (TaosConsumer consumer = getConsumer()){ + // subscribe to the topics + List topics = Collections.singletonList("topic_meters"); + + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + // unsubscribe the topics + consumer.unsubscribe(); + System.out.println("unsubscribed topics successfully"); + } catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data from topic_meters", ex); + } + } + + public static void pollExample() throws SQLException { +// ANCHOR: poll_data_code_piece +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + for (int i = 0; i < 50; i++) { + // poll data + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process the data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + } + +} catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to poll data", ex); +} +// ANCHOR_END: poll_data_code_piece + } + + public static void seekExample() throws SQLException { +// ANCHOR: consumer_seek +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + // subscribe to the topics + consumer.subscribe(topics); + System.out.println("subscribe topics successfully"); + ConsumerRecords records = ConsumerRecords.emptyRecord(); + // make sure we have got some data + while (records.isEmpty()){ + records = consumer.poll(Duration.ofMillis(100)); + } + + for (ConsumerRecord record : records) { + System.out.println("first data polled: " + JSON.toJSONString(record.value())); + Set assignment = consumer.assignment(); + // seek to the beginning of the all partitions + consumer.seekToBeginning(assignment); + System.out.println("assignment seek to beginning successfully"); + break; + } + + // poll data agagin + records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + // process the data here + System.out.println("second data polled: " + JSON.toJSONString(record.value())); + break; + } +} catch (SQLException ex) { + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("seek example failed", ex); +} +// ANCHOR_END: consumer_seek + } + + + public static void commitExample() throws SQLException { +// ANCHOR: commit_code_piece +try (TaosConsumer consumer = getConsumer()){ + List topics = Collections.singletonList("topic_meters"); + + consumer.subscribe(topics); + for (int i = 0; i < 50; i++) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + ResultBean bean = record.value(); + // process your data here + System.out.println("data: " + JSON.toJSONString(bean)); + } + if (!records.isEmpty()) { + // after processing the data, commit the offset manually + consumer.commitSync(); + } + } +} catch (SQLException ex){ + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to execute consumer functions", ex); +} +// ANCHOR_END: commit_code_piece + } + public static void unsubscribeExample() throws SQLException { + TaosConsumer consumer = getConsumer(); + List topics = Collections.singletonList("topic_meters"); + consumer.subscribe(topics); +// ANCHOR: unsubscribe_data_code_piece +try { + consumer.unsubscribe(); +} catch (SQLException ex){ + // handle any errors, please refer to the JDBC specifications for detailed exceptions info + System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to unsubscribe consumer", ex); +} finally { + consumer.close(); +} +// ANCHOR_END: unsubscribe_data_code_piece + } + +public static class ResultDeserializer extends ReferenceDeserializer { + +} +// use this class to define the data structure of the result record +public static class ResultBean { + private Timestamp ts; + private double current; + private int voltage; + private double phase; + private int groupid; + private String location; + + public Timestamp getTs() { + return ts; + } + + public void setTs(Timestamp ts) { + this.ts = ts; + } + + public double getCurrent() { + return current; + } + + public void setCurrent(double current) { + this.current = current; + } + + public int getVoltage() { + return voltage; + } + + public void setVoltage(int voltage) { + this.voltage = voltage; + } + + public double getPhase() { + return phase; + } + + public void setPhase(double phase) { + this.phase = phase; + } + + public int getGroupid() { + return groupid; + } + + public void setGroupid(int groupid) { + this.groupid = groupid; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } +} + + public static void prepareData() throws SQLException{ + StringBuilder insertQuery = new StringBuilder(); + insertQuery.append("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES "); + for (int i = 0; i < 10000; i++){ + insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) "); + } + try { + int affectedRows = statement.executeUpdate(insertQuery.toString()); + assert affectedRows == 10000; + } catch (SQLException ex) { + System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to insert data to power.meters", ex); + } + } + public static void prepareMeta() throws SQLException{ + try { + statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power"); + statement.executeUpdate("USE power"); + statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))"); + statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); + } catch (SQLException ex) { + System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create db and table", ex); + } + } + + public static void initConnection() throws SQLException { + String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata"; + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + + try { + connection = DriverManager.getConnection(url, properties); + } catch (SQLException ex) { + System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create connection", ex); + } + try { + statement = connection.createStatement(); + } catch (SQLException ex) { + System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to create statement", ex); + } + System.out.println("Connection created successfully."); + } + public static void closeConnection() throws SQLException { + try { + if (statement != null) { + statement.close(); + } + } catch (SQLException ex) { + System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to close statement", ex); + } + + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException ex) { + System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + throw new SQLException("Failed to close connection", ex); + } + System.out.println("Connection closed Successfully."); + } + + + public static void main(String[] args) throws SQLException { + initConnection(); + prepareMeta(); + + // create a single thread executor + ExecutorService executor = Executors.newSingleThreadExecutor(); + + // submit a task + executor.submit(() -> { + try { + // please use one example at a time + pollDataExample(); +// seekExample(); +// pollExample(); +// commitExample(); + unsubscribeExample(); + } catch (SQLException ex) { + System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage()); + } + System.out.println("pollDataExample executed successfully"); + }); + + prepareData(); + closeConnection(); + + System.out.println("Data prepared successfully"); + + // 关闭线程池,不再接收新任务 + executor.shutdown(); + + try { + // 等待直到所有任务完成 + boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + assert result; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e){ + e.printStackTrace(); + System.out.println("Wait executor termination failed."); + } + + System.out.println("program end."); + } +} +// ANCHOR_END: consumer_demo