diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index a852e71ff4..1476cd2170 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -91,220 +91,223 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer):
-
-
+
- ```c
- typedef struct tmq_t tmq_t;
- typedef struct tmq_conf_t tmq_conf_t;
- typedef struct tmq_list_t tmq_list_t;
+```c
+ typedef struct tmq_t tmq_t;
+ typedef struct tmq_conf_t tmq_conf_t;
+ typedef struct tmq_list_t tmq_list_t;
- typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
+ typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
- typedef enum tmq_conf_res_t {
- TMQ_CONF_UNKNOWN = -2,
- TMQ_CONF_INVALID = -1,
- TMQ_CONF_OK = 0,
- } tmq_conf_res_t;
+ typedef enum tmq_conf_res_t {
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+ } tmq_conf_res_t;
- typedef struct tmq_topic_assignment {
- int32_t vgId;
- int64_t currentOffset;
- int64_t begin;
- int64_t end;
- } tmq_topic_assignment;
+ typedef struct tmq_topic_assignment {
+ int32_t vgId;
+ int64_t currentOffset;
+ int64_t begin;
+ int64_t end;
+ } tmq_topic_assignment;
- DLL_EXPORT tmq_conf_t *tmq_conf_new();
- DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
- DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
- DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT tmq_conf_t *tmq_conf_new();
+ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
+ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
+ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
- DLL_EXPORT tmq_list_t *tmq_list_new();
- DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
- DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
- DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
- DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
+ DLL_EXPORT tmq_list_t *tmq_list_new();
+ DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
+ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
+ DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *);
+ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
- DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
- DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
- DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
- DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
- DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
- DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
- DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
- DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
- DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
- DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
- DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
- DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
- DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
- DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
- DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
+ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+ DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
+ DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
+ DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
+ DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
+ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
+ DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
+ DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
+ DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
+ DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
+ DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
+ DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
+ DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
+ DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
- DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
- DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
- DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
- DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
- DLL_EXPORT const char *tmq_err2str(int32_t code);
- ```
-
-
+ DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
+ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
+ DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
+ DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
+ DLL_EXPORT const char *tmq_err2str(int32_t code);
+```
- ```java
- void subscribe(Collection topics) throws SQLException;
+下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
- void unsubscribe() throws SQLException;
+
+
- Set subscription() throws SQLException;
+```java
+void subscribe(Collection topics) throws SQLException;
- ConsumerRecords poll(Duration timeout) throws SQLException;
+void unsubscribe() throws SQLException;
- Set assignment() throws SQLException;
- long position(TopicPartition partition) throws SQLException;
- Map position(String topic) throws SQLException;
- Map beginningOffsets(String topic) throws SQLException;
- Map endOffsets(String topic) throws SQLException;
- Map committed(Set partitions) throws SQLException;
+Set subscription() throws SQLException;
- void seek(TopicPartition partition, long offset) throws SQLException;
- void seekToBeginning(Collection partitions) throws SQLException;
- void seekToEnd(Collection partitions) throws SQLException;
+ConsumerRecords poll(Duration timeout) throws SQLException;
- void commitSync() throws SQLException;
- void commitSync(Map offsets) throws SQLException;
+Set assignment() throws SQLException;
+long position(TopicPartition partition) throws SQLException;
+Map position(String topic) throws SQLException;
+Map beginningOffsets(String topic) throws SQLException;
+Map endOffsets(String topic) throws SQLException;
+Map committed(Set partitions) throws SQLException;
- void close() throws SQLException;
- ```
+void seek(TopicPartition partition, long offset) throws SQLException;
+void seekToBeginning(Collection partitions) throws SQLException;
+void seekToEnd(Collection partitions) throws SQLException;
-
+void commitSync() throws SQLException;
+void commitSync(Map offsets) throws SQLException;
-
+void close() throws SQLException;
+```
- ```python
- class Consumer:
- def subscribe(self, topics):
+
+
+
+
+```python
+class Consumer:
+ def subscribe(self, topics):
pass
- def unsubscribe(self):
+ def unsubscribe(self):
pass
- def poll(self, timeout: float = 1.0):
+ def poll(self, timeout: float = 1.0):
pass
- def assignment(self):
+ def assignment(self):
pass
- def seek(self, partition):
+ def seek(self, partition):
pass
- def close(self):
+ def close(self):
pass
- def commit(self, message):
+ def commit(self, message):
pass
- ```
+```
-
+
-
+
- ```go
- func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
+```go
+func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
- // 出于兼容目的保留 rebalanceCb 参数,当前未使用
- func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- // 出于兼容目的保留 rebalanceCb 参数,当前未使用
- func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
+// 出于兼容目的保留 rebalanceCb 参数,当前未使用
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
- func (c *Consumer) Poll(timeoutMs int) tmq.Event
+func (c *Consumer) Poll(timeoutMs int) tmq.Event
- // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
- func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
+// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
+func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
- func (c *Consumer) Unsubscribe() error
+func (c *Consumer) Unsubscribe() error
- func (c *Consumer) Close() error
- ```
+func (c *Consumer) Close() error
+```
-
+
-
+
- ```rust
- impl TBuilder for TmqBuilder
- fn from_dsn(dsn: D) -> Result
- fn build(&self) -> Result
+```rust
+impl TBuilder for TmqBuilder
+ fn from_dsn(dsn: D) -> Result
+ fn build(&self) -> Result
- impl AsAsyncConsumer for Consumer
- async fn subscribe, I: IntoIterator- + Send>(
+impl AsAsyncConsumer for Consumer
+ async fn subscribe, I: IntoIterator
- + Send>(
&mut self,
topics: I,
- ) -> Result<(), Self::Error>;
- fn stream(
+ ) -> Result<(), Self::Error>;
+ fn stream(
&self,
- ) -> Pin<
- Box<
+ ) -> Pin<
+ Box<
dyn '_
+ Send
+ futures::Stream<
Item = Result<(Self::Offset, MessageSet), Self::Error>,
>,
- >,
- >;
- async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
+ >,
+ >;
+ async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
- async fn unsubscribe(self);
- ```
+ async fn unsubscribe(self);
+```
- 可在 上查看详细 API 说明。
+可在 上查看详细 API 说明。
-
+
-
+
- ```js
- function TMQConsumer(config)
-
- function subscribe(topic)
-
- function consume(timeout)
-
- function subscription()
-
- function unsubscribe()
-
- function commit(msg)
-
- function close()
- ```
+```js
+function TMQConsumer(config)
-
+function subscribe(topic)
-
+function consume(timeout)
- ```csharp
- class ConsumerBuilder
-
- ConsumerBuilder(IEnumerable> config)
-
- public IConsumer Build()
-
- void Subscribe(IEnumerable topics)
-
- void Subscribe(string topic)
-
- ConsumeResult Consume(int millisecondsTimeout)
-
- List Subscription()
-
- void Unsubscribe()
-
- List Commit()
-
- void Close()
- ```
-
+function subscription()
+
+function unsubscribe()
+
+function commit(msg)
+
+function close()
+```
+
+
+
+
+
+```csharp
+class ConsumerBuilder
+
+ConsumerBuilder(IEnumerable> config)
+
+public IConsumer Build()
+
+void Subscribe(IEnumerable topics)
+
+void Subscribe(string topic)
+
+ConsumeResult Consume(int millisecondsTimeout)
+
+List Subscription()
+
+void Unsubscribe()
+
+List Commit()
+
+void Close()
+```
+
+
# 数据订阅示例
@@ -334,163 +337,164 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
对于不同编程语言,其设置方式如下:
+
-
- ```c
- /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
- tmq_conf_t* conf = tmq_conf_new();
- tmq_conf_set(conf, "enable.auto.commit", "true");
- tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
- tmq_conf_set(conf, "group.id", "cgrpName");
- tmq_conf_set(conf, "td.connect.user", "root");
- tmq_conf_set(conf, "td.connect.pass", "taosdata");
- tmq_conf_set(conf, "auto.offset.reset", "latest");
- tmq_conf_set(conf, "msg.with.table.name", "true");
- tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+```c
+/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
+tmq_conf_t* conf = tmq_conf_new();
+tmq_conf_set(conf, "enable.auto.commit", "true");
+tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+tmq_conf_set(conf, "group.id", "cgrpName");
+tmq_conf_set(conf, "td.connect.user", "root");
+tmq_conf_set(conf, "td.connect.pass", "taosdata");
+tmq_conf_set(conf, "auto.offset.reset", "latest");
+tmq_conf_set(conf, "msg.with.table.name", "true");
+tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
- tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
- tmq_conf_destroy(conf);
- ```
-
+tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+tmq_conf_destroy(conf);
+```
-
+
+
- 对于 Java 程序,还可以使用如下配置项:
-
- | 参数名称 | 类型 | 参数说明 |
- | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
- | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
- | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
- | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
- | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
-
- 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
-
- ```java
- Properties properties = new Properties();
- properties.setProperty("enable.auto.commit", "true");
- properties.setProperty("auto.commit.interval.ms", "1000");
- properties.setProperty("group.id", "cgrpName");
- properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
- properties.setProperty("td.connect.user", "root");
- properties.setProperty("td.connect.pass", "taosdata");
- properties.setProperty("auto.offset.reset", "latest");
- properties.setProperty("msg.with.table.name", "true");
- properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-
- TaosConsumer consumer = new TaosConsumer<>(properties);
-
- /* value deserializer definition. */
- import com.taosdata.jdbc.tmq.ReferenceDeserializer;
-
- public class MetersDeserializer extends ReferenceDeserializer {
- }
- ```
-
+对于 Java 程序,还可以使用如下配置项:
-
-
- ```go
- conf := &tmq.ConfigMap{
- "group.id": "test",
- "auto.offset.reset": "latest",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "td.connect.port": "6030",
- "client.id": "test_tmq_c",
- "enable.auto.commit": "false",
- "msg.with.table.name": "true",
- }
- consumer, err := NewConsumer(conf)
- ```
-
-
+| 参数名称 | 类型 | 参数说明 |
+| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
+| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
+| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
+| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
+| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
-
-
- ```rust
- let mut dsn: Dsn = "taos://".parse()?;
- dsn.set("group.id", "group1");
- dsn.set("client.id", "test");
- dsn.set("auto.offset.reset", "latest");
-
- let tmq = TmqBuilder::from_dsn(dsn)?;
-
- let mut consumer = tmq.build()?;
- ```
-
-
+需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
-
-
- Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
-
- ```python
- from taos.tmq import Consumer
-
- # Syntax: `consumer = Consumer(configs)`
- #
- # Example:
- consumer = Consumer(
- {
- "group.id": "local",
- "client.id": "1",
- "enable.auto.commit": "true",
- "auto.commit.interval.ms": "1000",
- "td.connect.ip": "127.0.0.1",
- "td.connect.user": "root",
- "td.connect.pass": "taosdata",
- "auto.offset.reset": "latest",
- "msg.with.table.name": "true",
- }
- )
- ```
-
-
+```java
+Properties properties = new Properties();
+properties.setProperty("enable.auto.commit", "true");
+properties.setProperty("auto.commit.interval.ms", "1000");
+properties.setProperty("group.id", "cgrpName");
+properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+properties.setProperty("td.connect.user", "root");
+properties.setProperty("td.connect.pass", "taosdata");
+properties.setProperty("auto.offset.reset", "latest");
+properties.setProperty("msg.with.table.name", "true");
+properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
-
-
- ```js
- // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
-
- let consumer = taos.consumer({
- 'enable.auto.commit': 'true',
- 'auto.commit.interval.ms','1000',
- 'group.id': 'tg2',
- 'td.connect.user': 'root',
- 'td.connect.pass': 'taosdata',
- 'auto.offset.reset','latest',
- 'msg.with.table.name': 'true',
- 'td.connect.ip','127.0.0.1',
- 'td.connect.port','6030'
- });
- ```
-
-
+TaosConsumer consumer = new TaosConsumer<>(properties);
+
+/* value deserializer definition. */
+import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+public class MetersDeserializer extends ReferenceDeserializer {
+}
+```
+
+
+
+
+
+```go
+conf := &tmq.ConfigMap{
+ "group.id": "test",
+ "auto.offset.reset": "latest",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "td.connect.port": "6030",
+ "client.id": "test_tmq_c",
+ "enable.auto.commit": "false",
+ "msg.with.table.name": "true",
+}
+consumer, err := NewConsumer(conf)
+```
+
+
+
+
+
+```rust
+let mut dsn: Dsn = "taos://".parse()?;
+dsn.set("group.id", "group1");
+dsn.set("client.id", "test");
+dsn.set("auto.offset.reset", "latest");
+
+let tmq = TmqBuilder::from_dsn(dsn)?;
+
+let mut consumer = tmq.build()?;
+```
+
+
+
+
+
+Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
+
+```python
+from taos.tmq import Consumer
+
+# Syntax: `consumer = Consumer(configs)`
+#
+# Example:
+consumer = Consumer(
+ {
+ "group.id": "local",
+ "client.id": "1",
+ "enable.auto.commit": "true",
+ "auto.commit.interval.ms": "1000",
+ "td.connect.ip": "127.0.0.1",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "auto.offset.reset": "latest",
+ "msg.with.table.name": "true",
+ }
+)
+```
+
+
+
+
+
+```js
+// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
+
+let consumer = taos.consumer({
+ 'enable.auto.commit': 'true',
+ 'auto.commit.interval.ms','1000',
+ 'group.id': 'tg2',
+ 'td.connect.user': 'root',
+ 'td.connect.pass': 'taosdata',
+ 'auto.offset.reset','latest',
+ 'msg.with.table.name': 'true',
+ 'td.connect.ip','127.0.0.1',
+ 'td.connect.port','6030'
+ });
+```
+
+
+
+
+
+```csharp
+var cfg = new Dictionary()
+{
+ { "group.id", "group1" },
+ { "auto.offset.reset", "latest" },
+ { "td.connect.ip", "127.0.0.1" },
+ { "td.connect.user", "root" },
+ { "td.connect.pass", "taosdata" },
+ { "td.connect.port", "6030" },
+ { "client.id", "tmq_example" },
+ { "enable.auto.commit", "true" },
+ { "msg.with.table.name", "false" },
+};
+var consumer = new ConsumerBuilder>(cfg).Build();
+```
+
+
-
-
- ```csharp
- var cfg = new Dictionary()
- {
- { "group.id", "group1" },
- { "auto.offset.reset", "latest" },
- { "td.connect.ip", "127.0.0.1" },
- { "td.connect.user", "root" },
- { "td.connect.pass", "taosdata" },
- { "td.connect.port", "6030" },
- { "client.id", "tmq_example" },
- { "enable.auto.commit", "true" },
- { "msg.with.table.name", "false" },
- };
- var consumer = new ConsumerBuilder>(cfg).Build();
- ```
-
-
-
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
@@ -500,78 +504,77 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
一个 consumer 支持同时订阅多个 topic。
+
-
-
- ```c
- // 创建订阅 topics 列表
- tmq_list_t* topicList = tmq_list_new();
- tmq_list_append(topicList, "topicName");
- // 启动订阅
- tmq_subscribe(tmq, topicList);
- tmq_list_destroy(topicList);
-
- ```
-
-
-
-
- ```java
- List topics = new ArrayList<>();
- topics.add("tmq_topic");
- consumer.subscribe(topics);
- ```
-
-
-
+```c
+// 创建订阅 topics 列表
+tmq_list_t* topicList = tmq_list_new();
+tmq_list_append(topicList, "topicName");
+// 启动订阅
+tmq_subscribe(tmq, topicList);
+tmq_list_destroy(topicList);
+
+```
- ```go
- err = consumer.Subscribe("example_tmq_topic", nil)
- if err != nil {
- panic(err)
- }
- ```
+
+
-
-
+```java
+List topics = new ArrayList<>();
+topics.add("tmq_topic");
+consumer.subscribe(topics);
+```
- ```rust
- consumer.subscribe(["tmq_meters"]).await?;
- ```
+
+
-
+```go
+err = consumer.Subscribe("example_tmq_topic", nil)
+if err != nil {
+ panic(err)
+}
+```
-
+
+
- ```python
- consumer.subscribe(['topic1', 'topic2'])
- ```
+```rust
+consumer.subscribe(["tmq_meters"]).await?;
+```
-
+
-
+
- ```js
- // 创建订阅 topics 列表
- let topics = ['topic_test']
+```python
+consumer.subscribe(['topic1', 'topic2'])
+```
- // 启动订阅
- consumer.subscribe(topics);
- ```
+
-
+
-
+```js
+// 创建订阅 topics 列表
+let topics = ['topic_test']
- ```csharp
- // 创建订阅 topics 列表
- List topics = new List();
- topics.add("tmq_topic");
- // 启动订阅
- consumer.Subscribe(topics);
- ```
+// 启动订阅
+consumer.subscribe(topics);
+```
-
+
+
+
+
+```csharp
+// 创建订阅 topics 列表
+List topics = new List();
+topics.add("tmq_topic");
+// 启动订阅
+consumer.Subscribe(topics);
+```
+
+
@@ -580,135 +583,134 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
以下代码展示了不同语言下如何对 TMQ 消息进行消费。
+
-
+```c
+// 消费数据
+while (running) {
+ TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
+ msg_process(msg);
+}
+```
- ```c
- // 消费数据
- while (running) {
- TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
- msg_process(msg);
- }
- ```
+这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
- 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
+
+
-
-
-
- ```java
- while(running){
- ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
- for (Meters meter : meters) {
- processMsg(meter);
- }
- }
- ```
-
-
+```java
+while(running){
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ processMsg(meter);
+ }
+}
+```
-
+
- ```go
- for {
- ev := consumer.Poll(0)
- if ev != nil {
- switch e := ev.(type) {
- case *tmqcommon.DataMessage:
- fmt.Println(e.Value())
- case tmqcommon.Error:
- fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
- panic(e)
- }
- consumer.Commit()
- }
- }
- ```
+
-
+```go
+for {
+ ev := consumer.Poll(0)
+ if ev != nil {
+ switch e := ev.(type) {
+ case *tmqcommon.DataMessage:
+ fmt.Println(e.Value())
+ case tmqcommon.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+ panic(e)
+ }
+ consumer.Commit()
+ }
+}
+```
-
+
- ```rust
- {
- let mut stream = consumer.stream();
+
- while let Some((offset, message)) = stream.try_next().await? {
- // get information from offset
+```rust
+{
+ let mut stream = consumer.stream();
- // the topic
- let topic = offset.topic();
- // the vgroup id, like partition id in kafka.
- let vgroup_id = offset.vgroup_id();
- println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+ while let Some((offset, message)) = stream.try_next().await? {
+ // get information from offset
- if let Some(data) = message.into_data() {
+ // the topic
+ let topic = offset.topic();
+ // the vgroup id, like partition id in kafka.
+ let vgroup_id = offset.vgroup_id();
+ println!("* in vgroup id {vgroup_id} of topic {topic}\n");
+
+ if let Some(data) = message.into_data() {
while let Some(block) = data.fetch_raw_block().await? {
- // one block for one table, get table name if needed
- let name = block.table_name();
- let records: Vec = block.deserialize().try_collect()?;
- println!(
- "** table: {}, got {} records: {:#?}\n",
- name.unwrap(),
- records.len(),
- records
- );
+ // one block for one table, get table name if needed
+ let name = block.table_name();
+ let records: Vec = block.deserialize().try_collect()?;
+ println!(
+ "** table: {}, got {} records: {:#?}\n",
+ name.unwrap(),
+ records.len(),
+ records
+ );
+ }
}
- }
- consumer.commit(offset).await?;
- }
- }
- ```
+ consumer.commit(offset).await?;
+ }
+}
+```
-
-
+
+
- ```python
- while True:
- res = consumer.poll(100)
- if not res:
+```python
+while True:
+ res = consumer.poll(100)
+ if not res:
continue
- err = res.error()
- if err is not None:
+ err = res.error()
+ if err is not None:
raise err
- val = res.value()
+ val = res.value()
- for block in val:
+ for block in val:
print(block.fetchall())
- ```
+```
-
+
-
+
- ```js
- while(true){
- msg = consumer.consume(200);
- // process message(consumeResult)
- console.log(msg.topicPartition);
- console.log(msg.block);
- console.log(msg.fields)
- }
- ```
+```js
+while(true){
+ msg = consumer.consume(200);
+ // process message(consumeResult)
+ console.log(msg.topicPartition);
+ console.log(msg.block);
+ console.log(msg.fields)
+}
+```
-
+
-
+
- ```csharp
- // 消费数据
- while (true)
- {
- using (var result = consumer.Consume(500))
- {
- if (result == null) continue;
- ProcessMsg(result);
- consumer.Commit();
- }
- }
- ```
+```csharp
+// 消费数据
+while (true)
+{
+ using (var result = consumer.Consume(500))
+ {
+ if (result == null) continue;
+ ProcessMsg(result);
+ consumer.Commit();
+ }
+}
+```
-
+
@@ -717,80 +719,79 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
消费结束后,应当取消订阅。
-
-
+
- ```c
- /* 取消订阅 */
- tmq_unsubscribe(tmq);
+```c
+/* 取消订阅 */
+tmq_unsubscribe(tmq);
- /* 关闭消费者对象 */
- tmq_consumer_close(tmq);
- ```
+/* 关闭消费者对象 */
+tmq_consumer_close(tmq);
+```
-
-
+
+
- ```java
- /* 取消订阅 */
- consumer.unsubscribe();
+```java
+/* 取消订阅 */
+consumer.unsubscribe();
- /* 关闭消费 */
- consumer.close();
- ```
+/* 关闭消费 */
+consumer.close();
+```
-
-
-
+
- ```go
- /* Unsubscribe */
- _ = consumer.Unsubscribe()
+
- /* Close consumer */
- _ = consumer.Close()
- ```
+```go
+/* Unsubscribe */
+_ = consumer.Unsubscribe()
-
-
-
+/* Close consumer */
+_ = consumer.Close()
+```
- ```rust
- consumer.unsubscribe().await;
- ```
+
-
+
-
+```rust
+consumer.unsubscribe().await;
+```
- ```py
- # 取消订阅
- consumer.unsubscribe()
- # 关闭消费
- consumer.close()
- ```
+
-
-
+
- ```js
- consumer.unsubscribe();
- consumer.close();
- ```
+```py
+# 取消订阅
+consumer.unsubscribe()
+# 关闭消费
+consumer.close()
+```
-
+
+
-
+```js
+consumer.unsubscribe();
+consumer.close();
+```
- ```csharp
- // 取消订阅
- consumer.Unsubscribe();
+
- // 关闭消费
- consumer.Close();
- ```
+
-
+```csharp
+// 取消订阅
+consumer.Unsubscribe();
+
+// 关闭消费
+consumer.Close();
+```
+
+
@@ -800,41 +801,41 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
-
-
-
+
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
+
+
+
+
-
-
-
-
#订阅高级功能
@@ -854,4 +855,4 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
- 超级表和库订阅不支持回放
- enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭
-- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差
\ No newline at end of file
+- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差