Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-22023

This commit is contained in:
54liuyao 2024-02-19 08:59:15 +08:00
commit 05eaa90c11
57 changed files with 1192 additions and 921 deletions

BIN
deps/mips/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

View File

@ -15,43 +15,80 @@ import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx"; import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx"; import CDemo from "./_sub_c.mdx";
为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 *topic*, 但 TDengine 的 *topic* 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 `SELECT` 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度 为了帮助应用实时获取写入 TDengine 的数据或者以事件到达顺序处理数据TDengine 提供了类似 kafka 的数据订阅功能。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本
消费者订阅 *topic* 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。 # 介绍
## 主题
与 kafka 一样,你需要定义 topic, TDengine 的 topic 有三种,可以是数据库,超级表,或者一个 `SELECT` 语句,具体的语法参见 [CREATE TOPIC](../../12-taos-sql/13-tmq)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine而不是应用完成有效的减少传输的数据量与应用的复杂度。
为了实现上述功能TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 如下图,每个 topic 涉及到的数据表可能分布在多个 vnode相当于 kafka 里的 partition 上,每个 vnode 上的数据保存在 WAL(Write-Ahead-Log) 文件中WAL 文件里的数据是顺序写入的(由于 WAL 文件中存储的不只有数据,还有元数据,写入消息等,所以数据的版本号不是连续的)
下面为关于数据订阅的一些说明需要对TDengine的架构有一些了解结合各个语言链接器的接口使用。(可使用时再了解) ![img_5.png](img_5.png)
- 一个消费组消费同一个topic下的所有数据不同消费组之间相互独立
- 一个消费组消费同一个topic所有的vgroup消费组可由多个消费者组成但一个vgroup仅被一个消费者消费如果消费者数量超过了vgroup数量多余的消费者不消费数据
- 在服务端每个vgroup仅保存一个offset每个vgroup的offset是单调递增的但不一定连续。各个vgroup的offset之间没有关联
- 每次poll服务端会返回一个结果block该block属于一个vgroup可能包含多个wal版本的数据可以通过 offset 接口获得是该block第一条记录的offset
- 一个消费组如果从未commit过offset当其成员消费者重启重新拉取数据时均从参数auto.offset.reset设定值开始消费在一个消费者生命周期中客户端本地记录了最近一次拉取数据的offset不会拉取重复数据
- 消费者如果异常终止没有调用tmq_close需等约12秒后触发其所属消费组rebalance该消费者在服务端状态变为LOST约1天后该消费者自动被删除正常退出退出后就会删除消费者新增消费者需等约2秒触发rebalance该消费者在服务端状态变为ready
- 消费组rebalance会对该组所有ready状态的消费者成员重新进行vgroup分配消费者仅能对自己负责的vgroup进行assignment/seek/commit/poll操作
- 消费者可利用 position 获得当前消费的offset并seek到指定offset重新消费
- seek将position指向指定offset不执行commit操作一旦seek成功可poll拉取指定offset及以后的数据
- seek 操作之前须调用 assignment 接口获取该consumer的vgroup ID和offset范围。seek 操作会检测vgroup ID 和 offset是否合法如非法将报错
- position是获取当前的消费位置是下次要取的位置不是当前消费到的位置
- commit是提交消费位置不带参数的话是提交当前消费位置下次要取的位置不是当前消费到的位置带参数的话是提交参数里的位置也即下次退出重启后要取的位置
- seek是设置consumer消费位置seek到哪position就返回哪都是下次要取的位置
- seek不会影响commitcommit不影响seek相互独立两个是不同的概念
- begin接口为wal 第一条数据的offsetend 接口为wal 最后一条数据的offset + 1
- offset接口获取的是记录所在结果block块里的第一条数据的offset当seek至该offset时将消费到这个block里的全部数据。参见第四点
- 由于存在 WAL 过期删除机制即使seek 操作成功poll数据时有可能offset已失效。如果poll 的offset 小于 WAL 最小版本号将会从WAL最小版本号消费
- 数据订阅是从 WAL 消费数据,如果一些 WAL 文件被基于 WAL 保留策略删除,则已经删除的 WAL 文件中的数据就无法再消费到。需要根据业务需要在创建数据库时合理设置 `WAL_RETENTION_PERIOD` 或 `WAL_RETENTION_SIZE` ,并确保应用及时消费数据,这样才不会产生数据丢失的现象。数据订阅的行为与 Kafka 等广泛使用的消息队列类产品的行为相似;
本文档不对消息队列本身的知识做更多的介绍,如果需要了解,请自行搜索 TDengine 会为 WAL 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制,用户可以按需指定 WAL 文件保留的时间以及大小(详见 [CREATE DATABASE](../../12-taos-sql/02-database) 语句,由于消费是通过 WAL 实现的,所以应该根据写入消费速度来确定 WAL 的保存时长)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎。
说明: 对于 `SELECT` 语句形式的 topic在消费时TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
## 生产者
写入 topic 相关联的数据表中数据的都是生产者,生产者实际生产的数据写入到了子表或普通表中,即表所在 vnode 的 WAL 里。
## 消费者
### 消费者组
消费者订阅 topic 后,可以消费 topic 里的所有数据(这些数据所在的表可能分布在多个 vnode 上,即 db 所在的所有 vnode。订阅 topic 时,需要指定一个消费者组 (consumer group),如果这个消费者组里只有一个消费者,那么这个消费者会顺序的消费这些 vnode 上的数据。
为了提高消费速度,便于多线程、分布式地消费数据,可以在一个消费组里添加多个消费者,这些消费者将均分数据所在的 vnode 进行消费(比如数据分布在 4 个 vnode 上,有 2 个消费者的话,那么每个消费者消费 2 个 vnode有 3 个消费者的话2 个消费者各消费 1 个 vnode1 个消费者消费 2 个 vnode有 5 个消费者的话4 个各分配 1 个 vnode 消费,另外 1 个不消费),如下图:
![img_6.png](img_6.png)
在一个消费组里添加一个消费者后,在 Mnode 上通过 rebalance 的机制实现消费者的重新分配,该操作对用户是透明的。
一个消费者可以订阅多个 topic。TDengine 的数据订阅在宕机、重启等复杂环境下确保 at least once 消费。
### 消费进度
在 topic 的一个消费组的一个 vnode 上有消费进度。消费者消费的同时,可以提交消费进度,消费进度即 vnode 上 WAL 的版本号(对于 kafka 里的 offset消费进度可以手动提交也可以通过参数auto.commit.interval.ms设置为周期性自动提交。
首次消费数据时通过订阅参数auto.offset.reset来确定消费位置为最新数据latest还是最旧数据earliest
消费进度在一个 vnode 上对于同一个 topic 和 消费者组是唯一的。所以如果同一个 topic 和 消费者组在一个 vnode 上的消费者退出了,并且提交了消费进度。然后同一个 topic 和 消费者组里重新建了一个新的消费者消费这个 vnode那么这个新消费者将继承之前的消费进度继续消费。
如果之前的消费者没有提交消费进度那个新的消费者将根据订阅参数auto.offset.reset设置的值来确定起始消费位置。
不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。
![img_7.png](img_7.png)
作为一个数据库产品, WAL 文件中存储的不全是数据,也包括其他写入消息,元数据等,所以消费进度不是连续的。
##说明
从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。 从3.2.0.0版本开始数据订阅支持vnode迁移和分裂。
由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
## 主要数据结构和 API 由于数据订阅依赖wal文件而在vnode迁移和分裂的过程中wal并不会同步过去所以迁移或分裂后之前没消费完的wal数据后消费不到。所以请保证迁移和分裂之前把数据全部消费完后再进行vnode迁移或分裂否则消费会丢失数据。
不同语言下, TMQ 订阅相关的 API 及数据结构如下注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer # 语法说明
具体的语法参见 [数据订阅](../../12-taos-sql/13-tmq)
# 消费参数
消费参数主要用于消费者创建时指定,基础配置项如下表所示:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
# 主要数据结构和 API 接口
不同语言下, TMQ 订阅相关的 API 及数据结构如下详细的接口说明可以参考连接器章节注意consumer结构不是线程安全的在一个线程使用consumer时不要在另一个线程close这个consumer
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C"> <TabItem value="c" label="C">
@ -110,8 +147,6 @@ import CDemo from "./_sub_c.mdx";
DLL_EXPORT const char *tmq_err2str(int32_t code); DLL_EXPORT const char *tmq_err2str(int32_t code);
``` ```
下面介绍一下它们的具体用法(超级表和子表结构请参考“数据建模”一节),完整的示例代码请见下面 C 语言的示例代码。
</TabItem> </TabItem>
<TabItem value="java" label="Java"> <TabItem value="java" label="Java">
@ -273,6 +308,7 @@ void Close()
</TabItem> </TabItem>
</Tabs> </Tabs>
# 数据订阅示例
## 写入数据 ## 写入数据
首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如: 首先完成建库、建一张超级表和多张子表操作,然后就可以写入数据了,比如:
@ -286,78 +322,16 @@ CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
``` ```
## 创建 topic
## 创建 *topic* 使用 SQL 创建一个 topic
TDengine 使用 SQL 创建一个 topic
```sql ```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
``` ```
- topic创建个数有上限通过参数 tmqMaxTopicNum 控制,默认 20 个
TMQ 支持多种订阅类型:
### 列订阅
语法:
```sql
CREATE TOPIC topic_name as subquery
```
通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定列订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
- 若发生表结构变更,新增的列不出现在结果中。
### 超级表订阅
语法:
```sql
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```
与 `SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- with meta 参数可选选择时将返回创建超级表子表等语句主要用于taosx做超级表迁移
- where_condition 参数可选选择时将用来过滤符合条件的子表订阅这些子表。where 条件里不能有普通列只能是tag或tbnamewhere条件里可以用函数用来过滤tag但是不能是聚合函数因为子表tag值无法做聚合。也可以是常量表达式比如 2 > 1订阅全部子表或者 false订阅0个子表
- 返回数据不包含标签。
### 数据库订阅
语法:
```sql
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选选择时将返回创建数据库里所有超级表子表的语句主要用于taosx做数据库迁移
## 创建消费者 *consumer* ## 创建消费者 *consumer*
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | |
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句从3.2.0.0版本该参数废弃恒为true |默认关闭 |
| `enable.replay` | boolean | 是否开启数据回放功能 |默认关闭 |
对于不同编程语言,其设置方式如下: 对于不同编程语言,其设置方式如下:
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
@ -523,24 +497,6 @@ var consumer = new ConsumerBuilder<Dictionary<string, object>>(cfg).Build();
上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。 上述配置中包括 consumer group ID如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group共享消费进度。
数据回放功能说明:
- 订阅增加 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据
```sql
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000
```
则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
- 仅列订阅支持数据回放
- 回放需要保证独立时间线
- 如果是子表订阅或者普通表订阅只有一个vnode上有数据保证是一个时间线
- 如果超级表订阅,则需保证该 DB 只有一个vnode否则报错因为多个vnode上订阅出的数据不在一个时间线上
- 超级表和库订阅不支持回放
- 增加 enable.replay 参数true表示开启订阅回放功能false表示不开启订阅回放功能默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时auto commit 自动关闭
- 因为数据回放本身需要处理时间所以回放的精度存在几十ms的误差
## 订阅 *topics* ## 订阅 *topics*
一个 consumer 支持同时订阅多个 topic。 一个 consumer 支持同时订阅多个 topic。
@ -837,36 +793,7 @@ consumer.Close();
</Tabs> </Tabs>
## 删除 *topic* ## 完整示例代码
如果不再需要订阅数据,可以删除 topic需要注意只有当前未在订阅中的 TOPIC 才能被删除。
```sql
/* 删除 topic */
DROP TOPIC topic_name;
```
## 状态查看
1、*topics*:查询已经创建的 topic
```sql
SHOW TOPICS;
```
2、consumers查询 consumer 的状态及其订阅的 topic
```sql
SHOW CONSUMERS;
```
3、subscriptions查询 consumer 与 vgroup 之间的分配关系
```sql
SHOW SUBSCRIPTIONS;
```
## 示例代码
以下是各语言的完整示例代码。 以下是各语言的完整示例代码。
@ -908,3 +835,22 @@ SHOW SUBSCRIPTIONS;
</TabItem> </TabItem>
</Tabs> </Tabs>
#订阅高级功能
##数据回放
- 订阅支持 replay 功能,按照数据写入的时间回放。
比如,如下时间写入三条数据
```sql
2023/09/22 00:00:00.000
2023/09/22 00:00:05.000
2023/09/22 00:00:08.000
```
则订阅出第一条数据 5s 后返回第二条数据,获取第二条数据 3s 后返回第三条数据。
- 仅查询订阅支持数据回放
- 回放需要保证独立时间线
- 如果是子表订阅或者普通表订阅只有一个vnode上有数据保证是一个时间线
- 如果超级表订阅,则需保证该 DB 只有一个vnode否则报错因为多个vnode上订阅出的数据不在一个时间线上
- 超级表和库订阅不支持回放
- enable.replay 参数true表示开启订阅回放功能false表示不开启订阅回放功能默认不开启。
- 回放不支持进度保存,所以回放参数 enable.replay = true 时auto commit 自动关闭
- 因为数据回放本身需要处理时间所以回放的精度存在几十ms的误差

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -6,32 +6,68 @@ description: TDengine 消息队列提供的数据订阅功能
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。 TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
## 创建订阅主题 ## 创建 topic
TDengine 创建 topic 的个数上限通过参数 tmqMaxTopicNum 控制,默认 20 个。
TDengine 使用 SQL 创建一个 topic共有三种类型的 topic
### 查询 topic
语法:
```sql ```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery; CREATE TOPIC [IF NOT EXISTS] topic_name as subquery
``` ```
通过 `SELECT` 语句订阅(包括 `SELECT *`,或 `SELECT ts, c1` 等指定查询订阅,可以带条件过滤、标量函数计算,但不支持聚合函数、不支持时间窗口聚合)。需要注意的是:
TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下: - 该类型 TOPIC 一旦创建则订阅数据的结构确定。
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
- 若发生表结构变更,新增的列不出现在结果中。
- 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
### 超级表 topic
1. TOPIC 一旦创建则返回结果的字段确定 语法:
2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
## 删除订阅主题
```sql ```sql
CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS STABLE stb_name [where_condition]
```
`SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- with meta 参数可选选择时将返回创建超级表子表等语句主要用于taosx做超级表迁移
- where_condition 参数可选选择时将用来过滤符合条件的子表订阅这些子表。where 条件里不能有普通列只能是tag或tbnamewhere条件里可以用函数用来过滤tag但是不能是聚合函数因为子表tag值无法做聚合。也可以是常量表达式比如 2 > 1订阅全部子表或者 false订阅0个子表
- 返回数据不包含标签。
### 数据库 topic
语法:
```sql
CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
```
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选选择时将返回创建数据库里所有超级表子表的语句主要用于taosx做数据库迁移
说明: 超级表订阅和库订阅属于高级订阅模式,容易出错,如确实要使用,请咨询专业人员。
## 删除 topic
如果不再需要订阅数据,可以删除 topic需要注意只有当前未在订阅中的 TOPIC 才能被删除。
```sql
/* 删除 topic */
DROP TOPIC [IF EXISTS] topic_name; DROP TOPIC [IF EXISTS] topic_name;
``` ```
此时如果该订阅主题上存在 consumer则此 consumer 会收到一个错误。 此时如果该订阅主题上存在 consumer则此 consumer 会收到一个错误。
## 查看订阅主题 ## 查看 topic
## SHOW TOPICS
```sql ```sql
SHOW TOPICS; SHOW TOPICS;
@ -58,3 +94,11 @@ SHOW CONSUMERS;
``` ```
显示当前数据库下所有活跃的消费者的信息。 显示当前数据库下所有活跃的消费者的信息。
## 查看订阅信息
```sql
SHOW SUBSCRIPTIONS;
```
显示 consumer 与 vgroup 之间的分配关系和消费信息

View File

@ -58,16 +58,15 @@ typedef enum {
TSDB_GRANT_BACKUP_RESTORE, TSDB_GRANT_BACKUP_RESTORE,
} EGrantType; } EGrantType;
int32_t grantCheck(EGrantType grant); // less int32_t grantCheck(EGrantType grant);
int32_t grantCheckLE(EGrantType grant); // less or equal int32_t grantCheckExpire(EGrantType grant);
char* tGetMachineId(); char* tGetMachineId();
#ifndef TD_UNIQ_GRANT #ifdef TD_UNIQ_GRANT
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); int32_t grantCheckLE(EGrantType grant);
#endif #endif
// #ifndef GRANTS_CFG // #ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
#ifdef TD_UNIQ_GRANT
#define GRANTS_SCHEMA \ #define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \ static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
@ -80,38 +79,13 @@ int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, ch
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
} }
#else #else
#define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "storage", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "speed", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opc_da", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "opc_ua", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "pi", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "kafka", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "influxdb", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "mqtt", .bytes = GRANTS_COL_MAX_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
}
#endif
#else
#define GRANTS_SCHEMA \ #define GRANTS_SCHEMA \
static const SSysDbTableSchema grantsSchema[] = { \ static const SSysDbTableSchema grantsSchema[] = { \
{.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "version", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expire_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "service_time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "state", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "state", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \
{.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \ {.name = "cpu_cores", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, \

View File

@ -3634,6 +3634,7 @@ typedef struct {
int64_t timeout; int64_t timeout;
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
int8_t enableReplay; int8_t enableReplay;
int8_t sourceExcluded;
} SMqPollReq; } SMqPollReq;
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq); int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
@ -3763,6 +3764,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
STqOffsetVal offset; STqOffsetVal offset;
int64_t rows; int64_t rows;
int64_t ever;
} OffsetRows; } OffsetRows;
typedef struct { typedef struct {
@ -3920,6 +3922,9 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
#define SUBMIT_REQ_FROM_FILE 0x4 #define SUBMIT_REQ_FROM_FILE 0x4
#define SOURCE_NULL 0
#define SOURCE_TAOSX 1
typedef struct { typedef struct {
int32_t flags; int32_t flags;
SVCreateTbReq* pCreateTbReq; SVCreateTbReq* pCreateTbReq;
@ -3931,6 +3936,7 @@ typedef struct {
SArray* aCol; SArray* aCol;
}; };
int64_t ctimeMs; int64_t ctimeMs;
int8_t source;
} SSubmitTbData; } SSubmitTbData;
typedef struct { typedef struct {

View File

@ -197,6 +197,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetOpen(qTaskInfo_t tinfo);
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);

View File

@ -69,6 +69,19 @@ typedef pthread_key_t TdThreadKey;
#define taosThreadCleanupPush pthread_cleanup_push #define taosThreadCleanupPush pthread_cleanup_push
#define taosThreadCleanupPop pthread_cleanup_pop #define taosThreadCleanupPop pthread_cleanup_pop
#if !defined(WINDOWS)
#if defined(_TD_DARWIN_64) // MACOS
#define taosThreadRwlockAttrSetKindNP(A, B) ((void)0)
#else // LINUX
#if _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#define taosThreadRwlockAttrSetKindNP(A, B) pthread_rwlockattr_setkind_np(A, B)
#else
#define taosThreadRwlockAttrSetKindNP(A, B) ((void)0)
#endif
#endif
#else // WINDOWS
#define taosThreadRwlockAttrSetKindNP(A, B) ((void)0)
#endif
#if defined(WINDOWS) && !defined(__USE_PTHREAD) #if defined(WINDOWS) && !defined(__USE_PTHREAD)
#define TD_PTHREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER_FORBID #define TD_PTHREAD_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER_FORBID

View File

@ -287,7 +287,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_VALUE_LEN 256 #define TSDB_DNODE_VALUE_LEN 256
#define TSDB_CLUSTER_VALUE_LEN 1000 #define TSDB_CLUSTER_VALUE_LEN 1000
#define TSDB_GRANT_LOG_COL_LEN 15072 #define TSDB_GRANT_LOG_COL_LEN 15600
#define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255 #define TSDB_CONN_ACTIVE_KEY_LEN 255

View File

@ -63,6 +63,7 @@ struct tmq_conf_t {
int8_t withTbName; int8_t withTbName;
int8_t snapEnable; int8_t snapEnable;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
uint16_t port; uint16_t port;
int32_t autoCommitInterval; int32_t autoCommitInterval;
char* ip; char* ip;
@ -82,6 +83,7 @@ struct tmq_t {
int32_t autoCommitInterval; int32_t autoCommitInterval;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
int8_t replayEnable; int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
uint64_t consumerId; uint64_t consumerId;
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
@ -385,6 +387,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
} }
if (strcasecmp(key, "msg.consume.excluded") == 0) {
conf->sourceExcluded = taosStr2int64(value);
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) { if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK; return TMQ_CONF_OK;
@ -783,7 +789,6 @@ void tmqSendHbReq(void* param, void* tmrId) {
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch; req.epoch = tmq->epoch;
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
// if(tmq->needReportOffsetRows){
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
@ -796,14 +801,14 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
offRows->vgId = pVg->vgId; offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows; offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.beginOffset; offRows->offset = pVg->offsetInfo.endOffset;
offRows->ever = pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%"PRId64", rows:%"PRId64,
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
} }
} }
// tmq->needReportOffsetRows = false;
// }
taosRUnLockLatch(&tmq->lock); taosRUnLockLatch(&tmq->lock);
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
@ -1108,6 +1113,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable; pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
if(conf->replayEnable){ if(conf->replayEnable){
pTmq->autoCommit = false; pTmq->autoCommit = false;
} }
@ -1576,6 +1582,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq->useSnapshot = tmq->useSnapshot; pReq->useSnapshot = tmq->useSnapshot;
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->enableReplay = tmq->replayEnable; pReq->enableReplay = tmq->replayEnable;
pReq->sourceExcluded = tmq->sourceExcluded;
} }
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {

View File

@ -19,6 +19,13 @@
#ifndef _GRANT #ifndef _GRANT
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; } int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
int32_t grantCheckLE(EGrantType grant) {return TSDB_CODE_SUCCESS;} int32_t grantCheckExpire(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#ifdef TD_UNIQ_GRANT
int32_t grantCheckLE(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#endif
#else
#ifdef TD_UNIQ_GRANT
int32_t grantCheckExpire(EGrantType grant) { return TSDB_CODE_SUCCESS; }
#endif
#endif #endif

View File

@ -6252,6 +6252,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1; if (tEncodeI32(&encoder, offRows->vgId) < 0) return -1;
if (tEncodeI64(&encoder, offRows->rows) < 0) return -1; if (tEncodeI64(&encoder, offRows->rows) < 0) return -1;
if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1; if (tEncodeSTqOffsetVal(&encoder, &offRows->offset) < 0) return -1;
if (tEncodeI64(&encoder, offRows->ever) < 0) return -1;
} }
} }
@ -6289,6 +6290,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1; if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1; if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1; if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
if (tDecodeI64(&decoder, &offRows->ever) < 0) return -1;
} }
} }
} }
@ -6600,6 +6602,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1; if (tEncodeI64(&encoder, pReq->timeout) < 0) return -1;
if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1; if (tSerializeSTqOffsetVal(&encoder, &pReq->reqOffset) < 0) return -1;
if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1; if (tEncodeI8(&encoder, pReq->enableReplay) < 0) return -1;
if (tEncodeI8(&encoder, pReq->sourceExcluded) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
@ -6640,6 +6643,10 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1; if (tDecodeI8(&decoder, &pReq->enableReplay) < 0) return -1;
} }
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI8(&decoder, &pReq->sourceExcluded) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -8663,6 +8670,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
} }
} }
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1; if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
if (tEncodeI8(pCoder, pSubmitTbData->source) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
@ -8750,6 +8758,12 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
goto _exit; goto _exit;
} }
} }
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pSubmitTbData->source) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder); tEndDecode(pCoder);

View File

@ -36,10 +36,8 @@
int32_t mndGrantActionDelete(SSdb * pSdb, SGrantLogObj * pGrant); int32_t mndGrantActionDelete(SSdb * pSdb, SGrantLogObj * pGrant);
int32_t mndGrantActionUpdate(SSdb * pSdb, SGrantLogObj * pOldGrant, SGrantLogObj * pNewGrant); int32_t mndGrantActionUpdate(SSdb * pSdb, SGrantLogObj * pOldGrant, SGrantLogObj * pNewGrant);
#ifdef TD_UNIQ_GRANT
int32_t grantAlterActiveCode(SMnode * pMnode, SGrantLogObj * pObj, const char *oldActive, const char *newActive, int32_t grantAlterActiveCode(SMnode * pMnode, SGrantLogObj * pObj, const char *oldActive, const char *newActive,
char **mergeActive); char **mergeActive);
#endif
int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg); int32_t mndProcessConfigGrantReq(SMnode * pMnode, SRpcMsg * pReq, SMCfgClusterReq * pCfg);
int32_t mndProcessUpdGrantLog(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines, SGrantState * pState); int32_t mndProcessUpdGrantLog(SMnode * pMnode, SRpcMsg * pReq, SArray * pMachines, SGrantState * pState);

View File

@ -82,9 +82,11 @@ void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans); void mndTransSetParallel(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans); int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans);
#ifndef BUILD_NO_CALL
static int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) { static int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
return mndTransCheckConflict(pMnode, pTrans); return mndTransCheckConflict(pMnode, pTrans);
} }
#endif
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp); int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);

View File

@ -107,7 +107,7 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
goto FAILED; goto FAILED;
} }
if ((terrno = grantCheckLE(TSDB_GRANT_SUBSCRIPTION)) < 0) { if ((terrno = grantCheckExpire(TSDB_GRANT_SUBSCRIPTION)) < 0) {
code = terrno; code = terrno;
goto FAILED; goto FAILED;
} }
@ -241,7 +241,7 @@ static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbR
STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1); STopicPrivilege *data = taosArrayReserve(rsp->topicPrivileges, 1);
strcpy(data->topic, topic); strcpy(data->topic, topic);
if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 || if (mndCheckTopicPrivilege(pMnode, user, MND_OPER_SUBSCRIBE, pTopic) != 0 ||
grantCheckLE(TSDB_GRANT_SUBSCRIPTION) < 0) { grantCheckExpire(TSDB_GRANT_SUBSCRIPTION) < 0) {
data->noPrivilege = 1; data->noPrivilege = 1;
} else { } else {
data->noPrivilege = 0; data->noPrivilege = 0;

View File

@ -422,27 +422,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf; return (void *)buf;
} }
// SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { int32_t tEncodeOffRows(void **buf, SArray *offsetRows){
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew;
// }
//
// void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs);
// }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); int32_t szVgs = taosArrayGetSize(offsetRows);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j = 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); OffsetRows *offRows = taosArrayGet(offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
tlen += taosEncodeFixedI8(buf, offRows->offset.type); tlen += taosEncodeFixedI8(buf, offRows->offset.type);
@ -454,29 +439,29 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
} else { } else {
// do nothing // do nothing
} }
tlen += taosEncodeFixedI64(buf, offRows->ever);
} }
// #if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp);
// }
// #endif
return tlen; return tlen;
} }
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); int32_t tlen = 0;
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
if (sver > 1) { tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
}
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver){
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if (szVgs > 0) { if (szVgs > 0) {
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pConsumerEp->offsetRows) return NULL; if (NULL == *offsetRows) return NULL;
for (int32_t j = 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
@ -488,19 +473,20 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
} else { } else {
// do nothing // do nothing
} }
if(sver > 2){
buf = taosDecodeFixedI64(buf, &offRows->ever);
} }
} }
} }
// #if 0 return (void *)buf;
// int32_t sz; }
// buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
// for (int32_t i = 0; i < sz; i++) { buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
// buf = tDecodeSMqVgEp(buf, pVgEp); if (sver > 1) {
// taosArrayPush(pConsumerEp->vgs, &pVgEp); buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
// } }
// #endif
return (void *)buf; return (void *)buf;
} }
@ -596,22 +582,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName); tlen += taosEncodeString(buf, pSub->dbName);
int32_t szVgs = taosArrayGetSize(pSub->offsetRows); tlen += tEncodeOffRows(buf, pSub->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows);
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
} else {
// do nothing
}
}
tlen += taosEncodeString(buf, pSub->qmsg); tlen += taosEncodeString(buf, pSub->qmsg);
return tlen; return tlen;
} }
@ -639,26 +610,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
buf = taosDecodeStringTo(buf, pSub->dbName); buf = taosDecodeStringTo(buf, pSub->dbName);
if (sver > 1) { if (sver > 1) {
int32_t szVgs = 0; buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
buf = taosDecodeFixedI32(buf, &szVgs);
if (szVgs > 0) {
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pSub->offsetRows) return NULL;
for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
} else {
// do nothing
}
}
}
buf = taosDecodeString(buf, &pSub->qmsg); buf = taosDecodeString(buf, &pSub->qmsg);
} else { } else {
pSub->qmsg = taosStrdup(""); pSub->qmsg = taosStrdup("");

View File

@ -141,7 +141,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN); memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId); taosMemoryFreeClear(machineId);
} else { } else {
#ifdef TD_UNIQ_GRANT #ifdef TD_ENTERPRISE
terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE; terrno = TSDB_CODE_DNODE_NO_MACHINE_CODE;
goto _OVER; goto _OVER;
#endif #endif

View File

@ -1616,7 +1616,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
terrno = TSDB_CODE_GRANT_EXPIRED; terrno = TSDB_CODE_GRANT_EXPIRED;
return -1; return -1;
} }

View File

@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
if(grantCheckLE(TSDB_GRANT_STREAMS) < 0){ if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
if(suspendAllStreams(pMnode, &pReq->info) < 0){ if(suspendAllStreams(pMnode, &pReq->info) < 0){
return -1; return -1;
} }

View File

@ -24,7 +24,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tname.h" #include "tname.h"
#define MND_SUBSCRIBE_VER_NUMBER 2 #define MND_SUBSCRIBE_VER_NUMBER 3
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_HB_CNT 6
@ -530,7 +530,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
} }
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) { if (pSub) {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -562,6 +561,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (d1->vgId == d2->vgId) { if (d1->vgId == d2->vgId) {
d2->rows += d1->rows; d2->rows += d1->rows;
d2->offset = d1->offset; d2->offset = d1->offset;
d2->ever = d1->ever;
find = true; find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows); mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break; break;
@ -574,7 +574,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
// }
} }
// 8. generate logs // 8. generate logs
@ -1405,8 +1404,9 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
} }
if(data){ if(data){
// vg id // vg id
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; char buf[TSDB_OFFSET_LEN*2 + VARSTR_HEADER_SIZE] = {0};
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
sprintf(varDataVal(buf) + strlen(varDataVal(buf)), "/%"PRId64, data->ever);
varDataSetLen(buf, strlen(varDataVal(buf))); varDataSetLen(buf, strlen(varDataVal(buf)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false); colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);

View File

@ -223,7 +223,7 @@ bool tqReaderIsQueriedTable(STqReader *pReader, uint64_t uid);
bool tqCurrentBlockConsumed(const STqReader *pReader); bool tqCurrentBlockConsumed(const STqReader *pReader);
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
bool tqNextBlockInWal(STqReader *pReader, const char *idstr); bool tqNextBlockInWal(STqReader *pReader, const char *idstr, int sourceExcluded);
bool tqNextBlockImpl(STqReader *pReader, const char *idstr); bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
SWalReader *tqGetWalReader(STqReader *pReader); SWalReader *tqGetWalReader(STqReader *pReader);
SSDataBlock *tqGetResultBlock(STqReader *pReader); SSDataBlock *tqGetResultBlock(STqReader *pReader);

View File

@ -118,7 +118,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId); int32_t type, int32_t vgId);

View File

@ -27,7 +27,14 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k
static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) {
TdThreadRwlockAttr attr;
taosThreadRwlockAttrInit(&attr);
taosThreadRwlockAttrSetKindNP(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
taosThreadRwlockInit(&pMeta->lock, &attr);
taosThreadRwlockAttrDestroy(&attr);
return 0;
}
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
static void metaCleanup(SMeta **ppMeta); static void metaCleanup(SMeta **ppMeta);

View File

@ -193,7 +193,7 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
continue; continue;
} }
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE,}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = SUBMIT_REQ_AUTO_CREATE_TABLE, .source = SOURCE_NULL};
int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1; int32_t cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1;
tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true); tbData.pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, cid, pDataBlock, tagArray, true);

View File

@ -368,7 +368,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
} }
// todo ignore the error in wal? // todo ignore the error in wal?
bool tqNextBlockInWal(STqReader* pReader, const char* id) { bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
SWalReader* pWalReader = pReader->pWalReader; SWalReader* pWalReader = pReader->pWalReader;
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
@ -391,7 +391,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver); numOfBlocks, pReader->msg.msgLen, pReader->msg.ver);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if ((pSubmitTbData->source & sourceExcluded) != 0){
pReader->nextBlk += 1;
continue;
}
if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) { if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid); tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;

View File

@ -93,6 +93,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
return -1; return -1;
} }
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
code = getDataBlock(task, pHandle, vgId, &pDataBlock); code = getDataBlock(task, pHandle, vgId, &pDataBlock);
@ -249,7 +250,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0; return 0;
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*)); SArray* pSchemas = taosArrayInit(0, sizeof(void*));
@ -264,6 +265,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
} }
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
goto loop_table;
}
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
@ -328,6 +333,10 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
} }
if ((pSubmitTbDataRet->source & sourceExcluded) != 0){
goto loop_db;
}
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {

View File

@ -815,7 +815,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return; return;
} }
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
continue; continue;
@ -859,7 +859,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
pTask->execInfo.sink.numOfBlocks += 1; pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId; uint64_t groupId = pDataBlock->info.id.groupId;
SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .source = SOURCE_NULL};
int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId));
if (index == NULL) { // no data yet, append it if (index == NULL) { // no data yet, append it

View File

@ -250,7 +250,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
.ver = pHead->version, .ver = pHead->version,
}; };
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded);
if (code < 0) { if (code < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey); pRequest->subKey);

View File

@ -719,7 +719,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
int nCols, int16_t *slotIds); int nCols, int16_t *slotIds);
#if 1 #ifdef BUILD_NO_CALL
int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
rocksdb_writebatch_t *wb = NULL; rocksdb_writebatch_t *wb = NULL;
int32_t code = 0; int32_t code = 0;
@ -821,7 +821,6 @@ int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheR
return code; return code;
} }
#endif
static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid, static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid,
int8_t ltype) { int8_t ltype) {
@ -880,6 +879,7 @@ static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t sl
return pLastCol; return pLastCol;
} }
#endif
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols, static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
SCacheRowsReader *pr, int8_t ltype) { SCacheRowsReader *pr, int8_t ltype) {
@ -1359,6 +1359,7 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
*len = sizeof(uint64_t); *len = sizeof(uint64_t);
} }
#ifdef BUILD_NO_CALL
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value, void *ud) { static void deleteTableCacheLast(const void *key, size_t keyLen, void *value, void *ud) {
(void)ud; (void)ud;
SArray *pLastArray = (SArray *)value; SArray *pLastArray = (SArray *)value;
@ -1670,6 +1671,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb
return code; return code;
} }
#endif
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
tb_uid_t suid = 0; tb_uid_t suid = 0;
@ -1715,6 +1717,7 @@ static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
return code; return code;
} }
#ifdef BUILD_NO_CALL
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx,
SArray *aDelData) { SArray *aDelData) {
int32_t code = 0; int32_t code = 0;
@ -1759,6 +1762,7 @@ _err:
} }
return code; return code;
} }
#endif
static void freeTableInfoFunc(void *param) { static void freeTableInfoFunc(void *param) {
void **p = (void **)param; void **p = (void **)param;
@ -2716,6 +2720,7 @@ _err:
return code; return code;
} }
#ifdef BUILD_NO_CALL
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) { static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol)); SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
if (NULL == pColArray) { if (NULL == pColArray) {
@ -2729,6 +2734,7 @@ static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
*ppColArray = pColArray; *ppColArray = pColArray;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) { static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) {
SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol)); SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol));
@ -3089,7 +3095,9 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity) {
taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity); taosLRUCacheSetCapacity(pVnode->pTsdb->lruCache, capacity);
} }
#ifdef BUILD_NO_CALL
size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); } size_t tsdbCacheGetCapacity(SVnode *pVnode) { return taosLRUCacheGetCapacity(pVnode->pTsdb->lruCache); }
#endif
size_t tsdbCacheGetUsage(SVnode *pVnode) { size_t tsdbCacheGetUsage(SVnode *pVnode) {
size_t usage = 0; size_t usage = 0;
@ -3185,6 +3193,7 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
return code; return code;
} }
#ifdef BUILD_NO_CALL
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) { int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0; int32_t code = 0;
@ -3193,6 +3202,7 @@ int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
return code; return code;
} }
#endif
// block cache // block cache
static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) { static void getBCacheKey(int32_t fid, int64_t commitID, int64_t blkno, char *key, int *len) {

View File

@ -16,6 +16,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#ifdef BUILD_NO_CALL
// STsdbDataIter2 // STsdbDataIter2
/* open */ /* open */
int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) { int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
@ -451,6 +452,7 @@ int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
return code; return code;
} }
} }
#endif
/* get */ /* get */

View File

@ -29,6 +29,7 @@ void tMapDataClear(SMapData *pMapData) {
pMapData->aOffset = NULL; pMapData->aOffset = NULL;
} }
#ifdef BUILD_NO_CALL
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
int32_t code = 0; int32_t code = 0;
int32_t offset = pMapData->nData; int32_t offset = pMapData->nData;
@ -95,12 +96,14 @@ int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItem
_exit: _exit:
return code; return code;
} }
#endif
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
ASSERT(idx >= 0 && idx < pMapData->nItem); ASSERT(idx >= 0 && idx < pMapData->nItem);
tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem); tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem);
} }
#ifdef BUILD_NO_CALL
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
SArray **ppArray) { SArray **ppArray) {
int32_t code = 0; int32_t code = 0;
@ -140,6 +143,7 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
return n; return n;
} }
#endif
int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
int32_t n = 0; int32_t n = 0;
@ -167,6 +171,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
return n; return n;
} }
#ifdef BUILD_NO_CALL
// TABLEID ======================================================================= // TABLEID =======================================================================
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
TABLEID *pId1 = (TABLEID *)p1; TABLEID *pId1 = (TABLEID *)p1;
@ -199,6 +204,7 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph) {
return n; return n;
} }
#endif
int32_t tGetBlockIdx(uint8_t *p, void *ph) { int32_t tGetBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
@ -212,6 +218,7 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
return n; return n;
} }
#ifdef BUILD_NO_CALL
int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
SBlockIdx *rBlockIdx = (SBlockIdx *)rhs; SBlockIdx *rBlockIdx = (SBlockIdx *)rhs;
@ -280,6 +287,7 @@ int32_t tPutDataBlk(uint8_t *p, void *ph) {
return n; return n;
} }
#endif
int32_t tGetDataBlk(uint8_t *p, void *ph) { int32_t tGetDataBlk(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
@ -310,6 +318,7 @@ int32_t tGetDataBlk(uint8_t *p, void *ph) {
return n; return n;
} }
#ifdef BUILD_NO_CALL
int32_t tDataBlkCmprFn(const void *p1, const void *p2) { int32_t tDataBlkCmprFn(const void *p1, const void *p2) {
SDataBlk *pBlock1 = (SDataBlk *)p1; SDataBlk *pBlock1 = (SDataBlk *)p1;
SDataBlk *pBlock2 = (SDataBlk *)p2; SDataBlk *pBlock2 = (SDataBlk *)p2;
@ -349,6 +358,7 @@ int32_t tPutSttBlk(uint8_t *p, void *ph) {
return n; return n;
} }
#endif
int32_t tGetSttBlk(uint8_t *p, void *ph) { int32_t tGetSttBlk(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
@ -438,6 +448,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
return n; return n;
} }
#ifdef BUILD_NO_CALL
int32_t tBlockColCmprFn(const void *p1, const void *p2) { int32_t tBlockColCmprFn(const void *p1, const void *p2) {
if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) { if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) {
return -1; return -1;
@ -479,6 +490,7 @@ int32_t tPutDelIdx(uint8_t *p, void *ph) {
return n; return n;
} }
#endif
int32_t tGetDelIdx(uint8_t *p, void *ph) { int32_t tGetDelIdx(uint8_t *p, void *ph) {
SDelIdx *pDelIdx = (SDelIdx *)ph; SDelIdx *pDelIdx = (SDelIdx *)ph;
@ -492,6 +504,7 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) {
return n; return n;
} }
#ifdef BUILD_NO_CALL
// SDelData ====================================================== // SDelData ======================================================
int32_t tPutDelData(uint8_t *p, void *ph) { int32_t tPutDelData(uint8_t *p, void *ph) {
SDelData *pDelData = (SDelData *)ph; SDelData *pDelData = (SDelData *)ph;
@ -503,6 +516,7 @@ int32_t tPutDelData(uint8_t *p, void *ph) {
return n; return n;
} }
#endif
int32_t tGetDelData(uint8_t *p, void *ph) { int32_t tGetDelData(uint8_t *p, void *ph) {
SDelData *pDelData = (SDelData *)ph; SDelData *pDelData = (SDelData *)ph;
@ -1269,6 +1283,7 @@ _exit:
return code; return code;
} }
#ifdef BUILD_NO_CALL
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) { int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
if (pBlockData->nRow == 0) { if (pBlockData->nRow == 0) {
return 1; return 1;
@ -1286,6 +1301,7 @@ int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
} }
} }
#endif
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) { void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);

View File

@ -59,6 +59,7 @@ typedef struct STaskStopInfo {
typedef struct { typedef struct {
STqOffsetVal currentOffset; // for tmq STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t sourceExcluded;
int64_t snapshotVer; int64_t snapshotVer;
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor

View File

@ -1152,6 +1152,11 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
} }
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;

View File

@ -2066,7 +2066,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id, pTaskInfo->streamInfo.sourceExcluded);
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);

View File

@ -42,6 +42,7 @@ void regexDestroy(FstRegex *regex) {
taosMemoryFree(regex); taosMemoryFree(regex);
} }
#ifdef BUILD_NO_CALL
uint32_t regexAutomStart(FstRegex *regex) { uint32_t regexAutomStart(FstRegex *regex) {
///// no nothing ///// no nothing
return 0; return 0;
@ -65,3 +66,4 @@ bool regexAutomAccept(FstRegex *regex, uint32_t state, uint8_t byte, uint32_t *r
} }
return dfaAccept(regex->dfa, state, byte, result); return dfaAccept(regex->dfa, state, byte, result);
} }
#endif

View File

@ -684,12 +684,14 @@ int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
return tfileReaderSearch(reader, query, result); return tfileReaderSearch(reader, query, result);
} }
#ifdef BUILD_NO_CALL
int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName =
// term->nColName, .version = 1}; // term->nColName, .version = 1};
return 0; return 0;
} }
#endif
static bool tfileIteratorNext(Iterate* iiter) { static bool tfileIteratorNext(Iterate* iiter) {
IterateValue* iv = &iiter->val; IterateValue* iv = &iiter->val;
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);

View File

@ -211,6 +211,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
bool colMode, bool ignoreColVals) { bool colMode, bool ignoreColVals) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) { if (NULL == pTableCxt) {
*pOutput = NULL;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -268,12 +269,8 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
} }
} }
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pTableCxt; *pOutput = pTableCxt;
qDebug("tableDataCxt created, uid:%" PRId64 ", vgId:%d", pTableMeta->uid, pTableMeta->vgId); qDebug("tableDataCxt created, code:%d, uid:%" PRId64 ", vgId:%d", code, pTableMeta->uid, pTableMeta->vgId);
} else {
taosMemoryFree(pTableCxt);
}
return code; return code;
} }
@ -288,6 +285,7 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
pTmp->suid = pSrc->suid; pTmp->suid = pSrc->suid;
pTmp->uid = pSrc->uid; pTmp->uid = pSrc->uid;
pTmp->sver = pSrc->sver; pTmp->sver = pSrc->sver;
pTmp->source = pSrc->source;
pTmp->pCreateTbReq = NULL; pTmp->pCreateTbReq = NULL;
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
if (pSrc->pCreateTbReq) { if (pSrc->pCreateTbReq) {
@ -344,6 +342,10 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
void* pData = *pTableCxt; // deal scan coverity void* pData = *pTableCxt; // deal scan coverity
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES); code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
} }
if (TSDB_CODE_SUCCESS != code) {
insDestroyTableDataCxt(*pTableCxt);
}
return code; return code;
} }
@ -651,6 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
pTableCxt->pData->source = SOURCE_TAOSX;
if(tmp == NULL){ if(tmp == NULL){
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {

View File

@ -33,3 +33,8 @@ add_test(
NAME streamUpdateTest NAME streamUpdateTest
COMMAND streamUpdateTest COMMAND streamUpdateTest
) )
# add_test(
# NAME checkpointTest
# COMMAND checkpointTest
# )

View File

@ -39,7 +39,9 @@ int main(int argc, char **argv) {
return -1; return -1;
} }
strcpy(tsSnodeAddress, "127.0.0.1"); strcpy(tsSnodeAddress, "127.0.0.1");
return RUN_ALL_TESTS(); int ret = RUN_ALL_TESTS();
s3CleanUp();
return ret;
} }
TEST(testCase, checkpointUpload_Test) { TEST(testCase, checkpointUpload_Test) {

View File

@ -413,6 +413,7 @@ int32_t syncEndSnapshot(int64_t rid) {
return code; return code;
} }
#ifdef BUILD_NO_CALL
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -424,6 +425,7 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
#endif
bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -458,6 +460,7 @@ bool syncIsReadyForRead(int64_t rid) {
return ready; return ready;
} }
#ifdef BUILD_NO_CALL
bool syncSnapshotSending(int64_t rid) { bool syncSnapshotSending(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -479,6 +482,7 @@ bool syncSnapshotRecving(int64_t rid) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
return b; return b;
} }
#endif
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
if (pSyncNode->peersNum == 0) { if (pSyncNode->peersNum == 0) {
@ -1060,7 +1064,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine; pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, 0);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0); atomic_store_64(&pSyncNode->heartbeatTimerLogicClockUser, 0);
#ifdef BUILD_NO_CALL
pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer; pSyncNode->FpHeartbeatTimerCB = syncNodeEqHeartbeatTimer;
#endif
pSyncNode->heartbeatTimerCounter = 0; pSyncNode->heartbeatTimerCounter = 0;
// init peer heartbeat timer // init peer heartbeat timer
@ -1151,6 +1157,7 @@ _error:
return NULL; return NULL;
} }
#ifdef BUILD_NO_CALL
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
@ -1160,6 +1167,7 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
} }
} }
} }
#endif
int32_t syncNodeRestore(SSyncNode* pSyncNode) { int32_t syncNodeRestore(SSyncNode* pSyncNode) {
ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); ASSERTS(pSyncNode->pLogStore != NULL, "log store not created");
@ -1214,6 +1222,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
return ret; return ret;
} }
#ifdef BUILD_NO_CALL
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
@ -1235,6 +1244,7 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
return ret; return ret;
} }
#endif
void syncNodePreClose(SSyncNode* pSyncNode) { void syncNodePreClose(SSyncNode* pSyncNode) {
ASSERT(pSyncNode != NULL); ASSERT(pSyncNode != NULL);
@ -1401,6 +1411,7 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS); electMS);
} }
#ifdef BUILD_NO_CALL
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
@ -1414,6 +1425,7 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS); sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
return ret; return ret;
} }
#endif
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
@ -1452,11 +1464,13 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
return ret; return ret;
} }
#ifdef BUILD_NO_CALL
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartHeartbeatTimer(pSyncNode); syncNodeStartHeartbeatTimer(pSyncNode);
return 0; return 0;
} }
#endif
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg* pMsg) {
SEpSet* epSet = NULL; SEpSet* epSet = NULL;
@ -1700,6 +1714,7 @@ _END:
} }
// raft state change -------------- // raft state change --------------
#ifdef BUILD_NO_CALL
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) { if (term > raftStoreGetTerm(pSyncNode)) {
raftStoreSetTerm(pSyncNode, term); raftStoreSetTerm(pSyncNode, term);
@ -1709,6 +1724,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
raftStoreClearVote(pSyncNode); raftStoreClearVote(pSyncNode);
} }
} }
#endif
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
if (term > raftStoreGetTerm(pSyncNode)) { if (term > raftStoreGetTerm(pSyncNode)) {
@ -1934,6 +1950,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "follower to candidate"); sNTrace(pSyncNode, "follower to candidate");
} }
#ifdef BUILD_NO_CALL
void syncNodeLeader2Follower(SSyncNode* pSyncNode) { void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeBecomeFollower(pSyncNode, "leader to follower");
@ -1953,6 +1970,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "candidate to follower"); sNTrace(pSyncNode, "candidate to follower");
} }
#endif
// just called by syncNodeVoteForSelf // just called by syncNodeVoteForSelf
// need assert // need assert
@ -2042,6 +2060,7 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy
return 0; return 0;
} }
#ifdef BUILD_NO_CALL
// return append-entries first try index // return append-entries first try index
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
@ -2129,6 +2148,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
*pPreTerm = syncNodeGetPreTerm(pSyncNode, index); *pPreTerm = syncNodeGetPreTerm(pSyncNode, index);
return 0; return 0;
} }
#endif
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
@ -2200,6 +2220,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
syncNodeRelease(pNode); syncNodeRelease(pNode);
} }
#ifdef BUILD_NO_CALL
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
@ -2233,6 +2254,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
} }
} }
#endif
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
int64_t hbDataRid = (int64_t)param; int64_t hbDataRid = (int64_t)param;
@ -2320,6 +2342,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
#ifdef BUILD_NO_CALL
static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) { static void deleteCacheEntry(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud; (void)ud;
taosMemoryFree(value); taosMemoryFree(value);
@ -2339,6 +2362,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code; return code;
} }
#endif
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper? void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper?
cfg->replicaNum = 0; cfg->replicaNum = 0;
@ -2976,6 +3000,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
return 0; return 0;
} }
#ifdef BUILD_NO_CALL
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
int32_t ret = 0; int32_t ret = 0;
@ -3004,6 +3029,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
return ret; return ret;
} }
#endif
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeat* pMsg = pRpcMsg->pCont; SyncHeartbeat* pMsg = pRpcMsg->pCont;
@ -3121,6 +3147,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg); return syncLogReplProcessHeartbeatReply(pMgr, ths, pMsg);
} }
#ifdef BUILD_NO_CALL
int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeatReply* pMsg = pRpcMsg->pCont; SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
@ -3136,6 +3163,7 @@ int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
return 0; return 0;
} }
#endif
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pMsg = pRpcMsg->pCont; SyncLocalCmd* pMsg = pRpcMsg->pCont;
@ -3315,6 +3343,7 @@ SPeerState* syncNodeGetPeerState(SSyncNode* ths, const SRaftId* pDestId) {
return pState; return pState;
} }
#ifdef BUILD_NO_CALL
bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) { bool syncNodeNeedSendAppendEntries(SSyncNode* ths, const SRaftId* pDestId, const SyncAppendEntries* pMsg) {
SPeerState* pState = syncNodeGetPeerState(ths, pDestId); SPeerState* pState = syncNodeGetPeerState(ths, pDestId);
if (pState == NULL) { if (pState == NULL) {
@ -3356,3 +3385,4 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
return true; return true;
} }
#endif

View File

@ -18,10 +18,12 @@
#include <stdlib.h> #include <stdlib.h>
#include "talgo.h" #include "talgo.h"
#if defined(WINDOWS_STASH) || defined(_ALPINE)
int32_t qsortHelper(const void* p1, const void* p2, const void* param) { int32_t qsortHelper(const void* p1, const void* p2, const void* param) {
__compar_fn_t comparFn = param; __compar_fn_t comparFn = param;
return comparFn(p1, p2); return comparFn(p1, p2);
} }
#endif
// todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually. // todo refactor: 1) move away; 2) use merge sort instead; 3) qsort is not a stable sort actually.
void taosSort(void* base, int64_t sz, int64_t width, __compar_fn_t compar) { void taosSort(void* base, int64_t sz, int64_t width, __compar_fn_t compar) {

View File

@ -88,6 +88,7 @@ struct termios oldtio;
typedef struct FILE TdCmd; typedef struct FILE TdCmd;
#ifdef BUILD_NO_CALL
void* taosLoadDll(const char* filename) { void* taosLoadDll(const char* filename) {
#if defined(WINDOWS) #if defined(WINDOWS)
ASSERT(0); ASSERT(0);
@ -140,6 +141,7 @@ void taosCloseDll(void* handle) {
} }
#endif #endif
} }
#endif
int taosSetConsoleEcho(bool on) { int taosSetConsoleEcho(bool on) {
#if defined(WINDOWS) #if defined(WINDOWS)

View File

@ -450,10 +450,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_ACTIVE, "Invalid active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_KEY, "Invalid key to parse active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_KEY, "Invalid key to parse active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KEY, "Invalid key to decode active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KEY, "Invalid key to decode active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KLEN, "Invalid klen to decode active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KLEN, "Invalid klen to decode active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_IVLD_KEY, "Invalid key to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_IVLD_KEY, "Invalid key to generate active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ACTIVE_LEN, "Exceeded active len to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ACTIVE_LEN, "Exceeded active len to generate active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encode active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encode active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_DIST, "Invalid dist to parse active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_DIST, "Invalid distribution time to parse active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_UNLICENSED_CLUSTER, "Illegal operation, the license is being used by an unlicensed cluster") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_UNLICENSED_CLUSTER, "Illegal operation, the license is being used by an unlicensed cluster")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functions in active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functions in active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist")

View File

@ -115,6 +115,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
return (void *)pSched; return (void *)pSched;
} }
#ifdef BUILD_NO_CALL
void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) { void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL); SSchedQueue *pSched = taosInitScheduler(queueSize, numOfThreads, label, NULL);
@ -125,6 +126,7 @@ void *taosInitSchedulerWithInfo(int32_t queueSize, int32_t numOfThreads, const c
return pSched; return pSched;
} }
#endif
void *taosProcessSchedQueue(void *scheduler) { void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg; SSchedMsg msg;
@ -241,6 +243,7 @@ void taosCleanUpScheduler(void *param) {
// taosMemoryFree(pSched); // taosMemoryFree(pSched);
} }
#ifdef BUILD_NO_CALL
// for debug purpose, dump the scheduler status every 1min. // for debug purpose, dump the scheduler status every 1min.
void taosDumpSchedulerStatus(void *qhandle, void *tmrId) { void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
SSchedQueue *pSched = (SSchedQueue *)qhandle; SSchedQueue *pSched = (SSchedQueue *)qhandle;
@ -255,3 +258,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer); taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
} }
#endif

View File

@ -64,6 +64,7 @@ int64_t taosStrHumanToInt64(const char* str) {
return val; return val;
} }
#ifdef BUILD_NO_CALL
void taosInt64ToHumanStr(int64_t val, char* outStr) { void taosInt64ToHumanStr(int64_t val, char* outStr) {
if (((val >= UNIT_ONE_EXBIBYTE) || (-val >= UNIT_ONE_EXBIBYTE)) && ((val % UNIT_ONE_EXBIBYTE) == 0)) { if (((val >= UNIT_ONE_EXBIBYTE) || (-val >= UNIT_ONE_EXBIBYTE)) && ((val % UNIT_ONE_EXBIBYTE) == 0)) {
sprintf(outStr, "%qdE", (long long)val / UNIT_ONE_EXBIBYTE); sprintf(outStr, "%qdE", (long long)val / UNIT_ONE_EXBIBYTE);
@ -80,6 +81,7 @@ void taosInt64ToHumanStr(int64_t val, char* outStr) {
} else } else
sprintf(outStr, "%qd", (long long)val); sprintf(outStr, "%qd", (long long)val);
} }
#endif
int32_t taosStrHumanToInt32(const char* str) { int32_t taosStrHumanToInt32(const char* str) {
size_t sLen = strlen(str); size_t sLen = strlen(str);
@ -112,6 +114,7 @@ int32_t taosStrHumanToInt32(const char* str) {
return val; return val;
} }
#ifdef BUILD_NO_CALL
void taosInt32ToHumanStr(int32_t val, char* outStr) { void taosInt32ToHumanStr(int32_t val, char* outStr) {
if (((val >= UNIT_ONE_GIBIBYTE) || (-val >= UNIT_ONE_GIBIBYTE)) && ((val % UNIT_ONE_GIBIBYTE) == 0)) { if (((val >= UNIT_ONE_GIBIBYTE) || (-val >= UNIT_ONE_GIBIBYTE)) && ((val % UNIT_ONE_GIBIBYTE) == 0)) {
sprintf(outStr, "%qdG", (long long)val / UNIT_ONE_GIBIBYTE); sprintf(outStr, "%qdG", (long long)val / UNIT_ONE_GIBIBYTE);
@ -122,3 +125,4 @@ void taosInt32ToHumanStr(int32_t val, char* outStr) {
} else } else
sprintf(outStr, "%qd", (long long)val); sprintf(outStr, "%qd", (long long)val);
} }
#endif

View File

@ -27,6 +27,15 @@ from frame import *
class TDTestCase(TBase): class TDTestCase(TBase):
updatecfgDict = {
'queryMaxConcurrentTables': '2K',
'streamMax': '1M',
'totalMemoryKB': '1G',
#'rpcQueueMemoryAllowed': '1T',
#'mndLogRetention': '1P',
'streamBufferSize':'2G'
}
def insertData(self): def insertData(self):
tdLog.info(f"insert data.") tdLog.info(f"insert data.")
@ -62,7 +71,7 @@ class TDTestCase(TBase):
# TSDB_FQDN_LEN = 128 # TSDB_FQDN_LEN = 128
lname = "testhostnamelength" lname = "testhostnamelength"
lname.rjust(130, 'a') lname.rjust(230, 'a')
# except test # except test
sql = f"show vgroups;" sql = f"show vgroups;"
@ -72,6 +81,9 @@ class TDTestCase(TBase):
etool.exeBinFile("taos", f'-a {lname} -s "{sql}" ', wait=False) etool.exeBinFile("taos", f'-a {lname} -s "{sql}" ', wait=False)
etool.exeBinFile("taos", f'-p{lname} -s "{sql}" ', wait=False) etool.exeBinFile("taos", f'-p{lname} -s "{sql}" ', wait=False)
etool.exeBinFile("taos", f'-w -s "{sql}" ', wait=False) etool.exeBinFile("taos", f'-w -s "{sql}" ', wait=False)
etool.exeBinFile("taos", f'abc', wait=False)
etool.exeBinFile("taos", f'-V', wait=False)
etool.exeBinFile("taos", f'-?', wait=False)
# others # others
etool.exeBinFile("taos", f'-N 200 -l 2048 -s "{sql}" ', wait=False) etool.exeBinFile("taos", f'-N 200 -l 2048 -s "{sql}" ', wait=False)
@ -121,6 +133,11 @@ class TDTestCase(TBase):
time.sleep(3) time.sleep(3)
eos.exe("pkill -9 taos") eos.exe("pkill -9 taos")
# call enter password
etool.exeBinFile("taos", f'-p', wait=False)
time.sleep(1)
eos.exe("pkill -9 taos")
# run # run
def run(self): def run(self):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")

View File

@ -4,12 +4,8 @@
#unit-test #unit-test
archOs=$(arch)
if [[ $archOs =~ "aarch64" ]]; then
,,n,unit-test,bash test.sh ,,n,unit-test,bash test.sh
else
,,y,unit-test,bash test.sh
fi
# #
# army-test # army-test

View File

@ -211,7 +211,7 @@ function lcovFunc {
'*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\ '*/AccessBridgeCalls.c' '*/ttszip.c' '*/dataInserter.c' '*/tlinearhash.c' '*/tsimplehash.c' '*/tsdbDiskData.c'\
'*/texpr.c' '*/runUdf.c' '*/schDbg.c' '*/syncIO.c' '*/tdbOs.c' '*/pushServer.c' '*/osLz4.c'\ '*/texpr.c' '*/runUdf.c' '*/schDbg.c' '*/syncIO.c' '*/tdbOs.c' '*/pushServer.c' '*/osLz4.c'\
'*/tbase64.c' '*/tbuffer.c' '*/tdes.c' '*/texception.c' '*/examples/*' '*/tidpool.c' '*/tmempool.c'\ '*/tbase64.c' '*/tbuffer.c' '*/tdes.c' '*/texception.c' '*/examples/*' '*/tidpool.c' '*/tmempool.c'\
'*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c'\ '*/clientJniConnector.c' '*/clientTmqConnector.c' '*/version.c' '*/build_version.cc'\
'*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' \ '*/tthread.c' '*/tversion.c' '*/ctgDbg.c' '*/schDbg.c' '*/qwDbg.c' '*/tencode.h' \
'*/shellAuto.c' '*/shellTire.c' '*/shellCommand.c'\ '*/shellAuto.c' '*/shellTire.c' '*/shellCommand.c'\
'*/sql.c' '*/sql.y'\ '*/sql.c' '*/sql.y'\

View File

@ -263,6 +263,54 @@ class TDTestCase:
tdSql.error(f'select c_active_code from information_schema.ins_dnodes') tdSql.error(f'select c_active_code from information_schema.ins_dnodes')
tdSql.error('alter all dnodes "cActiveCode" ""') tdSql.error('alter all dnodes "cActiveCode" ""')
def ins_grants_check(self):
grant_name_dict = {
'stream':'stream',
'subscription':'subscription',
'view':'view',
'audit':'audit',
'csv':'csv',
'storage':'multi_tier_storage',
'backup_restore':'backup_restore',
'opc_da':'OPC_DA',
'opc_ua':'OPC_UA',
'pi':'Pi',
'kafka':'Kafka',
'influxdb':'InfluxDB',
'mqtt':'MQTT',
'avevahistorian':'avevaHistorian',
'opentsdb':'OpenTSDB',
'td2.6':'TDengine2.6',
'td3.0':'TDengine3.0'
}
tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
tdSql.query(f'select * from information_schema.ins_grants_full')
result = tdSql.queryResult
index = 0
for i in range(0, len(result)):
if result[i][0] in grant_name_dict:
tdSql.checkEqual(result[i][1], grant_name_dict[result[i][0]])
index += 1
tdSql.checkEqual(index, 17)
tdSql.query(f'select * from information_schema.ins_grants_logs')
result = tdSql.queryResult
tdSql.checkEqual(True, len(result) >= 0)
if(len(result) > 0):
tdSql.checkEqual(True, result[0][0].find(",init,ungranted,ungranted") >= 16)
tdSql.checkEqual(True, len(result[0][1]) == 0)
tdSql.checkEqual(True, len(result[0][2]) >= 46)
tdSql.query(f'select * from information_schema.ins_machines')
tdSql.checkRows(1)
tdSql.execute('alter cluster "activeCode" "revoked"')
tdSql.execute('alter cluster "activeCode" "revoked"')
tdSql.error('alter cluster "activeCode" ""')
tdSql.error('alter cluster "activeCode" "abc"')
tdSql.error('alter cluster "activeCode" ""')
tdSql.execute('alter cluster "activeCode" "revoked"')
def run(self): def run(self):
self.prepare_data() self.prepare_data()
self.count_check() self.count_check()
@ -271,6 +319,7 @@ class TDTestCase:
self.ins_stable_check() self.ins_stable_check()
self.ins_stable_check2() self.ins_stable_check2()
self.ins_dnodes_check() self.ins_dnodes_check()
self.ins_grants_check()
def stop(self): def stop(self):

View File

@ -133,13 +133,17 @@ class TDTestCase:
if snapshot_value == "true": if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "": if offset_value != "earliest" and offset_value != "":
if offset_value == "latest": if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True) offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
tdSql.checkEqual(sum(offset_value_list1) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
elif offset_value == "none": elif offset_value == "none":
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
else: else:
@ -151,18 +155,23 @@ class TDTestCase:
# tdSql.checkEqual(sum(rows_value_list), expected_res) # tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, [None]*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else: else:
if offset_value != "none": if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: (x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True) offset_value_list1 = list(map(lambda x: int(x.split("/")[0]), offset_value_list))
offset_value_list2 = list(map(lambda x: int(x.split("/")[1]), offset_value_list))
tdSql.checkEqual(offset_value_list1 == offset_value_list2, True)
tdSql.checkEqual(sum(offset_value_list1) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
else: else:
offset_value_list = list(map(lambda x: x[-2], subscription_info)) offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) offset_value_list1 = list(map(lambda x: (x.split("/")[0]), offset_value_list))
tdSql.checkEqual(offset_value_list1, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info)) rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
tdSql.execute(f"drop topic if exists {topic_name}") tdSql.execute(f"drop topic if exists {topic_name}")

View File

@ -11,6 +11,7 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import * from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq") sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
@ -310,6 +311,43 @@ class TDTestCase:
return return
def consumeExcluded(self):
tdSql.execute(f'create topic topic_excluded as database db_taosx')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"msg.consume.excluded": 1
}
consumer = Consumer(consumer_dict)
tdLog.debug("test subscribe topic created by other user")
exceptOccured = False
try:
consumer.subscribe(["topic_excluded"])
except TmqError:
exceptOccured = True
if exceptOccured:
tdLog.exit(f"subscribe error")
try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
finally:
consumer.close()
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.checkWal1VgroupOnlyMeta() self.checkWal1VgroupOnlyMeta()
@ -324,6 +362,8 @@ class TDTestCase:
self.checkSnapshotMultiVgroups() self.checkSnapshotMultiVgroups()
self.checkWalMultiVgroupsWithDropTable() self.checkWalMultiVgroupsWithDropTable()
# self.consumeExcluded()
self.checkSnapshotMultiVgroupsWithDropTable() self.checkSnapshotMultiVgroupsWithDropTable()
def stop(self): def stop(self):

View File

@ -683,6 +683,6 @@ if __name__ == "__main__":
if conn is not None: if conn is not None:
conn.close() conn.close()
if asan: if asan:
tdDnodes.StopAllSigint() # tdDnodes.StopAllSigint()
tdLog.info("Address sanitizer mode finished") tdLog.info("Address sanitizer mode finished")
sys.exit(0) sys.exit(0)

View File

@ -40,7 +40,7 @@ pgrep taosd || taosd >> /dev/null 2>&1 &
sleep 10 sleep 10
ctest -E "smlTest|funcTest|profileTest|sdbTest|showTest|geomTest|idxFstUtilUT|idxTest|idxUtilUT|idxFstUT|parserTest|plannerTest|transUT|transUtilUt" -j8 ctest -j8
ret=$? ret=$?
exit $ret exit $ret

View File

@ -244,7 +244,7 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
} }
return 0; return 0;
} }
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) || defined(_TD_DARWIN_64)
int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
SShellArgs *pArgs = &shell.args; SShellArgs *pArgs = &shell.args;
@ -302,6 +302,7 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
return 0; return 0;
} }
#endif
static void shellInitArgs(int argc, char *argv[]) { static void shellInitArgs(int argc, char *argv[]) {
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {

View File

@ -909,6 +909,88 @@ void initLogFile() {
taosCloseFile(&pFile2); taosCloseFile(&pFile2);
} }
void testConsumeExcluded(int topic_type){
TAOS* pConn = use_db();
TAOS_RES *pRes = NULL;
if(topic_type == 1){
char *topic = "create topic topic_excluded with meta as database db_taosx";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}else if(topic_type == 2){
char *topic = "create topic topic_excluded as select * from stt";
pRes = taos_query(pConn, topic);
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_excluded, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}
taos_close(pConn);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_excluded");
int32_t code = 0;
if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
while (running) {
TAOS_RES* msg = tmq_consumer_poll(tmq, 1000);
if (msg) {
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
if(topic_type == 1){
assert(raw.raw_type != 2 && raw.raw_type != 4);
}else if(topic_type == 2){
assert(0);
}
// printf("write raw data type: %d\n", raw.raw_type);
tmq_free_raw(raw);
taos_free_result(msg);
} else {
break;
}
}
tmq_consumer_close(tmq);
tmq_list_destroy(topic_list);
pConn = use_db();
pRes = taos_query(pConn, "drop topic if exists topic_excluded");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
taos_close(pConn);
return;
}
taos_free_result(pRes);
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
@ -942,5 +1024,8 @@ int main(int argc, char* argv[]) {
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list); basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);
testConsumeExcluded(1);
testConsumeExcluded(2);
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
} }