diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index 1f6da3d1b6..cee036454e 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -1,59 +1,6 @@ import taos -from taos.tmq import * - -conn = taos.connect() - -# create database -conn.execute("drop database if exists py_tmq") -conn.execute("create database if not exists py_tmq vgroups 2") - -# create table and stables -conn.select_db("py_tmq") -conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") -conn.execute("create table if not exists tb1 using stb1 tags(1)") -conn.execute("create table if not exists tb2 using stb1 tags(2)") -conn.execute("create table if not exists tb3 using stb1 tags(3)") - -# create topic -conn.execute("drop topic if exists topic_ctb_column") -conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1") - -# set consumer configure options -conf = TaosTmqConf() -conf.set("group.id", "tg2") -conf.set("td.connect.user", "root") -conf.set("td.connect.pass", "taosdata") -conf.set("enable.auto.commit", "true") -conf.set("msg.with.table.name", "true") - -def tmq_commit_cb_print(tmq, resp, offset, param=None): - print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}") - -conf.set_auto_commit_cb(tmq_commit_cb_print, None) - -# build consumer -tmq = conf.new_consumer() - -# build topic list -topic_list = TaosTmqList() -topic_list.append("topic_ctb_column") - -# subscribe consumer -tmq.subscribe(topic_list) - -# check subscriptions -sub_list = tmq.subscription() -print("subscribed topics: ",sub_list) - -# start subscribe -while 1: - res = tmq.poll(1000) - if res: - topic = res.get_topic_name() - vg = res.get_vgroup_id() - db = res.get_db_name() - print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}") - for row in res: - print(row) - tb = res.get_table_name() - print(f"from table: {tb}") +from taos.tmq import TaosConsumer +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +for msg in consumer: + for row in msg: + print(row) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index f36f76fd85..23574e7478 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -88,6 +88,110 @@ void close() throws SQLException; ``` + + + +```python +class TaosConsumer(): + DEFAULT_CONFIG = { + 'group.id', + 'client.id', + 'enable.auto.commit', + 'auto.commit.interval.ms', + 'auto.offset.reset', + 'msg.with.table.name', + 'experimental.snapshot.enable', + 'enable.heartbeat.background', + 'experimental.snapshot.batch.size', + 'td.connect.ip', + 'td.connect.user', + 'td.connect.pass', + 'td.connect.port', + 'td.connect.db', + 'timeout' + } + + def __init__(self, *topics, **configs): + self._closed = True + self._conf = None + self._list = None + self._tmq = None + + keys = list(configs.keys()) + for k in keys: + configs.update({k.replace('_','.'): configs.pop(k)}) + + extra_configs = set(configs).difference(self.DEFAULT_CONFIG) + if extra_configs: + raise TmqError("Unrecognized configs: %s" % (extra_configs,)) + + self._conf = tmq_conf_new() + self._list = tmq_list_new() + + # set poll timeout + if 'timeout' in configs: + self._timeout = configs['timeout'] + del configs['timeout'] + else: + self._timeout = 0 + + # check if group id is set + + if 'group.id' not in configs: + raise TmqError("missing group.id in consumer config setting") + + for key, value in configs.items(): + tmq_conf_set(self._conf, key, value) + + self._tmq = tmq_consumer_new(self._conf) + + if not topics: + raise TmqError("Unset topic for Consumer") + + for topic in topics: + tmq_list_append(self._list, topic) + + tmq_subscribe(self._tmq, self._list) + + + def __iter__(self): + return self + + def __next__(self): + if not self._tmq: + raise StopIteration('TaosConsumer closed') + return next(self.sync_next()) + + def sync_next(self): + while 1: + res = tmq_consumer_poll(self._tmq, self._timeout) + if res: + break + yield TaosResult(res) + + def subscription(self): + if self._tmq is None: + return None + return tmq_subscription(self._tmq) + + def unsubscribe(self): + tmq_unsubscribe(self._tmq) + + def close(self): + if self._tmq: + tmq_consumer_close(self._tmq) + self._tmq = None + + def __del__(self): + if self._conf: + tmq_conf_destroy(self._conf) + if self._list: + tmq_list_destroy(self._list) + if self._tmq: + tmq_consumer_close(self._tmq) +``` + + ## 写入数据 @@ -230,6 +334,27 @@ public class MetersDeserializer extends ReferenceDeserializer { ``` + + + + +| 参数名称 | 类型 | 参数说明 | 备注 | +| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | | +| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | | +| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 | +| `client_id` | string | 客户端 ID | 最大长度:192。 | +| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) | +| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 | +| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | | +| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | +| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` | +| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` | + + + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 @@ -262,6 +387,14 @@ consumer.subscribe(topics); + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + ## 消费 @@ -294,6 +427,17 @@ while(running){ ``` + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + ## 结束消费 @@ -322,6 +466,19 @@ consumer.unsubscribe(); consumer.close(); ``` + + + + + +```python +/* 取消订阅 */ +consumer.unsubscribe(); + +/* 关闭消费 */ +consumer.close(); +``` +