fix snapshot.enable and auto.offset.reset
This commit is contained in:
parent
8460ac9ef3
commit
5baf0ae351
|
@ -371,7 +371,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
@ -401,7 +401,7 @@ properties.setProperty("group.id", "cgrpName");
|
||||||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||||
properties.setProperty("td.connect.user", "root");
|
properties.setProperty("td.connect.user", "root");
|
||||||
properties.setProperty("td.connect.pass", "taosdata");
|
properties.setProperty("td.connect.pass", "taosdata");
|
||||||
properties.setProperty("auto.offset.reset", "earliest");
|
properties.setProperty("auto.offset.reset", "latest");
|
||||||
properties.setProperty("msg.with.table.name", "true");
|
properties.setProperty("msg.with.table.name", "true");
|
||||||
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
||||||
|
|
||||||
|
@ -441,7 +441,7 @@ consumer, err := NewConsumer(conf)
|
||||||
let mut dsn: Dsn = "taos://".parse()?;
|
let mut dsn: Dsn = "taos://".parse()?;
|
||||||
dsn.set("group.id", "group1");
|
dsn.set("group.id", "group1");
|
||||||
dsn.set("client.id", "test");
|
dsn.set("client.id", "test");
|
||||||
dsn.set("auto.offset.reset", "earliest");
|
dsn.set("auto.offset.reset", "latest");
|
||||||
|
|
||||||
let tmq = TmqBuilder::from_dsn(dsn)?;
|
let tmq = TmqBuilder::from_dsn(dsn)?;
|
||||||
|
|
||||||
|
@ -467,7 +467,7 @@ consumer = Consumer(
|
||||||
"td.connect.ip": "127.0.0.1",
|
"td.connect.ip": "127.0.0.1",
|
||||||
"td.connect.user": "root",
|
"td.connect.user": "root",
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": "taosdata",
|
||||||
"auto.offset.reset": "earliest",
|
"auto.offset.reset": "latest",
|
||||||
"msg.with.table.name": "true",
|
"msg.with.table.name": "true",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -487,7 +487,7 @@ let consumer = taos.consumer({
|
||||||
'group.id': 'tg2',
|
'group.id': 'tg2',
|
||||||
'td.connect.user': 'root',
|
'td.connect.user': 'root',
|
||||||
'td.connect.pass': 'taosdata',
|
'td.connect.pass': 'taosdata',
|
||||||
'auto.offset.reset','earliest',
|
'auto.offset.reset','latest',
|
||||||
'msg.with.table.name': 'true',
|
'msg.with.table.name': 'true',
|
||||||
'td.connect.ip','127.0.0.1',
|
'td.connect.ip','127.0.0.1',
|
||||||
'td.connect.port','6030'
|
'td.connect.port','6030'
|
||||||
|
|
|
@ -1093,7 +1093,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
|
||||||
- httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
|
- httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
|
||||||
- messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
|
- messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
|
||||||
- httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type.
|
- httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type.
|
||||||
- For more information, see [Consumer Parameters](../../../develop/tmq).
|
- For more information, see [Consumer Parameters](../../../develop/tmq). Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0.
|
||||||
|
|
||||||
#### Subscribe to consume data
|
#### Subscribe to consume data
|
||||||
|
|
||||||
|
@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||||
config.setProperty("td.connect.user", "root");
|
config.setProperty("td.connect.user", "root");
|
||||||
config.setProperty("td.connect.pass", "taosdata");
|
config.setProperty("td.connect.pass", "taosdata");
|
||||||
config.setProperty("auto.offset.reset", "earliest");
|
config.setProperty("auto.offset.reset", "latest");
|
||||||
config.setProperty("msg.with.table.name", "true");
|
config.setProperty("msg.with.table.name", "true");
|
||||||
config.setProperty("enable.auto.commit", "true");
|
config.setProperty("enable.auto.commit", "true");
|
||||||
config.setProperty("auto.commit.interval.ms", "1000");
|
config.setProperty("auto.commit.interval.ms", "1000");
|
||||||
|
@ -1276,7 +1276,7 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||||
config.setProperty("td.connect.user", "root");
|
config.setProperty("td.connect.user", "root");
|
||||||
config.setProperty("td.connect.pass", "taosdata");
|
config.setProperty("td.connect.pass", "taosdata");
|
||||||
config.setProperty("auto.offset.reset", "earliest");
|
config.setProperty("auto.offset.reset", "latest");
|
||||||
config.setProperty("msg.with.table.name", "true");
|
config.setProperty("msg.with.table.name", "true");
|
||||||
config.setProperty("enable.auto.commit", "true");
|
config.setProperty("enable.auto.commit", "true");
|
||||||
config.setProperty("auto.commit.interval.ms", "1000");
|
config.setProperty("auto.commit.interval.ms", "1000");
|
||||||
|
|
|
@ -66,7 +66,6 @@ public class SubscribeDemo {
|
||||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||||
"com.taos.example.MetersDeserializer");
|
"com.taos.example.MetersDeserializer");
|
||||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
|
||||||
|
|
||||||
// poll data
|
// poll data
|
||||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||||
|
|
|
@ -66,7 +66,6 @@ public class WebsocketSubscribeDemo {
|
||||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||||
"com.taos.example.MetersDeserializer");
|
"com.taos.example.MetersDeserializer");
|
||||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
|
||||||
|
|
||||||
// poll data
|
// poll data
|
||||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||||
|
|
|
@ -343,18 +343,18 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
|
||||||
|
|
||||||
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
|
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
|
||||||
|
|
||||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
| :-----------------------: | :-----: | ----------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `td.connect.ip` | string | 服务端的 IP 地址 | |
|
| `td.connect.ip` | string | 服务端的 IP 地址 | |
|
||||||
| `td.connect.user` | string | 用户名 | |
|
| `td.connect.user` | string | 用户名 | |
|
||||||
| `td.connect.pass` | string | 密码 | |
|
| `td.connect.pass` | string | 密码 | |
|
||||||
| `td.connect.port` | integer | 服务端的端口号 | |
|
| `td.connect.port` | integer | 服务端的端口号 | |
|
||||||
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | <br />**必填项**。最大长度:192。<br />每个topic最多可建立100个 consumer group |
|
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | <br />**必填项**。最大长度:192。<br />每个topic最多可建立100个 consumer group |
|
||||||
| `client.id` | string | 客户端 ID | 最大长度:192。 |
|
| `client.id` | string | 客户端 ID | 最大长度:192。 |
|
||||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
|
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
|
||||||
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
|
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
|
||||||
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
|
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
|
||||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 |
|
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) | 默认关闭 |
|
||||||
|
|
||||||
对于不同编程语言,其设置方式如下:
|
对于不同编程语言,其设置方式如下:
|
||||||
|
|
||||||
|
@ -370,7 +370,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
@ -385,7 +385,7 @@ tmq_conf_destroy(conf);
|
||||||
|
|
||||||
| 参数名称 | 类型 | 参数说明 |
|
| 参数名称 | 类型 | 参数说明 |
|
||||||
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
|
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
|
| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
|
||||||
| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
|
| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
|
||||||
| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
|
| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
|
||||||
| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
|
| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
|
||||||
|
@ -400,7 +400,7 @@ properties.setProperty("group.id", "cgrpName");
|
||||||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||||
properties.setProperty("td.connect.user", "root");
|
properties.setProperty("td.connect.user", "root");
|
||||||
properties.setProperty("td.connect.pass", "taosdata");
|
properties.setProperty("td.connect.pass", "taosdata");
|
||||||
properties.setProperty("auto.offset.reset", "earliest");
|
properties.setProperty("auto.offset.reset", "latest");
|
||||||
properties.setProperty("msg.with.table.name", "true");
|
properties.setProperty("msg.with.table.name", "true");
|
||||||
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
||||||
|
|
||||||
|
@ -440,7 +440,7 @@ consumer, err := NewConsumer(conf)
|
||||||
let mut dsn: Dsn = "taos://".parse()?;
|
let mut dsn: Dsn = "taos://".parse()?;
|
||||||
dsn.set("group.id", "group1");
|
dsn.set("group.id", "group1");
|
||||||
dsn.set("client.id", "test");
|
dsn.set("client.id", "test");
|
||||||
dsn.set("auto.offset.reset", "earliest");
|
dsn.set("auto.offset.reset", "latest");
|
||||||
|
|
||||||
let tmq = TmqBuilder::from_dsn(dsn)?;
|
let tmq = TmqBuilder::from_dsn(dsn)?;
|
||||||
|
|
||||||
|
@ -468,7 +468,7 @@ consumer = Consumer(
|
||||||
"td.connect.ip": "127.0.0.1",
|
"td.connect.ip": "127.0.0.1",
|
||||||
"td.connect.user": "root",
|
"td.connect.user": "root",
|
||||||
"td.connect.pass": "taosdata",
|
"td.connect.pass": "taosdata",
|
||||||
"auto.offset.reset": "earliest",
|
"auto.offset.reset": "latest",
|
||||||
"msg.with.table.name": "true",
|
"msg.with.table.name": "true",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -488,7 +488,7 @@ let consumer = taos.consumer({
|
||||||
'group.id': 'tg2',
|
'group.id': 'tg2',
|
||||||
'td.connect.user': 'root',
|
'td.connect.user': 'root',
|
||||||
'td.connect.pass': 'taosdata',
|
'td.connect.pass': 'taosdata',
|
||||||
'auto.offset.reset','earliest',
|
'auto.offset.reset','latest',
|
||||||
'msg.with.table.name': 'true',
|
'msg.with.table.name': 'true',
|
||||||
'td.connect.ip','127.0.0.1',
|
'td.connect.ip','127.0.0.1',
|
||||||
'td.connect.port','6030'
|
'td.connect.port','6030'
|
||||||
|
|
|
@ -1095,7 +1095,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
|
||||||
- httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
|
- httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
|
||||||
- messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
|
- messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
|
||||||
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
|
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
|
||||||
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group)
|
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group), 注意TDengine服务端自3.2.0.0版本开始消息订阅中的auto.offset.reset默认值发生变化。
|
||||||
|
|
||||||
#### 订阅消费数据
|
#### 订阅消费数据
|
||||||
|
|
||||||
|
@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("bootstrap.servers", "localhost:6030");
|
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||||
config.setProperty("td.connect.user", "root");
|
config.setProperty("td.connect.user", "root");
|
||||||
config.setProperty("td.connect.pass", "taosdata");
|
config.setProperty("td.connect.pass", "taosdata");
|
||||||
config.setProperty("auto.offset.reset", "earliest");
|
config.setProperty("auto.offset.reset", "latest");
|
||||||
config.setProperty("msg.with.table.name", "true");
|
config.setProperty("msg.with.table.name", "true");
|
||||||
config.setProperty("enable.auto.commit", "true");
|
config.setProperty("enable.auto.commit", "true");
|
||||||
config.setProperty("auto.commit.interval.ms", "1000");
|
config.setProperty("auto.commit.interval.ms", "1000");
|
||||||
|
@ -1201,7 +1201,6 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("client.id", "1");
|
config.setProperty("client.id", "1");
|
||||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||||
config.setProperty("experimental.snapshot.enable", "true");
|
|
||||||
|
|
||||||
this.consumer = new TaosConsumer<>(config);
|
this.consumer = new TaosConsumer<>(config);
|
||||||
this.topics = Collections.singletonList("topic_speed");
|
this.topics = Collections.singletonList("topic_speed");
|
||||||
|
@ -1279,7 +1278,7 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("bootstrap.servers", "localhost:6041");
|
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||||
config.setProperty("td.connect.user", "root");
|
config.setProperty("td.connect.user", "root");
|
||||||
config.setProperty("td.connect.pass", "taosdata");
|
config.setProperty("td.connect.pass", "taosdata");
|
||||||
config.setProperty("auto.offset.reset", "earliest");
|
config.setProperty("auto.offset.reset", "latest");
|
||||||
config.setProperty("msg.with.table.name", "true");
|
config.setProperty("msg.with.table.name", "true");
|
||||||
config.setProperty("enable.auto.commit", "true");
|
config.setProperty("enable.auto.commit", "true");
|
||||||
config.setProperty("auto.commit.interval.ms", "1000");
|
config.setProperty("auto.commit.interval.ms", "1000");
|
||||||
|
@ -1287,7 +1286,6 @@ public abstract class ConsumerLoop {
|
||||||
config.setProperty("client.id", "1");
|
config.setProperty("client.id", "1");
|
||||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||||
config.setProperty("experimental.snapshot.enable", "true");
|
|
||||||
|
|
||||||
this.consumer = new TaosConsumer<>(config);
|
this.consumer = new TaosConsumer<>(config);
|
||||||
this.topics = Collections.singletonList("topic_speed");
|
this.topics = Collections.singletonList("topic_speed");
|
||||||
|
|
Loading…
Reference in New Issue