docs: add rust examples for subscription document
Closes: [TD-18417](https://jira.taosdata.com:18080/browse/TD-18417)
This commit is contained in:
parent
3872ef4c1c
commit
a888d735bd
|
@ -132,6 +132,58 @@ func (c *Consumer) Unsubscribe() error
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Rust" value="Rust">
|
||||
|
||||
```rust
|
||||
impl TBuilder for TmqBuilder
|
||||
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
|
||||
fn build(&self) -> Result<Self::Target, Self::Error>
|
||||
|
||||
impl AsAsyncConsumer for Consumer
|
||||
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
|
||||
&mut self,
|
||||
topics: I,
|
||||
) -> Result<(), Self::Error>;
|
||||
fn stream(
|
||||
&self,
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn '_
|
||||
+ Send
|
||||
+ futures::Stream<
|
||||
Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
|
||||
>,
|
||||
>,
|
||||
>;
|
||||
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
|
||||
|
||||
async fn unsubscribe(self);
|
||||
```
|
||||
|
||||
可在 <https://docs.rs/taos> 上查看详细 API 说明。
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```js
|
||||
function TMQConsumer(config)
|
||||
|
||||
function subscribe(topic)
|
||||
|
||||
function consume(timeout)
|
||||
|
||||
function subscription()
|
||||
|
||||
function unsubscribe()
|
||||
|
||||
function commit(msg)
|
||||
|
||||
function close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```csharp
|
||||
|
@ -157,27 +209,6 @@ void Close()
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```node
|
||||
function TMQConsumer(config)
|
||||
|
||||
function subscribe(topic)
|
||||
|
||||
function consume(timeout)
|
||||
|
||||
function subscription()
|
||||
|
||||
function unsubscribe()
|
||||
|
||||
function commit(msg)
|
||||
|
||||
function close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 写入数据
|
||||
|
@ -321,28 +352,6 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client_id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
|
||||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
|
@ -394,6 +403,64 @@ if err != nil {
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Rust" value="Rust">
|
||||
|
||||
```rust
|
||||
let mut dsn: Dsn = "taos://".parse()?;
|
||||
dsn.set("group.id", "group1");
|
||||
dsn.set("client.id", "test");
|
||||
dsn.set("auto.offset.reset", "earliest");
|
||||
|
||||
let tmq = TmqBuilder::from_dsn(dsn)?;
|
||||
|
||||
let mut consumer = tmq.build()?;
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client_id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
|
||||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```js
|
||||
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
|
||||
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
|
||||
|
||||
let consumer = taos.consumer({
|
||||
'enable.auto.commit': 'true',
|
||||
'auto.commit.interval.ms','1000',
|
||||
'group.id': 'tg2',
|
||||
'td.connect.user': 'root',
|
||||
'td.connect.pass': 'taosdata',
|
||||
'auto.offset.reset','earliest',
|
||||
'msg.with.table.name': 'true',
|
||||
'td.connect.ip','127.0.0.1',
|
||||
'td.connect.port','6030'
|
||||
});
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```csharp
|
||||
|
@ -420,28 +487,6 @@ var consumer = new ConsumerBuilder(cfg).Build();
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
``` node
|
||||
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
|
||||
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
|
||||
|
||||
let consumer = taos.consumer({
|
||||
'enable.auto.commit': 'true',
|
||||
'auto.commit.interval.ms','1000',
|
||||
'group.id': 'tg2',
|
||||
'td.connect.user': 'root',
|
||||
'td.connect.pass': 'taosdata',
|
||||
'auto.offset.reset','earliest',
|
||||
'msg.with.table.name': 'true',
|
||||
'td.connect.ip','127.0.0.1',
|
||||
'td.connect.port','6030'
|
||||
});
|
||||
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
|
||||
|
@ -486,6 +531,33 @@ if err != nil {
|
|||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Rust" label="Rust">
|
||||
|
||||
```rust
|
||||
consumer.subscribe(["tmq_meters"]).await?;
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```python
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```js
|
||||
// 创建订阅 topics 列表
|
||||
let topics = ['topic_test']
|
||||
|
||||
// 启动订阅
|
||||
consumer.subscribe(topics);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
@ -500,24 +572,6 @@ consumer.Subscribe(topics);
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```node
|
||||
// 创建订阅 topics 列表
|
||||
let topics = ['topic_test']
|
||||
|
||||
// 启动订阅
|
||||
consumer.subscribe(topics);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 消费
|
||||
|
@ -551,14 +605,6 @@ while(running){
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
|
@ -575,6 +621,64 @@ for {
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Rust" label="Rust">
|
||||
|
||||
```rust
|
||||
{
|
||||
let mut stream = consumer.stream();
|
||||
|
||||
while let Some((offset, message)) = stream.try_next().await? {
|
||||
// get information from offset
|
||||
|
||||
// the topic
|
||||
let topic = offset.topic();
|
||||
// the vgroup id, like partition id in kafka.
|
||||
let vgroup_id = offset.vgroup_id();
|
||||
println!("* in vgroup id {vgroup_id} of topic {topic}\n");
|
||||
|
||||
if let Some(data) = message.into_data() {
|
||||
while let Some(block) = data.fetch_raw_block().await? {
|
||||
// one block for one table, get table name if needed
|
||||
let name = block.table_name();
|
||||
let records: Vec<Record> = block.deserialize().try_collect()?;
|
||||
println!(
|
||||
"** table: {}, got {} records: {:#?}\n",
|
||||
name.unwrap(),
|
||||
records.len(),
|
||||
records
|
||||
);
|
||||
}
|
||||
}
|
||||
consumer.commit(offset).await?;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```python
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```js
|
||||
while(true){
|
||||
msg = consumer.consume(200);
|
||||
// process message(consumeResult)
|
||||
console.log(msg.topicPartition);
|
||||
console.log(msg.block);
|
||||
console.log(msg.fields)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```csharp
|
||||
|
@ -590,20 +694,6 @@ while (true)
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```node
|
||||
while(true){
|
||||
msg = consumer.consume(200);
|
||||
// process message(consumeResult)
|
||||
console.log(msg.topicPartition);
|
||||
console.log(msg.block);
|
||||
console.log(msg.fields)
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 结束消费
|
||||
|
@ -634,16 +724,6 @@ consumer.close();
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```python
|
||||
/* 取消订阅 */
|
||||
consumer.unsubscribe();
|
||||
|
||||
/* 关闭消费 */
|
||||
consumer.close();
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
|
@ -652,6 +732,34 @@ consumer.Close()
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Rust" label="Rust">
|
||||
|
||||
```rust
|
||||
consumer.unsubscribe().await;
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```py
|
||||
# 取消订阅
|
||||
consumer.unsubscribe()
|
||||
# 关闭消费
|
||||
consumer.close()
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```js
|
||||
consumer.unsubscribe();
|
||||
consumer.close();
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="C#" label="C#">
|
||||
|
||||
```csharp
|
||||
|
@ -663,15 +771,6 @@ consumer.Close();
|
|||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
||||
```node
|
||||
consumer.unsubscribe();
|
||||
consumer.close();
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 删除 *topic*
|
||||
|
|
Loading…
Reference in New Issue