diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index 50913e87c8..df651eab96 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -15,334 +15,62 @@ import Node from "./_sub_node.mdx"; import CSharp from "./_sub_cs.mdx"; import CDemo from "./_sub_c.mdx"; -为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 +为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。 +# 介绍 +## 主题 +与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 -为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 +如下图,每个 topic 涉及到的数据表可能分布在多个 vnode(相当于 kafka 里的 partition) 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中,WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)。 -下面为关于数据订阅的一些说明,需要对TDengine的架构有一些了解,结合各个语言链接器的接口使用。(可使用时再了解) -- 一个消费组消费同一个topic下的所有数据,不同消费组之间相互独立; -- 一个消费组消费同一个topic所有的vgroup,消费组可由多个消费者组成,但一个vgroup仅被一个消费者消费,如果消费者数量超过了vgroup数量,多余的消费者不消费数据; -- 在服务端每个vgroup仅保存一个offset,每个vgroup的offset是单调递增的,但不一定连续。各个vgroup的offset之间没有关联; -- 每次poll服务端会返回一个结果block,该block属于一个vgroup,可能包含多个wal版本的数据,可以通过 offset 接口获得是该block第一条记录的offset; -- 一个消费组如果从未commit过offset,当其成员消费者重启重新拉取数据时,均从参数auto.offset.reset设定值开始消费;在一个消费者生命周期中,客户端本地记录了最近一次拉取数据的offset,不会拉取重复数据; -- 消费者如果异常终止(没有调用tmq_close),需等约12秒后触发其所属消费组rebalance,该消费者在服务端状态变为LOST,约1天后该消费者自动被删除;正常退出,退出后就会删除消费者;新增消费者,需等约2秒触发rebalance,该消费者在服务端状态变为ready; -- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配,消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作; -- 消费者可利用 position 获得当前消费的offset,并seek到指定offset,重新消费; -- seek将position指向指定offset,不执行commit操作,一旦seek成功,可poll拉取指定offset及以后的数据; -- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法,如非法将报错; -- position是获取当前的消费位置,是下次要取的位置,不是当前消费到的位置 -- commit是提交消费位置,不带参数的话,是提交当前消费位置(下次要取的位置,不是当前消费到的位置),带参数的话,是提交参数里的位置(也即下次退出重启后要取的位置) -- seek是设置consumer消费位置,seek到哪,position就返回哪,都是下次要取的位置 -- seek不会影响commit,commit不影响seek,相互独立,两个是不同的概念 -- begin接口为wal 第一条数据的offset,end 接口为wal 最后一条数据的offset + 1 -- offset接口获取的是记录所在结果block块里的第一条数据的offset,当seek至该offset时,将消费到这个block里的全部数据。参见第四点; -- 由于存在 WAL 过期删除机制,即使seek 操作成功,poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号,将会从WAL最小版本号消费; -- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似; +![img_5.png](img_5.png) -本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索。 +TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。 -说明: +对于 `SELECT` 语句形式的 topic,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 + +## 生产者 +写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。 + +## 消费者 +### 消费者组 +消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode)。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。 + +为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode;有 3 个消费者的话,2 个消费者各消费 1 个 vnode,1 个消费者消费 2 个 vnode;有 5 个消费者的话,4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图: + +![img_6.png](img_6.png) + +在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。 + +一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。 +### 消费进度 +在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset),消费进度可以手动提交,也可以通过参数(auto.commit.interval.ms)设置为周期性自动提交。 + +首次消费数据时通过订阅参数(auto.offset.reset)来确定消费位置为最新数据(latest)还是最旧数据(earliest)。 + +消费进度在一个 vnode 上对于同一个 topic 和 消费者组是唯一的。所以如果同一个 topic 和 消费者组在一个 vnode 上的消费者退出了,并且提交了消费进度。然后同一个 topic 和 消费者组里重新建了一个新的消费者消费这个 vnode,那么这个新消费者将继承之前的消费进度继续消费。 + +如果之前的消费者没有提交消费进度,那个新的消费者将根据订阅参数(auto.offset.reset)设置的值来确定起始消费位置。 + +不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。 + +![img_7.png](img_7.png) + +作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。 + +##说明 从3.2.0.0版本开始,数据订阅支持vnode迁移和分裂。 -由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。 -## 主要数据结构和 API +由于数据订阅依赖wal文件,而在vnode迁移和分裂的过程中,wal并不会同步过去,所以迁移或分裂后,之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后,再进行vnode迁移或分裂,否则,消费会丢失数据。 -不同语言下, TMQ 订阅相关的 API 及数据结构如下(注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): +# 语法说明 - - +具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq) -```c - typedef struct tmq_t tmq_t; - typedef struct tmq_conf_t tmq_conf_t; - typedef struct tmq_list_t tmq_list_t; +# 消费参数 - typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param)); - - typedef enum tmq_conf_res_t { - TMQ_CONF_UNKNOWN = -2, - TMQ_CONF_INVALID = -1, - TMQ_CONF_OK = 0, - } tmq_conf_res_t; - - typedef struct tmq_topic_assignment { - int32_t vgId; - int64_t currentOffset; - int64_t begin; - int64_t end; - } tmq_topic_assignment; - - DLL_EXPORT tmq_conf_t *tmq_conf_new(); - DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); - DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); - DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); - - DLL_EXPORT tmq_list_t *tmq_list_new(); - DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); - DLL_EXPORT void tmq_list_destroy(tmq_list_t *); - DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); - DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); - - DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); - DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); - DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); - DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); - DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); - DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); - DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); - DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); - DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); - DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); - DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment); - DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); - DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); - DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); - DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); - - DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); - DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); - DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); - DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); - DLL_EXPORT const char *tmq_err2str(int32_t code); -``` - -下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。 - - - - -```java -void subscribe(Collection topics) throws SQLException; - -void unsubscribe() throws SQLException; - -Set subscription() throws SQLException; - -ConsumerRecords poll(Duration timeout) throws SQLException; - -Set assignment() throws SQLException; -long position(TopicPartition partition) throws SQLException; -Map position(String topic) throws SQLException; -Map beginningOffsets(String topic) throws SQLException; -Map endOffsets(String topic) throws SQLException; -Map committed(Set partitions) throws SQLException; - -void seek(TopicPartition partition, long offset) throws SQLException; -void seekToBeginning(Collection partitions) throws SQLException; -void seekToEnd(Collection partitions) throws SQLException; - -void commitSync() throws SQLException; -void commitSync(Map offsets) throws SQLException; - -void close() throws SQLException; -``` - - - - - -```python -class Consumer: - def subscribe(self, topics): - pass - - def unsubscribe(self): - pass - - def poll(self, timeout: float = 1.0): - pass - - def assignment(self): - pass - - def seek(self, partition): - pass - - def close(self): - pass - - def commit(self, message): - pass -``` - - - - - -```go -func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) - -// 出于兼容目的保留 rebalanceCb 参数,当前未使用 -func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error - -// 出于兼容目的保留 rebalanceCb 参数,当前未使用 -func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error - -func (c *Consumer) Poll(timeoutMs int) tmq.Event - -// 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 -func (c *Consumer) Commit() ([]tmq.TopicPartition, error) - -func (c *Consumer) Unsubscribe() error - -func (c *Consumer) Close() error -``` - - - - - -```rust -impl TBuilder for TmqBuilder - fn from_dsn(dsn: D) -> Result - fn build(&self) -> Result - -impl AsAsyncConsumer for Consumer - async fn subscribe, I: IntoIterator + Send>( - &mut self, - topics: I, - ) -> Result<(), Self::Error>; - fn stream( - &self, - ) -> Pin< - Box< - dyn '_ - + Send - + futures::Stream< - Item = Result<(Self::Offset, MessageSet), Self::Error>, - >, - >, - >; - async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; - - async fn unsubscribe(self); -``` - -可在 上查看详细 API 说明。 - - - - - -```js -function TMQConsumer(config) - -function subscribe(topic) - -function consume(timeout) - -function subscription() - -function unsubscribe() - -function commit(msg) - -function close() -``` - - - - - -```csharp -class ConsumerBuilder - -ConsumerBuilder(IEnumerable> config) - -public IConsumer Build() - -void Subscribe(IEnumerable topics) - -void Subscribe(string topic) - -ConsumeResult Consume(int millisecondsTimeout) - -List Subscription() - -void Unsubscribe() - -List Commit() - -void Close() -``` - - - - -## 写入数据 - -首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: - -```sql -DROP DATABASE IF EXISTS tmqdb; -CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600; -CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); -CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); -CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); -INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); -INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); -``` - -## 创建 *topic* - -TDengine 使用 SQL 创建一个 topic: - -```sql -CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; -``` -- topic创建个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个 - -TMQ 支持多种订阅类型: - -### 列订阅 - -语法: - -```sql -CREATE TOPIC topic_name as subquery -``` - -通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是: - -- 该类型 TOPIC 一旦创建则订阅数据的结构确定。 -- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。 -- 若发生表结构变更,新增的列不出现在结果中。 - -### 超级表订阅 - -语法: - -```sql -CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition] -``` - -与 `SELECT * from stbName` 订阅的区别是: - -- 不会限制用户的表结构变更。 -- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。 -- with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于taosx做超级表迁移 -- where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是tag或tbname,where条件里可以用函数,用来过滤tag,但是不能是聚合函数,因为子表tag值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅0个子表) -- 返回数据不包含标签。 - -### 数据库订阅 - -语法: - -```sql -CREATE TOPIC topic_name [with meta] AS DATABASE db_name; -``` - -通过该语句可创建一个包含数据库所有表数据的订阅 - -- with meta 参数可选,选择时将返回创建数据库里所有超级表,子表的语句,主要用于taosx做数据库迁移 - -## 创建消费者 *consumer* - -消费者需要通过一系列配置选项创建,基础配置项如下表所示: +消费参数主要用于消费者创建时指定,基础配置项如下表所示: | 参数名称 | 类型 | 参数说明 | 备注 | | :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | @@ -358,515 +86,714 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name; | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句)(从3.2.0.0版本该参数废弃,恒为true) |默认关闭 | | `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 | -对于不同编程语言,其设置方式如下: +# 主要数据结构和 API 接口 + +不同语言下, TMQ 订阅相关的 API 及数据结构如下(详细的接口说明可以参考连接器章节,注意consumer结构不是线程安全的,在一个线程使用consumer时,不要在另一个线程close这个consumer): - + -```c -/* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 - 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ -tmq_conf_t* conf = tmq_conf_new(); -tmq_conf_set(conf, "enable.auto.commit", "true"); -tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); -tmq_conf_set(conf, "group.id", "cgrpName"); -tmq_conf_set(conf, "td.connect.user", "root"); -tmq_conf_set(conf, "td.connect.pass", "taosdata"); -tmq_conf_set(conf, "auto.offset.reset", "latest"); -tmq_conf_set(conf, "msg.with.table.name", "true"); -tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + ```c + typedef struct tmq_t tmq_t; + typedef struct tmq_conf_t tmq_conf_t; + typedef struct tmq_list_t tmq_list_t; -tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); -tmq_conf_destroy(conf); + typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param)); + + typedef enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, + } tmq_conf_res_t; + + typedef struct tmq_topic_assignment { + int32_t vgId; + int64_t currentOffset; + int64_t begin; + int64_t end; + } tmq_topic_assignment; + + DLL_EXPORT tmq_conf_t *tmq_conf_new(); + DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); + DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); + DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); + + DLL_EXPORT tmq_list_t *tmq_list_new(); + DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); + DLL_EXPORT void tmq_list_destroy(tmq_list_t *); + DLL_EXPORT int32_t tmq_list_get_size(const tmq_list_t *); + DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); + + DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); + DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); + DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); + DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); + DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); + DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); + DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); + DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); + DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); + DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); + DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment); + DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); + DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); + DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); + DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); + + DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); + DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); + DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); + DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); + DLL_EXPORT const char *tmq_err2str(int32_t code); + ``` + + + + + ```java + void subscribe(Collection topics) throws SQLException; + + void unsubscribe() throws SQLException; + + Set subscription() throws SQLException; + + ConsumerRecords poll(Duration timeout) throws SQLException; + + Set assignment() throws SQLException; + long position(TopicPartition partition) throws SQLException; + Map position(String topic) throws SQLException; + Map beginningOffsets(String topic) throws SQLException; + Map endOffsets(String topic) throws SQLException; + Map committed(Set partitions) throws SQLException; + + void seek(TopicPartition partition, long offset) throws SQLException; + void seekToBeginning(Collection partitions) throws SQLException; + void seekToEnd(Collection partitions) throws SQLException; + + void commitSync() throws SQLException; + void commitSync(Map offsets) throws SQLException; + + void close() throws SQLException; + ``` + + + + + + ```python + class Consumer: + def subscribe(self, topics): + pass + + def unsubscribe(self): + pass + + def poll(self, timeout: float = 1.0): + pass + + def assignment(self): + pass + + def seek(self, partition): + pass + + def close(self): + pass + + def commit(self, message): + pass + ``` + + + + + + ```go + func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) + + // 出于兼容目的保留 rebalanceCb 参数,当前未使用 + func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error + + // 出于兼容目的保留 rebalanceCb 参数,当前未使用 + func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error + + func (c *Consumer) Poll(timeoutMs int) tmq.Event + + // 出于兼容目的保留 tmq.TopicPartition 参数,当前未使用 + func (c *Consumer) Commit() ([]tmq.TopicPartition, error) + + func (c *Consumer) Unsubscribe() error + + func (c *Consumer) Close() error + ``` + + + + + + ```rust + impl TBuilder for TmqBuilder + fn from_dsn(dsn: D) -> Result + fn build(&self) -> Result + + impl AsAsyncConsumer for Consumer + async fn subscribe, I: IntoIterator + Send>( + &mut self, + topics: I, + ) -> Result<(), Self::Error>; + fn stream( + &self, + ) -> Pin< + Box< + dyn '_ + + Send + + futures::Stream< + Item = Result<(Self::Offset, MessageSet), Self::Error>, + >, + >, + >; + async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; + + async fn unsubscribe(self); + ``` + + 可在 上查看详细 API 说明。 + + + + + + ```js + function TMQConsumer(config) + + function subscribe(topic) + + function consume(timeout) + + function subscription() + + function unsubscribe() + + function commit(msg) + + function close() + ``` + + + + + + ```csharp + class ConsumerBuilder + + ConsumerBuilder(IEnumerable> config) + + public IConsumer Build() + + void Subscribe(IEnumerable topics) + + void Subscribe(string topic) + + ConsumeResult Consume(int millisecondsTimeout) + + List Subscription() + + void Unsubscribe() + + List Commit() + + void Close() + ``` + + + + + # 数据订阅示例 + ## 写入数据 + + 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: + + ```sql + DROP DATABASE IF EXISTS tmqdb; + CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600; + CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16)); + CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); + CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); + INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); +INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); ``` +## 创建 topic - - + 使用 SQL 创建一个 topic: -对于 Java 程序,还可以使用如下配置项: + ```sql + CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; + ``` -| 参数名称 | 类型 | 参数说明 | -| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | -| `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | -| `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | -| `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | -| `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | + ## 创建消费者 *consumer* -需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 + 对于不同编程语言,其设置方式如下: -```java -Properties properties = new Properties(); -properties.setProperty("enable.auto.commit", "true"); -properties.setProperty("auto.commit.interval.ms", "1000"); -properties.setProperty("group.id", "cgrpName"); -properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); -properties.setProperty("td.connect.user", "root"); -properties.setProperty("td.connect.pass", "taosdata"); -properties.setProperty("auto.offset.reset", "latest"); -properties.setProperty("msg.with.table.name", "true"); -properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); + + -TaosConsumer consumer = new TaosConsumer<>(properties); + ```c + /* 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 + 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 */ + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + tmq_conf_set(conf, "group.id", "cgrpName"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "latest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); -/* value deserializer definition. */ -import com.taosdata.jdbc.tmq.ReferenceDeserializer; + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + ``` -public class MetersDeserializer extends ReferenceDeserializer { -} -``` + + - + 对于 Java 程序,还可以使用如下配置项: - + | 参数名称 | 类型 | 参数说明 | + | ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | + | `td.connect.type` | string | 连接类型,"jni" 指原生连接,"ws" 指 websocket 连接,默认值为 "jni" | + | `bootstrap.servers` | string | 连接地址,如 `localhost:6030` | + | `value.deserializer` | string | 值解析方法,使用此方法应实现 `com.taosdata.jdbc.tmq.Deserializer` 接口或继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer` 类 | + | `value.deserializer.encoding` | string | 指定字符串解析的字符集 | | -```go -conf := &tmq.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "latest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_c", - "enable.auto.commit": "false", - "msg.with.table.name": "true", -} -consumer, err := NewConsumer(conf) -``` + 需要注意:此处使用 `bootstrap.servers` 替代 `td.connect.ip` 和 `td.connect.port`,以提供与 Kafka 一致的接口。 - + ```java + Properties properties = new Properties(); + properties.setProperty("enable.auto.commit", "true"); + properties.setProperty("auto.commit.interval.ms", "1000"); + properties.setProperty("group.id", "cgrpName"); + properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); + properties.setProperty("td.connect.user", "root"); + properties.setProperty("td.connect.pass", "taosdata"); + properties.setProperty("auto.offset.reset", "latest"); + properties.setProperty("msg.with.table.name", "true"); + properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); - + TaosConsumer consumer = new TaosConsumer<>(properties); -```rust -let mut dsn: Dsn = "taos://".parse()?; -dsn.set("group.id", "group1"); -dsn.set("client.id", "test"); -dsn.set("auto.offset.reset", "latest"); + /* value deserializer definition. */ + import com.taosdata.jdbc.tmq.ReferenceDeserializer; -let tmq = TmqBuilder::from_dsn(dsn)?; + public class MetersDeserializer extends ReferenceDeserializer { + } + ``` -let mut consumer = tmq.build()?; -``` + - + - + ```go + conf := &tmq.ConfigMap{ + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + "msg.with.table.name": "true", + } + consumer, err := NewConsumer(conf) + ``` -Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: + -```python -from taos.tmq import Consumer + -# Syntax: `consumer = Consumer(configs)` -# -# Example: -consumer = Consumer( - { - "group.id": "local", - "client.id": "1", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "1000", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "latest", - "msg.with.table.name": "true", - } -) -``` + ```rust + let mut dsn: Dsn = "taos://".parse()?; + dsn.set("group.id", "group1"); + dsn.set("client.id", "test"); + dsn.set("auto.offset.reset", "latest"); - + let tmq = TmqBuilder::from_dsn(dsn)?; - + let mut consumer = tmq.build()?; + ``` -```js -// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 -// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 + -let consumer = taos.consumer({ - 'enable.auto.commit': 'true', + + + Python 语言下引入 `taos` 库的 `Consumer` 类,创建一个 Consumer 示例: + + ```python + from taos.tmq import Consumer + + # Syntax: `consumer = Consumer(configs)` + # + # Example: + consumer = Consumer( + { + "group.id": "local", + "client.id": "1", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "latest", + "msg.with.table.name": "true", + } + ) + ``` + + + + + + ```js + // 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、 + // 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数 + + let consumer = taos.consumer({ +'enable.auto.commit': 'true', 'auto.commit.interval.ms','1000', - 'group.id': 'tg2', + 'group.id': 'tg2', 'td.connect.user': 'root', 'td.connect.pass': 'taosdata', 'auto.offset.reset','latest', 'msg.with.table.name': 'true', 'td.connect.ip','127.0.0.1', - 'td.connect.port','6030' - }); -``` + 'td.connect.port','6030' + }); + ``` -```csharp -var cfg = new Dictionary() -{ - { "group.id", "group1" }, - { "auto.offset.reset", "latest" }, - { "td.connect.ip", "127.0.0.1" }, - { "td.connect.user", "root" }, - { "td.connect.pass", "taosdata" }, - { "td.connect.port", "6030" }, - { "client.id", "tmq_example" }, - { "enable.auto.commit", "true" }, - { "msg.with.table.name", "false" }, -}; -var consumer = new ConsumerBuilder>(cfg).Build(); -``` + ```csharp + var cfg = new Dictionary() + { + { "group.id", "group1" }, + { "auto.offset.reset", "latest" }, + { "td.connect.ip", "127.0.0.1" }, + { "td.connect.user", "root" }, + { "td.connect.pass", "taosdata" }, + { "td.connect.port", "6030" }, + { "client.id", "tmq_example" }, + { "enable.auto.commit", "true" }, + { "msg.with.table.name", "false" }, + }; + var consumer = new ConsumerBuilder>(cfg).Build(); + ``` - + - + -上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 + 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 -数据回放功能说明: -- 订阅增加 replay 功能,按照数据写入的时间回放。 - 比如,如下时间写入三条数据 - ```sql - 2023/09/22 00:00:00.000 - 2023/09/22 00:00:05.000 - 2023/09/22 00:00:08.000 - ``` - 则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。 -- 仅列订阅支持数据回放 - - 回放需要保证独立时间线 - - 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线 - - 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上) -- 超级表和库订阅不支持回放 -- 增加 enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。 -- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭 -- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差 + ## 订阅 *topics* -## 订阅 *topics* + 一个 consumer 支持同时订阅多个 topic。 -一个 consumer 支持同时订阅多个 topic。 - - - + + ```c -// 创建订阅 topics 列表 -tmq_list_t* topicList = tmq_list_new(); -tmq_list_append(topicList, "topicName"); -// 启动订阅 -tmq_subscribe(tmq, topicList); -tmq_list_destroy(topicList); - -``` + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "topicName"); + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + ``` -```java -List topics = new ArrayList<>(); -topics.add("tmq_topic"); -consumer.subscribe(topics); -``` + ```java + List topics = new ArrayList<>(); + topics.add("tmq_topic"); + consumer.subscribe(topics); + ``` -```go -err = consumer.Subscribe("example_tmq_topic", nil) -if err != nil { - panic(err) -} -``` + ```go + err = consumer.Subscribe("example_tmq_topic", nil) + if err != nil { + panic(err) + } + ``` - - + + ```rust -consumer.subscribe(["tmq_meters"]).await?; -``` + consumer.subscribe(["tmq_meters"]).await?; + ``` - + - + ```python -consumer.subscribe(['topic1', 'topic2']) -``` + consumer.subscribe(['topic1', 'topic2']) + ``` - + - + ```js -// 创建订阅 topics 列表 -let topics = ['topic_test'] + // 创建订阅 topics 列表 + let topics = ['topic_test'] -// 启动订阅 -consumer.subscribe(topics); -``` + // 启动订阅 + consumer.subscribe(topics); + ``` - + - + ```csharp -// 创建订阅 topics 列表 -List topics = new List(); -topics.add("tmq_topic"); -// 启动订阅 -consumer.Subscribe(topics); -``` + // 创建订阅 topics 列表 + List topics = new List(); + topics.add("tmq_topic"); + // 启动订阅 + consumer.Subscribe(topics); + ``` - + - + -## 消费 + ## 消费 -以下代码展示了不同语言下如何对 TMQ 消息进行消费。 + 以下代码展示了不同语言下如何对 TMQ 消息进行消费。 - - + + ```c -// 消费数据 -while (running) { - TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); - msg_process(msg); -} -``` + // 消费数据 + while (running) { + TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); + msg_process(msg); + } + ``` 这里是一个 **while** 循环,每调用一次 tmq_consumer_poll(),获取一个消息,该消息与普通查询返回的结果集完全相同,可以使用相同的解析 API 完成消息内容的解析。 -```java -while(running){ - ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); - for (Meters meter : meters) { - processMsg(meter); - } -} -``` + ```java + while(running){ + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + processMsg(meter); + } + } + ``` -```go -for { - ev := consumer.Poll(0) - if ev != nil { - switch e := ev.(type) { - case *tmqcommon.DataMessage: - fmt.Println(e.Value()) - case tmqcommon.Error: - fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) - panic(e) - } - consumer.Commit() - } -} -``` + ```go + for { + ev := consumer.Poll(0) + if ev != nil { + switch e := ev.(type) { + case *tmqcommon.DataMessage: + fmt.Println(e.Value()) + case tmqcommon.Error: + fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) + panic(e) + } + consumer.Commit() + } + } + ``` -```rust -{ - let mut stream = consumer.stream(); + ```rust + { + let mut stream = consumer.stream(); - while let Some((offset, message)) = stream.try_next().await? { - // get information from offset + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset - // the topic - let topic = offset.topic(); - // the vgroup id, like partition id in kafka. - let vgroup_id = offset.vgroup_id(); - println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); - if let Some(data) = message.into_data() { + if let Some(data) = message.into_data() { while let Some(block) = data.fetch_raw_block().await? { - // one block for one table, get table name if needed - let name = block.table_name(); - let records: Vec = block.deserialize().try_collect()?; - println!( - "** table: {}, got {} records: {:#?}\n", - name.unwrap(), - records.len(), - records - ); - } + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); } - consumer.commit(offset).await?; - } -} -``` + } + consumer.commit(offset).await?; + } + } + ``` -```python -while True: - res = consumer.poll(100) - if not res: + ```python + while True: + res = consumer.poll(100) + if not res: continue - err = res.error() - if err is not None: + err = res.error() + if err is not None: raise err - val = res.value() + val = res.value() - for block in val: + for block in val: print(block.fetchall()) -``` + ``` -```js -while(true){ - msg = consumer.consume(200); - // process message(consumeResult) - console.log(msg.topicPartition); - console.log(msg.block); - console.log(msg.fields) -} -``` + ```js + while(true){ + msg = consumer.consume(200); + // process message(consumeResult) + console.log(msg.topicPartition); + console.log(msg.block); + console.log(msg.fields) + } + ``` - + - + ```csharp -// 消费数据 -while (true) -{ - using (var result = consumer.Consume(500)) - { - if (result == null) continue; - ProcessMsg(result); - consumer.Commit(); - } -} -``` + // 消费数据 + while (true) + { + using (var result = consumer.Consume(500)) + { + if (result == null) continue; + ProcessMsg(result); + consumer.Commit(); + } + } + ``` - + - + -## 结束消费 + ## 结束消费 -消费结束后,应当取消订阅。 + 消费结束后,应当取消订阅。 - - + + ```c -/* 取消订阅 */ -tmq_unsubscribe(tmq); + /* 取消订阅 */ + tmq_unsubscribe(tmq); -/* 关闭消费者对象 */ -tmq_consumer_close(tmq); -``` + /* 关闭消费者对象 */ + tmq_consumer_close(tmq); + ``` - - + + ```java -/* 取消订阅 */ -consumer.unsubscribe(); + /* 取消订阅 */ + consumer.unsubscribe(); -/* 关闭消费 */ -consumer.close(); -``` + /* 关闭消费 */ + consumer.close(); + ``` - + - + ```go -/* Unsubscribe */ -_ = consumer.Unsubscribe() + /* Unsubscribe */ + _ = consumer.Unsubscribe() -/* Close consumer */ -_ = consumer.Close() -``` + /* Close consumer */ + _ = consumer.Close() + ``` - + - + ```rust -consumer.unsubscribe().await; -``` + consumer.unsubscribe().await; + ``` - + - + ```py -# 取消订阅 -consumer.unsubscribe() -# 关闭消费 -consumer.close() -``` + # 取消订阅 + consumer.unsubscribe() + # 关闭消费 + consumer.close() + ``` - - + + ```js -consumer.unsubscribe(); -consumer.close(); -``` + consumer.unsubscribe(); + consumer.close(); + ``` - + - + ```csharp -// 取消订阅 -consumer.Unsubscribe(); + // 取消订阅 + consumer.Unsubscribe(); -// 关闭消费 -consumer.Close(); -``` + // 关闭消费 + consumer.Close(); + ``` -## 删除 *topic* - -如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。 - -```sql -/* 删除 topic */ -DROP TOPIC topic_name; -``` - -## 状态查看 - -1、*topics*:查询已经创建的 topic - -```sql -SHOW TOPICS; -``` - -2、consumers:查询 consumer 的状态及其订阅的 topic - -```sql -SHOW CONSUMERS; -``` - -3、subscriptions:查询 consumer 与 vgroup 之间的分配关系 - -```sql -SHOW SUBSCRIPTIONS; -``` - -## 示例代码 +## 完整示例代码 以下是各语言的完整示例代码。 @@ -908,3 +835,22 @@ SHOW SUBSCRIPTIONS; + +#订阅高级功能 +##数据回放 +- 订阅支持 replay 功能,按照数据写入的时间回放。 + 比如,如下时间写入三条数据 + ```sql + 2023/09/22 00:00:00.000 + 2023/09/22 00:00:05.000 + 2023/09/22 00:00:08.000 + ``` + 则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。 +- 仅查询订阅支持数据回放 + - 回放需要保证独立时间线 + - 如果是子表订阅或者普通表订阅,只有一个vnode上有数据,保证是一个时间线 + - 如果超级表订阅,则需保证该 DB 只有一个vnode,否则报错(因为多个vnode上订阅出的数据不在一个时间线上) +- 超级表和库订阅不支持回放 +- enable.replay 参数,true表示开启订阅回放功能,false表示不开启订阅回放功能,默认不开启。 +- 回放不支持进度保存,所以回放参数 enable.replay = true 时,auto commit 自动关闭 +- 因为数据回放本身需要处理时间,所以回放的精度存在几十ms的误差 \ No newline at end of file diff --git a/docs/zh/07-develop/img_5.png b/docs/zh/07-develop/img_5.png new file mode 100644 index 0000000000..d9306cdb74 Binary files /dev/null and b/docs/zh/07-develop/img_5.png differ diff --git a/docs/zh/07-develop/img_6.png b/docs/zh/07-develop/img_6.png new file mode 100644 index 0000000000..0c9a061107 Binary files /dev/null and b/docs/zh/07-develop/img_6.png differ diff --git a/docs/zh/07-develop/img_7.png b/docs/zh/07-develop/img_7.png new file mode 100644 index 0000000000..0ddb005d66 Binary files /dev/null and b/docs/zh/07-develop/img_7.png differ diff --git a/docs/zh/12-taos-sql/13-tmq.md b/docs/zh/12-taos-sql/13-tmq.md index 571300ad8c..61135a3422 100644 --- a/docs/zh/12-taos-sql/13-tmq.md +++ b/docs/zh/12-taos-sql/13-tmq.md @@ -6,32 +6,68 @@ description: TDengine 消息队列提供的数据订阅功能 TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。 -## 创建订阅主题 +## 创建 topic + +TDengine 创建 topic 的个数上限通过参数 tmqMaxTopicNum 控制,默认 20 个。 + +TDengine 使用 SQL 创建一个 topic,共有三种类型的 topic: + +### 查询 topic + +语法: ```sql -CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery; +CREATE TOPIC [IF NOT EXISTS] topic_name as subquery ``` +通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是: -TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下: +- 该类型 TOPIC 一旦创建则订阅数据的结构确定。 +- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。 +- 若发生表结构变更,新增的列不出现在结果中。 +- 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列) +### 超级表 topic -1. TOPIC 一旦创建则返回结果的字段确定 -2. 被订阅或用于计算的列不可被删除、修改 -3. 列可以新增,但新增的列不出现在订阅结果字段中 -4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列) - - -## 删除订阅主题 +语法: ```sql +CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS STABLE stb_name [where_condition] +``` + +与 `SELECT * from stbName` 订阅的区别是: + +- 不会限制用户的表结构变更。 +- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。 +- with meta 参数可选,选择时将返回创建超级表,子表等语句,主要用于taosx做超级表迁移 +- where_condition 参数可选,选择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有普通列,只能是tag或tbname,where条件里可以用函数,用来过滤tag,但是不能是聚合函数,因为子表tag值无法做聚合。也可以是常量表达式,比如 2 > 1(订阅全部子表),或者 false(订阅0个子表) +- 返回数据不包含标签。 + +### 数据库 topic + +语法: + +```sql +CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name; +``` + +通过该语句可创建一个包含数据库所有表数据的订阅 + +- with meta 参数可选,选择时将返回创建数据库里所有超级表,子表的语句,主要用于taosx做数据库迁移 + +说明: 超级表订阅和库订阅属于高级订阅模式,容易出错,如确实要使用,请咨询专业人员。 + +## 删除 topic + +如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。 + +```sql +/* 删除 topic */ DROP TOPIC [IF EXISTS] topic_name; ``` 此时如果该订阅主题上存在 consumer,则此 consumer 会收到一个错误。 -## 查看订阅主题 - -## SHOW TOPICS +## 查看 topic ```sql SHOW TOPICS; @@ -58,3 +94,11 @@ SHOW CONSUMERS; ``` 显示当前数据库下所有活跃的消费者的信息。 + +## 查看订阅信息 + +```sql +SHOW SUBSCRIPTIONS; +``` + +显示 consumer 与 vgroup 之间的分配关系和消费信息 \ No newline at end of file