doc: update python tmq document (#19604)

* doc: update python tmq document

* doc: update en document

Co-authored-by: Shuduo Sang <sangshuduo@gmail.com>
This commit is contained in:
sunpeng 2023-01-17 15:53:14 +08:00 committed by GitHub
parent 172559a2af
commit dff16ad4bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 118 deletions

View File

@ -94,22 +94,21 @@ void close() throws SQLException;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
class TaosConsumer(): class Consumer:
def __init__(self, *topics, **configs) def subscribe(self, topics):
pass
def __iter__(self) def unsubscribe(self):
pass
def __next__(self) def poll(self, timeout: float = 1.0):
pass
def sync_next(self) def close(self):
pass
def subscription(self) def commit(self, message):
pass
def unsubscribe(self)
def close(self)
def __del__(self)
``` ```
</TabItem> </TabItem>
@ -395,23 +394,31 @@ let mut consumer = tmq.build()?;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python
from taos.tmq import Consumer
# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
Python programs use the following parameters: Python programs use the following parameters:
| Parameter | Type | Description | Remarks | | Parameter | Type | Description | Remarks |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | |:---------:|:----:|:-----------:|:-------:|
| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | | | `td.connect.ip` | string | Used in establishing a connection||
| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | | | `td.connect.user` | string | Used in establishing a connection||
| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | | | `td.connect.pass` | string | Used in establishing a connection||
| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | | | `td.connect.port` | string | Used in establishing a connection||
| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. | | `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192 |
| `client_id` | string | Client ID | Maximum length: 192. | | `client.id` | string | Client ID | Maximum length: 192 |
| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | | `msg.with.table.name` | string | Specify whether to deserialize table names from messages | pecify `true` or `false` |
| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. | | `enable.auto.commit` | string | Commit automatically | pecify `true` or `false` |
| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds | | `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. | | `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. | | `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`. | `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
| `timeout` | int | Consumer pull timeout | |
</TabItem> </TabItem>
@ -514,7 +521,7 @@ consumer.subscribe(["tmq_meters"]).await?;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2') consumer.subscribe(['topic1', 'topic2'])
``` ```
</TabItem> </TabItem>
@ -633,9 +640,17 @@ for {
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
for msg in consumer: while True:
for row in msg: res = consumer.poll(100)
print(row) if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
``` ```
</TabItem> </TabItem>

View File

@ -1,58 +1,48 @@
from taos.tmq import Consumer
import taos import taos
from taos.tmq import *
conn = taos.connect()
print("init")
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
conn.select_db("py_tmq")
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
)
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
print("create topic")
conn.execute(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
print("build consumer")
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None): def init_tmq_env(db, topic):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {}".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
conf.set_auto_commit_cb(tmq_commit_cb_print, None) if __name__ == '__main__':
tmq = conf.new_consumer() init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
print("build topic list") try:
while True:
res = consumer.poll(100)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
topic_list = TaosTmqList() for block in val:
topic_list.append("topic_ctb_column") print(block.fetchall())
finally:
print("basic consume loop") consumer.unsubscribe()
tmq.subscribe(topic_list) consumer.close()
sub_list = tmq.subscription()
print("subscribed topics: ", sub_list)
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)

View File

@ -92,22 +92,21 @@ void close() throws SQLException;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
class TaosConsumer(): class Consumer:
def __init__(self, *topics, **configs) def subscribe(self, topics):
pass
def __iter__(self) def unsubscribe(self):
pass
def __next__(self) def poll(self, timeout: float = 1.0):
pass
def sync_next(self) def close(self):
pass
def subscription(self) def commit(self, message):
pass
def unsubscribe(self)
def close(self)
def __del__(self)
``` ```
</TabItem> </TabItem>
@ -393,34 +392,33 @@ let mut consumer = tmq.build()?;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
Python 语言下引入 `taos` 库的 `TaosConsumer` 类,创建一个 Consumer 示例: Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例:
```python ```python
from taos.tmq import TaosConsumer from taos.tmq import Consumer
# Syntax: `consumer = TaosConsumer(*topics, **args)` # Syntax: `consumer = Consumer(configs)`
# #
# Example: # Example:
consumer = TaosConsumer('topic1', 'topic2', td_connect_ip = "127.0.0.1", group_id = "local") consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
``` ```
其中,元组类型参数被视为 *Topics*,字典类型参数用于以下订阅配置设置 其中,`configs` 为 dict 类型,传递创建 Consumer 的参数。可以配置的参数有
| 参数名称 | 类型 | 参数说明 | 备注 | | 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | |:------:|:----:|:-------:|:---:|
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.ip` | string | 用于创建连接||
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.user` | string | 用于创建连接||
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.pass` | string | 用于创建连接||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | | `td.connect.port` | string | 用于创建连接||
| `group_id` | string | 消费组 ID同一消费组共享消费进度 | **必填项**。最大长度192。 | | `group.id` | string | 消费组 ID同一消费组共享消费进度 | **必填项**。最大长度192 |
| `client_id` | string | 客户端 ID | 最大长度192。 | | `client.id` | string | 客户端 ID | 最大长度192 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | | `msg.with.table.name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`,默认为 true | | `enable.auto.commit` | string | 启用自动提交 | 合法值:`true`, `false` |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值5000 ms | | `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值5000 ms |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | | `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental_snapshot_enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` | | `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` | | `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉取数据的超时时间 | |
</TabItem> </TabItem>
@ -523,7 +521,7 @@ consumer.subscribe(["tmq_meters"]).await?;
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2') consumer.subscribe(['topic1', 'topic2'])
``` ```
</TabItem> </TabItem>
@ -642,9 +640,17 @@ for {
<TabItem value="Python" label="Python"> <TabItem value="Python" label="Python">
```python ```python
for msg in consumer: while True:
for row in msg: res = consumer.poll(100)
print(row) if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
``` ```
</TabItem> </TabItem>