Merge pull request #21465 from taosdata/enh/TD-24319
enh: tmq example add all properties
This commit is contained in:
commit
ec02b60595
|
@ -959,6 +959,7 @@ The preceding example uses the SQL statement `select ts, speed from speed_table`
|
|||
|
||||
```java
|
||||
Properties config = new Properties();
|
||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||
|
@ -966,12 +967,14 @@ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.Res
|
|||
TaosConsumer consumer = new TaosConsumer<>(config);
|
||||
```
|
||||
|
||||
- bootstrap.servers: `ip:port` where the TDengine server is located, or `ip:port` where the taosAdapter is located if WebSocket connection is used.
|
||||
- enable.auto.commit: Specifies whether to commit automatically.
|
||||
- group.id: consumer: Specifies the group that the consumer is in.
|
||||
- value.deserializer: To deserialize the results, you can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer` and specify the result set bean. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and perform custom deserialization based on the SQL result set.
|
||||
- td.connect.type: Specifies the type connect with TDengine, `jni` or `WebSocket`. default is `jni`
|
||||
- httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
|
||||
- messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
|
||||
- httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type.
|
||||
- For more information, see [Consumer Parameters](../../../develop/tmq).
|
||||
|
||||
#### Subscribe to consume data
|
||||
|
@ -1015,10 +1018,20 @@ public abstract class ConsumerLoop {
|
|||
|
||||
public ConsumerLoop() throws SQLException {
|
||||
Properties config = new Properties();
|
||||
config.setProperty("td.connect.type", "jni");
|
||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
@ -1090,12 +1103,19 @@ public abstract class ConsumerLoop {
|
|||
|
||||
public ConsumerLoop() throws SQLException {
|
||||
Properties config = new Properties();
|
||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.type", "ws");
|
||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group2");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
|
|
@ -53,20 +53,28 @@ public class SubscribeDemo {
|
|||
|
||||
// create consumer
|
||||
Properties properties = new Properties();
|
||||
properties.getProperty(TMQConstants.CONNECT_TYPE, "jni");
|
||||
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6030");
|
||||
properties.setProperty(TMQConstants.CONNECT_USER, "root");
|
||||
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
|
||||
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
|
||||
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
|
||||
properties.setProperty(TMQConstants.GROUP_ID, "test");
|
||||
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
|
||||
properties.setProperty(TMQConstants.GROUP_ID, "test1");
|
||||
properties.setProperty(TMQConstants.CLIENT_ID, "1");
|
||||
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||
"com.taos.example.MetersDeserializer");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
||||
|
||||
// poll data
|
||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||
while (!shutdown.get()) {
|
||||
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
|
||||
for (ConsumerRecord<Meters> recode : meters) {
|
||||
Meters meter = recode.value();
|
||||
for (ConsumerRecord<Meters> r : meters) {
|
||||
Meters meter = r.value();
|
||||
System.out.println(meter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.taos.example;
|
||||
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecord;
|
||||
import com.taosdata.jdbc.tmq.ConsumerRecords;
|
||||
import com.taosdata.jdbc.tmq.TMQConstants;
|
||||
import com.taosdata.jdbc.tmq.TaosConsumer;
|
||||
|
@ -54,18 +55,26 @@ public class WebsocketSubscribeDemo {
|
|||
Properties properties = new Properties();
|
||||
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
|
||||
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
|
||||
properties.setProperty(TMQConstants.CONNECT_USER, "root");
|
||||
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
|
||||
properties.setProperty(TMQConstants.AUTO_OFFSET_RESET, "earliest");
|
||||
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
|
||||
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
|
||||
properties.setProperty(TMQConstants.GROUP_ID, "test");
|
||||
properties.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
|
||||
properties.setProperty(TMQConstants.GROUP_ID, "test2");
|
||||
properties.setProperty(TMQConstants.CLIENT_ID, "1");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||
"com.taos.example.MetersDeserializer");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
||||
|
||||
// poll data
|
||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||
consumer.subscribe(Collections.singletonList(TOPIC));
|
||||
while (!shutdown.get()) {
|
||||
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
|
||||
for (Meters meter : meters) {
|
||||
for (ConsumerRecord<Meters> r : meters) {
|
||||
Meters meter = (Meters) r.value();
|
||||
System.out.println(meter);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -962,6 +962,7 @@ statement.executeUpdate("create topic if not exists topic_speed as select ts, sp
|
|||
|
||||
```java
|
||||
Properties config = new Properties();
|
||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||
|
@ -969,12 +970,14 @@ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.Res
|
|||
TaosConsumer consumer = new TaosConsumer<>(config);
|
||||
```
|
||||
|
||||
- bootstrap.servers: TDengine 服务端所在的`ip:port`,如果使用 WebSocket 连接,则为 taosAdapter 所在的`ip:port`。
|
||||
- enable.auto.commit: 是否允许自动提交。
|
||||
- group.id: consumer: 所在的 group。
|
||||
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
|
||||
- td.connect.type: 连接方式。jni:表示使用动态库连接的方式,ws/WebSocket:表示使用 WebSocket 进行数据通信。默认为 jni 方式。
|
||||
- httpConnectTimeout:创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
|
||||
- messageWaitTimeout:数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
|
||||
- httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
|
||||
- messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
|
||||
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
|
||||
其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
|
||||
|
||||
#### 订阅消费数据
|
||||
|
@ -1016,10 +1019,19 @@ public abstract class ConsumerLoop {
|
|||
|
||||
public ConsumerLoop() throws SQLException {
|
||||
Properties config = new Properties();
|
||||
config.setProperty("td.connect.type", "jni");
|
||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group1");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
@ -1093,12 +1105,19 @@ public abstract class ConsumerLoop {
|
|||
|
||||
public ConsumerLoop() throws SQLException {
|
||||
Properties config = new Properties();
|
||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.type", "ws");
|
||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
config.setProperty("group.id", "group2");
|
||||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
|
Loading…
Reference in New Issue