mod tmq example

This commit is contained in:
sheyanjie-qq 2024-08-01 21:15:53 +08:00
parent f399771a2f
commit cbf666b529
8 changed files with 764 additions and 432 deletions

View File

@ -85,7 +85,7 @@ Java 连接器创建消费者的参数为 Properties 可以设置的参数列
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:create_consumer}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:create_consumer}}
```
</TabItem>
@ -135,7 +135,7 @@ Java 连接器创建消费者的参数为 Properties 可以设置的参数列
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:create_consumer}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:create_consumer}}
```
@ -180,7 +180,7 @@ Java 连接器创建消费者的参数为 Properties 可以设置的参数列
<TabItem value="java" label="Java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:poll_data_code_piece}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:poll_data_code_piece}}
```
- `subscribe` 方法的参数含义为:订阅的主题列表(即名称),支持同时订阅多个主题。
@ -273,33 +273,7 @@ Java 连接器创建消费者的参数为 Properties 可以设置的参数列
<TabItem value="java" label="Java">
```java
// 获取当前消费者分配的 TopicPartition 集合
Set<TopicPartition> assignment() throws SQLException;
// 获取指定分区的当前偏移量
long position(TopicPartition partition) throws SQLException;
// 获取指定主题的所有分区的当前偏移量
Map<TopicPartition, Long> position(String topic) throws SQLException;
// 获取指定主题的所有分区的起始偏移量
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
// 获取指定主题的所有分区的最新偏移量
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
// 获取指定分区集合中的已提交偏移量
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
// 设置指定分区的偏移量
void seek(TopicPartition partition, long offset) throws SQLException;
// 将指定分区集合的偏移量设置为最开始
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
// 将指定分区集合的偏移量设置为最新
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
```
示例代码:
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerOffsetSeek.java:consumer_seek}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_seek}}
```
</TabItem>
@ -344,7 +318,6 @@ void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
<Tabs groupId="lang">
<TabItem value="java" label="Java">
同 Websocket 代码样例。
</TabItem>
@ -389,22 +362,9 @@ void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
```java
// 同步提交当前消费者的偏移量
void commitSync() throws SQLException;
// 同步提交指定的偏移量
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
// 异步提交仅在 native 连接下有效
// 异步提交当前消费者的偏移量,需要提供回调以处理可能的提交结果
void commitAsync(OffsetCommitCallback<V> callback) throws SQLException;
// 异步提交指定的偏移量,需要提供回调以处理可能的提交结果
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback<V> callback) throws SQLException;
```
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:commit_code_piece}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:commit_code_piece}}
```
</TabItem>
@ -501,7 +461,7 @@ void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCal
<TabItem value="java" label="Java">
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoop.java:unsubscribe_data_code_piece}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:unsubscribe_data_code_piece}}
```
</TabItem>
@ -593,12 +553,14 @@ void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCal
### Websocket 连接
<Tabs defaultValue="java" groupId="lang">
<TabItem value="java" label="Java">
<details>
<summary>完整 Websocket 连接代码示例</summary>
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsWsConsumerLoop.java:consumer_demo}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/WsConsumerLoopFull.java:consumer_demo}}
```
**注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
其余代码请参考: [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC/JDBCDemo)
</details>
</TabItem>
<TabItem label="Python" value="python">
@ -641,12 +603,16 @@ void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCal
### 原生连接
<Tabs groupId="lang">
<TabItem value="java" label="Java">
<details>
<summary>完整原生连接代码示例</summary>
```java
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/AbsConsumerLoopFull.java:consumer_demo}}
{{#include examples/JDBC/JDBCDemo/src/main/java/com/taosdata/example/ConsumerLoopFull.java:consumer_demo}}
```
**注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
其余代码请参考: [JDBC example](https://github.com/taosdata/TDengine/tree/3.0/examples/JDBC/JDBCDemo)
</details>
</TabItem>

View File

@ -32,7 +32,7 @@ config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("client.id", "client1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {

View File

@ -1,142 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
// ANCHOR: consumer_demo
public abstract class AbsConsumerLoopFull {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public AbsConsumerLoopFull() throws SQLException {
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoop$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
this.topics = Collections.singletonList("topic_meters");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
// subscribe to the topics
consumer.subscribe(topics);
while (!shutdown.get()) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
process(bean);
}
}
// unsubscribe the topics
consumer.unsubscribe();
} finally {
// close the consumer
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
}
// ANCHOR_END: consumer_demo

View File

@ -1,144 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
import com.taosdata.jdbc.tmq.TaosConsumer;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
// ANCHOR: consumer_demo
public abstract class AbsWsConsumerLoop {
private final TaosConsumer<ResultBean> consumer;
private final List<String> topics;
private final AtomicBoolean shutdown;
private final CountDownLatch shutdownLatch;
public AbsWsConsumerLoop() throws SQLException {
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "client1");
config.setProperty("value.deserializer", "com.taosdata.example.AbsConsumerLoopWs$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
this.consumer = new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create ws consumer with " + config.getProperty("bootstrap.servers") + " ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
}
// ANCHOR_END: create_consumer
this.topics = Collections.singletonList("topic_meters");
this.shutdown = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
}
public abstract void process(ResultBean result);
public void pollData() throws SQLException {
try {
// Subscribe to the topic
consumer.subscribe(topics);
while (!shutdown.get()) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process data here
process(bean);
}
}
// unsubscribe the topics
consumer.unsubscribe();
} finally {
// close the consumer
consumer.close();
shutdownLatch.countDown();
}
}
public void shutdown() throws InterruptedException {
shutdown.set(true);
shutdownLatch.await();
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
}
// ANCHOR_END: consumer_demo

View File

@ -0,0 +1,357 @@
package com.taosdata.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// ANCHOR: consumer_demo
public class ConsumerLoopFull {
static private Connection connection;
static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException{
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "jni");
config.setProperty("bootstrap.servers", "localhost:6030");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taosdata.example.ConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
return new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create jni consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception e) {
e.printStackTrace();
throw new SQLException("Failed to create consumer", e);
}
// ANCHOR_END: create_consumer
}
public static void pollDataExample() throws SQLException {
try (TaosConsumer<ResultBean> consumer = getConsumer()){
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
// unsubscribe the topics
consumer.unsubscribe();
System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample() throws SQLException {
// ANCHOR: poll_data_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
}
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample() throws SQLException {
// ANCHOR: consumer_seek
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()){
records = consumer.poll(Duration.ofMillis(100));
}
for (ConsumerRecord<ResultBean> record : records) {
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
Set<TopicPartition> assignment = consumer.assignment();
// seek to the beginning of the all partitions
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
break;
}
// poll data agagin
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
// process the data here
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
break;
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
}
// ANCHOR_END: consumer_seek
}
public static void commitExample() throws SQLException {
// ANCHOR: commit_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
}
}
} catch (SQLException ex){
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
consumer.unsubscribe();
} catch (SQLException ex){
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
} finally {
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException{
StringBuilder insertQuery = new StringBuilder();
insertQuery.append("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES ");
for (int i = 0; i < 10000; i++){
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
}
try {
int affectedRows = statement.executeUpdate(insertQuery.toString());
assert affectedRows == 10000;
} catch (SQLException ex) {
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to insert data to power.meters", ex);
}
}
public static void prepareMeta() throws SQLException{
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (SQLException ex) {
System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create db and table", ex);
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
// please use one example at a time
pollDataExample();
// seekExample();
// pollExample();
// commitExample();
unsubscribeExample();
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
}
System.out.println("pollDataExample executed successfully");
});
prepareData();
closeConnection();
System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务
executor.shutdown();
try {
// 等待直到所有任务完成
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e){
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
System.out.println("program end.");
}
}
// ANCHOR_END: consumer_demo

View File

@ -2,7 +2,10 @@ package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import java.sql.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Random;
@ -24,6 +27,7 @@ public class ParameterBindingBasicDemo {
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
@ -32,19 +36,35 @@ public class ParameterBindingBasicDemo {
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set columns
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++) {
pstmt.setTimestamp(1, new Timestamp(current + j));
pstmt.setFloat(2, random.nextFloat() * 30);
pstmt.setInt(3, random.nextInt(300));
pstmt.setFloat(4, random.nextFloat());
pstmt.addBatch();
}
int [] exeResult = pstmt.executeBatch();
// you can check exeResult here
System.out.println("insert " + exeResult.length + " rows.");
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> currentList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
currentList.add(random.nextFloat() * 30);
pstmt.setFloat(1, currentList);
// set column voltage
ArrayList<Integer> voltageList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
voltageList.add(random.nextInt(300));
pstmt.setInt(2, voltageList);
// set column phase
ArrayList<Float> phaseList = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
phaseList.add(random.nextFloat());
pstmt.setFloat(3, phaseList);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info

View File

@ -1,83 +0,0 @@
package com.taosdata.example;
import com.taosdata.jdbc.TSDBPreparedStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Random;
// ANCHOR: para_bind
public class ParameterBindingBatchDemo {
// modify host to your own
private static final String host = "127.0.0.1";
private static final Random random = new Random(System.currentTimeMillis());
private static final int numOfSubTable = 10, numOfRow = 10;
public static void main(String[] args) throws SQLException {
String jdbcUrl = "jdbc:TAOS://" + host + ":6030/";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
init(conn);
String sql = "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)";
try (TSDBPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSDBPreparedStatement.class)) {
for (int i = 1; i <= numOfSubTable; i++) {
// set table name
pstmt.setTableName("d_bind_" + i);
// set tags
pstmt.setTagInt(0, i);
pstmt.setTagString(1, "location_" + i);
// set column ts
ArrayList<Long> tsList = new ArrayList<>();
long current = System.currentTimeMillis();
for (int j = 0; j < numOfRow; j++)
tsList.add(current + j);
pstmt.setTimestamp(0, tsList);
// set column current
ArrayList<Float> f1List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f1List.add(random.nextFloat() * 30);
pstmt.setFloat(1, f1List);
// set column voltage
ArrayList<Integer> f2List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f2List.add(random.nextInt(300));
pstmt.setInt(2, f2List);
// set column phase
ArrayList<Float> f3List = new ArrayList<>();
for (int j = 0; j < numOfRow; j++)
f3List.add(random.nextFloat());
pstmt.setFloat(3, f3List);
// add column
pstmt.columnDataAddBatch();
}
// execute column
pstmt.columnDataExecuteBatch();
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to insert to table meters using stmt, url: " + jdbcUrl + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
}
}
private static void init(Connection conn) throws SQLException {
try (Statement stmt = conn.createStatement()) {
stmt.execute("CREATE DATABASE IF NOT EXISTS power");
stmt.execute("USE power");
stmt.execute("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
}
}
}
// ANCHOR_END: para_bind

View File

@ -0,0 +1,358 @@
package com.taosdata.example;
import com.alibaba.fastjson.JSON;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.*;
import java.sql.*;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
// ANCHOR: consumer_demo
public class WsConsumerLoopFull {
static private Connection connection;
static private Statement statement;
public static TaosConsumer<ResultBean> getConsumer() throws SQLException{
// ANCHOR: create_consumer
Properties config = new Properties();
config.setProperty("td.connect.type", "ws");
config.setProperty("bootstrap.servers", "localhost:6041");
config.setProperty("auto.offset.reset", "latest");
config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true");
config.setProperty("auto.commit.interval.ms", "1000");
config.setProperty("group.id", "group1");
config.setProperty("client.id", "client1");
config.setProperty("td.connect.user", "root");
config.setProperty("td.connect.pass", "taosdata");
config.setProperty("value.deserializer", "com.taosdata.example.WsConsumerLoopFull$ResultDeserializer");
config.setProperty("value.deserializer.encoding", "UTF-8");
try {
return new TaosConsumer<>(config);
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to create websocket consumer, host : " + config.getProperty("bootstrap.servers") + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create consumer", ex);
} catch (Exception e) {
e.printStackTrace();
throw new SQLException("Failed to create consumer", e);
}
// ANCHOR_END: create_consumer
}
public static void pollDataExample() throws SQLException {
try (TaosConsumer<ResultBean> consumer = getConsumer()){
// subscribe to the topics
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
// unsubscribe the topics
consumer.unsubscribe();
System.out.println("unsubscribed topics successfully");
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data from topic_meters", ex);
}
}
public static void pollExample() throws SQLException {
// ANCHOR: poll_data_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
for (int i = 0; i < 50; i++) {
// poll data
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process the data here
System.out.println("data: " + JSON.toJSONString(bean));
}
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to poll data; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to poll data", ex);
}
// ANCHOR_END: poll_data_code_piece
}
public static void seekExample() throws SQLException {
// ANCHOR: consumer_seek
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
// subscribe to the topics
consumer.subscribe(topics);
System.out.println("subscribe topics successfully");
ConsumerRecords<ResultBean> records = ConsumerRecords.emptyRecord();
// make sure we have got some data
while (records.isEmpty()){
records = consumer.poll(Duration.ofMillis(100));
}
for (ConsumerRecord<ResultBean> record : records) {
System.out.println("first data polled: " + JSON.toJSONString(record.value()));
Set<TopicPartition> assignment = consumer.assignment();
// seek to the beginning of the all partitions
consumer.seekToBeginning(assignment);
System.out.println("assignment seek to beginning successfully");
break;
}
// poll data agagin
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
// process the data here
System.out.println("second data polled: " + JSON.toJSONString(record.value()));
break;
}
} catch (SQLException ex) {
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("seek example failed; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("seek example failed", ex);
}
// ANCHOR_END: consumer_seek
}
public static void commitExample() throws SQLException {
// ANCHOR: commit_code_piece
try (TaosConsumer<ResultBean> consumer = getConsumer()){
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
for (int i = 0; i < 50; i++) {
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<ResultBean> record : records) {
ResultBean bean = record.value();
// process your data here
System.out.println("data: " + JSON.toJSONString(bean));
}
if (!records.isEmpty()) {
// after processing the data, commit the offset manually
consumer.commitSync();
}
}
} catch (SQLException ex){
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to execute consumer functions. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to execute consumer functions", ex);
}
// ANCHOR_END: commit_code_piece
}
public static void unsubscribeExample() throws SQLException {
TaosConsumer<ResultBean> consumer = getConsumer();
List<String> topics = Collections.singletonList("topic_meters");
consumer.subscribe(topics);
// ANCHOR: unsubscribe_data_code_piece
try {
consumer.unsubscribe();
} catch (SQLException ex){
// handle any errors, please refer to the JDBC specifications for detailed exceptions info
System.out.println("Failed to unsubscribe consumer. ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to unsubscribe consumer", ex);
} finally {
consumer.close();
}
// ANCHOR_END: unsubscribe_data_code_piece
}
public static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
}
// use this class to define the data structure of the result record
public static class ResultBean {
private Timestamp ts;
private double current;
private int voltage;
private double phase;
private int groupid;
private String location;
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public int getVoltage() {
return voltage;
}
public void setVoltage(int voltage) {
this.voltage = voltage;
}
public double getPhase() {
return phase;
}
public void setPhase(double phase) {
this.phase = phase;
}
public int getGroupid() {
return groupid;
}
public void setGroupid(int groupid) {
this.groupid = groupid;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
public static void prepareData() throws SQLException{
StringBuilder insertQuery = new StringBuilder();
insertQuery.append("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES ");
for (int i = 0; i < 10000; i++){
insertQuery.append("(NOW + ").append(i).append("a, 10.30000, 219, 0.31000) ");
}
try {
int affectedRows = statement.executeUpdate(insertQuery.toString());
assert affectedRows == 10000;
} catch (SQLException ex) {
System.out.println("Failed to insert data to power.meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to insert data to power.meters", ex);
}
}
public static void prepareMeta() throws SQLException{
try {
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS power");
statement.executeUpdate("USE power");
statement.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))");
statement.executeUpdate("CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
} catch (SQLException ex) {
System.out.println("Failed to create db and table, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create db and table", ex);
}
}
public static void initConnection() throws SQLException {
String url = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "C");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
try {
connection = DriverManager.getConnection(url, properties);
} catch (SQLException ex) {
System.out.println("Failed to create connection, url:" + url + "; ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create connection", ex);
}
try {
statement = connection.createStatement();
} catch (SQLException ex) {
System.out.println("Failed to create statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to create statement", ex);
}
System.out.println("Connection created successfully.");
}
public static void closeConnection() throws SQLException {
try {
if (statement != null) {
statement.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close statement, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close statement", ex);
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
System.out.println("Failed to close connection, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
throw new SQLException("Failed to close connection", ex);
}
System.out.println("Connection closed Successfully.");
}
public static void main(String[] args) throws SQLException {
initConnection();
prepareMeta();
// create a single thread executor
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit a task
executor.submit(() -> {
try {
// please use one example at a time
pollDataExample();
// seekExample();
// pollExample();
// commitExample();
unsubscribeExample();
} catch (SQLException ex) {
System.out.println("Failed to poll data from topic_meters, ErrCode:" + ex.getErrorCode() + "; ErrMessage: " + ex.getMessage());
}
System.out.println("pollDataExample executed successfully");
});
prepareData();
closeConnection();
System.out.println("Data prepared successfully");
// 关闭线程池不再接收新任务
executor.shutdown();
try {
// 等待直到所有任务完成
boolean result = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
assert result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e){
e.printStackTrace();
System.out.println("Wait executor termination failed.");
}
System.out.println("program end.");
}
}
// ANCHOR_END: consumer_demo