Merge pull request #15824 from taosdata/feature/tq

doc(tmq)
This commit is contained in:
Liu Jicong 2022-08-07 11:23:35 +08:00 committed by GitHub
commit d23738b126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 20 deletions

View File

@ -1,14 +1,18 @@
---
sidebar_label: 消息队列
description: "数据订阅与推送服务。连续写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
description: "数据订阅与推送服务。写入到 TDengine 中的时序数据能够被自动推送到订阅客户端。"
title: 消息队列
---
基于数据天然的时间序列特性TDengine 的数据写入insert与消息系统的数据发布pub逻辑上一致均可视为系统中插入一条带时间戳的新记录。同时TDengine 在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说TDengine 中每一张表均可视为一个标准的消息队列。
TDengine 内嵌支持消息订阅与推送服务下文都简称TMQ。使用系统提供的 API用户可使用普通查询语句订阅数据库中的一张或多张表或整个库。客户端启动订阅后定时或按需轮询服务器是否有新的记录到达有新的记录到达就会将结果反馈到客户。
为了实时获取写入TDengine的数据或者以事件到达顺序处理数据TDengine提供了类似消息队列产品的数据订阅、消费接口它支持以消费者组的方式使多个消费者分布式、多线程地同时订阅一个topic共享消费进度。并且提供了消息的ACK机制在宕机、重启等复杂环境下确保at least once消费。
使用TDengine一体化的数据订阅功能除了以上标准的消息队列特性还能在订阅同时通过标签、表名、列、表达式等多种方法过滤所需数据并且支持对数据进行函数变换、预处理包括标量udf计算
为了实现上述功能TDengine提供了多种灵活的WAL文件切换与保留机制可以按照时间或文件大小来保留WAL文件详见create database语句。在消费时TDengine从WAL中获取数据并经过过滤、变换等操作将数据推送给消费者。
TMQ提供了提交机制来保证消息队列的可靠性和正确性。在调用方法上支持自动提交和手动提交。
TMQ 的 API 中与订阅相关的主要数据结构和API如下
@ -82,18 +86,11 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
2、超级表订阅
语法CREATE TOPIC topic_name AS STABLE stbName
- 订阅某超级表的全部数据schema变更不受限schema变更后写入的数据将以最新schema返回
- 在tmq的返回消息中schema是块级别的每块的schema可能不一样
- 列变更后写入的数据若未落盘将以写入时的schema返回
- 列变更后写入的数据若已落盘将以落盘时的schema返回
3、db订阅
语法CREATE TOPIC topic_name AS DATABASE db_name
- 订阅某一db的全部数据schema变更不受限
- 在tmq的返回消息中schema是块级别的每块的schema可能不一样
- 列变更后写入的数据若未落盘将以写入时的schema返回
- 列变更后写入的数据若已落盘将以落盘时的schema返回
与select * from stbName订阅的区别是
- 不会限制用户的schema变更
- 返回的是非结构化的数据返回数据的schema会随之超级表的schema变化而变化
- 用户对于要处理的每一个数据块都可能有不同的schema因此必须重新获取schema
- 返回数据不带有tag
三、创建consumer
@ -121,7 +118,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
@ -151,9 +148,9 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
/* 循环poll消息 */
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t consumeDelay = 5000;
int32_t timeOut = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay);
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeOut);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
@ -190,7 +187,7 @@ create topic topicName as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1;
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf);
}