diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index c9ac178081..c487835e2d 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -132,6 +132,58 @@ func (c *Consumer) Unsubscribe() error + + +```rust +impl TBuilder for TmqBuilder + fn from_dsn(dsn: D) -> Result + fn build(&self) -> Result + +impl AsAsyncConsumer for Consumer + async fn subscribe, I: IntoIterator + Send>( + &mut self, + topics: I, + ) -> Result<(), Self::Error>; + fn stream( + &self, + ) -> Pin< + Box< + dyn '_ + + Send + + futures::Stream< + Item = Result<(Self::Offset, MessageSet), Self::Error>, + >, + >, + >; + async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; + + async fn unsubscribe(self); +``` + +可在 上查看详细 API 说明。 + + + + + +```js +function TMQConsumer(config) + +function subscribe(topic) + +function consume(timeout) + +function subscription() + +function unsubscribe() + +function commit(msg) + +function close() +``` + + + ```csharp @@ -157,27 +209,6 @@ void Close() ``` - - - -```node -function TMQConsumer(config) - -function subscribe(topic) - -function consume(timeout) - -function subscription() - -function unsubscribe() - -function commit(msg) - -function close() -``` - - - ## 写入数据 @@ -321,28 +352,6 @@ public class MetersDeserializer extends ReferenceDeserializer { - - -Python 使用以下配置项创建一个 Consumer 实例。 - -| 参数名称 | 类型 | 参数说明 | 备注 | -| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | -| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | -| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | -| `client_id` | string | 客户端 ID | 最大长度:192。 | -| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | -| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | -| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | -| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | -| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | -| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | -| `timeout` | int | 消费者拉去的超时时间 | | - - - ```go @@ -394,6 +403,64 @@ if err != nil { + + +```rust +let mut dsn: Dsn = "taos://".parse()?; +dsn.set("group.id", "group1"); +dsn.set("client.id", "test"); +dsn.set("auto.offset.reset", "earliest"); + +let tmq = TmqBuilder::from_dsn(dsn)?; + +let mut consumer = tmq.build()?; +``` + + + + + +Python 使用以下配置项创建一个 Consumer 实例。 + +| 参数名称 | 类型 | 参数说明 | 备注 | +| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | +| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | +| `client_id` | string | 客户端 ID | 最大长度:192。 | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | +| `timeout` | int | 消费者拉去的超时时间 | | + + + + + +```js +// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 +// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 + +let consumer = taos.consumer({ + 'enable.auto.commit': 'true', + 'auto.commit.interval.ms','1000', + 'group.id': 'tg2', + 'td.connect.user': 'root', + 'td.connect.pass': 'taosdata', + 'auto.offset.reset','earliest', + 'msg.with.table.name': 'true', + 'td.connect.ip','127.0.0.1', + 'td.connect.port','6030' + }); +``` + + + ```csharp @@ -420,28 +487,6 @@ var consumer = new ConsumerBuilder(cfg).Build(); - - -``` node -// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 -// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 - -let consumer = taos.consumer({ - 'enable.auto.commit': 'true', - 'auto.commit.interval.ms','1000', - 'group.id': 'tg2', - 'td.connect.user': 'root', - 'td.connect.pass': 'taosdata', - 'auto.offset.reset','earliest', - 'msg.with.table.name': 'true', - 'td.connect.ip','127.0.0.1', - 'td.connect.port','6030' - }); - -``` - - - 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -486,6 +531,33 @@ if err != nil { } ``` + + + +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` + + + + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + + + +```js +// 创建订阅 topics 列表 +let topics = ['topic_test'] + +// 启动订阅 +consumer.subscribe(topics); +``` + @@ -500,24 +572,6 @@ consumer.Subscribe(topics); - -```python -consumer = TaosConsumer('topic_ctb_column', group_id='vg2') -``` - - - - -```node -// 创建订阅 topics 列表 -let topics = ['topic_test'] - -// 启动订阅 -consumer.subscribe(topics); -``` - - - ## 消费 @@ -551,14 +605,6 @@ while(running){ - -```python -for msg in consumer: - for row in msg: - print(row) -``` - - ```go @@ -575,6 +621,64 @@ for { + + +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } +} +``` + + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + + + +```js +while(true){ + msg = consumer.consume(200); + // process message(consumeResult) + console.log(msg.topicPartition); + console.log(msg.block); + console.log(msg.fields) +} +``` + + + ```csharp @@ -590,20 +694,6 @@ while (true) - - -```node -while(true){ - msg = consumer.consume(200); - // process message(consumeResult) - console.log(msg.topicPartition); - console.log(msg.block); - console.log(msg.fields) - } -``` - - - ## 结束消费 @@ -634,16 +724,6 @@ consumer.close(); - - -```python -/* 取消订阅 */ -consumer.unsubscribe(); - -/* 关闭消费 */ -consumer.close(); - - @@ -652,6 +732,34 @@ consumer.Close() ``` + + + +```rust +consumer.unsubscribe().await; +``` + + + + + +```py +# 取消订阅 +consumer.unsubscribe() +# 关闭消费 +consumer.close() +``` + + + + +```js +consumer.unsubscribe(); +consumer.close(); +``` + + + ```csharp @@ -663,15 +771,6 @@ consumer.Close(); ``` - - -```node -consumer.unsubscribe(); -consumer.close(); -``` - - - ## 删除 *topic*