docs(java): use tabular style in subscription document
This commit is contained in:
parent
1d265e1ac6
commit
55e7f68582
|
@ -14,23 +14,22 @@ import Node from "./_sub_node.mdx";
|
|||
import CSharp from "./_sub_cs.mdx";
|
||||
import CDemo from "./_sub_c.mdx";
|
||||
|
||||
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
|
||||
|
||||
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
|
||||
与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
|
||||
|
||||
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。
|
||||
|
||||
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。
|
||||
消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
|
||||
|
||||
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
|
||||
|
||||
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
|
||||
|
||||
## 主要数据结构和API
|
||||
## 主要数据结构和 API
|
||||
|
||||
TMQ 的 API 中,与订阅相关的主要数据结构和API如下:
|
||||
不同语言下, TMQ 订阅相关的 API 及数据结构如下:
|
||||
|
||||
<Tabs defaultValue="c_struct">
|
||||
<TabItem value="c_struct" label="C">
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
typedef struct tmq_t tmq_t;
|
||||
|
@ -65,10 +64,10 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
|
|||
DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
|
||||
```
|
||||
|
||||
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面C语言的示例代码。
|
||||
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="java_struct" label="Java">
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
```java
|
||||
void subscribe(Collection<String> topics) throws SQLException;
|
||||
|
@ -96,97 +95,115 @@ void close() throws SQLException;
|
|||
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
|
||||
|
||||
```sql
|
||||
drop database if exists tmqdb;
|
||||
create database tmqdb;
|
||||
create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16) tags(t1 int, t3 varchar(16));
|
||||
create table tmqdb.ctb0 using tmqdb.stb tags(0, "subtable0");
|
||||
create table tmqdb.ctb1 using tmqdb.stb tags(1, "subtable1");
|
||||
insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
|
||||
insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
||||
DROP DATABASE IF EXISTS tmqdb;
|
||||
CREATE DATABASE tmqdb;
|
||||
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
|
||||
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
|
||||
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
|
||||
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
|
||||
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
|
||||
```
|
||||
|
||||
## 创建topic:
|
||||
## 创建 *topic*
|
||||
|
||||
TDengine 使用 SQL 创建一个 topic:
|
||||
|
||||
```sql
|
||||
create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
|
||||
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
|
||||
```
|
||||
|
||||
TMQ支持多种订阅类型:
|
||||
TMQ 支持多种订阅类型:
|
||||
|
||||
### 列订阅
|
||||
|
||||
语法:CREATE TOPIC topic_name as subquery
|
||||
通过select语句订阅(包括select *,或select ts, c1等指定列描述订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)
|
||||
|
||||
- TOPIC一旦创建则schema确定
|
||||
- 被订阅或用于计算的column和tag不可被删除、修改
|
||||
- 若发生schema变更,新增的column不出现在结果中
|
||||
|
||||
### 超级表订阅
|
||||
语法:CREATE TOPIC topic_name AS STABLE stbName
|
||||
|
||||
与select * from stbName订阅的区别是:
|
||||
- 不会限制用户的schema变更
|
||||
- 返回的是非结构化的数据:返回数据的schema会随之超级表的schema变化而变化
|
||||
- 用户对于要处理的每一个数据块都可能有不同的schema,因此,必须重新获取schema
|
||||
- 返回数据不带有tag
|
||||
|
||||
## 创建 consumer 以及consumer group
|
||||
|
||||
对于consumer, 目前支持的config包括:
|
||||
|
||||
<Tabs defaultValue="c_param">
|
||||
<TabItem value="c_param" label="C">
|
||||
|
||||
| 参数名称 | 参数说明 | 备注 |
|
||||
| ---------------------------- |----------------------------|----------------------------|
|
||||
| group.id | | 最大长度:192 |
|
||||
| enable.auto.commit | | 合法值:true, false |
|
||||
| auto.commit.interval.ms | | |
|
||||
| auto.offset.reset | | 合法值:earliest, latest, none |
|
||||
| td.connect.ip | | 用于连接,同taos_connect的参数 | |
|
||||
| td.connect.user | | 用于连接,同taos_connect的参数 | |
|
||||
| td.connect.pass | | 用于连接,同taos_connect的参数 | |
|
||||
| td.connect.port | | 用于连接,同taos_connect的参数 | |
|
||||
| enable.heartbeat.background |开启后台心跳,即consumer不会因为长时间不poll而认为离线 | 合法值:true, false |
|
||||
| experimental.snapshot.enable |从wal开始消费,还是从tsbs开始消费 | 合法值:true, false |
|
||||
| msg.with.table.name | 从消息中能否解析表名 | 合法值:true, false |
|
||||
语法:
|
||||
|
||||
```sql
|
||||
/* 根据需要,设置消费组(group.id)、自动提交(enable.auto.commit)、自动提交时间间隔(auto.commit.interval.ms)、用户名(td.connect.user)、密码(td.connect.pass)等参数 */
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
CREATE TOPIC topic_name as subquery
|
||||
```
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
|
||||
|
||||
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
|
||||
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
|
||||
- 若发生表结构变更,新增的列不出现在结果中,若发生列删除则会报错。
|
||||
|
||||
### 超级表订阅
|
||||
|
||||
语法:
|
||||
|
||||
```sql
|
||||
CREATE TOPIC topic_name AS STABLE stb_name
|
||||
```
|
||||
|
||||
与 `SELECT * from stbName` 订阅的区别是:
|
||||
|
||||
- 不会限制用户的表结构变更。
|
||||
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
|
||||
- 用户对于要处理的每一个数据块都可能有不同的表结构。
|
||||
- 返回数据不包含标签。
|
||||
|
||||
### 数据库订阅
|
||||
|
||||
语法:
|
||||
|
||||
```sql
|
||||
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
|
||||
```
|
||||
|
||||
通过该语句可创建一个包含数据库所有表数据的订阅,`WITH META` 可选择将数据库结构变动信息加入到订阅消息流,TMQ 将消费当前数据库下所有表结构的变动,包括超级表的创建与删除,列添加、删除或修改,子表的创建、删除及 TAG 变动信息等等。消费者可通过 API 来判断具体的消息类型。这一点也是与 Kafka 不同的地方。
|
||||
|
||||
## 创建消费者 *consumer*
|
||||
|
||||
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td.connect.ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td.connect.user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td.connect.pass` | string | 用于创建连接,同 `taos_connect` |
|
||||
| `td.connect.port` | integer | 用于创建连接,同 `taos_connect` |
|
||||
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client.id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable.auto.commit` | boolean | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto.commit.interval.ms` | integer | 以毫秒为单位的自动提交时间间隔 |
|
||||
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | |
|
||||
| `experimental.snapshot.enable` | boolean | 从 wal 开始消费,还是从 tsbs 开始消费 | |
|
||||
| `msg.with.table.name` | boolean | 从消息中能否解析表名 |
|
||||
|
||||
对于不同编程语言,其设置方式如下:
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
|
||||
自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="java_param" label="Java">
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
| 参数名称 | 参数值 | 备注 |
|
||||
| ---------------------------- |----------------------------|------------------------------------------------------------------------------------------------|
|
||||
| group.id | 最大长度:192,必填 | consumer 所在组 id |
|
||||
| client.id | 最大长度:192 | consumer id |
|
||||
| enable.auto.commit | 合法值:true, false | 是否允许自动提交 |
|
||||
| auto.commit.interval.ms | | 自动提交时间间隔 |
|
||||
| auto.offset.reset | 合法值:earliest, latest, none | offset 消费位置 |
|
||||
| bootstrap.servers | 用于创建连接 | 服务端 ip + port |
|
||||
| td.connect.user | 用于创建连接,同 taos_connect 的参数 | 服务端用户名 |
|
||||
| td.connect.pass | 用于创建连接,同 taos_connect 的参数 | 服务端密码 |
|
||||
| enable.heartbeat.background | 合法值:true, false | 开启后台心跳,即 consumer 不会因为长时间不 poll 而认为离线 |
|
||||
| experimental.snapshot.enable | 合法值:true, false | 从 wal 开始消费,还是从 tsbs 开始消费 |
|
||||
| msg.with.table.name | 合法值:true, false | 从消息中能否解析表名 |
|
||||
| value.deserializer | 指定 value 解析方法 | 此方法应实现 com.taosdata.jdbc.tmq.Deserializer 接口或是继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类 |
|
||||
| value.deserializer.encoding | 指定 value 解析使用的字符编码集 | |
|
||||
对于 Java 程序,额外支持以下配置项:
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 |
|
||||
| --------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| value.deserializer | boolean | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或是继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 |
|
||||
| value.deserializer.encoding | string | 值解析所使用的反序列化方式 | |
|
||||
|
||||
```java
|
||||
import com.taosdata.jdbc.tmq.ReferenceDeserializer;
|
||||
|
@ -196,139 +213,428 @@ TMQ支持多种订阅类型:
|
|||
```
|
||||
|
||||
```java
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("enable.auto.commit", "true");
|
||||
properties.setProperty("auto.commit.interval.ms", "1000");
|
||||
properties.setProperty("group.id", "cgrpName");
|
||||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||
properties.setProperty("td.connect.user", "root");
|
||||
properties.setProperty("td.connect.pass", "taosdata");
|
||||
properties.setProperty("auto.offset.reset", "earliest");
|
||||
properties.setProperty("msg.with.table.name", "true");
|
||||
properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
|
||||
|
||||
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("enable.auto.commit", "true");
|
||||
properties.setProperty("auto.commit.interval.ms", "1000");
|
||||
properties.setProperty("group.id", "cgrpName");
|
||||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||
properties.setProperty("td.connect.user", "root");
|
||||
properties.setProperty("td.connect.pass", "taosdata");
|
||||
properties.setProperty("auto.offset.reset", "earliest");
|
||||
properties.setProperty("msg.with.table.name", "true");
|
||||
properties.setProperty("value.deserializer.encoding", "com.taos.example.MetersDeserializer");
|
||||
|
||||
TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
上述配置中包括consumer group ID,如果多个 consumer 指定的 consumer group ID一样,则自动形成一个consumer group,共享消费进度。
|
||||
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
|
||||
|
||||
## 订阅 *topics*
|
||||
|
||||
## 订阅 topic 列表
|
||||
一个 consumer 支持同时订阅多个 topic。
|
||||
|
||||
单个consumer支持同时订阅多个topic。
|
||||
|
||||
```sql
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "topicName");
|
||||
```
|
||||
|
||||
## 启动订阅并开始消费
|
||||
|
||||
<Tabs defaultValue="c_create">
|
||||
<TabItem value="c_create" label="C">
|
||||
|
||||
```
|
||||
/* 启动订阅 */
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
```c
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "topicName");
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
/* 循环poll消息 */
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
|
||||
msg_process(tmqmsg);
|
||||
}
|
||||
```
|
||||
|
||||
## 消费
|
||||
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```c
|
||||
// 消费数据
|
||||
while (running) {
|
||||
TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
|
||||
msg_process(msg);
|
||||
}
|
||||
```
|
||||
|
||||
这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="java_create" label="Java">
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
```java
|
||||
List<String> topics = new ArrayList<>();
|
||||
topics.add("tmq_topic");
|
||||
consumer.subscribe(topics);
|
||||
|
||||
while(running){
|
||||
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
|
||||
for (Meters meter : meters) {
|
||||
processMsg(meter);
|
||||
}
|
||||
}
|
||||
List<String> topics = new ArrayList<>();
|
||||
topics.add("tmq_topic");
|
||||
consumer.subscribe(topics);
|
||||
|
||||
while(running){
|
||||
ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
|
||||
for (Meters meter : meters) {
|
||||
processMsg(meter);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
这里是一个 **while** 循环,每调用一次tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析API完成消息内容的解析。
|
||||
|
||||
## 结束消费
|
||||
|
||||
<Tabs defaultValue="c_close">
|
||||
<TabItem value="c_close" label="C">
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem value="c" label="C">
|
||||
|
||||
```sql
|
||||
/* 取消订阅 */
|
||||
tmq_unsubscribe(tmq);
|
||||
```c
|
||||
/* 取消订阅 */
|
||||
tmq_unsubscribe(tmq);
|
||||
|
||||
/* 关闭消费 */
|
||||
tmq_consumer_close(tmq);
|
||||
/* 关闭消费者对象 */
|
||||
tmq_consumer_close(tmq);
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
<TabItem value="java_close" label="Java">
|
||||
<TabItem value="java" label="Java">
|
||||
|
||||
```java
|
||||
/* 取消订阅 */
|
||||
consumer.unsubscribe();
|
||||
/* 取消订阅 */
|
||||
consumer.unsubscribe();
|
||||
|
||||
/* 关闭消费 */
|
||||
consumer.close();
|
||||
/* 关闭消费 */
|
||||
consumer.close();
|
||||
```
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
## 删除topic
|
||||
## 删除 *topic*
|
||||
|
||||
如果不再需要,可以删除创建topic,但注意:只有没有被订阅的topic才能别删除。
|
||||
如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
|
||||
|
||||
```sql
|
||||
/* 删除topic */
|
||||
drop topic topicName;
|
||||
/* 删除 topic */
|
||||
DROP TOPIC topic_name;
|
||||
```
|
||||
|
||||
## 状态查看
|
||||
|
||||
1、topics:查询已经创建的topic
|
||||
1、*topics*:查询已经创建的 topic
|
||||
|
||||
```sql
|
||||
show topics;
|
||||
SHOW TOPICS;
|
||||
```
|
||||
|
||||
2、consumers:查询consumer的状态及其订阅的topic
|
||||
2、consumers:查询 consumer 的状态及其订阅的 topic
|
||||
|
||||
```sql
|
||||
show consumers;
|
||||
SHOW CONSUMERS;
|
||||
```
|
||||
|
||||
3、subscriptions:查询consumer与vgroup之间的分配关系
|
||||
3、subscriptions:查询 consumer 与 vgroup 之间的分配关系
|
||||
|
||||
```sql
|
||||
show subscriptions;
|
||||
SHOW SUBSCRIPTIONS;
|
||||
```
|
||||
|
||||
## 示例代码
|
||||
|
||||
本节展示各种语言的示例代码。
|
||||
以下是各语言的完整示例代码。
|
||||
|
||||
<Tabs>
|
||||
<Tabs defaultValue="java" groupId="lang">
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
```c
|
||||
{{#include examples/c/tmq.c}}
|
||||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
static char dbName[64] = "tmqdb";
|
||||
static char stbName[64] = "stb";
|
||||
static char topicName[64] = "topicname";
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
const char* tbName = tmq_get_table_name(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
// drop database if exists
|
||||
printf("create database\n");
|
||||
pRes = taos_query(pConn, "drop database if exists tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = taos_query(pConn, "create database tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
printf("create super table\n");
|
||||
pRes = taos_query(
|
||||
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create sub tables
|
||||
printf("create sub tables\n");
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// insert data
|
||||
printf("insert data into sub tables\n");
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
code = tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
if (TMQ_CONF_OK != code) return NULL;
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
int32_t code = tmq_list_append(topicList, "topicname");
|
||||
if (code) {
|
||||
return NULL;
|
||||
}
|
||||
return topicList;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
|
||||
int32_t code;
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topicList))) {
|
||||
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 5000;
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
totalRows += msg_process(tmqmsg);
|
||||
taos_free_result(tmqmsg);
|
||||
/*} else {*/
|
||||
/*break;*/
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
|
||||
if (init_env() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "%% build_consumer() fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
|
||||
code = tmq_unsubscribe(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "%% unsubscribe\n");
|
||||
}
|
||||
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c)
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Java" value="java">
|
||||
|
@ -346,9 +652,70 @@ TMQ支持多种订阅类型:
|
|||
<TabItem label="Python" value="Python">
|
||||
|
||||
```python
|
||||
{{#include docs/examples/python/tmq_example.py}}
|
||||
import taos
|
||||
from taos.tmq import *
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
# create database
|
||||
conn.execute("drop database if exists py_tmq")
|
||||
conn.execute("create database if not exists py_tmq vgroups 2")
|
||||
|
||||
# create table and stables
|
||||
conn.select_db("py_tmq")
|
||||
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1)")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2)")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3)")
|
||||
|
||||
# create topic
|
||||
conn.execute("drop topic if exists topic_ctb_column")
|
||||
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
|
||||
|
||||
# set consumer configure options
|
||||
conf = TaosTmqConf()
|
||||
conf.set("group.id", "tg2")
|
||||
conf.set("td.connect.user", "root")
|
||||
conf.set("td.connect.pass", "taosdata")
|
||||
conf.set("enable.auto.commit", "true")
|
||||
conf.set("msg.with.table.name", "true")
|
||||
|
||||
def tmq_commit_cb_print(tmq, resp, offset, param=None):
|
||||
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
|
||||
|
||||
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
|
||||
|
||||
# build consumer
|
||||
tmq = conf.new_consumer()
|
||||
|
||||
# build topic list
|
||||
topic_list = TaosTmqList()
|
||||
topic_list.append("topic_ctb_column")
|
||||
|
||||
# subscribe consumer
|
||||
tmq.subscribe(topic_list)
|
||||
|
||||
# check subscriptions
|
||||
sub_list = tmq.subscription()
|
||||
print("subscribed topics: ",sub_list)
|
||||
|
||||
# start subscribe
|
||||
while 1:
|
||||
res = tmq.poll(1000)
|
||||
if res:
|
||||
topic = res.get_topic_name()
|
||||
vg = res.get_vgroup_id()
|
||||
db = res.get_db_name()
|
||||
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
|
||||
for row in res:
|
||||
print(row)
|
||||
tb = res.get_table_name()
|
||||
print(f"from table: {tb}")
|
||||
|
||||
```
|
||||
|
||||
[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.JS" value="Node.JS">
|
||||
|
|
Loading…
Reference in New Issue