From dff16ad4bf89298873a1457f6f75f58914158528 Mon Sep 17 00:00:00 2001 From: sunpeng Date: Tue, 17 Jan 2023 15:53:14 +0800 Subject: [PATCH] doc: update python tmq document (#19604) * doc: update python tmq document * doc: update en document Co-authored-by: Shuduo Sang --- docs/en/07-develop/07-tmq.mdx | 77 ++++++++++++++---------- docs/examples/python/tmq_example.py | 92 +++++++++++++---------------- docs/zh/07-develop/07-tmq.mdx | 78 +++++++++++++----------- 3 files changed, 129 insertions(+), 118 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index 94a9dbffbd..92db7d4cbf 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -94,22 +94,21 @@ void close() throws SQLException; ```python -class TaosConsumer(): - def __init__(self, *topics, **configs) +class Consumer: + def subscribe(self, topics): + pass - def __iter__(self) + def unsubscribe(self): + pass - def __next__(self) + def poll(self, timeout: float = 1.0): + pass - def sync_next(self) - - def subscription(self) + def close(self): + pass - def unsubscribe(self) - - def close(self) - - def __del__(self) + def commit(self, message): + pass ``` @@ -395,23 +394,31 @@ let mut consumer = tmq.build()?; +```python +from taos.tmq import Consumer + +# Syntax: `consumer = Consumer(configs)` +# +# Example: +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) +``` + Python programs use the following parameters: -| Parameter | Type | Description | Remarks | -| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | -| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | | -| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | | -| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | | -| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | | -| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. | -| `client_id` | string | Client ID | Maximum length: 192. | -| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | -| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. | -| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds | -| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. | -| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. | -| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`. -| `timeout` | int | Consumer pull timeout | | +| Parameter | Type | Description | Remarks | +|:---------:|:----:|:-----------:|:-------:| +| `td.connect.ip` | string | Used in establishing a connection|| +| `td.connect.user` | string | Used in establishing a connection|| +| `td.connect.pass` | string | Used in establishing a connection|| +| `td.connect.port` | string | Used in establishing a connection|| +| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192 | +| `client.id` | string | Client ID | Maximum length: 192 | +| `msg.with.table.name` | string | Specify whether to deserialize table names from messages | pecify `true` or `false` | +| `enable.auto.commit` | string | Commit automatically | pecify `true` or `false` | +| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | | +| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | +| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` | +| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` | @@ -514,7 +521,7 @@ consumer.subscribe(["tmq_meters"]).await?; ```python -consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +consumer.subscribe(['topic1', 'topic2']) ``` @@ -633,9 +640,17 @@ for { ```python -for msg in consumer: - for row in msg: - print(row) +while True: + res = consumer.poll(100) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) ``` diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index a4625ca11a..fafa81e8b5 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -1,58 +1,48 @@ +from taos.tmq import Consumer import taos -from taos.tmq import * - -conn = taos.connect() - -print("init") -conn.execute("drop topic if exists topic_ctb_column") -conn.execute("drop database if exists py_tmq") -conn.execute("create database if not exists py_tmq vgroups 2") -conn.select_db("py_tmq") -conn.execute( - "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)" -) -conn.execute("create table if not exists tb1 using stb1 tags(1)") -conn.execute("create table if not exists tb2 using stb1 tags(2)") -conn.execute("create table if not exists tb3 using stb1 tags(3)") - -print("create topic") -conn.execute( - "create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1" -) - -print("build consumer") -conf = TaosTmqConf() -conf.set("group.id", "tg2") -conf.set("td.connect.user", "root") -conf.set("td.connect.pass", "taosdata") -conf.set("enable.auto.commit", "true") -def tmq_commit_cb_print(tmq, resp, offset, param=None): - print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") +def init_tmq_env(db, topic): + conn = taos.connect() + conn.execute("drop topic if exists {}".format(topic)) + conn.execute("drop database if exists {}".format(db)) + conn.execute("create database if not exists {}".format(db)) + conn.select_db(db) + conn.execute( + "create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))") + conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')") + conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')") + conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')") + conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic)) + conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')") + conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')") + conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')") -conf.set_auto_commit_cb(tmq_commit_cb_print, None) -tmq = conf.new_consumer() +if __name__ == '__main__': + init_tmq_env("tmq_test", "tmq_test_topic") # init env + consumer = Consumer( + { + "group.id": "tg2", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + consumer.subscribe(["tmq_test_topic"]) -print("build topic list") + try: + while True: + res = consumer.poll(100) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() -topic_list = TaosTmqList() -topic_list.append("topic_ctb_column") - -print("basic consume loop") -tmq.subscribe(topic_list) - -sub_list = tmq.subscription() - -print("subscribed topics: ", sub_list) - -while 1: - res = tmq.poll(1000) - if res: - topic = res.get_topic_name() - vg = res.get_vgroup_id() - db = res.get_db_name() - print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") - for row in res: - print(row) + for block in val: + print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 039e9eb635..fb171042d9 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -92,22 +92,21 @@ void close() throws SQLException; ```python -class TaosConsumer(): - def __init__(self, *topics, **configs) +class Consumer: + def subscribe(self, topics): + pass - def __iter__(self) + def unsubscribe(self): + pass - def __next__(self) + def poll(self, timeout: float = 1.0): + pass - def sync_next(self) - - def subscription(self) + def close(self): + pass - def unsubscribe(self) - - def close(self) - - def __del__(self) + def commit(self, message): + pass ``` @@ -393,34 +392,33 @@ let mut consumer = tmq.build()?; -Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例: +Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: ```python -from taos.tmq import TaosConsumer +from taos.tmq import Consumer -# Syntax: `consumer = TaosConsumer(*topics, **args)` +# Syntax: `consumer = Consumer(configs)` # # Example: -consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local") +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` -其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置: +其中,`configs` 为 dict 类型,传递创建 Consumer 的参数。可以配置的参数有: -| 参数名称 | 类型 | 参数说明 | 备注 | -| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | -| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | -| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | -| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | -| `client_id` | string | 客户端 ID | 最大长度:192。 | -| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | -| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`,默认为 true | -| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms | -| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | -| `experimental_snapshot_enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` | -| `msg_with_table_name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` | -| `timeout` | int | 消费者拉取数据的超时时间 | | +| 参数名称 | 类型 | 参数说明 | 备注 | +|:------:|:----:|:-------:|:---:| +| `td.connect.ip` | string | 用于创建连接|| +| `td.connect.user` | string | 用于创建连接|| +| `td.connect.pass` | string | 用于创建连接|| +| `td.connect.port` | string | 用于创建连接|| +| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192 | +| `client.id` | string | 客户端 ID | 最大长度:192 | +| `msg.with.table.name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` | +| `enable.auto.commit` | string | 启用自动提交 | 合法值:`true`, `false` | +| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms | +| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | +| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` | +| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | @@ -523,7 +521,7 @@ consumer.subscribe(["tmq_meters"]).await?; ```python -consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +consumer.subscribe(['topic1', 'topic2']) ``` @@ -642,9 +640,17 @@ for { ```python -for msg in consumer: - for row in msg: - print(row) +while True: + res = consumer.poll(100) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) ```