Merge pull request #16132 from taosdata/docs/TD-18416
docs: refine python tmq doc
This commit is contained in:
commit
877945aaf6
|
@ -1,59 +1,6 @@
|
|||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
||||
from taos.tmq import TaosConsumer
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
|
|
|
@ -87,6 +87,27 @@ void commitSync() throws SQLException;
|
|||
void close() throws SQLException;
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
class TaosConsumer():
|
||||
def __init__(self, *topics, **configs)
|
||||
|
||||
def __iter__(self)
|
||||
|
||||
def __next__(self)
|
||||
|
||||
def sync_next(self)
|
||||
|
||||
def subscription(self)
|
||||
|
||||
def unsubscribe(self)
|
||||
|
||||
def close(self)
|
||||
|
||||
def __del__(self)
|
||||
```
|
||||
</TabItem>
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
|
@ -249,6 +270,29 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client_id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
|
||||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Go" value="Go">
|
||||
|
||||
```go
|
||||
|
@ -345,6 +389,13 @@ if err != nil {
|
|||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
||||
</Tabs>
|
||||
|
||||
## 消费
|
||||
|
@ -377,6 +428,16 @@ while(running){
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
|
@ -421,6 +482,15 @@ consumer.close();
|
|||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
```python
|
||||
/* 取消订阅 */
|
||||
consumer.unsubscribe();
|
||||
|
||||
/* 关闭消费 */
|
||||
consumer.close();
|
||||
<TabItem value="Go" label="Go">
|
||||
|
||||
```go
|
||||
|
@ -775,65 +845,15 @@ int main(int argc, char* argv[]) {
|
|||
<TabItem label="Python" value="Python">
|
||||
|
||||
```python
|
||||
import taos
|
||||
from taos.tmq import TaosConsumer
|
||||
|
||||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
||||
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
|
||||
for msg in consumer:
|
||||
for row in msg:
|
||||
print(row)
|
||||
|
||||
```
|
||||
|
||||
|
|
Loading…
Reference in New Issue