diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx
index 5ed1639f72..46dab395c0 100644
--- a/docs/zh/07-develop/07-tmq.mdx
+++ b/docs/zh/07-develop/07-tmq.mdx
@@ -1,729 +1,729 @@
----
-sidebar_label: 数据订阅
-description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
-title: 数据订阅
----
-
-import Tabs from "@theme/Tabs";
-import TabItem from "@theme/TabItem";
-import Java from "./_sub_java.mdx";
-import Python from "./_sub_python.mdx";
-import Go from "./_sub_go.mdx";
-import Rust from "./_sub_rust.mdx";
-import Node from "./_sub_node.mdx";
-import CSharp from "./_sub_cs.mdx";
-import CDemo from "./_sub_c.mdx";
-
-为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
-
-与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
-
-消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
-
-为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
-
-本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
-
-## 主要数据结构和 API
-
-不同语言下, TMQ 订阅相关的 API 及数据结构如下:
-
-
-
-
-```c
-typedef struct tmq_t tmq_t;
-typedef struct tmq_conf_t tmq_conf_t;
-typedef struct tmq_list_t tmq_list_t;
-
-typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
-
-DLL_EXPORT tmq_list_t *tmq_list_new();
-DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
-DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
-DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
-DLL_EXPORT const char *tmq_err2str(int32_t code);
-
-DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
-DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
-DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
-DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
-DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
-DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
-
-enum tmq_conf_res_t {
- TMQ_CONF_UNKNOWN = -2,
- TMQ_CONF_INVALID = -1,
- TMQ_CONF_OK = 0,
-};
-typedef enum tmq_conf_res_t tmq_conf_res_t;
-
-DLL_EXPORT tmq_conf_t *tmq_conf_new();
-DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
-DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
-DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
-```
-
-这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
-
-
-
-
-```java
-void subscribe(Collection topics) throws SQLException;
-
-void unsubscribe() throws SQLException;
-
-Set subscription() throws SQLException;
-
-ConsumerRecords poll(Duration timeout) throws SQLException;
-
-void commitAsync();
-
-void commitAsync(OffsetCommitCallback callback);
-
-void commitSync() throws SQLException;
-
-void close() throws SQLException;
-```
-
-
-
-
-## 写入数据
-
-首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
-
-```sql
-DROP DATABASE IF EXISTS tmqdb;
-CREATE DATABASE tmqdb;
-CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
-CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
-CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
-INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
-INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
-```
-
-## 创建 *topic*
-
-TDengine 使用 SQL 创建一个 topic:
-
-```sql
-CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
-```
-
-TMQ 支持多种订阅类型:
-
-### 列订阅
-
-语法:
-
-```sql
-CREATE TOPIC topic_name as subquery
-```
-
-通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
-
-- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
-- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
-- 若发生表结构变更,新增的列不出现在结果中,若发生列删除则会报错。
-
-### 超级表订阅
-
-语法:
-
-```sql
-CREATE TOPIC topic_name AS STABLE stb_name
-```
-
-与 `SELECT * from stbName` 订阅的区别是:
-
-- 不会限制用户的表结构变更。
-- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
-- 用户对于要处理的每一个数据块都可能有不同的表结构。
-- 返回数据不包含标签。
-
-### 数据库订阅
-
-语法:
-
-```sql
-CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
-```
-
-通过该语句可创建一个包含数据库所有表数据的订阅,`WITH META` 可选择将数据库结构变动信息加入到订阅消息流,TMQ 将消费当前数据库下所有表结构的变动,包括超级表的创建与删除,列添加、删除或修改,子表的创建、删除及 TAG 变动信息等等。消费者可通过 API 来判断具体的消息类型。这一点也是与 Kafka 不同的地方。
-
-## 创建消费者 *consumer*
-
-消费者需要通过一系列配置选项创建,基础配置项如下表所示:
-
-| 参数名称 | 类型 | 参数说明 | 备注 |
-| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
-| `td.connect.ip` | string | 用于创建连接,同 `taos_connect` | |
-| `td.connect.user` | string | 用于创建连接,同 `taos_connect` | |
-| `td.connect.pass` | string | 用于创建连接,同 `taos_connect` |
-| `td.connect.port` | integer | 用于创建连接,同 `taos_connect` |
-| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
-| `client.id` | string | 客户端 ID | 最大长度:192。 |
-| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
-| `enable.auto.commit` | boolean | 启用自动提交 | 合法值:`true`, `false`。 |
-| `auto.commit.interval.ms` | integer | 以毫秒为单位的自动提交时间间隔 |
-| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | |
-| `experimental.snapshot.enable` | boolean | 从 wal 开始消费,还是从 tsbs 开始消费 | |
-| `msg.with.table.name` | boolean | 从消息中能否解析表名 |
-
-对于不同编程语言,其设置方式如下:
-
-
-
-
-```c
-/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
- 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
-tmq_conf_t* conf = tmq_conf_new();
-tmq_conf_set(conf, "enable.auto.commit", "true");
-tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
-tmq_conf_set(conf, "group.id", "cgrpName");
-tmq_conf_set(conf, "td.connect.user", "root");
-tmq_conf_set(conf, "td.connect.pass", "taosdata");
-tmq_conf_set(conf, "auto.offset.reset", "earliest");
-tmq_conf_set(conf, "experimental.snapshot.enable", "true");
-tmq_conf_set(conf, "msg.with.table.name", "true");
-tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
-
-tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
-tmq_conf_destroy(conf);
-```
-
-
-
-
-对于 Java 程序,额外支持以下配置项:
-
-| 参数名称 | 类型 | 参数说明 |
-| --------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------- |
-| value.deserializer | boolean | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或是继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
-| value.deserializer.encoding | string | 值解析所使用的反序列化方式 | |
-
-```java
- import com.taosdata.jdbc.tmq.ReferenceDeserializer;
-
- public class MetersDeserializer extends ReferenceDeserializer {
- }
-```
-
-```java
-Properties properties = new Properties();
-properties.setProperty("enable.auto.commit", "true");
-properties.setProperty("auto.commit.interval.ms", "1000");
-properties.setProperty("group.id", "cgrpName");
-properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
-properties.setProperty("td.connect.user", "root");
-properties.setProperty("td.connect.pass", "taosdata");
-properties.setProperty("auto.offset.reset", "earliest");
-properties.setProperty("msg.with.table.name", "true");
-properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
-
-TaosConsumer consumer = new TaosConsumer<>(properties);
-```
-
-
-
-
-上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
-
-## 订阅 *topics*
-
-一个 consumer 支持同时订阅多个 topic。
-
-```c
-// 创建订阅 topics 列表
-tmq_list_t* topicList = tmq_list_new();
-tmq_list_append(topicList, "topicName");
-// 启动订阅
-tmq_subscribe(tmq, topicList);
-tmq_list_destroy(topicList);
-
-```
-
-## 消费
-
-
-
-
-```c
-// 消费数据
-while (running) {
- TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
- msg_process(msg);
-}
-```
-
-这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
-
-
-
-
-```java
-List topics = new ArrayList<>();
-topics.add("tmq_topic");
-consumer.subscribe(topics);
-
-while(running){
- ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
- for (Meters meter : meters) {
- processMsg(meter);
- }
-}
-```
-
-
-
-
-## 结束消费
-
-
-
-
-```c
-/* 取消订阅 */
-tmq_unsubscribe(tmq);
-
-/* 关闭消费者对象 */
-tmq_consumer_close(tmq);
-```
-
-
-
-
-```java
-/* 取消订阅 */
-consumer.unsubscribe();
-
-/* 关闭消费 */
-consumer.close();
-```
-
-
-
-
-## 删除 *topic*
-
-如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
-
-```sql
-/* 删除 topic */
-DROP TOPIC topic_name;
-```
-
-## 状态查看
-
-1、*topics*:查询已经创建的 topic
-
-```sql
-SHOW TOPICS;
-```
-
-2、consumers:查询 consumer 的状态及其订阅的 topic
-
-```sql
-SHOW CONSUMERS;
-```
-
-3、subscriptions:查询 consumer 与 vgroup 之间的分配关系
-
-```sql
-SHOW SUBSCRIPTIONS;
-```
-
-## 示例代码
-
-以下是各语言的完整示例代码。
-
-
-
-
-```c
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#include
-#include
-#include
-#include
-#include
-#include "taos.h"
-
-static int running = 1;
-static char dbName[64] = "tmqdb";
-static char stbName[64] = "stb";
-static char topicName[64] = "topicname";
-
-static int32_t msg_process(TAOS_RES* msg) {
- char buf[1024];
- int32_t rows = 0;
-
- const char* topicName = tmq_get_topic_name(msg);
- const char* dbName = tmq_get_db_name(msg);
- int32_t vgroupId = tmq_get_vgroup_id(msg);
-
- printf("topic: %s\n", topicName);
- printf("db: %s\n", dbName);
- printf("vgroup id: %d\n", vgroupId);
-
- while (1) {
- TAOS_ROW row = taos_fetch_row(msg);
- if (row == NULL) break;
-
- TAOS_FIELD* fields = taos_fetch_fields(msg);
- int32_t numOfFields = taos_field_count(msg);
- int32_t* length = taos_fetch_lengths(msg);
- int32_t precision = taos_result_precision(msg);
- const char* tbName = tmq_get_table_name(msg);
- rows++;
- taos_print_row(buf, row, fields, numOfFields);
- printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
- }
-
- return rows;
-}
-
-static int32_t init_env() {
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- if (pConn == NULL) {
- return -1;
- }
-
- TAOS_RES* pRes;
- // drop database if exists
- printf("create database\n");
- pRes = taos_query(pConn, "drop database if exists tmqdb");
- if (taos_errno(pRes) != 0) {
- printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- // create database
- pRes = taos_query(pConn, "create database tmqdb");
- if (taos_errno(pRes) != 0) {
- printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- // create super table
- printf("create super table\n");
- pRes = taos_query(
- pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
- if (taos_errno(pRes) != 0) {
- printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- // create sub tables
- printf("create sub tables\n");
- pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
- if (taos_errno(pRes) != 0) {
- printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
- if (taos_errno(pRes) != 0) {
- printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
- if (taos_errno(pRes) != 0) {
- printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
- if (taos_errno(pRes) != 0) {
- printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- // insert data
- printf("insert data into sub tables\n");
- pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
- if (taos_errno(pRes) != 0) {
- printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
- if (taos_errno(pRes) != 0) {
- printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
- if (taos_errno(pRes) != 0) {
- printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
- if (taos_errno(pRes) != 0) {
- printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- taos_close(pConn);
- return 0;
-}
-
-int32_t create_topic() {
- printf("create topic\n");
- TAOS_RES* pRes;
- TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
- if (pConn == NULL) {
- return -1;
- }
-
- pRes = taos_query(pConn, "use tmqdb");
- if (taos_errno(pRes) != 0) {
- printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
- if (taos_errno(pRes) != 0) {
- printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
- return -1;
- }
- taos_free_result(pRes);
-
- taos_close(pConn);
- return 0;
-}
-
-void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
- printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
-}
-
-tmq_t* build_consumer() {
- tmq_conf_res_t code;
- tmq_conf_t* conf = tmq_conf_new();
- code = tmq_conf_set(conf, "enable.auto.commit", "true");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "group.id", "cgrpName");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "client.id", "user defined name");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "td.connect.user", "root");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
- if (TMQ_CONF_OK != code) return NULL;
- code = tmq_conf_set(conf, "msg.with.table.name", "true");
- if (TMQ_CONF_OK != code) return NULL;
-
- tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
-
- tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
- tmq_conf_destroy(conf);
- return tmq;
-}
-
-tmq_list_t* build_topic_list() {
- tmq_list_t* topicList = tmq_list_new();
- int32_t code = tmq_list_append(topicList, "topicname");
- if (code) {
- return NULL;
- }
- return topicList;
-}
-
-void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
- int32_t code;
-
- if ((code = tmq_subscribe(tmq, topicList))) {
- fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
- return;
- }
-
- int32_t totalRows = 0;
- int32_t msgCnt = 0;
- int32_t timeout = 5000;
- while (running) {
- TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
- if (tmqmsg) {
- msgCnt++;
- totalRows += msg_process(tmqmsg);
- taos_free_result(tmqmsg);
- /*} else {*/
- /*break;*/
- }
- }
-
- fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
-}
-
-int main(int argc, char* argv[]) {
- int32_t code;
-
- if (init_env() < 0) {
- return -1;
- }
-
- if (create_topic() < 0) {
- return -1;
- }
-
- tmq_t* tmq = build_consumer();
- if (NULL == tmq) {
- fprintf(stderr, "%% build_consumer() fail!\n");
- return -1;
- }
-
- tmq_list_t* topic_list = build_topic_list();
- if (NULL == topic_list) {
- return -1;
- }
-
- basic_consume_loop(tmq, topic_list);
-
- code = tmq_unsubscribe(tmq);
- if (code) {
- fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
- } else {
- fprintf(stderr, "%% unsubscribe\n");
- }
-
- code = tmq_consumer_close(tmq);
- if (code) {
- fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
- } else {
- fprintf(stderr, "%% Consumer closed\n");
- }
-
- return 0;
-}
-
-```
-
-[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c)
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-```python
-import taos
-from taos.tmq import *
-
-conn = taos.connect()
-
-# create database
-conn.execute("drop database if exists py_tmq")
-conn.execute("create database if not exists py_tmq vgroups 2")
-
-# create table and stables
-conn.select_db("py_tmq")
-conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
-conn.execute("create table if not exists tb1 using stb1 tags(1)")
-conn.execute("create table if not exists tb2 using stb1 tags(2)")
-conn.execute("create table if not exists tb3 using stb1 tags(3)")
-
-# create topic
-conn.execute("drop topic if exists topic_ctb_column")
-conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
-
-# set consumer configure options
-conf = TaosTmqConf()
-conf.set("group.id", "tg2")
-conf.set("td.connect.user", "root")
-conf.set("td.connect.pass", "taosdata")
-conf.set("enable.auto.commit", "true")
-conf.set("msg.with.table.name", "true")
-
-def tmq_commit_cb_print(tmq, resp, offset, param=None):
- print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
-
-conf.set_auto_commit_cb(tmq_commit_cb_print, None)
-
-# build consumer
-tmq = conf.new_consumer()
-
-# build topic list
-topic_list = TaosTmqList()
-topic_list.append("topic_ctb_column")
-
-# subscribe consumer
-tmq.subscribe(topic_list)
-
-# check subscriptions
-sub_list = tmq.subscription()
-print("subscribed topics: ",sub_list)
-
-# start subscribe
-while 1:
- res = tmq.poll(1000)
- if res:
- topic = res.get_topic_name()
- vg = res.get_vgroup_id()
- db = res.get_db_name()
- print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
- for row in res:
- print(row)
- tb = res.get_table_name()
- print(f"from table: {tb}")
-
-```
-
-[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)
-
-
-
-
-
-
-
-
-
-
-
-
+---
+sidebar_label: 数据订阅
+description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
+title: 数据订阅
+---
+
+import Tabs from "@theme/Tabs";
+import TabItem from "@theme/TabItem";
+import Java from "./_sub_java.mdx";
+import Python from "./_sub_python.mdx";
+import Go from "./_sub_go.mdx";
+import Rust from "./_sub_rust.mdx";
+import Node from "./_sub_node.mdx";
+import CSharp from "./_sub_cs.mdx";
+import CDemo from "./_sub_c.mdx";
+
+为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
+
+与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
+
+消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
+
+为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
+
+本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
+
+## 主要数据结构和 API
+
+不同语言下, TMQ 订阅相关的 API 及数据结构如下:
+
+
+
+
+```c
+typedef struct tmq_t tmq_t;
+typedef struct tmq_conf_t tmq_conf_t;
+typedef struct tmq_list_t tmq_list_t;
+
+typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
+
+DLL_EXPORT tmq_list_t *tmq_list_new();
+DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
+DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
+DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+DLL_EXPORT const char *tmq_err2str(int32_t code);
+
+DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
+DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
+DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
+DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
+DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
+DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
+
+enum tmq_conf_res_t {
+ TMQ_CONF_UNKNOWN = -2,
+ TMQ_CONF_INVALID = -1,
+ TMQ_CONF_OK = 0,
+};
+typedef enum tmq_conf_res_t tmq_conf_res_t;
+
+DLL_EXPORT tmq_conf_t *tmq_conf_new();
+DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
+DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
+DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
+```
+
+这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
+
+
+
+
+```java
+void subscribe(Collection topics) throws SQLException;
+
+void unsubscribe() throws SQLException;
+
+Set subscription() throws SQLException;
+
+ConsumerRecords poll(Duration timeout) throws SQLException;
+
+void commitAsync();
+
+void commitAsync(OffsetCommitCallback callback);
+
+void commitSync() throws SQLException;
+
+void close() throws SQLException;
+```
+
+
+
+
+## 写入数据
+
+首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
+
+```sql
+DROP DATABASE IF EXISTS tmqdb;
+CREATE DATABASE tmqdb;
+CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
+CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
+CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
+INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
+INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
+```
+
+## 创建 *topic*
+
+TDengine 使用 SQL 创建一个 topic:
+
+```sql
+CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
+```
+
+TMQ 支持多种订阅类型:
+
+### 列订阅
+
+语法:
+
+```sql
+CREATE TOPIC topic_name as subquery
+```
+
+通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
+
+- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
+- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
+- 若发生表结构变更,新增的列不出现在结果中,若发生列删除则会报错。
+
+### 超级表订阅
+
+语法:
+
+```sql
+CREATE TOPIC topic_name AS STABLE stb_name
+```
+
+与 `SELECT * from stbName` 订阅的区别是:
+
+- 不会限制用户的表结构变更。
+- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
+- 用户对于要处理的每一个数据块都可能有不同的表结构。
+- 返回数据不包含标签。
+
+### 数据库订阅
+
+语法:
+
+```sql
+CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
+```
+
+通过该语句可创建一个包含数据库所有表数据的订阅,`WITH META` 可选择将数据库结构变动信息加入到订阅消息流,TMQ 将消费当前数据库下所有表结构的变动,包括超级表的创建与删除,列添加、删除或修改,子表的创建、删除及 TAG 变动信息等等。消费者可通过 API 来判断具体的消息类型。这一点也是与 Kafka 不同的地方。
+
+## 创建消费者 *consumer*
+
+消费者需要通过一系列配置选项创建,基础配置项如下表所示:
+
+| 参数名称 | 类型 | 参数说明 | 备注 |
+| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
+| `td.connect.ip` | string | 用于创建连接,同 `taos_connect` | |
+| `td.connect.user` | string | 用于创建连接,同 `taos_connect` | |
+| `td.connect.pass` | string | 用于创建连接,同 `taos_connect` |
+| `td.connect.port` | integer | 用于创建连接,同 `taos_connect` |
+| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
+| `client.id` | string | 客户端 ID | 最大长度:192。 |
+| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
+| `enable.auto.commit` | boolean | 启用自动提交 | 合法值:`true`, `false`。 |
+| `auto.commit.interval.ms` | integer | 以毫秒为单位的自动提交时间间隔 |
+| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | |
+| `experimental.snapshot.enable` | boolean | 从 WAL 开始消费,还是从 TSBS 开始消费 | |
+| `msg.with.table.name` | boolean | 是否允许从消息中解析表名 |
+
+对于不同编程语言,其设置方式如下:
+
+
+
+
+```c
+/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
+ 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
+tmq_conf_t* conf = tmq_conf_new();
+tmq_conf_set(conf, "enable.auto.commit", "true");
+tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+tmq_conf_set(conf, "group.id", "cgrpName");
+tmq_conf_set(conf, "td.connect.user", "root");
+tmq_conf_set(conf, "td.connect.pass", "taosdata");
+tmq_conf_set(conf, "auto.offset.reset", "earliest");
+tmq_conf_set(conf, "experimental.snapshot.enable", "true");
+tmq_conf_set(conf, "msg.with.table.name", "true");
+tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+
+tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+tmq_conf_destroy(conf);
+```
+
+
+
+
+对于 Java 程序,额外支持以下配置项:
+
+| 参数名称 | 类型 | 参数说明 |
+| --------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------- |
+| value.deserializer | boolean | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或是继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
+| value.deserializer.encoding | string | 值解析所使用的反序列化方式 | |
+
+```java
+ import com.taosdata.jdbc.tmq.ReferenceDeserializer;
+
+ public class MetersDeserializer extends ReferenceDeserializer {
+ }
+```
+
+```java
+Properties properties = new Properties();
+properties.setProperty("enable.auto.commit", "true");
+properties.setProperty("auto.commit.interval.ms", "1000");
+properties.setProperty("group.id", "cgrpName");
+properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
+properties.setProperty("td.connect.user", "root");
+properties.setProperty("td.connect.pass", "taosdata");
+properties.setProperty("auto.offset.reset", "earliest");
+properties.setProperty("msg.with.table.name", "true");
+properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
+
+TaosConsumer consumer = new TaosConsumer<>(properties);
+```
+
+
+
+
+上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
+
+## 订阅 *topics*
+
+一个 consumer 支持同时订阅多个 topic。
+
+```c
+// 创建订阅 topics 列表
+tmq_list_t* topicList = tmq_list_new();
+tmq_list_append(topicList, "topicName");
+// 启动订阅
+tmq_subscribe(tmq, topicList);
+tmq_list_destroy(topicList);
+
+```
+
+## 消费
+
+
+
+
+```c
+// 消费数据
+while (running) {
+ TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
+ msg_process(msg);
+}
+```
+
+这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
+
+
+
+
+```java
+List topics = new ArrayList<>();
+topics.add("tmq_topic");
+consumer.subscribe(topics);
+
+while(running){
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ processMsg(meter);
+ }
+}
+```
+
+
+
+
+## 结束消费
+
+
+
+
+```c
+/* 取消订阅 */
+tmq_unsubscribe(tmq);
+
+/* 关闭消费者对象 */
+tmq_consumer_close(tmq);
+```
+
+
+
+
+```java
+/* 取消订阅 */
+consumer.unsubscribe();
+
+/* 关闭消费 */
+consumer.close();
+```
+
+
+
+
+## 删除 *topic*
+
+如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
+
+```sql
+/* 删除 topic */
+DROP TOPIC topic_name;
+```
+
+## 状态查看
+
+1、*topics*:查询已经创建的 topic
+
+```sql
+SHOW TOPICS;
+```
+
+2、consumers:查询 consumer 的状态及其订阅的 topic
+
+```sql
+SHOW CONSUMERS;
+```
+
+3、subscriptions:查询 consumer 与 vgroup 之间的分配关系
+
+```sql
+SHOW SUBSCRIPTIONS;
+```
+
+## 示例代码
+
+以下是各语言的完整示例代码。
+
+
+
+
+```c
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include "taos.h"
+
+static int running = 1;
+static char dbName[64] = "tmqdb";
+static char stbName[64] = "stb";
+static char topicName[64] = "topicname";
+
+static int32_t msg_process(TAOS_RES* msg) {
+ char buf[1024];
+ int32_t rows = 0;
+
+ const char* topicName = tmq_get_topic_name(msg);
+ const char* dbName = tmq_get_db_name(msg);
+ int32_t vgroupId = tmq_get_vgroup_id(msg);
+
+ printf("topic: %s\n", topicName);
+ printf("db: %s\n", dbName);
+ printf("vgroup id: %d\n", vgroupId);
+
+ while (1) {
+ TAOS_ROW row = taos_fetch_row(msg);
+ if (row == NULL) break;
+
+ TAOS_FIELD* fields = taos_fetch_fields(msg);
+ int32_t numOfFields = taos_field_count(msg);
+ int32_t* length = taos_fetch_lengths(msg);
+ int32_t precision = taos_result_precision(msg);
+ const char* tbName = tmq_get_table_name(msg);
+ rows++;
+ taos_print_row(buf, row, fields, numOfFields);
+ printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
+ }
+
+ return rows;
+}
+
+static int32_t init_env() {
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+ if (pConn == NULL) {
+ return -1;
+ }
+
+ TAOS_RES* pRes;
+ // drop database if exists
+ printf("create database\n");
+ pRes = taos_query(pConn, "drop database if exists tmqdb");
+ if (taos_errno(pRes) != 0) {
+ printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ // create database
+ pRes = taos_query(pConn, "create database tmqdb");
+ if (taos_errno(pRes) != 0) {
+ printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ // create super table
+ printf("create super table\n");
+ pRes = taos_query(
+ pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ // create sub tables
+ printf("create sub tables\n");
+ pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ // insert data
+ printf("insert data into sub tables\n");
+ pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ taos_close(pConn);
+ return 0;
+}
+
+int32_t create_topic() {
+ printf("create topic\n");
+ TAOS_RES* pRes;
+ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
+ if (pConn == NULL) {
+ return -1;
+ }
+
+ pRes = taos_query(pConn, "use tmqdb");
+ if (taos_errno(pRes) != 0) {
+ printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
+ if (taos_errno(pRes) != 0) {
+ printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
+ return -1;
+ }
+ taos_free_result(pRes);
+
+ taos_close(pConn);
+ return 0;
+}
+
+void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
+ printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
+}
+
+tmq_t* build_consumer() {
+ tmq_conf_res_t code;
+ tmq_conf_t* conf = tmq_conf_new();
+ code = tmq_conf_set(conf, "enable.auto.commit", "true");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "group.id", "cgrpName");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "client.id", "user defined name");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "td.connect.user", "root");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
+ if (TMQ_CONF_OK != code) return NULL;
+ code = tmq_conf_set(conf, "msg.with.table.name", "true");
+ if (TMQ_CONF_OK != code) return NULL;
+
+ tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
+
+ tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+ tmq_conf_destroy(conf);
+ return tmq;
+}
+
+tmq_list_t* build_topic_list() {
+ tmq_list_t* topicList = tmq_list_new();
+ int32_t code = tmq_list_append(topicList, "topicname");
+ if (code) {
+ return NULL;
+ }
+ return topicList;
+}
+
+void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
+ int32_t code;
+
+ if ((code = tmq_subscribe(tmq, topicList))) {
+ fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
+ return;
+ }
+
+ int32_t totalRows = 0;
+ int32_t msgCnt = 0;
+ int32_t timeout = 5000;
+ while (running) {
+ TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
+ if (tmqmsg) {
+ msgCnt++;
+ totalRows += msg_process(tmqmsg);
+ taos_free_result(tmqmsg);
+ /*} else {*/
+ /*break;*/
+ }
+ }
+
+ fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
+}
+
+int main(int argc, char* argv[]) {
+ int32_t code;
+
+ if (init_env() < 0) {
+ return -1;
+ }
+
+ if (create_topic() < 0) {
+ return -1;
+ }
+
+ tmq_t* tmq = build_consumer();
+ if (NULL == tmq) {
+ fprintf(stderr, "%% build_consumer() fail!\n");
+ return -1;
+ }
+
+ tmq_list_t* topic_list = build_topic_list();
+ if (NULL == topic_list) {
+ return -1;
+ }
+
+ basic_consume_loop(tmq, topic_list);
+
+ code = tmq_unsubscribe(tmq);
+ if (code) {
+ fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
+ } else {
+ fprintf(stderr, "%% unsubscribe\n");
+ }
+
+ code = tmq_consumer_close(tmq);
+ if (code) {
+ fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
+ } else {
+ fprintf(stderr, "%% Consumer closed\n");
+ }
+
+ return 0;
+}
+
+```
+
+[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+```python
+import taos
+from taos.tmq import *
+
+conn = taos.connect()
+
+# create database
+conn.execute("drop database if exists py_tmq")
+conn.execute("create database if not exists py_tmq vgroups 2")
+
+# create table and stables
+conn.select_db("py_tmq")
+conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
+conn.execute("create table if not exists tb1 using stb1 tags(1)")
+conn.execute("create table if not exists tb2 using stb1 tags(2)")
+conn.execute("create table if not exists tb3 using stb1 tags(3)")
+
+# create topic
+conn.execute("drop topic if exists topic_ctb_column")
+conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
+
+# set consumer configure options
+conf = TaosTmqConf()
+conf.set("group.id", "tg2")
+conf.set("td.connect.user", "root")
+conf.set("td.connect.pass", "taosdata")
+conf.set("enable.auto.commit", "true")
+conf.set("msg.with.table.name", "true")
+
+def tmq_commit_cb_print(tmq, resp, offset, param=None):
+ print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
+
+conf.set_auto_commit_cb(tmq_commit_cb_print, None)
+
+# build consumer
+tmq = conf.new_consumer()
+
+# build topic list
+topic_list = TaosTmqList()
+topic_list.append("topic_ctb_column")
+
+# subscribe consumer
+tmq.subscribe(topic_list)
+
+# check subscriptions
+sub_list = tmq.subscription()
+print("subscribed topics: ",sub_list)
+
+# start subscribe
+while 1:
+ res = tmq.poll(1000)
+ if res:
+ topic = res.get_topic_name()
+ vg = res.get_vgroup_id()
+ db = res.get_db_name()
+ print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
+ for row in res:
+ print(row)
+ tb = res.get_table_name()
+ print(f"from table: {tb}")
+
+```
+
+[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)
+
+
+
+
+
+
+
+
+
+
+
+