opti:docs for tmq

This commit is contained in:
wangmm0220 2024-02-18 13:53:07 +08:00
parent 43624bbe08
commit 7108e0dc4e
5 changed files with 672 additions and 682 deletions

View File

@ -15,43 +15,80 @@ import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本
消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
# 介绍
## 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
为了实现上述功能TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)
下面为关于数据订阅的一些说明需要对TDengine的架构有一些了解结合各个语言链接器的接口使用。(可使用时再了解)
- 一个消费组消费同一个topic下的所有数据不同消费组之间相互独立
- 一个消费组消费同一个topic所有的vgroup消费组可由多个消费者组成但一个vgroup仅被一个消费者消费如果消费者数量超过了vgroup数量多余的消费者不消费数据
- 在服务端每个vgroup仅保存一个offset每个vgroup的offset是单调递增的但不一定连续。各个vgroup的offset之间没有关联
- 每次poll服务端会返回一个结果block该block属于一个vgroup可能包含多个wal版本的数据可以通过 offset 接口获得是该block第一条记录的offset
- 一个消费组如果从未commit过offset当其成员消费者重启重新拉取数据时均从参数auto.offset.reset设定值开始消费在一个消费者生命周期中客户端本地记录了最近一次拉取数据的offset不会拉取重复数据
- 消费者如果异常终止没有调用tmq_close需等约12秒后触发其所属消费组rebalance该消费者在服务端状态变为LOST约1天后该消费者自动被删除正常退出退出后就会删除消费者新增消费者需等约2秒触发rebalance该消费者在服务端状态变为ready
- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作
- 消费者可利用 position 获得当前消费的offset并seek到指定offset重新消费
- seek将position指向指定offset不执行commit操作一旦seek成功可poll拉取指定offset及以后的数据
- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法如非法将报错
- position是获取当前的消费位置是下次要取的位置不是当前消费到的位置
- commit是提交消费位置不带参数的话是提交当前消费位置下次要取的位置不是当前消费到的位置带参数的话是提交参数里的位置也即下次退出重启后要取的位置
- seek是设置consumer消费位置seek到哪position就返回哪都是下次要取的位置
- seek不会影响commitcommit不影响seek相互独立两个是不同的概念
- begin接口为wal 第一条数据的offsetend 接口为wal 最后一条数据的offset + 1
- offset接口获取的是记录所在结果block块里的第一条数据的offset当seek至该offset时将消费到这个block里的全部数据。参见第四点
- 由于存在 WAL 过期删除机制即使seek 操作成功poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号将会从WAL最小版本号消费
- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;
![img_5.png](img_5.png)
本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索
TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
说明:
对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
## 生产者
写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。
## 消费者
### 消费者组
消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。
为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:
![img_6.png](img_6.png)
在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。
一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。
### 消费进度
在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。
首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest
消费进度在一个 vnode 上对于同一个 topic 和 消费者组是唯一的。所以如果同一个 topic 和 消费者组在一个 vnode 上的消费者退出了,并且提交了消费进度。然后同一个 topic 和 消费者组里重新建了一个新的消费者消费这个 vnode那么这个新消费者将继承之前的消费进度继续消费。
如果之前的消费者没有提交消费进度那个新的消费者将根据订阅参数auto.offset.reset设置的值来确定起始消费位置。
不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。
![img_7.png](img_7.png)
作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。
##说明
从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
## 主要数据结构和 API
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
不同语言下, TMQ 订阅相关的 API 及数据结构如下注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
# 语法说明
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq)
# 消费参数
消费参数主要用于消费者创建时指定,基础配置项如下表所示:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
# 主要数据结构和 API 接口
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">
@ -110,8 +147,6 @@ import CDemo from "./_sub_c.mdx";
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
</TabItem>
<TabItem value="java" label="Java">
@ -273,6 +308,7 @@ void Close()
</TabItem>
</Tabs>
# 数据订阅示例
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@ -286,78 +322,16 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```
## 创建 topic
## 创建 *topic*
TDengine 使用 SQL 创建一个 topic
使用 SQL 创建一个 topic
```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
- topic创建个数有上限通过参数 tmqMaxTopicNum 控制,默认 20 个
TMQ 支持多种订阅类型:
### 列订阅
语法:
```sql
CREATE TOPIC topic_name as subquery
```
通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
- 若发生表结构变更,新增的列不出现在结果中。
### 超级表订阅
语法:
```sql
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```
与 `SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- with meta 参数可选选择时将返回创建超级表子表等语句主要用于taosx做超级表迁移
- where_condition 参数可选选择时将用来过滤符合条件的子表订阅这些子表。where 条件里不能有普通列只能是tag或tbnamewhere条件里可以用函数用来过滤tag但是不能是聚合函数因为子表tag值无法做聚合。也可以是常量表达式比如 2 > 1订阅全部子表或者 false订阅0个子表
- 返回数据不包含标签。
### 数据库订阅
语法:
```sql
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选选择时将返回创建数据库里所有超级表子表的语句主要用于taosx做数据库迁移
## 创建消费者 *consumer*
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang">
@ -523,24 +497,6 @@ var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。
数据回放功能说明:
- 订阅增加 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据
```sql
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000
```
则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
- 仅列订阅支持数据回放
- 回放需要保证独立时间线
- 如果是子表订阅或者普通表订阅只有一个vnode上有数据保证是一个时间线
- 如果超级表订阅,则需保证该 DB 只有一个vnode否则报错因为多个vnode上订阅出的数据不在一个时间线上
- 超级表和库订阅不支持回放
- 增加 enable.replay 参数true表示开启订阅回放功能false表示不开启订阅回放功能默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时auto commit 自动关闭
- 因为数据回放本身需要处理时间所以回放的精度存在几十ms的误差
## 订阅 *topics*
一个 consumer 支持同时订阅多个 topic。
@ -837,36 +793,7 @@ consumer.Close();
</Tabs>
## 删除 *topic*
如果不再需要订阅数据,可以删除 topic需要注意只有当前未在订阅中的 TOPIC 才能被删除。
```sql
/* 删除 topic */
DROP TOPIC topic_name;
```
## 状态查看
1、*topics*:查询已经创建的 topic
```sql
SHOW TOPICS;
```
2、consumers查询 consumer 的状态及其订阅的 topic
```sql
SHOW CONSUMERS;
```
3、subscriptions查询 consumer 与 vgroup 之间的分配关系
```sql
SHOW SUBSCRIPTIONS;
```
## 示例代码
## 完整示例代码
以下是各语言的完整示例代码。
@ -908,3 +835,22 @@ SHOW SUBSCRIPTIONS;
</TabItem>
</Tabs>
#订阅高级功能
##数据回放
- 订阅支持 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据
```sql
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000
```
则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
- 仅查询订阅支持数据回放
- 回放需要保证独立时间线
- 如果是子表订阅或者普通表订阅只有一个vnode上有数据保证是一个时间线
- 如果超级表订阅,则需保证该 DB 只有一个vnode否则报错因为多个vnode上订阅出的数据不在一个时间线上
- 超级表和库订阅不支持回放
- enable.replay 参数true表示开启订阅回放功能false表示不开启订阅回放功能默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时auto commit 自动关闭
- 因为数据回放本身需要处理时间所以回放的精度存在几十ms的误差

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -6,32 +6,68 @@ description: TDengine 消息队列提供的数据订阅功能
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
## 创建订阅主题
## 创建 topic
TDengine 创建 topic 的个数上限通过参数 tmqMaxTopicNum 控制,默认 20 个。
TDengine 使用 SQL 创建一个 topic共有三种类型的 topic
### 查询 topic
语法:
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
CREATE TOPIC [IF NOT EXISTS] topic_name as subquery
```
通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
- 若发生表结构变更,新增的列不出现在结果中。
- 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
### 超级表 topic
1. TOPIC 一旦创建则返回结果的字段确定
2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
## 删除订阅主题
语法:
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS STABLE stb_name [where_condition]
```
`SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- with meta 参数可选选择时将返回创建超级表子表等语句主要用于taosx做超级表迁移
- where_condition 参数可选选择时将用来过滤符合条件的子表订阅这些子表。where 条件里不能有普通列只能是tag或tbnamewhere条件里可以用函数用来过滤tag但是不能是聚合函数因为子表tag值无法做聚合。也可以是常量表达式比如 2 > 1订阅全部子表或者 false订阅0个子表
- 返回数据不包含标签。
### 数据库 topic
语法:
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
```
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选选择时将返回创建数据库里所有超级表子表的语句主要用于taosx做数据库迁移
说明: 超级表订阅和库订阅属于高级订阅模式,容易出错,如确实要使用,请咨询专业人员。
## 删除 topic
如果不再需要订阅数据,可以删除 topic需要注意只有当前未在订阅中的 TOPIC 才能被删除。
```sql
/* 删除 topic */
DROP TOPIC [IF EXISTS] topic_name;
```
此时如果该订阅主题上存在 consumer则此 consumer 会收到一个错误。
## 查看订阅主题
## SHOW TOPICS
## 查看 topic
```sql
SHOW TOPICS;
@ -58,3 +94,11 @@ SHOW CONSUMERS;
```
显示当前数据库下所有活跃的消费者的信息。
## 查看订阅信息
```sql
SHOW SUBSCRIPTIONS;
```
显示 consumer 与 vgroup 之间的分配关系和消费信息