From 6b716b1a7e8f0c103af44af7d76f08c9609b2293 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 21 Feb 2024 13:54:43 +0800 Subject: [PATCH] fix:syntax error --- docs/zh/07-develop/07-tmq.mdx | 1089 +++++++++++++++++---------------- 1 file changed, 545 insertions(+), 544 deletions(-) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index a852e71ff4..1476cd2170 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -91,220 +91,223 @@ TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提 不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): - - + - ```c - typedef struct tmq_t tmq_t; - typedef struct tmq_conf_t tmq_conf_t; - typedef struct tmq_list_t tmq_list_t; +```c + typedef struct tmq_t tmq_t; + typedef struct tmq_conf_t tmq_conf_t; + typedef struct tmq_list_t tmq_list_t; - typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param)); + typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param)); - typedef enum tmq_conf_res_t { - TMQ_CONF_UNKNOWN = -2, - TMQ_CONF_INVALID = -1, - TMQ_CONF_OK = 0, - } tmq_conf_res_t; + typedef enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, + } tmq_conf_res_t; - typedef struct tmq_topic_assignment { - int32_t vgId; - int64_t currentOffset; - int64_t begin; - int64_t end; - } tmq_topic_assignment; + typedef struct tmq_topic_assignment { + int32_t vgId; + int64_t currentOffset; + int64_t begin; + int64_t end; + } tmq_topic_assignment; - DLL_EXPORT tmq_conf_t *tmq_conf_new(); - DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); - DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); - DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); + DLL_EXPORT tmq_conf_t *tmq_conf_new(); + DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); + DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); + DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); - DLL_EXPORT tmq_list_t *tmq_list_new(); - DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); - DLL_EXPORT void tmq_list_destroy(tmq_list_t *); - DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); - DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); + DLL_EXPORT tmq_list_t *tmq_list_new(); + DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); + DLL_EXPORT void tmq_list_destroy(tmq_list_t *); + DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); + DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); - DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); - DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); - DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); - DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); - DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); - DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); - DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); - DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); - DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); - DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); - DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment); - DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); - DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); - DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); - DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); + DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); + DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); + DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); + DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); + DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); + DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); + DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); + DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); + DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); + DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); + DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment); + DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); + DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); + DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); + DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); - DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); - DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); - DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); - DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); - DLL_EXPORT const char *tmq_err2str(int32_t code); - ``` - - + DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); + DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); + DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); + DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); + DLL_EXPORT const char *tmq_err2str(int32_t code); +``` - ```java - void subscribe(Collection topics) throws SQLException; +下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 - void unsubscribe() throws SQLException; + + - Set subscription() throws SQLException; +```java +void subscribe(Collection topics) throws SQLException; - ConsumerRecords poll(Duration timeout) throws SQLException; +void unsubscribe() throws SQLException; - Set assignment() throws SQLException; - long position(TopicPartition partition) throws SQLException; - Map position(String topic) throws SQLException; - Map beginningOffsets(String topic) throws SQLException; - Map endOffsets(String topic) throws SQLException; - Map committed(Set partitions) throws SQLException; +Set subscription() throws SQLException; - void seek(TopicPartition partition, long offset) throws SQLException; - void seekToBeginning(Collection partitions) throws SQLException; - void seekToEnd(Collection partitions) throws SQLException; +ConsumerRecords poll(Duration timeout) throws SQLException; - void commitSync() throws SQLException; - void commitSync(Map offsets) throws SQLException; +Set assignment() throws SQLException; +long position(TopicPartition partition) throws SQLException; +Map position(String topic) throws SQLException; +Map beginningOffsets(String topic) throws SQLException; +Map endOffsets(String topic) throws SQLException; +Map committed(Set partitions) throws SQLException; - void close() throws SQLException; - ``` +void seek(TopicPartition partition, long offset) throws SQLException; +void seekToBeginning(Collection partitions) throws SQLException; +void seekToEnd(Collection partitions) throws SQLException; - +void commitSync() throws SQLException; +void commitSync(Map offsets) throws SQLException; - +void close() throws SQLException; +``` - ```python - class Consumer: - def subscribe(self, topics): + + + + +```python +class Consumer: + def subscribe(self, topics): pass - def unsubscribe(self): + def unsubscribe(self): pass - def poll(self, timeout: float = 1.0): + def poll(self, timeout: float = 1.0): pass - def assignment(self): + def assignment(self): pass - def seek(self, partition): + def seek(self, partition): pass - def close(self): + def close(self): pass - def commit(self, message): + def commit(self, message): pass - ``` +``` - + - + - ```go - func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) +```go +func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) - // 出于兼容目的保留 rebalanceCb 参数,当前未使用 - func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error +// 出于兼容目的保留 rebalanceCb 参数,当前未使用 +func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error - // 出于兼容目的保留 rebalanceCb 参数,当前未使用 - func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error +// 出于兼容目的保留 rebalanceCb 参数,当前未使用 +func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error - func (c *Consumer) Poll(timeoutMs int) tmq.Event +func (c *Consumer) Poll(timeoutMs int) tmq.Event - // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 - func (c *Consumer) Commit() ([]tmq.TopicPartition, error) +// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 +func (c *Consumer) Commit() ([]tmq.TopicPartition, error) - func (c *Consumer) Unsubscribe() error +func (c *Consumer) Unsubscribe() error - func (c *Consumer) Close() error - ``` +func (c *Consumer) Close() error +``` - + - + - ```rust - impl TBuilder for TmqBuilder - fn from_dsn(dsn: D) -> Result - fn build(&self) -> Result +```rust +impl TBuilder for TmqBuilder + fn from_dsn(dsn: D) -> Result + fn build(&self) -> Result - impl AsAsyncConsumer for Consumer - async fn subscribe, I: IntoIterator + Send>( +impl AsAsyncConsumer for Consumer + async fn subscribe, I: IntoIterator + Send>( &mut self, topics: I, - ) -> Result<(), Self::Error>; - fn stream( + ) -> Result<(), Self::Error>; + fn stream( &self, - ) -> Pin< - Box< + ) -> Pin< + Box< dyn '_ + Send + futures::Stream< Item = Result<(Self::Offset, MessageSet), Self::Error>, >, - >, - >; - async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; + >, + >; + async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; - async fn unsubscribe(self); - ``` + async fn unsubscribe(self); +``` - 可在 上查看详细 API 说明。 +可在 上查看详细 API 说明。 - + - + - ```js - function TMQConsumer(config) - - function subscribe(topic) - - function consume(timeout) - - function subscription() - - function unsubscribe() - - function commit(msg) - - function close() - ``` +```js +function TMQConsumer(config) - +function subscribe(topic) - +function consume(timeout) - ```csharp - class ConsumerBuilder - - ConsumerBuilder(IEnumerable> config) - - public IConsumer Build() - - void Subscribe(IEnumerable topics) - - void Subscribe(string topic) - - ConsumeResult Consume(int millisecondsTimeout) - - List Subscription() - - void Unsubscribe() - - List Commit() - - void Close() - ``` - +function subscription() + +function unsubscribe() + +function commit(msg) + +function close() +``` + + + + + +```csharp +class ConsumerBuilder + +ConsumerBuilder(IEnumerable> config) + +public IConsumer Build() + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +List Commit() + +void Close() +``` + + # 数据订阅示例 @@ -334,163 +337,164 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; 对于不同编程语言,其设置方式如下: + - - ```c - /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 - 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName"); - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "auto.offset.reset", "latest"); - tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); +```c +/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 + 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ +tmq_conf_t* conf = tmq_conf_new(); +tmq_conf_set(conf, "enable.auto.commit", "true"); +tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); +tmq_conf_set(conf, "group.id", "cgrpName"); +tmq_conf_set(conf, "td.connect.user", "root"); +tmq_conf_set(conf, "td.connect.pass", "taosdata"); +tmq_conf_set(conf, "auto.offset.reset", "latest"); +tmq_conf_set(conf, "msg.with.table.name", "true"); +tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); - tmq_conf_destroy(conf); - ``` - +tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); +tmq_conf_destroy(conf); +``` - + + - 对于 Java 程序,还可以使用如下配置项: - - | 参数名称 | 类型 | 参数说明 | - | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | - | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | - | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | - | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | - | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | - - 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 - - ```java - Properties properties = new Properties(); - properties.setProperty("enable.auto.commit", "true"); - properties.setProperty("auto.commit.interval.ms", "1000"); - properties.setProperty("group.id", "cgrpName"); - properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); - properties.setProperty("td.connect.user", "root"); - properties.setProperty("td.connect.pass", "taosdata"); - properties.setProperty("auto.offset.reset", "latest"); - properties.setProperty("msg.with.table.name", "true"); - properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); - - TaosConsumer consumer = new TaosConsumer<>(properties); - - /* value deserializer definition. */ - import com.taosdata.jdbc.tmq.ReferenceDeserializer; - - public class MetersDeserializer extends ReferenceDeserializer { - } - ``` - +对于 Java 程序,还可以使用如下配置项: - - - ```go - conf := &tmq.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "latest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_c", - "enable.auto.commit": "false", - "msg.with.table.name": "true", - } - consumer, err := NewConsumer(conf) - ``` - - +| 参数名称 | 类型 | 参数说明 | +| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | +| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | +| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | +| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | +| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | - - - ```rust - let mut dsn: Dsn = "taos://".parse()?; - dsn.set("group.id", "group1"); - dsn.set("client.id", "test"); - dsn.set("auto.offset.reset", "latest"); - - let tmq = TmqBuilder::from_dsn(dsn)?; - - let mut consumer = tmq.build()?; - ``` - - +需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 - - - Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: - - ```python - from taos.tmq import Consumer - - # Syntax: `consumer = Consumer(configs)` - # - # Example: - consumer = Consumer( - { - "group.id": "local", - "client.id": "1", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "latest", - "msg.with.table.name": "true", - } - ) - ``` - - +```java +Properties properties = new Properties(); +properties.setProperty("enable.auto.commit", "true"); +properties.setProperty("auto.commit.interval.ms", "1000"); +properties.setProperty("group.id", "cgrpName"); +properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); +properties.setProperty("td.connect.user", "root"); +properties.setProperty("td.connect.pass", "taosdata"); +properties.setProperty("auto.offset.reset", "latest"); +properties.setProperty("msg.with.table.name", "true"); +properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); - - - ```js - // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 - // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 - - let consumer = taos.consumer({ - 'enable.auto.commit': 'true', - 'auto.commit.interval.ms','1000', - 'group.id': 'tg2', - 'td.connect.user': 'root', - 'td.connect.pass': 'taosdata', - 'auto.offset.reset','latest', - 'msg.with.table.name': 'true', - 'td.connect.ip','127.0.0.1', - 'td.connect.port','6030' - }); - ``` - - +TaosConsumer consumer = new TaosConsumer<>(properties); + +/* value deserializer definition. */ +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class MetersDeserializer extends ReferenceDeserializer { +} +``` + + + + + +```go +conf := &tmq.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + "msg.with.table.name": "true", +} +consumer, err := NewConsumer(conf) +``` + + + + + +```rust +let mut dsn: Dsn = "taos://".parse()?; +dsn.set("group.id", "group1"); +dsn.set("client.id", "test"); +dsn.set("auto.offset.reset", "latest"); + +let tmq = TmqBuilder::from_dsn(dsn)?; + +let mut consumer = tmq.build()?; +``` + + + + + +Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: + +```python +from taos.tmq import Consumer + +# Syntax: `consumer = Consumer(configs)` +# +# Example: +consumer = Consumer( + { + "group.id": "local", + "client.id": "1", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "latest", + "msg.with.table.name": "true", + } +) +``` + + + + + +```js +// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 +// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 + +let consumer = taos.consumer({ + 'enable.auto.commit': 'true', + 'auto.commit.interval.ms','1000', + 'group.id': 'tg2', + 'td.connect.user': 'root', + 'td.connect.pass': 'taosdata', + 'auto.offset.reset','latest', + 'msg.with.table.name': 'true', + 'td.connect.ip','127.0.0.1', + 'td.connect.port','6030' + }); +``` + + + + + +```csharp +var cfg = new Dictionary() +{ + { "group.id", "group1" }, + { "auto.offset.reset", "latest" }, + { "td.connect.ip", "127.0.0.1" }, + { "td.connect.user", "root" }, + { "td.connect.pass", "taosdata" }, + { "td.connect.port", "6030" }, + { "client.id", "tmq_example" }, + { "enable.auto.commit", "true" }, + { "msg.with.table.name", "false" }, +}; +var consumer = new ConsumerBuilder>(cfg).Build(); +``` + + - - - ```csharp - var cfg = new Dictionary() - { - { "group.id", "group1" }, - { "auto.offset.reset", "latest" }, - { "td.connect.ip", "127.0.0.1" }, - { "td.connect.user", "root" }, - { "td.connect.pass", "taosdata" }, - { "td.connect.port", "6030" }, - { "client.id", "tmq_example" }, - { "enable.auto.commit", "true" }, - { "msg.with.table.name", "false" }, - }; - var consumer = new ConsumerBuilder>(cfg).Build(); - ``` - - - 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -500,78 +504,77 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; 一个 consumer 支持同时订阅多个 topic。 + - - - ```c - // 创建订阅 topics 列表 - tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "topicName"); - // 启动订阅 - tmq_subscribe(tmq, topicList); - tmq_list_destroy(topicList); - - ``` - - - - - ```java - List topics = new ArrayList<>(); - topics.add("tmq_topic"); - consumer.subscribe(topics); - ``` - - - +```c +// 创建订阅 topics 列表 +tmq_list_t* topicList = tmq_list_new(); +tmq_list_append(topicList, "topicName"); +// 启动订阅 +tmq_subscribe(tmq, topicList); +tmq_list_destroy(topicList); + +``` - ```go - err = consumer.Subscribe("example_tmq_topic", nil) - if err != nil { - panic(err) - } - ``` + + - - +```java +List topics = new ArrayList<>(); +topics.add("tmq_topic"); +consumer.subscribe(topics); +``` - ```rust - consumer.subscribe(["tmq_meters"]).await?; - ``` + + - +```go +err = consumer.Subscribe("example_tmq_topic", nil) +if err != nil { + panic(err) +} +``` - + + - ```python - consumer.subscribe(['topic1', 'topic2']) - ``` +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` - + - + - ```js - // 创建订阅 topics 列表 - let topics = ['topic_test'] +```python +consumer.subscribe(['topic1', 'topic2']) +``` - // 启动订阅 - consumer.subscribe(topics); - ``` + - + - +```js +// 创建订阅 topics 列表 +let topics = ['topic_test'] - ```csharp - // 创建订阅 topics 列表 - List topics = new List(); - topics.add("tmq_topic"); - // 启动订阅 - consumer.Subscribe(topics); - ``` +// 启动订阅 +consumer.subscribe(topics); +``` - + + + + +```csharp +// 创建订阅 topics 列表 +List topics = new List(); +topics.add("tmq_topic"); +// 启动订阅 +consumer.Subscribe(topics); +``` + + @@ -580,135 +583,134 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 + - +```c +// 消费数据 +while (running) { + TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); + msg_process(msg); +} +``` - ```c - // 消费数据 - while (running) { - TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); - msg_process(msg); - } - ``` +这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 - 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 + + - - - - ```java - while(running){ - ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); - for (Meters meter : meters) { - processMsg(meter); - } - } - ``` - - +```java +while(running){ + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + processMsg(meter); + } +} +``` - + - ```go - for { - ev := consumer.Poll(0) - if ev != nil { - switch e := ev.(type) { - case *tmqcommon.DataMessage: - fmt.Println(e.Value()) - case tmqcommon.Error: - fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) - panic(e) - } - consumer.Commit() - } - } - ``` + - +```go +for { + ev := consumer.Poll(0) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + fmt.Println(e.Value()) + case tmqcommon.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + panic(e) + } + consumer.Commit() + } +} +``` - + - ```rust - { - let mut stream = consumer.stream(); + - while let Some((offset, message)) = stream.try_next().await? { - // get information from offset +```rust +{ + let mut stream = consumer.stream(); - // the topic - let topic = offset.topic(); - // the vgroup id, like partition id in kafka. - let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset - if let Some(data) = message.into_data() { + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { - // one block for one table, get table name if needed - let name = block.table_name(); - let records: Vec = block.deserialize().try_collect()?; - println!( - "** table: {}, got {} records: {:#?}\n", - name.unwrap(), - records.len(), - records - ); + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } } - } - consumer.commit(offset).await?; - } - } - ``` + consumer.commit(offset).await?; + } +} +``` - - + + - ```python - while True: - res = consumer.poll(100) - if not res: +```python +while True: + res = consumer.poll(100) + if not res: continue - err = res.error() - if err is not None: + err = res.error() + if err is not None: raise err - val = res.value() + val = res.value() - for block in val: + for block in val: print(block.fetchall()) - ``` +``` - + - + - ```js - while(true){ - msg = consumer.consume(200); - // process message(consumeResult) - console.log(msg.topicPartition); - console.log(msg.block); - console.log(msg.fields) - } - ``` +```js +while(true){ + msg = consumer.consume(200); + // process message(consumeResult) + console.log(msg.topicPartition); + console.log(msg.block); + console.log(msg.fields) +} +``` - + - + - ```csharp - // 消费数据 - while (true) - { - using (var result = consumer.Consume(500)) - { - if (result == null) continue; - ProcessMsg(result); - consumer.Commit(); - } - } - ``` +```csharp +// 消费数据 +while (true) +{ + using (var result = consumer.Consume(500)) + { + if (result == null) continue; + ProcessMsg(result); + consumer.Commit(); + } +} +``` - + @@ -717,80 +719,79 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; 消费结束后,应当取消订阅。 - - + - ```c - /* 取消订阅 */ - tmq_unsubscribe(tmq); +```c +/* 取消订阅 */ +tmq_unsubscribe(tmq); - /* 关闭消费者对象 */ - tmq_consumer_close(tmq); - ``` +/* 关闭消费者对象 */ +tmq_consumer_close(tmq); +``` - - + + - ```java - /* 取消订阅 */ - consumer.unsubscribe(); +```java +/* 取消订阅 */ +consumer.unsubscribe(); - /* 关闭消费 */ - consumer.close(); - ``` +/* 关闭消费 */ +consumer.close(); +``` - - - + - ```go - /* Unsubscribe */ - _ = consumer.Unsubscribe() + - /* Close consumer */ - _ = consumer.Close() - ``` +```go +/* Unsubscribe */ +_ = consumer.Unsubscribe() - - - +/* Close consumer */ +_ = consumer.Close() +``` - ```rust - consumer.unsubscribe().await; - ``` + - + - +```rust +consumer.unsubscribe().await; +``` - ```py - # 取消订阅 - consumer.unsubscribe() - # 关闭消费 - consumer.close() - ``` + - - + - ```js - consumer.unsubscribe(); - consumer.close(); - ``` +```py +# 取消订阅 +consumer.unsubscribe() +# 关闭消费 +consumer.close() +``` - + + - +```js +consumer.unsubscribe(); +consumer.close(); +``` - ```csharp - // 取消订阅 - consumer.Unsubscribe(); + - // 关闭消费 - consumer.Close(); - ``` + - +```csharp +// 取消订阅 +consumer.Unsubscribe(); + +// 关闭消费 +consumer.Close(); +``` + + @@ -800,41 +801,41 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; - - - + + + - - - - - - - - - - + + + + + + + + + + - - - + + + - - - + + + - - - + + + - - - + + + + + + + - - - - #订阅高级功能 @@ -854,4 +855,4 @@ CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; - 超级表和库订阅不支持回放 - enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。 - 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭 -- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差 \ No newline at end of file +- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差