Update 07-tmq.md

This commit is contained in:
Jeff Tao 2022-08-07 13:20:45 +08:00 committed by GitHub
parent d23738b126
commit 662f194cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 19 deletions

View File

@ -1,18 +1,20 @@
---
sidebar_label: 消息队列
sidebar_label: 数据订阅
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 消息队列
title: 数据订阅
---
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
为了实时获取写入TDengine的数据或者以事件到达顺序处理数据TDengine提供了类似消息队列产品的数据订阅、消费接口它支持以消费者组的方式使多个消费者分布式、多线程地同时订阅一个topic共享消费进度。并且提供了消息的ACK机制在宕机、重启等复杂环境下确保at least once消费
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表或一张子表。不仅如此你可以通过标签、表名、列、表达式等多种方法过滤所需数据并且支持对数据进行函数变换、预处理包括标量udf计算。与其他消息队列软件相比这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine而不是应用完成有效的减少传输的数据量
使用TDengine一体化的数据订阅功能除了以上标准的消息队列特性还能在订阅同时通过标签、表名、列、表达式等多种方法过滤所需数据并且支持对数据进行函数变换、预处理包括标量udf计算
为了实现上述功能TDengine提供了多种灵活的WAL文件切换与保留机制可以按照时间或文件大小来保留WAL文件详见create database语句。在消费时TDengine从WAL中获取数据并经过过滤、变换等操作将数据推送给消费者。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度便于多线程分布式的消费数据提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表数据可能会分布在多个不同的vnode上也就是多个shard上这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制在宕机、重启等复杂环境下确保at least once消费。
为了实现上述功能TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制可以按照时间或文件大小来保留WAL文件详见create database语句。在消费时TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
## 主要数据结构和API
TMQ 的 API 中与订阅相关的主要数据结构和API如下
@ -51,7 +53,9 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码可以在 [tmq.c](https://github.com/taosdata/TDengine/blob/3.0/examples/c/tmq.c) 看到。
一、首先完成建库、建一张超级表和多张子表,并每个子表插入若干条数据记录:
## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
```sql
drop database if exists tmqdb;
@ -67,14 +71,15 @@ insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22');
insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33');
```
二、创建topic
## 创建topic
```sql
create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
```
TMQ支持多种订阅类型
1、列订阅
TMQ支持多种订阅类型
### 列订阅
语法CREATE TOPIC topic_name as subquery
通过select语句订阅包括select *或select ts, c1等指定列描述订阅可以带条件过滤、标量函数计算但不支持聚合函数、不支持时间窗口聚合
@ -83,7 +88,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
- 被订阅或用于计算的column和tag不可被删除、修改
- 若发生schema变更新增的column不出现在结果中
2、超级表订阅
### 超级表订阅
语法CREATE TOPIC topic_name AS STABLE stbName
与select * from stbName订阅的区别是
@ -92,7 +97,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
- 用户对于要处理的每一个数据块都可能有不同的schema因此必须重新获取schema
- 返回数据不带有tag
三、创建consumer
## 创建 consumer
目前支持的config
@ -128,7 +133,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return tmq;
```
四、创建订阅主题列表
## 创建 topic 列表
单个consumer支持同时订阅多个topic。
```sql
tmq_list_t* topicList = tmq_list_new();
@ -136,9 +143,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
return topicList;
```
单个consumer支持同时订阅多个topic。
五、启动订阅并开始消费
## 启动订阅并开始消费
```sql
/* 启动订阅 */
@ -196,7 +201,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
}
```
五、结束消费
## 结束消费
```sql
/* 取消订阅 */
@ -206,7 +211,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_consumer_close(tmq);
```
六、删除topic
## 删除topic
如果不再需要可以删除创建topic但注意只有没有被订阅的topic才能别删除。
@ -215,7 +220,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
drop topic topicName;
```
七、状态查看
## 状态查看
1、topics查询已经创建的topic