docs: add docs for assignment and seek
This commit is contained in:
parent
87d91bf310
commit
3af153bae6
|
@ -453,6 +453,44 @@ As the way to connect introduced above but add `req_id` argument.
|
|||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
### Subscription
|
||||
|
||||
Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../develop/tmq/).
|
||||
|
||||
<Tabs defaultValue="native">
|
||||
<TabItem value="native" label="native connection">
|
||||
|
||||
The `consumer` in the connector has the subscription api. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/).
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_example.py}}
|
||||
```
|
||||
|
||||
There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic.
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="websocket" label="WebSocket connection">
|
||||
|
||||
In addition to native connections, the connector also supports subscriptions via websockets.
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_websocket_example.py}}
|
||||
```
|
||||
|
||||
There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic.
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
### Schemaless Insert
|
||||
|
||||
Connector support schemaless insert.
|
||||
|
@ -507,7 +545,8 @@ Insert with req_id argument
|
|||
|
||||
| Example program links | Example program content |
|
||||
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
|
||||
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
|
||||
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
|
||||
bind multiple rows at once |
|
||||
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
|
||||
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
|
||||
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
import taos
|
||||
from taos.tmq import Consumer
|
||||
import taosws
|
||||
|
||||
|
||||
def prepare():
|
||||
conn = taos.connect()
|
||||
conn.execute("drop topic if exists tmq_assignment_demo_topic")
|
||||
conn.execute("drop database if exists tmq_assignment_demo_db")
|
||||
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
|
||||
conn.select_db("tmq_assignment_demo_db")
|
||||
conn.execute(
|
||||
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute(
|
||||
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
|
||||
|
||||
|
||||
def taos_get_assignment_and_seek_demo():
|
||||
prepare()
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "0",
|
||||
# should disable snapshot,
|
||||
# otherwise it will cause invalid params error
|
||||
"experimental.snapshot.enable": "false",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_assignment_demo_topic"])
|
||||
|
||||
# get topic assignment
|
||||
assignments = consumer.assignment()
|
||||
for assignment in assignments:
|
||||
print(assignment)
|
||||
|
||||
# poll
|
||||
consumer.poll(1)
|
||||
consumer.poll(1)
|
||||
|
||||
# get topic assignment again
|
||||
after_pool_assignments = consumer.assignment()
|
||||
for assignment in after_pool_assignments:
|
||||
print(assignment)
|
||||
|
||||
# seek to the beginning
|
||||
for assignment in assignments:
|
||||
consumer.seek(assignment)
|
||||
|
||||
# now the assignment should be the same as before poll
|
||||
assignments = consumer.assignment()
|
||||
for assignment in assignments:
|
||||
print(assignment)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
taosws_get_assignment_and_seek_demo()
|
|
@ -0,0 +1,57 @@
|
|||
import taos
|
||||
import taosws
|
||||
|
||||
|
||||
def prepare():
|
||||
conn = taos.connect()
|
||||
conn.execute("drop topic if exists tmq_assignment_demo_topic")
|
||||
conn.execute("drop database if exists tmq_assignment_demo_db")
|
||||
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
|
||||
conn.select_db("tmq_assignment_demo_db")
|
||||
conn.execute(
|
||||
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute(
|
||||
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
|
||||
|
||||
|
||||
def taosws_get_assignment_and_seek_demo():
|
||||
prepare()
|
||||
consumer = taosws.Consumer(conf={
|
||||
"td.connect.websocket.scheme": "ws",
|
||||
# should disable snapshot,
|
||||
# otherwise it will cause invalid params error
|
||||
"experimental.snapshot.enable": "false",
|
||||
"group.id": "0",
|
||||
})
|
||||
consumer.subscribe(["tmq_assignment_demo_topic"])
|
||||
|
||||
# get topic assignment
|
||||
assignments = consumer.assignment()
|
||||
for assignment in assignments:
|
||||
print(assignment.to_string())
|
||||
|
||||
# poll
|
||||
consumer.poll(1)
|
||||
consumer.poll(1)
|
||||
|
||||
# get topic assignment again
|
||||
after_poll_assignments = consumer.assignment()
|
||||
for assignment in after_poll_assignments:
|
||||
print(assignment.to_string())
|
||||
|
||||
# seek to the beginning
|
||||
for assignment in assignments:
|
||||
for a in assignment.assignments():
|
||||
consumer.seek(assignment.topic(), a.vg_id(), a.offset())
|
||||
|
||||
# now the assignment should be the same as before poll
|
||||
assignments = consumer.assignment()
|
||||
for assignment in assignments:
|
||||
print(assignment.to_string())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
taosws_get_assignment_and_seek_demo()
|
|
@ -467,6 +467,12 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
|
|||
{{#include docs/examples/python/tmq_example.py}}
|
||||
```
|
||||
|
||||
连接器提供了 `assignment`接口,用于获取 topic assignment 的功能,可以查询订阅的 assignment 的消费进度,并提供 `seek` 接口,用于重置 topic 的 assignment。
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem value="websocket" label="WebSocket 连接">
|
||||
|
@ -477,6 +483,12 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
|
|||
{{#include docs/examples/python/tmq_websocket_example.py}}
|
||||
```
|
||||
|
||||
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
|
Loading…
Reference in New Issue