diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index dc08f4e24d..881838f8f0 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -15,11 +15,11 @@ import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
-
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
-# 介绍
-## 主题
+## 数据订阅介绍
+
+### 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../taos-sql/tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode(相当于 kafka 里的 partition) 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中,WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。
@@ -30,11 +30,12 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
对于 `SELECT` 语句形式的 topic,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
-## 生产者
+### 生产者
写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。
-## 消费者
-### 消费者组
+### 消费者
+
+#### 消费者组
消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode)。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。
为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode;有 3 个消费者的话,2 个消费者各消费 1 个 vnode,1 个消费者消费 2 个 vnode;有 5 个消费者的话,4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:
@@ -44,7 +45,8 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。
一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。
-### 消费进度
+
+#### 消费进度
在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset),消费进度可以手动提交,也可以通过参数(auto.commit.interval.ms)设置为周期性自动提交。
首次消费数据时通过订阅参数(auto.offset.reset)来确定消费位置为最新数据(latest)还是最旧数据(earliest)。
@@ -59,16 +61,16 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。
-##说明
+### 说明
从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。
-# 语法说明
+## 数据订阅语法说明
具体的语法参见 [数据订阅](../../taos-sql/tmq)
-# 消费参数
+## 数据订阅相关参数
消费参数主要用于消费者创建时指定,基础配置项如下表所示:
@@ -86,229 +88,232 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
-# 主要数据结构和 API 接口
+## 数据订阅主要 API 接口
-不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
+不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节数据订阅部分,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
-
-
+
- ```c
- typedef struct tmq_t tmq_t;
- typedef struct tmq_conf_t tmq_conf_t;
- typedef struct tmq_list_t tmq_list_t;
+```c
+ typedef struct tmq_t tmq_t;
+ typedef struct tmq_conf_t tmq_conf_t;
+ typedef struct tmq_list_t tmq_list_t;
- typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
+ typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
- typedef enum tmq_conf_res_t {
- TMQ_CONF_UNKNOWN = -2,
- TMQ_CONF_INVALID = -1,
- TMQ_CONF_OK = 0,
- } tmq_conf_res_t;
+ typedef enum tmq_conf_res_t {
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+ } tmq_conf_res_t;
- typedef struct tmq_topic_assignment {
- int32_t vgId;
- int64_t currentOffset;
- int64_t begin;
- int64_t end;
- } tmq_topic_assignment;
+ typedef struct tmq_topic_assignment {
+ int32_t vgId;
+ int64_t currentOffset;
+ int64_t begin;
+ int64_t end;
+ } tmq_topic_assignment;
- DLL_EXPORT tmq_conf_t *tmq_conf_new();
- DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
- DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
- DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT tmq_conf_t *tmq_conf_new();
+ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
+ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
+ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
- DLL_EXPORT tmq_list_t *tmq_list_new();
- DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
- DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
- DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
- DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
+ DLL_EXPORT tmq_list_t *tmq_list_new();
+ DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
+ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
+ DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
+ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
- DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
- DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
- DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
- DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
- DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
- DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
- DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
- DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
- DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
- DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
- DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
- DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
- DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
- DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
- DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
+ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+ DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
+ DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
+ DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
+ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
+ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
+ DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
+ DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
+ DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
+ DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
+ DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
+ DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
+ DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
- DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
- DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
- DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
- DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
- DLL_EXPORT const char *tmq_err2str(int32_t code);
- ```
-
-
+ DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
+ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
+ DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
+ DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
+ DLL_EXPORT const char *tmq_err2str(int32_t code);
+```
- ```java
- void subscribe(Collection topics) throws SQLException;
+下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
- void unsubscribe() throws SQLException;
+
+
- Set subscription() throws SQLException;
+```java
+void subscribe(Collection topics) throws SQLException;
- ConsumerRecords poll(Duration timeout) throws SQLException;
+void unsubscribe() throws SQLException;
- Set assignment() throws SQLException;
- long position(TopicPartition partition) throws SQLException;
- Map position(String topic) throws SQLException;
- Map beginningOffsets(String topic) throws SQLException;
- Map endOffsets(String topic) throws SQLException;
- Map committed(Set partitions) throws SQLException;
+Set subscription() throws SQLException;
- void seek(TopicPartition partition, long offset) throws SQLException;
- void seekToBeginning(Collection partitions) throws SQLException;
- void seekToEnd(Collection partitions) throws SQLException;
+ConsumerRecords poll(Duration timeout) throws SQLException;
- void commitSync() throws SQLException;
- void commitSync(Map offsets) throws SQLException;
+Set assignment() throws SQLException;
+long position(TopicPartition partition) throws SQLException;
+Map position(String topic) throws SQLException;
+Map beginningOffsets(String topic) throws SQLException;
+Map endOffsets(String topic) throws SQLException;
+Map committed(Set partitions) throws SQLException;
- void close() throws SQLException;
- ```
+void seek(TopicPartition partition, long offset) throws SQLException;
+void seekToBeginning(Collection partitions) throws SQLException;
+void seekToEnd(Collection partitions) throws SQLException;
-
+void commitSync() throws SQLException;
+void commitSync(Map offsets) throws SQLException;
-
+void close() throws SQLException;
+```
- ```python
- class Consumer:
- def subscribe(self, topics):
+
+
+
+
+```python
+class Consumer:
+ def subscribe(self, topics):
pass
- def unsubscribe(self):
+ def unsubscribe(self):
pass
- def poll(self, timeout: float = 1.0):
+ def poll(self, timeout: float = 1.0):
pass
- def assignment(self):
+ def assignment(self):
pass
- def seek(self, partition):
+ def seek(self, partition):
pass
- def close(self):
+ def close(self):
pass
- def commit(self, message):
+ def commit(self, message):
pass
- ```
+```
-
+
-
+
- ```go
- func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
+```go
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- // 出于兼容目的保留 rebalanceCb 参数,当前未使用
- func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- // 出于兼容目的保留 rebalanceCb 参数,当前未使用
- func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- func (c *Consumer) Poll(timeoutMs int) tmq.Event
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
- // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
- func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
+// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- func (c *Consumer) Unsubscribe() error
+func (c *Consumer) Unsubscribe() error
- func (c *Consumer) Close() error
- ```
+func (c *Consumer) Close() error
+```
-
+
-
+
- ```rust
- impl TBuilder for TmqBuilder
- fn from_dsn(dsn: D) -> Result
- fn build(&self) -> Result
+```rust
+impl TBuilder for TmqBuilder
+ fn from_dsn(dsn: D) -> Result
+ fn build(&self) -> Result
- impl AsAsyncConsumer for Consumer
- async fn subscribe, I: IntoIterator- + Send>(
+impl AsAsyncConsumer for Consumer
+ async fn subscribe, I: IntoIterator
- + Send>(
&mut self,
topics: I,
- ) -> Result<(), Self::Error>;
- fn stream(
+ ) -> Result<(), Self::Error>;
+ fn stream(
&self,
- ) -> Pin<
- Box<
+ ) -> Pin<
+ Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet), Self::Error>,
>,
- >,
- >;
- async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
+ >,
+ >;
+ async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
- async fn unsubscribe(self);
- ```
+ async fn unsubscribe(self);
+```
- 可在 上查看详细 API 说明。
+可在 上查看详细 API 说明。
-
+
-
+
- ```js
- function TMQConsumer(config)
-
- function subscribe(topic)
-
- function consume(timeout)
-
- function subscription()
-
- function unsubscribe()
-
- function commit(msg)
-
- function close()
- ```
+```js
+function TMQConsumer(config)
-
+function subscribe(topic)
-
+function consume(timeout)
- ```csharp
- class ConsumerBuilder
-
- ConsumerBuilder(IEnumerable> config)
-
- public IConsumer Build()
-
- void Subscribe(IEnumerable topics)
-
- void Subscribe(string topic)
-
- ConsumeResult Consume(int millisecondsTimeout)
-
- List Subscription()
-
- void Unsubscribe()
-
- List Commit()
-
- void Close()
- ```
-
+function subscription()
+
+function unsubscribe()
+
+function commit(msg)
+
+function close()
+```
+
+
+
+
+
+```csharp
+class ConsumerBuilder
+
+ConsumerBuilder(IEnumerable> config)
+
+public IConsumer Build()
+
+void Subscribe(IEnumerable topics)
+
+void Subscribe(string topic)
+
+ConsumeResult Consume(int millisecondsTimeout)
+
+List Subscription()
+
+void Unsubscribe()
+
+List Commit()
+
+void Close()
+```
+
+
-# 数据订阅示例
-## 写入数据
+## 数据订阅示例
+### 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@@ -321,7 +326,7 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
-## 创建 topic
+### 创建 topic
使用 SQL 创建一个 topic:
@@ -329,516 +334,514 @@ INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
-## 创建消费者 *consumer*
+### 创建消费者 consumer
对于不同编程语言,其设置方式如下:
+
-
- ```c
- /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
- tmq_conf_t* conf = tmq_conf_new();
- tmq_conf_set(conf, "enable.auto.commit", "true");
- tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
- tmq_conf_set(conf, "group.id", "cgrpName");
- tmq_conf_set(conf, "td.connect.user", "root");
- tmq_conf_set(conf, "td.connect.pass", "taosdata");
- tmq_conf_set(conf, "auto.offset.reset", "latest");
- tmq_conf_set(conf, "msg.with.table.name", "true");
- tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+```c
+/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
+tmq_conf_t* conf = tmq_conf_new();
+tmq_conf_set(conf, "enable.auto.commit", "true");
+tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+tmq_conf_set(conf, "group.id", "cgrpName");
+tmq_conf_set(conf, "td.connect.user", "root");
+tmq_conf_set(conf, "td.connect.pass", "taosdata");
+tmq_conf_set(conf, "auto.offset.reset", "latest");
+tmq_conf_set(conf, "msg.with.table.name", "true");
+tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
- tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
- tmq_conf_destroy(conf);
- ```
-
+tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+tmq_conf_destroy(conf);
+```
-
+
+
- 对于 Java 程序,还可以使用如下配置项:
-
- | 参数名称 | 类型 | 参数说明 |
- | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
- | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
- | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
- | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
- | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
-
- 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
-
- ```java
- Properties properties = new Properties();
- properties.setProperty("enable.auto.commit", "true");
- properties.setProperty("auto.commit.interval.ms", "1000");
- properties.setProperty("group.id", "cgrpName");
- properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
- properties.setProperty("td.connect.user", "root");
- properties.setProperty("td.connect.pass", "taosdata");
- properties.setProperty("auto.offset.reset", "latest");
- properties.setProperty("msg.with.table.name", "true");
- properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-
- TaosConsumer consumer = new TaosConsumer<>(properties);
-
- /* value deserializer definition. */
- import com.taosdata.jdbc.tmq.ReferenceDeserializer;
-
- public class MetersDeserializer extends ReferenceDeserializer {
- }
- ```
-
+对于 Java 程序,还可以使用如下配置项:
-
-
- ```go
- conf := &tmq.ConfigMap{
- "group.id": "test",
- "auto.offset.reset": "latest",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "td.connect.port": "6030",
- "client.id": "test_tmq_c",
- "enable.auto.commit": "false",
- "msg.with.table.name": "true",
- }
- consumer, err := NewConsumer(conf)
- ```
-
-
+| 参数名称 | 类型 | 参数说明 |
+| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
+| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
+| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
+| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
+| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
-
-
- ```rust
- let mut dsn: Dsn = "taos://".parse()?;
- dsn.set("group.id", "group1");
- dsn.set("client.id", "test");
- dsn.set("auto.offset.reset", "latest");
-
- let tmq = TmqBuilder::from_dsn(dsn)?;
-
- let mut consumer = tmq.build()?;
- ```
-
-
+需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
-
-
- Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
-
- ```python
- from taos.tmq import Consumer
-
- # Syntax: `consumer = Consumer(configs)`
- #
- # Example:
- consumer = Consumer(
- {
- "group.id": "local",
- "client.id": "1",
- "enable.auto.commit": "true",
- "auto.commit.interval.ms": "1000",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "auto.offset.reset": "latest",
- "msg.with.table.name": "true",
- }
- )
- ```
-
-
+```java
+Properties properties = new Properties();
+properties.setProperty("enable.auto.commit", "true");
+properties.setProperty("auto.commit.interval.ms", "1000");
+properties.setProperty("group.id", "cgrpName");
+properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+properties.setProperty("td.connect.user", "root");
+properties.setProperty("td.connect.pass", "taosdata");
+properties.setProperty("auto.offset.reset", "latest");
+properties.setProperty("msg.with.table.name", "true");
+properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-
-
- ```js
- // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
-
- let consumer = taos.consumer({
- 'enable.auto.commit': 'true',
- 'auto.commit.interval.ms','1000',
- 'group.id': 'tg2',
- 'td.connect.user': 'root',
- 'td.connect.pass': 'taosdata',
- 'auto.offset.reset','latest',
- 'msg.with.table.name': 'true',
- 'td.connect.ip','127.0.0.1',
- 'td.connect.port','6030'
- });
- ```
-
-
+TaosConsumer consumer = new TaosConsumer<>(properties);
+
+/* value deserializer definition. */
+import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+public class MetersDeserializer extends ReferenceDeserializer {
+}
+```
+
+
+
+
+
+```go
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "latest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "msg.with.table.name": "true",
+}
+consumer, err := NewConsumer(conf)
+```
+
+
+
+
+
+```rust
+let mut dsn: Dsn = "taos://".parse()?;
+dsn.set("group.id", "group1");
+dsn.set("client.id", "test");
+dsn.set("auto.offset.reset", "latest");
+
+let tmq = TmqBuilder::from_dsn(dsn)?;
+
+let mut consumer = tmq.build()?;
+```
+
+
+
+
+
+Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
+
+```python
+from taos.tmq import Consumer
+
+# Syntax: `consumer = Consumer(configs)`
+#
+# Example:
+consumer = Consumer(
+ {
+ "group.id": "local",
+ "client.id": "1",
+ "enable.auto.commit": "true",
+ "auto.commit.interval.ms": "1000",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "auto.offset.reset": "latest",
+ "msg.with.table.name": "true",
+ }
+)
+```
+
+
+
+
+
+```js
+// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
+
+let consumer = taos.consumer({
+ 'enable.auto.commit': 'true',
+ 'auto.commit.interval.ms','1000',
+ 'group.id': 'tg2',
+ 'td.connect.user': 'root',
+ 'td.connect.pass': 'taosdata',
+ 'auto.offset.reset','latest',
+ 'msg.with.table.name': 'true',
+ 'td.connect.ip','127.0.0.1',
+ 'td.connect.port','6030'
+ });
+```
+
+
+
+
+
+```csharp
+var cfg = new Dictionary()
+{
+ { "group.id", "group1" },
+ { "auto.offset.reset", "latest" },
+ { "td.connect.ip", "127.0.0.1" },
+ { "td.connect.user", "root" },
+ { "td.connect.pass", "taosdata" },
+ { "td.connect.port", "6030" },
+ { "client.id", "tmq_example" },
+ { "enable.auto.commit", "true" },
+ { "msg.with.table.name", "false" },
+};
+var consumer = new ConsumerBuilder>(cfg).Build();
+```
+
+
-
-
- ```csharp
- var cfg = new Dictionary()
- {
- { "group.id", "group1" },
- { "auto.offset.reset", "latest" },
- { "td.connect.ip", "127.0.0.1" },
- { "td.connect.user", "root" },
- { "td.connect.pass", "taosdata" },
- { "td.connect.port", "6030" },
- { "client.id", "tmq_example" },
- { "enable.auto.commit", "true" },
- { "msg.with.table.name", "false" },
- };
- var consumer = new ConsumerBuilder>(cfg).Build();
- ```
-
-
-
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
-## 订阅 *topics*
+### 订阅 topics
一个 consumer 支持同时订阅多个 topic。
+
-
-
- ```c
- // 创建订阅 topics 列表
- tmq_list_t* topicList = tmq_list_new();
- tmq_list_append(topicList, "topicName");
- // 启动订阅
- tmq_subscribe(tmq, topicList);
- tmq_list_destroy(topicList);
-
- ```
-
-
-
-
- ```java
- List topics = new ArrayList<>();
- topics.add("tmq_topic");
- consumer.subscribe(topics);
- ```
-
-
-
+```c
+// 创建订阅 topics 列表
+tmq_list_t* topicList = tmq_list_new();
+tmq_list_append(topicList, "topicName");
+// 启动订阅
+tmq_subscribe(tmq, topicList);
+tmq_list_destroy(topicList);
+
+```
- ```go
- err = consumer.Subscribe("example_tmq_topic", nil)
- if err != nil {
- panic(err)
- }
- ```
+
+
-
-
+```java
+List topics = new ArrayList<>();
+topics.add("tmq_topic");
+consumer.subscribe(topics);
+```
- ```rust
- consumer.subscribe(["tmq_meters"]).await?;
- ```
+
+
-
+```go
+err = consumer.Subscribe("example_tmq_topic", nil)
+if err != nil {
+ panic(err)
+}
+```
-
+
+
- ```python
- consumer.subscribe(['topic1', 'topic2'])
- ```
+```rust
+consumer.subscribe(["tmq_meters"]).await?;
+```
-
+
-
+
- ```js
- // 创建订阅 topics 列表
- let topics = ['topic_test']
+```python
+consumer.subscribe(['topic1', 'topic2'])
+```
- // 启动订阅
- consumer.subscribe(topics);
- ```
+
-
+
-
+```js
+// 创建订阅 topics 列表
+let topics = ['topic_test']
- ```csharp
- // 创建订阅 topics 列表
- List topics = new List();
- topics.add("tmq_topic");
- // 启动订阅
- consumer.Subscribe(topics);
- ```
+// 启动订阅
+consumer.subscribe(topics);
+```
-
+
+
+
+
+```csharp
+// 创建订阅 topics 列表
+List topics = new List();
+topics.add("tmq_topic");
+// 启动订阅
+consumer.Subscribe(topics);
+```
+
+
-## 消费
+### 消费
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
+
-
+```c
+// 消费数据
+while (running) {
+ TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
+ msg_process(msg);
+}
+```
- ```c
- // 消费数据
- while (running) {
- TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
- msg_process(msg);
- }
- ```
+这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
- 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
+
+
-
-
-
- ```java
- while(running){
- ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
- for (Meters meter : meters) {
- processMsg(meter);
- }
- }
- ```
-
-
+```java
+while(running){
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ processMsg(meter);
+ }
+}
+```
-
+
- ```go
- for {
- ev := consumer.Poll(0)
- if ev != nil {
- switch e := ev.(type) {
- case *tmqcommon.DataMessage:
- fmt.Println(e.Value())
- case tmqcommon.Error:
- fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
- panic(e)
- }
- consumer.Commit()
- }
- }
- ```
+
-
+```go
+for {
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
+ }
+}
+```
-
+
- ```rust
- {
- let mut stream = consumer.stream();
+
- while let Some((offset, message)) = stream.try_next().await? {
- // get information from offset
+```rust
+{
+ let mut stream = consumer.stream();
- // the topic
- let topic = offset.topic();
- // the vgroup id, like partition id in kafka.
- let vgroup_id = offset.vgroup_id();
- println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+ while let Some((offset, message)) = stream.try_next().await? {
+ // get information from offset
- if let Some(data) = message.into_data() {
+ // the topic
+ let topic = offset.topic();
+ // the vgroup id, like partition id in kafka.
+ let vgroup_id = offset.vgroup_id();
+ println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+
+ if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
- // one block for one table, get table name if needed
- let name = block.table_name();
- let records: Vec = block.deserialize().try_collect()?;
- println!(
- "** table: {}, got {} records: {:#?}\n",
- name.unwrap(),
- records.len(),
- records
- );
+ // one block for one table, get table name if needed
+ let name = block.table_name();
+ let records: Vec = block.deserialize().try_collect()?;
+ println!(
+ "** table: {}, got {} records: {:#?}\n",
+ name.unwrap(),
+ records.len(),
+ records
+ );
+ }
}
- }
- consumer.commit(offset).await?;
- }
- }
- ```
+ consumer.commit(offset).await?;
+ }
+}
+```
-
-
+
+
- ```python
- while True:
- res = consumer.poll(100)
- if not res:
+```python
+while True:
+ res = consumer.poll(100)
+ if not res:
continue
- err = res.error()
- if err is not None:
+ err = res.error()
+ if err is not None:
raise err
- val = res.value()
+ val = res.value()
- for block in val:
+ for block in val:
print(block.fetchall())
- ```
+```
-
+
-
+
- ```js
- while(true){
- msg = consumer.consume(200);
- // process message(consumeResult)
- console.log(msg.topicPartition);
- console.log(msg.block);
- console.log(msg.fields)
- }
- ```
+```js
+while(true){
+ msg = consumer.consume(200);
+ // process message(consumeResult)
+ console.log(msg.topicPartition);
+ console.log(msg.block);
+ console.log(msg.fields)
+}
+```
-
+
-
+
- ```csharp
- // 消费数据
- while (true)
- {
- using (var result = consumer.Consume(500))
- {
- if (result == null) continue;
- ProcessMsg(result);
- consumer.Commit();
- }
- }
- ```
+```csharp
+// 消费数据
+while (true)
+{
+ using (var result = consumer.Consume(500))
+ {
+ if (result == null) continue;
+ ProcessMsg(result);
+ consumer.Commit();
+ }
+}
+```
-
+
-## 结束消费
+### 结束消费
消费结束后,应当取消订阅。
-
-
+
- ```c
- /* 取消订阅 */
- tmq_unsubscribe(tmq);
+```c
+/* 取消订阅 */
+tmq_unsubscribe(tmq);
- /* 关闭消费者对象 */
- tmq_consumer_close(tmq);
- ```
+/* 关闭消费者对象 */
+tmq_consumer_close(tmq);
+```
-
-
+
+
- ```java
- /* 取消订阅 */
- consumer.unsubscribe();
+```java
+/* 取消订阅 */
+consumer.unsubscribe();
- /* 关闭消费 */
- consumer.close();
- ```
+/* 关闭消费 */
+consumer.close();
+```
-
-
-
+
- ```go
- /* Unsubscribe */
- _ = consumer.Unsubscribe()
+
- /* Close consumer */
- _ = consumer.Close()
- ```
+```go
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
-
-
-
+/* Close consumer */
+_ = consumer.Close()
+```
- ```rust
- consumer.unsubscribe().await;
- ```
+
-
+
-
+```rust
+consumer.unsubscribe().await;
+```
- ```py
- # 取消订阅
- consumer.unsubscribe()
- # 关闭消费
- consumer.close()
- ```
+
-
-
+
- ```js
- consumer.unsubscribe();
- consumer.close();
- ```
+```py
+# 取消订阅
+consumer.unsubscribe()
+# 关闭消费
+consumer.close()
+```
-
+
+
-
+```js
+consumer.unsubscribe();
+consumer.close();
+```
- ```csharp
- // 取消订阅
- consumer.Unsubscribe();
+
- // 关闭消费
- consumer.Close();
- ```
+
-
+```csharp
+// 取消订阅
+consumer.Unsubscribe();
+
+// 关闭消费
+consumer.Close();
+```
+
+
-## 完整示例代码
+### 完整示例代码
以下是各语言的完整示例代码。
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-#订阅高级功能
-##数据回放
+## 数据订阅高级功能
+### 数据回放
- 订阅支持 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据
```sql
@@ -854,4 +857,4 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- 超级表和库订阅不支持回放
- enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
-- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差
\ No newline at end of file
+- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差
diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx
index 60810ec275..c0e9e6c937 100644
--- a/docs/zh/07-develop/_sub_java.mdx
+++ b/docs/zh/07-develop/_sub_java.mdx
@@ -6,3 +6,4 @@
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+```