fix:syntax error

This commit is contained in:
wangmm0220 2024-02-21 13:54:43 +08:00
parent db30163b17
commit 6b716b1a7e
1 changed files with 545 additions and 544 deletions

View File

@ -91,10 +91,9 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer 不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
```c
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
@ -146,42 +145,45 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code); DLL_EXPORT const char *tmq_err2str(int32_t code);
``` ```
</TabItem>
<TabItem value="java" label="Java">
```java 下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
void subscribe(Collection<String> topics) throws SQLException;
void unsubscribe() throws SQLException; </TabItem>
<TabItem value="java" label="Java">
Set<String> subscription() throws SQLException; ```java
void subscribe(Collection<String> topics) throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; void unsubscribe() throws SQLException;
Set<TopicPartition> assignment() throws SQLException; Set<String> subscription() throws SQLException;
long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void seek(TopicPartition partition, long offset) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
void commitSync() throws SQLException; Set<TopicPartition> assignment() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException; long position(TopicPartition partition) throws SQLException;
Map<TopicPartition, Long> position(String topic) throws SQLException;
Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException;
Map<TopicPartition, Long> endOffsets(String topic) throws SQLException;
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException;
void close() throws SQLException; void seek(TopicPartition partition, long offset) throws SQLException;
``` void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException;
void seekToEnd(Collection<TopicPartition> partitions) throws SQLException;
</TabItem> void commitSync() throws SQLException;
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException;
<TabItem value="Python" label="Python"> void close() throws SQLException;
```
```python </TabItem>
class Consumer:
<TabItem value="Python" label="Python">
```python
class Consumer:
def subscribe(self, topics): def subscribe(self, topics):
pass pass
@ -202,41 +204,41 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
def commit(self, message): def commit(self, message):
pass pass
``` ```
</TabItem> </TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
```go ```go
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
// 出于兼容目的保留 rebalanceCb 参数,当前未使用 // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
// 出于兼容目的保留 rebalanceCb 参数,当前未使用 // 出于兼容目的保留 rebalanceCb 参数,当前未使用
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
func (c *Consumer) Poll(timeoutMs int) tmq.Event func (c *Consumer) Poll(timeoutMs int) tmq.Event
// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用
func (c *Consumer) Commit() ([]tmq.TopicPartition, error) func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
func (c *Consumer) Unsubscribe() error func (c *Consumer) Unsubscribe() error
func (c *Consumer) Close() error func (c *Consumer) Close() error
``` ```
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
```rust ```rust
impl TBuilder for TmqBuilder impl TBuilder for TmqBuilder
fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error> fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
fn build(&self) -> Result<Self::Target, Self::Error> fn build(&self) -> Result<Self::Target, Self::Error>
impl AsAsyncConsumer for Consumer impl AsAsyncConsumer for Consumer
async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>( async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
&mut self, &mut self,
topics: I, topics: I,
@ -255,56 +257,57 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提
async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;
async fn unsubscribe(self); async fn unsubscribe(self);
``` ```
可在 <https://docs.rs/taos> 上查看详细 API 说明。 可在 <https://docs.rs/taos> 上查看详细 API 说明。
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
function TMQConsumer(config) function TMQConsumer(config)
function subscribe(topic) function subscribe(topic)
function consume(timeout) function consume(timeout)
function subscription() function subscription()
function unsubscribe() function unsubscribe()
function commit(msg) function commit(msg)
function close() function close()
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
class ConsumerBuilder<TValue> class ConsumerBuilder<TValue>
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config) ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
public IConsumer<TValue> Build() public IConsumer<TValue> Build()
void Subscribe(IEnumerable<string> topics) void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic) void Subscribe(string topic)
ConsumeResult<TValue> Consume(int millisecondsTimeout) ConsumeResult<TValue> Consume(int millisecondsTimeout)
List<string> Subscription() List<string> Subscription()
void Unsubscribe() void Unsubscribe()
List<TopicPartitionOffset> Commit() List<TopicPartitionOffset> Commit()
void Close() void Close()
``` ```
</TabItem>
</TabItem>
</Tabs> </Tabs>
# 数据订阅示例 # 数据订阅示例
@ -334,65 +337,66 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
对于不同编程语言,其设置方式如下: 对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
```c /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest"); tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
``` ```
</TabItem>
<TabItem value="java" label="Java"> </TabItem>
<TabItem value="java" label="Java">
对于 Java 程序,还可以使用如下配置项: 对于 Java 程序,还可以使用如下配置项:
| 参数名称 | 类型 | 参数说明 | | 参数名称 | 类型 | 参数说明 |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" |
| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` |
| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | |
需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。
```java ```java
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true"); properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000"); properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName"); properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata"); properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "latest"); properties.setProperty("auto.offset.reset", "latest");
properties.setProperty("msg.with.table.name", "true"); properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties); TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
/* value deserializer definition. */ /* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer; import com.taosdata.jdbc.tmq.ReferenceDeserializer;
public class MetersDeserializer extends ReferenceDeserializer<Meters> { public class MetersDeserializer extends ReferenceDeserializer<Meters> {
} }
``` ```
</TabItem>
<TabItem label="Go" value="Go"> </TabItem>
```go <TabItem label="Go" value="Go">
conf := &tmq.ConfigMap{
```go
conf := &tmq.ConfigMap{
"group.id": "test", "group.id": "test",
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"td.connect.ip": "127.0.0.1", "td.connect.ip": "127.0.0.1",
@ -402,38 +406,38 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
"client.id": "test_tmq_c", "client.id": "test_tmq_c",
"enable.auto.commit": "false", "enable.auto.commit": "false",
"msg.with.table.name": "true", "msg.with.table.name": "true",
} }
consumer, err := NewConsumer(conf) consumer, err := NewConsumer(conf)
``` ```
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
```rust ```rust
let mut dsn: Dsn = "taos://".parse()?; let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1"); dsn.set("group.id", "group1");
dsn.set("client.id", "test"); dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "latest"); dsn.set("auto.offset.reset", "latest");
let tmq = TmqBuilder::from_dsn(dsn)?; let tmq = TmqBuilder::from_dsn(dsn)?;
let mut consumer = tmq.build()?; let mut consumer = tmq.build()?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
```python ```python
from taos.tmq import Consumer from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)` # Syntax: `consumer = Consumer(configs)`
# #
# Example: # Example:
consumer = Consumer( consumer = Consumer(
{ {
"group.id": "local", "group.id": "local",
"client.id": "1", "client.id": "1",
@ -445,18 +449,18 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
"auto.offset.reset": "latest", "auto.offset.reset": "latest",
"msg.with.table.name": "true", "msg.with.table.name": "true",
} }
) )
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
let consumer = taos.consumer({ let consumer = taos.consumer({
'enable.auto.commit': 'true', 'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000', 'auto.commit.interval.ms','1000',
'group.id': 'tg2', 'group.id': 'tg2',
@ -467,15 +471,15 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
'td.connect.ip','127.0.0.1', 'td.connect.ip','127.0.0.1',
'td.connect.port','6030' 'td.connect.port','6030'
}); });
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
var cfg = new Dictionary<string, string>() var cfg = new Dictionary<string, string>()
{ {
{ "group.id", "group1" }, { "group.id", "group1" },
{ "auto.offset.reset", "latest" }, { "auto.offset.reset", "latest" },
{ "td.connect.ip", "127.0.0.1" }, { "td.connect.ip", "127.0.0.1" },
@ -485,11 +489,11 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
{ "client.id", "tmq_example" }, { "client.id", "tmq_example" },
{ "enable.auto.commit", "true" }, { "enable.auto.commit", "true" },
{ "msg.with.table.name", "false" }, { "msg.with.table.name", "false" },
}; };
var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build(); var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -500,78 +504,77 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
一个 consumer 支持同时订阅多个 topic。 一个 consumer 支持同时订阅多个 topic。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
```c ```
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
``` </TabItem>
<TabItem value="java" label="Java">
</TabItem> ```java
<TabItem value="java" label="Java"> List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```
```java </TabItem>
List<String> topics = new ArrayList<>(); <TabItem value="Go" label="Go">
topics.add("tmq_topic");
consumer.subscribe(topics);
```
</TabItem> ```go
<TabItem value="Go" label="Go"> err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
```go
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
panic(err) panic(err)
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
consumer.subscribe(["tmq_meters"]).await?; consumer.subscribe(["tmq_meters"]).await?;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
consumer.subscribe(['topic1', 'topic2']) consumer.subscribe(['topic1', 'topic2'])
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
// 创建订阅 topics 列表 // 创建订阅 topics 列表
let topics = ['topic_test'] let topics = ['topic_test']
// 启动订阅 // 启动订阅
consumer.subscribe(topics); consumer.subscribe(topics);
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
// 创建订阅 topics 列表 // 创建订阅 topics 列表
List<String> topics = new List<string>(); List<String> topics = new List<string>();
topics.add("tmq_topic"); topics.add("tmq_topic");
// 启动订阅 // 启动订阅
consumer.Subscribe(topics); consumer.Subscribe(topics);
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -580,37 +583,36 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
以下代码展示了不同语言下如何对 TMQ 消息进行消费。 以下代码展示了不同语言下如何对 TMQ 消息进行消费。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
// 消费数据
```c while (running) {
// 消费数据
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
msg_process(msg); msg_process(msg);
} }
``` ```
这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
</TabItem> </TabItem>
<TabItem value="java" label="Java"> <TabItem value="java" label="Java">
```java ```java
while(running){ while(running){
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100)); ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) { for (Meters meter : meters) {
processMsg(meter); processMsg(meter);
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Go" label="Go"> <TabItem value="Go" label="Go">
```go ```go
for { for {
ev := consumer.Poll(0) ev := consumer.Poll(0)
if ev != nil { if ev != nil {
switch e := ev.(type) { switch e := ev.(type) {
@ -622,15 +624,15 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
} }
consumer.Commit() consumer.Commit()
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Rust" label="Rust"> <TabItem value="Rust" label="Rust">
```rust ```rust
{ {
let mut stream = consumer.stream(); let mut stream = consumer.stream();
while let Some((offset, message)) = stream.try_next().await? { while let Some((offset, message)) = stream.try_next().await? {
@ -657,14 +659,14 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
} }
consumer.commit(offset).await?; consumer.commit(offset).await?;
} }
} }
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
while True: while True:
res = consumer.poll(100) res = consumer.poll(100)
if not res: if not res:
continue continue
@ -675,40 +677,40 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
for block in val: for block in val:
print(block.fetchall()) print(block.fetchall())
``` ```
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
```js ```js
while(true){ while(true){
msg = consumer.consume(200); msg = consumer.consume(200);
// process message(consumeResult) // process message(consumeResult)
console.log(msg.topicPartition); console.log(msg.topicPartition);
console.log(msg.block); console.log(msg.block);
console.log(msg.fields) console.log(msg.fields)
} }
``` ```
</TabItem> </TabItem>
<TabItem value="C#" label="C#"> <TabItem value="C#" label="C#">
```csharp ```csharp
// 消费数据 // 消费数据
while (true) while (true)
{ {
using (var result = consumer.Consume(500)) using (var result = consumer.Consume(500))
{ {
if (result == null) continue; if (result == null) continue;
ProcessMsg(result); ProcessMsg(result);
consumer.Commit(); consumer.Commit();
} }
} }
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -717,80 +719,79 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
消费结束后,应当取消订阅。 消费结束后,应当取消订阅。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
<TabItem value="c" label="C"> ```c
/* 取消订阅 */
tmq_unsubscribe(tmq);
```c /* 关闭消费者对象 */
/* 取消订阅 */ tmq_consumer_close(tmq);
tmq_unsubscribe(tmq); ```
/* 关闭消费者对象 */ </TabItem>
tmq_consumer_close(tmq); <TabItem value="java" label="Java">
```
</TabItem> ```java
<TabItem value="java" label="Java"> /* 取消订阅 */
consumer.unsubscribe();
```java /* 关闭消费 */
/* 取消订阅 */ consumer.close();
consumer.unsubscribe(); ```
/* 关闭消费 */ </TabItem>
consumer.close();
```
</TabItem> <TabItem value="Go" label="Go">
<TabItem value="Go" label="Go"> ```go
/* Unsubscribe */
_ = consumer.Unsubscribe()
```go /* Close consumer */
/* Unsubscribe */ _ = consumer.Close()
_ = consumer.Unsubscribe() ```
/* Close consumer */ </TabItem>
_ = consumer.Close()
```
</TabItem> <TabItem value="Rust" label="Rust">
<TabItem value="Rust" label="Rust"> ```rust
consumer.unsubscribe().await;
```
```rust </TabItem>
consumer.unsubscribe().await;
```
</TabItem> <TabItem value="Python" label="Python">
<TabItem value="Python" label="Python"> ```py
# 取消订阅
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
```py </TabItem>
# 取消订阅 <TabItem label="Node.JS" value="Node.JS">
consumer.unsubscribe()
# 关闭消费
consumer.close()
```
</TabItem> ```js
<TabItem label="Node.JS" value="Node.JS"> consumer.unsubscribe();
consumer.close();
```
```js </TabItem>
consumer.unsubscribe();
consumer.close();
```
</TabItem> <TabItem value="C#" label="C#">
<TabItem value="C#" label="C#"> ```csharp
// 取消订阅
consumer.Unsubscribe();
```csharp // 关闭消费
// 取消订阅 consumer.Close();
consumer.Unsubscribe(); ```
// 关闭消费 </TabItem>
consumer.Close();
```
</TabItem>
</Tabs> </Tabs>
@ -800,40 +801,40 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem label="C" value="c"> <TabItem label="C" value="c">
<CDemo /> <CDemo />
</TabItem> </TabItem>
<TabItem label="Java" value="java"> <TabItem label="Java" value="java">
<Tabs defaultValue="native"> <Tabs defaultValue="native">
<TabItem value="native" label="本地连接"> <TabItem value="native" label="本地连接">
<Java /> <Java />
</TabItem> </TabItem>
<TabItem value="ws" label="WebSocket 连接"> <TabItem value="ws" label="WebSocket 连接">
<JavaWS /> <JavaWS />
</TabItem> </TabItem>
</Tabs> </Tabs>
</TabItem> </TabItem>
<TabItem label="Go" value="Go"> <TabItem label="Go" value="Go">
<Go/> <Go/>
</TabItem> </TabItem>
<TabItem label="Rust" value="Rust"> <TabItem label="Rust" value="Rust">
<Rust /> <Rust />
</TabItem> </TabItem>
<TabItem label="Python" value="Python"> <TabItem label="Python" value="Python">
<Python /> <Python />
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
<Node/> <Node/>
</TabItem> </TabItem>
<TabItem label="C#" value="C#"> <TabItem label="C#" value="C#">
<CSharp/> <CSharp/>
</TabItem> </TabItem>
</Tabs> </Tabs>