Merge branch '3.0' into feat/TD-25964-3.0
This commit is contained in:
commit
d65bf47a36
|
@ -6,7 +6,14 @@ toc_max_heading_level: 2
|
||||||
|
|
||||||
TDengine 是一款开源、高性能、云原生的[时序数据库](https://tdengine.com/tsdb/),且针对物联网、车联网、工业互联网、金融、IT 运维等场景进行了优化。TDengine 的代码,包括集群功能,都在 GNU AGPL v3.0 下开源。除核心的时序数据库功能外,TDengine 还提供[缓存](../develop/cache/)、[数据订阅](../develop/tmq)、[流式计算](../develop/stream)等其它功能以降低系统复杂度及研发和运维成本。
|
TDengine 是一款开源、高性能、云原生的[时序数据库](https://tdengine.com/tsdb/),且针对物联网、车联网、工业互联网、金融、IT 运维等场景进行了优化。TDengine 的代码,包括集群功能,都在 GNU AGPL v3.0 下开源。除核心的时序数据库功能外,TDengine 还提供[缓存](../develop/cache/)、[数据订阅](../develop/tmq)、[流式计算](../develop/stream)等其它功能以降低系统复杂度及研发和运维成本。
|
||||||
|
|
||||||
本章节介绍 TDengine 的主要功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对 TDengine 有个整体的了解。
|
本章节介绍 TDengine 的主要产品和功能、竞争优势、适用场景、与其他数据库的对比测试等等,让大家对 TDengine 有个整体的了解。
|
||||||
|
|
||||||
|
## 主要产品
|
||||||
|
|
||||||
|
TDengine 有三个主要产品:TDengine Pro (即 TDengine 企业版),TDengine Cloud,和 TDengine OSS,关于它们的具体定义请参考
|
||||||
|
- [TDengine 企业版](https://www.taosdata.com/tdengine-pro)
|
||||||
|
- [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn)
|
||||||
|
- [TDengine 开源版](https://www.taosdata.com/tdengine-oss)
|
||||||
|
|
||||||
## 主要功能
|
## 主要功能
|
||||||
|
|
||||||
|
|
|
@ -1,138 +0,0 @@
|
||||||
---
|
|
||||||
sidebar_label: 权限管理
|
|
||||||
title: 权限管理
|
|
||||||
description: 企业版中才具有的权限管理功能
|
|
||||||
---
|
|
||||||
|
|
||||||
本节讲述如何在 TDengine 中进行权限管理的相关操作。权限管理是 TDengine 企业版的特有功能,本节只列举了一些基本的权限管理功能作为示例,更丰富的权限管理请联系 TDengine 销售或市场团队。
|
|
||||||
|
|
||||||
## 创建用户
|
|
||||||
|
|
||||||
```sql
|
|
||||||
CREATE USER use_name PASS 'password' [SYSINFO {1|0}];
|
|
||||||
```
|
|
||||||
|
|
||||||
创建用户。
|
|
||||||
|
|
||||||
use_name 最长为 23 字节。
|
|
||||||
|
|
||||||
password 最长为 31 字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。
|
|
||||||
|
|
||||||
SYSINFO 表示用户是否可以查看系统信息。1 表示可以查看,0 表示不可以查看。系统信息包括服务端配置信息、服务端各种节点信息(如 DNODE、QNODE等)、存储相关的信息等。默认为可以查看系统信息。
|
|
||||||
|
|
||||||
例如,创建密码为123456且可以查看系统信息的用户test如下:
|
|
||||||
|
|
||||||
```sql
|
|
||||||
taos> create user test pass '123456' sysinfo 1;
|
|
||||||
Query OK, 0 of 0 rows affected (0.001254s)
|
|
||||||
```
|
|
||||||
|
|
||||||
## 查看用户
|
|
||||||
|
|
||||||
```sql
|
|
||||||
SHOW USERS;
|
|
||||||
```
|
|
||||||
|
|
||||||
查看用户信息。
|
|
||||||
|
|
||||||
```sql
|
|
||||||
taos> show users;
|
|
||||||
name | super | enable | sysinfo | create_time |
|
|
||||||
================================================================================
|
|
||||||
test | 0 | 1 | 1 | 2022-08-29 15:10:27.315 |
|
|
||||||
root | 1 | 1 | 1 | 2022-08-29 15:03:34.710 |
|
|
||||||
Query OK, 2 rows in database (0.001657s)
|
|
||||||
```
|
|
||||||
|
|
||||||
也可以通过查询INFORMATION_SCHEMA.INS_USERS系统表来查看用户信息,例如:
|
|
||||||
|
|
||||||
```sql
|
|
||||||
taos> select * from information_schema.ins_users;
|
|
||||||
name | super | enable | sysinfo | create_time |
|
|
||||||
================================================================================
|
|
||||||
test | 0 | 1 | 1 | 2022-08-29 15:10:27.315 |
|
|
||||||
root | 1 | 1 | 1 | 2022-08-29 15:03:34.710 |
|
|
||||||
Query OK, 2 rows in database (0.001953s)
|
|
||||||
```
|
|
||||||
|
|
||||||
## 删除用户
|
|
||||||
|
|
||||||
```sql
|
|
||||||
DROP USER user_name;
|
|
||||||
```
|
|
||||||
|
|
||||||
## 修改用户信息
|
|
||||||
|
|
||||||
```sql
|
|
||||||
ALTER USER user_name alter_user_clause
|
|
||||||
|
|
||||||
alter_user_clause: {
|
|
||||||
PASS 'literal'
|
|
||||||
| ENABLE value
|
|
||||||
| SYSINFO value
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
- PASS:修改用户密码。
|
|
||||||
- ENABLE:修改用户是否启用。1 表示启用此用户,0 表示禁用此用户。
|
|
||||||
- SYSINFO:修改用户是否可查看系统信息。1 表示可以查看系统信息,0 表示不可以查看系统信息。
|
|
||||||
|
|
||||||
例如,禁用 test 用户:
|
|
||||||
|
|
||||||
```sql
|
|
||||||
taos> alter user test enable 0;
|
|
||||||
Query OK, 0 of 0 rows affected (0.001160s)
|
|
||||||
```
|
|
||||||
|
|
||||||
## 授权
|
|
||||||
|
|
||||||
```sql
|
|
||||||
GRANT privileges ON priv_level TO user_name
|
|
||||||
|
|
||||||
privileges : {
|
|
||||||
ALL
|
|
||||||
| priv_type [, priv_type] ...
|
|
||||||
}
|
|
||||||
|
|
||||||
priv_type : {
|
|
||||||
READ
|
|
||||||
| WRITE
|
|
||||||
}
|
|
||||||
|
|
||||||
priv_level : {
|
|
||||||
dbname.*
|
|
||||||
| *.*
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
对用户授权。授权功能只包含在企业版中。
|
|
||||||
|
|
||||||
授权级别支持到DATABASE,权限有READ和WRITE两种。
|
|
||||||
|
|
||||||
TDengine 有超级用户和普通用户两类用户。超级用户缺省创建为root,拥有所有权限。使用超级用户创建出来的用户为普通用户。在未授权的情况下,普通用户可以创建DATABASE,并拥有自己创建的DATABASE的所有权限,包括删除数据库、修改数据库、查询时序数据和写入时序数据。超级用户可以给普通用户授予其他DATABASE的读写权限,使其可以在此DATABASE上读写数据,但不能对其进行删除和修改数据库的操作。
|
|
||||||
|
|
||||||
对于非DATABASE的对象,如USER、DNODE、UDF、QNODE等,普通用户只有读权限(一般为SHOW命令),不能创建和修改。
|
|
||||||
|
|
||||||
## 撤销授权
|
|
||||||
|
|
||||||
```sql
|
|
||||||
REVOKE privileges ON priv_level FROM user_name
|
|
||||||
|
|
||||||
privileges : {
|
|
||||||
ALL
|
|
||||||
| priv_type [, priv_type] ...
|
|
||||||
}
|
|
||||||
|
|
||||||
priv_type : {
|
|
||||||
READ
|
|
||||||
| WRITE
|
|
||||||
}
|
|
||||||
|
|
||||||
priv_level : {
|
|
||||||
dbname.*
|
|
||||||
| *.*
|
|
||||||
}
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
收回对用户的授权。授权功能只包含在企业版中。
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
---
|
||||||
|
title: 集群运维
|
||||||
|
description: TDengine 提供了多种集群运维手段以使集群运行更健康更高效
|
||||||
|
---
|
||||||
|
|
||||||
|
为了使集群运行更健康更高效,TDengine 企业版提供了一些运维手段来帮助系统管理员更好地运维集群。
|
||||||
|
|
||||||
|
## 数据重整
|
||||||
|
|
||||||
|
TDengine 面向多种写入场景,在有些写入场景下,TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率,另一方面也会影响查询效率。为了解决上述问题,TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。
|
||||||
|
|
||||||
|
**语法**
|
||||||
|
|
||||||
|
```sql
|
||||||
|
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY'];
|
||||||
|
```
|
||||||
|
|
||||||
|
**效果**
|
||||||
|
|
||||||
|
- 扫描并压缩指定的 DB 中所有 VGROUP 中 VNODE 的所有数据文件
|
||||||
|
- COMPCAT 会删除被删除数据以及被删除的表的数据
|
||||||
|
- COMPACT 会合并多个 STT 文件
|
||||||
|
- 可通过 start with 关键字指定 COMPACT 数据的起始时间
|
||||||
|
- 可通过 end with 关键字指定 COMPACT 数据的终止时间
|
||||||
|
|
||||||
|
**补充说明**
|
||||||
|
|
||||||
|
- COMPACT 为异步,执行 COMPACT 命令后不会等 COMPACT 结束就会返回。如果上一个 COMPACT 没有完成则再发起一个 COMPACT 任务,则会等上一个任务完成后再返回。
|
||||||
|
- COMPACT 可能阻塞写入,但不阻塞查询
|
||||||
|
- COMPACT 的进度不可观测
|
||||||
|
|
||||||
|
## 集群负载再平衡
|
||||||
|
|
||||||
|
当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令
|
||||||
|
|
||||||
|
```sql
|
||||||
|
balance vgroup leader;
|
||||||
|
```
|
||||||
|
|
||||||
|
**功能**
|
||||||
|
|
||||||
|
让所有的 vgroup 的 leade r在各自的replica节点上均匀分布。这个命令会让 vgroup 强制重新选举,通过重新选举,在选举的过程中,变换 vgroup 的leader,通过这个方式,最终让leader均匀分布。
|
||||||
|
|
||||||
|
**注意**
|
||||||
|
|
||||||
|
Raft选举本身带有随机性,所以通过选举的重新分布产生的均匀分布也是带有一定的概率,不会完全的均匀。**该命令的副作用是影响查询和写入**,在vgroup重新选举时,从开始选举到选举出新的 leader 这段时间,这 个vgroup 无法写入和查询。选举过程一般在秒级完成。所有的vgroup会依次逐个重新选举。
|
||||||
|
|
||||||
|
## 恢复数据节点
|
||||||
|
|
||||||
|
在多节点三副本的集群环境中,如果某个 dnode 的磁盘损坏,该 dnode 会自动退出,但集群中其它的 dnode 仍然能够继续提供写入和查询服务。
|
||||||
|
|
||||||
|
在更换了损坏的磁盘后,如果想要让曾经主动退出的 dnode 重新加入集群提供服务,可以通过 `restore dnode` 命令来恢复该数据节点上的部分或全部逻辑节点,该功能依赖多副本中的其它副本进行数据复制,所以只在集群中 dnode 数量大于等于 3 且副本数为 3 的情况下能够工作。
|
||||||
|
|
||||||
|
|
||||||
|
```sql
|
||||||
|
restore dnode <dnode_id>;# 恢复dnode上的mnode,所有vnode和qnode
|
||||||
|
restore mnode on dnode <dnode_id>;# 恢复dnode上的mnode
|
||||||
|
restore vnode on dnode <dnode_id> ;# 恢复dnode上的所有vnode
|
||||||
|
restore qnode on dnode <dnode_id>;# 恢复dnode上的qnode
|
||||||
|
```
|
||||||
|
|
||||||
|
**限制**
|
||||||
|
- 该功能是基于已有的复制功能的恢复,不是灾难恢复或者备份恢复,所以对于要恢复的 mnode 和 vnode来说,使用该命令的前提是还存在该 mnode 或 vnode 的其它两个副本仍然能够正常工作。
|
||||||
|
- 该命令不能修复数据目录中的个别文件的损坏或者丢失。例如,如果某个 mnode 或者 vnode 中的个别文件或数据损坏,无法单独恢复损坏的某个文件或者某块数据。此时,可以选择将该 mnode/vnode 的数据全部清空再进行恢复。
|
||||||
|
|
||||||
|
|
||||||
|
## 虚拟组分裂 (Scale Out)
|
||||||
|
|
||||||
|
当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后,可通过 `split vgroup` 命令把该 vgroup 分裂为两个虚拟组。分裂完成后,新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。这也是 TDengine 为企业版用户提供的 scale out 集群的能力。
|
||||||
|
|
||||||
|
```sql
|
||||||
|
split vgroup <vgroup_id>
|
||||||
|
```
|
||||||
|
|
||||||
|
**注意**
|
||||||
|
- 单副本库虚拟组,在分裂完成后,历史时序数据总磁盘空间使用量,可能会翻倍。所以,在执行该操作之前,通过增加 dnode 节点方式,确保集群中有足够的 CPU 和磁盘资源,避免资源不足现象发生。
|
||||||
|
- 该命令为 DB 级事务;执行过程,当前DB的其它管理事务将会被拒绝。集群中,其它DB不受影响。
|
||||||
|
- 分裂任务执行过程中,可持续提供读写服务;期间,可能存在可感知的短暂的读写业务中断。
|
||||||
|
- 在分裂过程中,不支持流和订阅。分裂结束后,历史 WAL 会清空。
|
||||||
|
- 分裂过程中,可支持节点宕机重启容错;但不支持节点磁盘故障容错。
|
|
@ -0,0 +1,56 @@
|
||||||
|
---
|
||||||
|
title: 多级存储
|
||||||
|
---
|
||||||
|
|
||||||
|
## 多级存储
|
||||||
|
|
||||||
|
说明:多级存储功能仅企业版支持。
|
||||||
|
|
||||||
|
在默认配置下,TDengine 会将所有数据保存在 /var/lib/taos 目录下,而且每个 vnode 的数据文件保存在该目录下的不同目录。为扩大存储空间,尽量减少文件读取的瓶颈,提高数据吞吐率 TDengine 可通过配置系统参数 dataDir 让多个挂载的硬盘被系统同时使用。
|
||||||
|
|
||||||
|
除此之外,TDengine 也提供了数据分级存储的功能,将不同时间段的数据存储在挂载的不同介质上的目录里,从而实现不同“热度”的数据存储在不同的存储介质上,充分利用存储,节约成本。比如,最新采集的数据需要经常访问,对硬盘的读取性能要求高,那么用户可以配置将这些数据存储在 SSD 盘上。超过一定期限的数据,查询需求量没有那么高,那么可以存储在相对便宜的 HDD 盘上。
|
||||||
|
|
||||||
|
多级存储支持 3 级,每级最多可配置 16 个挂载点。
|
||||||
|
|
||||||
|
TDengine 多级存储配置方式如下(在配置文件/etc/taos/taos.cfg 中):
|
||||||
|
|
||||||
|
```
|
||||||
|
dataDir [path] <level> <primary>
|
||||||
|
```
|
||||||
|
|
||||||
|
- path: 挂载点的文件夹路径
|
||||||
|
- level: 介质存储等级,取值为 0,1,2。
|
||||||
|
0 级存储最新的数据,1 级存储次新的数据,2 级存储最老的数据,省略默认为 0。
|
||||||
|
各级存储之间的数据流向:0 级存储 -> 1 级存储 -> 2 级存储。
|
||||||
|
同一存储等级可挂载多个硬盘,同一存储等级上的数据文件分布在该存储等级的所有硬盘上。
|
||||||
|
需要说明的是,数据在不同级别的存储介质上的移动,是由系统自动完成的,用户无需干预。
|
||||||
|
- primary: 是否为主挂载点,0(否)或 1(是),省略默认为 1。
|
||||||
|
|
||||||
|
在配置中,只允许一个主挂载点的存在(level=0,primary=1),例如采用如下的配置方式:
|
||||||
|
|
||||||
|
```
|
||||||
|
dataDir /mnt/data1 0 1
|
||||||
|
dataDir /mnt/data2 0 0
|
||||||
|
dataDir /mnt/data3 1 0
|
||||||
|
dataDir /mnt/data4 1 0
|
||||||
|
dataDir /mnt/data5 2 0
|
||||||
|
dataDir /mnt/data6 2 0
|
||||||
|
```
|
||||||
|
|
||||||
|
:::note
|
||||||
|
|
||||||
|
1. 多级存储不允许跨级配置,合法的配置方案有:仅 0 级,仅 0 级+ 1 级,以及 0 级+ 1 级+ 2 级。而不允许只配置 level=0 和 level=2,而不配置 level=1。
|
||||||
|
2. 禁止手动移除使用中的挂载盘,挂载盘目前不支持非本地的网络盘。
|
||||||
|
3. 多级存储目前不支持删除已经挂载的硬盘的功能。
|
||||||
|
|
||||||
|
:::
|
||||||
|
|
||||||
|
## 0 级负载均衡
|
||||||
|
|
||||||
|
在多级存储中,有且只有一个主挂载点,主挂载点承担了系统中最重要的元数据在座,同时各个 vnode 的主目录均存在于当前 dnode 主挂载点上,从而导致该 dnode 的写入性能受限于单个磁盘的 IO 吞吐能力。
|
||||||
|
|
||||||
|
从 TDengine 3.1.0.0 开始,如果一个 dnode 配置了多个 0 级挂载点,我们将该 dnode 上所有 vnode 的主目录均衡分布在所有的 0 级挂载点上,由这些 0 级挂载点共同承担写入负荷。在网络 I/O 及其它处理资源不成为瓶颈的情况下,通过优化集群配置,测试结果证明整个系统的写入能力和 0 级挂载点的数量呈现线性关系,即随着 0 级挂载点数量的增加,整个系统的写入能力也成倍增加。
|
||||||
|
|
||||||
|
## 同级挂载点选择策略
|
||||||
|
|
||||||
|
一般情况下,当 TDengine 要从同级挂载点中选择一个用于生成新的数据文件时,采用 round robin 策略进行选择。但现实中有可能每个磁盘的容量不相同,或者容量相同但写入的数据量不相同,这就导致会出现每个磁盘上的可用空间不均衡,在实际进行选择时有可能会选择到一个剩余空间已经很小的磁盘。为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 `minDiskFreeSize`,当某块磁盘上的可用空间小于等于这个阈值时,该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节,其值应该大于 2GB,即会跳过可用空间小于 2GB 的挂载点。
|
|
@ -767,8 +767,6 @@ typedef struct {
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
int64_t deleteMark1;
|
int64_t deleteMark1;
|
||||||
int64_t deleteMark2;
|
int64_t deleteMark2;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMCreateStbReq;
|
} SMCreateStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||||||
|
@ -789,8 +787,6 @@ typedef struct {
|
||||||
int8_t source; // 1-taosX or 0-taosClient
|
int8_t source; // 1-taosX or 0-taosClient
|
||||||
int8_t reserved[6];
|
int8_t reserved[6];
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMDropStbReq;
|
} SMDropStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
||||||
|
@ -804,8 +800,6 @@ typedef struct {
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
int32_t commentLen;
|
int32_t commentLen;
|
||||||
char* comment;
|
char* comment;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMAlterStbReq;
|
} SMAlterStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
|
||||||
|
@ -875,8 +869,6 @@ int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pR
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
int32_t sqlLen;
|
|
||||||
char *sql;
|
|
||||||
} SDropUserReq, SDropAcctReq;
|
} SDropUserReq, SDropAcctReq;
|
||||||
|
|
||||||
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
|
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
|
||||||
|
@ -889,8 +881,6 @@ typedef struct {
|
||||||
int8_t enable;
|
int8_t enable;
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char pass[TSDB_USET_PASSWORD_LEN];
|
char pass[TSDB_USET_PASSWORD_LEN];
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SCreateUserReq;
|
} SCreateUserReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
|
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
|
||||||
|
@ -907,8 +897,6 @@ typedef struct {
|
||||||
char tabName[TSDB_TABLE_NAME_LEN];
|
char tabName[TSDB_TABLE_NAME_LEN];
|
||||||
char* tagCond;
|
char* tagCond;
|
||||||
int32_t tagCondLen;
|
int32_t tagCondLen;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SAlterUserReq;
|
} SAlterUserReq;
|
||||||
|
|
||||||
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
||||||
|
@ -1071,8 +1059,6 @@ typedef struct {
|
||||||
int16_t hashPrefix;
|
int16_t hashPrefix;
|
||||||
int16_t hashSuffix;
|
int16_t hashSuffix;
|
||||||
int32_t tsdbPageSize;
|
int32_t tsdbPageSize;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SCreateDbReq;
|
} SCreateDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
|
||||||
|
@ -1098,8 +1084,6 @@ typedef struct {
|
||||||
int32_t minRows;
|
int32_t minRows;
|
||||||
int32_t walRetentionPeriod;
|
int32_t walRetentionPeriod;
|
||||||
int32_t walRetentionSize;
|
int32_t walRetentionSize;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SAlterDbReq;
|
} SAlterDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||||||
|
@ -1108,8 +1092,6 @@ int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int8_t ignoreNotExists;
|
int8_t ignoreNotExists;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SDropDbReq;
|
} SDropDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
|
||||||
|
@ -1307,8 +1289,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
STimeWindow timeRange;
|
STimeWindow timeRange;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SCompactDbReq;
|
} SCompactDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
|
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
|
||||||
|
@ -1872,8 +1852,6 @@ void tFreeSExplainRsp(SExplainRsp* pRsp);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||||
int32_t port;
|
int32_t port;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SCreateDnodeReq;
|
} SCreateDnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
|
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
|
||||||
|
@ -1885,8 +1863,6 @@ typedef struct {
|
||||||
int32_t port;
|
int32_t port;
|
||||||
int8_t force;
|
int8_t force;
|
||||||
int8_t unsafe;
|
int8_t unsafe;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SDropDnodeReq;
|
} SDropDnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
|
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
|
||||||
|
@ -1902,8 +1878,6 @@ enum {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int8_t restoreType;
|
int8_t restoreType;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SRestoreDnodeReq;
|
} SRestoreDnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
|
int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
|
||||||
|
@ -1913,8 +1887,6 @@ typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char config[TSDB_DNODE_CONFIG_LEN];
|
char config[TSDB_DNODE_CONFIG_LEN];
|
||||||
char value[TSDB_DNODE_VALUE_LEN];
|
char value[TSDB_DNODE_VALUE_LEN];
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMCfgDnodeReq;
|
} SMCfgDnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
|
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
|
||||||
|
@ -1930,8 +1902,6 @@ int32_t tDeserializeSDCfgDnodeReq(void* buf, int32_t bufLen, SDCfgDnodeReq* pReq
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int32_t sqlLen;
|
|
||||||
char *sql;
|
|
||||||
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq, SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq,
|
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq, SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq,
|
||||||
SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq;
|
SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq;
|
||||||
|
|
||||||
|
@ -1972,8 +1942,6 @@ int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t useless; // useless
|
int32_t useless; // useless
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SBalanceVgroupReq;
|
} SBalanceVgroupReq;
|
||||||
|
|
||||||
int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
|
int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
|
||||||
|
@ -1992,8 +1960,6 @@ typedef struct {
|
||||||
int32_t dnodeId1;
|
int32_t dnodeId1;
|
||||||
int32_t dnodeId2;
|
int32_t dnodeId2;
|
||||||
int32_t dnodeId3;
|
int32_t dnodeId3;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SRedistributeVgroupReq;
|
} SRedistributeVgroupReq;
|
||||||
|
|
||||||
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
|
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
|
||||||
|
@ -2001,8 +1967,6 @@ int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistrib
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t useless;
|
int32_t useless;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SBalanceVgroupLeaderReq;
|
} SBalanceVgroupLeaderReq;
|
||||||
|
|
||||||
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
|
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
|
||||||
|
@ -2262,7 +2226,6 @@ typedef struct {
|
||||||
int64_t deleteMark;
|
int64_t deleteMark;
|
||||||
int8_t igUpdate;
|
int8_t igUpdate;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
int32_t sqlLen;
|
|
||||||
} SCMCreateStreamReq;
|
} SCMCreateStreamReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -2299,7 +2262,6 @@ typedef struct {
|
||||||
char subDbName[TSDB_DB_FNAME_LEN];
|
char subDbName[TSDB_DB_FNAME_LEN];
|
||||||
char* ast;
|
char* ast;
|
||||||
char subStbName[TSDB_TABLE_FNAME_LEN];
|
char subStbName[TSDB_TABLE_FNAME_LEN];
|
||||||
int32_t sqlLen;
|
|
||||||
} SCMCreateTopicReq;
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||||
|
@ -2484,8 +2446,6 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMDropTopicReq;
|
} SMDropTopicReq;
|
||||||
|
|
||||||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||||
|
@ -2585,8 +2545,6 @@ typedef struct SVCreateTbReq {
|
||||||
SSchemaWrapper schemaRow;
|
SSchemaWrapper schemaRow;
|
||||||
} ntb;
|
} ntb;
|
||||||
};
|
};
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SVCreateTbReq;
|
} SVCreateTbReq;
|
||||||
|
|
||||||
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
|
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
|
||||||
|
@ -3061,8 +3019,6 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
int32_t sqlLen;
|
|
||||||
char* sql;
|
|
||||||
} SMDropStreamReq;
|
} SMDropStreamReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -736,7 +736,23 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
||||||
code = mndCreateDb(pMnode, pReq, &createReq, pUser);
|
code = mndCreateDb(pMnode, pReq, &createReq, pUser);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createDB", createReq.db, "", "");
|
char detail[3000] = {0};
|
||||||
|
sprintf(detail, "buffer:%d, cacheLast:%d, cacheLastSize:%d, compression:%d, daysPerFile:%d, "
|
||||||
|
"daysToKeep0:%d, daysToKeep:%d, daysToKeep2:%d, hashPrefix:%d, "
|
||||||
|
"hashSuffix:%d, ignoreExist:%d, maxRows:%d, minRows:%d, numOfRetensions:%d, "
|
||||||
|
"numOfStables:%d, numOfVgroups:%d, pages:%d, pageSize:%d, precision:%d, "
|
||||||
|
"replications:%d, schemaless:%d, sstTrigger:%d, strict:%d, "
|
||||||
|
"tsdbPageSize:%d, walFsyncPeriod:%d, walLevel:%d, walRetentionPeriod:%d, "
|
||||||
|
"walRetentionSize:%" PRId64 ", walRollPeriod:%d, walSegmentSize:%" PRId64,
|
||||||
|
createReq.buffer, createReq.cacheLast, createReq.cacheLastSize, createReq.compression, createReq.daysPerFile,
|
||||||
|
createReq.daysToKeep0, createReq.daysToKeep1, createReq.daysToKeep2, createReq.hashPrefix,
|
||||||
|
createReq.hashSuffix, createReq.ignoreExist, createReq.maxRows, createReq.minRows, createReq.numOfRetensions,
|
||||||
|
createReq.numOfStables, createReq.numOfVgroups, createReq.pages, createReq.pageSize, createReq.precision,
|
||||||
|
createReq.replications, createReq.schemaless, createReq.sstTrigger, createReq.strict,
|
||||||
|
createReq.tsdbPageSize, createReq.walFsyncPeriod, createReq.walLevel, createReq.walRetentionPeriod,
|
||||||
|
createReq.walRetentionSize, createReq.walRollPeriod, createReq.walSegmentSize);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "createDB", createReq.db, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -980,7 +996,17 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "alterDB", alterReq.db, "", "");
|
char detail[3000] = {0};
|
||||||
|
sprintf(detail, "buffer:%d, cacheLast:%d, cacheLastSize:%d, daysPerFile:%d, daysToKeep0:%d, "
|
||||||
|
"daysToKeep1:%d, daysToKeep2:%d, db:%s, minRows:%d, pages:%d, pageSize:%d, "
|
||||||
|
"replications:%d, sstTrigger:%d, strict:%d, walFsyncPeriod:%d, "
|
||||||
|
"walRetentionSize:%d",
|
||||||
|
alterReq.buffer, alterReq.cacheLast, alterReq.cacheLastSize, alterReq.daysPerFile, alterReq.daysToKeep0,
|
||||||
|
alterReq.daysToKeep1, alterReq.daysToKeep2, alterReq.db, alterReq.minRows, alterReq.pages, alterReq.pageSize,
|
||||||
|
alterReq.replications, alterReq.sstTrigger, alterReq.strict, alterReq.walFsyncPeriod,
|
||||||
|
alterReq.walRetentionSize);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "alterDB", alterReq.db, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -1271,7 +1297,10 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropDB", dropReq.db, "", "");
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "ignoreNotExists:%d", dropReq.ignoreNotExists);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "dropDB", dropReq.db, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -910,11 +910,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
tsGrantHBInterval = 5;
|
tsGrantHBInterval = 5;
|
||||||
|
|
||||||
char detail[1000] = {0};
|
char obj[200] = {0};
|
||||||
sprintf(detail, "%s:%d",
|
sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
|
||||||
createReq.fqdn, createReq.port);
|
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createDnode", detail, "", "");
|
auditRecord(pReq, pMnode->clusterId, "createDnode", obj, "", "");
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -1066,10 +1065,13 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
||||||
char obj1[150] = {0};
|
char obj1[150] = {0};
|
||||||
sprintf(obj1, "%s:%d", dropReq.fqdn, dropReq.port);
|
sprintf(obj1, "%s:%d", dropReq.fqdn, dropReq.port);
|
||||||
|
|
||||||
char obj2[10] = {0};
|
char obj2[30] = {0};
|
||||||
sprintf(obj2, "%d", dropReq.dnodeId);
|
sprintf(obj2, "%d", dropReq.dnodeId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, obj2, "");
|
char detail[100] = {0};
|
||||||
|
sprintf(detail, "force:%d, unsafe:%d", dropReq.force, dropReq.unsafe);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, obj2, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -1252,10 +1254,13 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char detail[50] = {0};
|
char obj[50] = {0};
|
||||||
sprintf(detail, "%d", cfgReq.dnodeId);
|
sprintf(obj, "%d", cfgReq.dnodeId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "alterDnode", detail, "", "");
|
char detail[500] = {0};
|
||||||
|
sprintf(detail, "config:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", detail);
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -653,14 +653,10 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
|
code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
char detail[1000] = {0};
|
char obj[40] = {0};
|
||||||
|
|
||||||
char obj[20] = {0};
|
|
||||||
sprintf(obj, "%d", createReq.dnodeId);
|
sprintf(obj, "%d", createReq.dnodeId);
|
||||||
|
|
||||||
sprintf(detail, "dnodeId:%d", createReq.dnodeId);
|
auditRecord(pReq, pMnode->clusterId, "createMnode", obj, "", "");
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createMnode", obj, detail, "");
|
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -798,7 +794,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
||||||
code = mndDropMnode(pMnode, pReq, pObj);
|
code = mndDropMnode(pMnode, pReq, pObj);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
char obj[20] = {0};
|
char obj[40] = {0};
|
||||||
sprintf(obj, "%d", dropReq.dnodeId);
|
sprintf(obj, "%d", dropReq.dnodeId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropMnode", obj, "", "");
|
auditRecord(pReq, pMnode->clusterId, "dropMnode", obj, "", "");
|
||||||
|
|
|
@ -309,15 +309,14 @@ _CONNECT:
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
char detail[1000] = {0};
|
char obj[100] = {0};
|
||||||
|
|
||||||
char obj[30] = {0};
|
|
||||||
sprintf(obj, "%s:%d", ip, pConn->port);
|
sprintf(obj, "%s:%d", ip, pConn->port);
|
||||||
|
|
||||||
sprintf(detail, "user:%s, from:%s, connType%d",
|
char detail[1000] = {0};
|
||||||
connReq.user, obj, connReq.connType);
|
sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s",
|
||||||
|
connReq.connType, connReq.db, connReq.pid, connReq.startTime, connReq.sVer, connReq.app);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "login", connReq.app, obj, detail);
|
auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
||||||
|
|
|
@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
||||||
char obj[33] = {0};
|
char obj[33] = {0};
|
||||||
sprintf(obj, "%d", dropReq.dnodeId);
|
sprintf(obj, "%d", dropReq.dnodeId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createQnode", obj, "", "");
|
auditRecord(pReq, pMnode->clusterId, "dropQnode", obj, "", "");
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -1174,7 +1174,17 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createStb", pDb->name, createReq.name, "");
|
char detail[2000] = {0};
|
||||||
|
sprintf(detail, "colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64 ", "
|
||||||
|
"deleteMark2:%" PRId64 ", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, "
|
||||||
|
"source:%d, suid:%" PRId64 ", tagVer:%d, ttl:%d, "
|
||||||
|
"watermark1:%" PRId64 ", watermark2:%" PRId64,
|
||||||
|
createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1,
|
||||||
|
createReq.deleteMark2, createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags,
|
||||||
|
createReq.source, createReq.suid, createReq.tagVer, createReq.ttl,
|
||||||
|
createReq.watermark1, createReq.watermark2);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "createStb", pDb->name, createReq.name, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -2244,7 +2254,11 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
||||||
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
|
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "alterStb", pDb->name, alterReq.name, "");
|
char detail[2000] = {0};
|
||||||
|
sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d" ,
|
||||||
|
alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "alterStb", pDb->name, alterReq.name, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -2507,7 +2521,11 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropStb", pDb->name, dropReq.name, "");
|
char detail[2000] = {0};
|
||||||
|
sprintf(detail, "igNotExists:%d, source:%d" ,
|
||||||
|
dropReq.igNotExists, dropReq.source);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "dropStb", pDb->name, dropReq.name, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -829,7 +829,19 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", "");
|
char detail[2000] = {0};
|
||||||
|
sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 ", "
|
||||||
|
"fillHistory:%d, igExists:%d, "
|
||||||
|
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", "
|
||||||
|
"maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, "
|
||||||
|
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
|
||||||
|
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
|
||||||
|
createStreamReq.fillHistory, createStreamReq.igExists,
|
||||||
|
createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs,
|
||||||
|
createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
|
||||||
|
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -1076,7 +1088,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropStream", dropReq.name, "", "");
|
char detail[100] = {0};
|
||||||
|
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "dropStream", dropReq.name, "", detail);
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
|
@ -622,7 +622,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "crateTopic", createTopicReq.name, createTopicReq.subDbName, createTopicReq.sql);
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d",
|
||||||
|
createTopicReq.igExists, createTopicReq.subStbName, createTopicReq.subType, createTopicReq.withMeta);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "crateTopic", createTopicReq.name, createTopicReq.subDbName, detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -815,7 +819,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropTopic", dropReq.name, "", dropReq.sql);
|
char detail[100] = {0};
|
||||||
|
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "dropTopic", dropReq.name, "", detail);
|
||||||
|
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -656,7 +656,11 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
|
||||||
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
|
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", "");
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d",
|
||||||
|
createReq.createType, createReq.enable, createReq.superUser, createReq.sysInfo);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -973,13 +977,17 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
||||||
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
|
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "alterType:%d, enable:%d, superUser:%d, sysInfo:%d, tabName:%s",
|
||||||
|
alterReq.alterType, alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName);
|
||||||
|
|
||||||
if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){
|
if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){
|
||||||
auditRecord(pReq, pMnode->clusterId, "changePassword", alterReq.user, alterReq.objname, "");
|
auditRecord(pReq, pMnode->clusterId, "changePassword", alterReq.user, alterReq.objname, detail);
|
||||||
}
|
}
|
||||||
else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER ||
|
else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER ||
|
||||||
alterReq.alterType == TSDB_ALTER_USER_ENABLE ||
|
alterReq.alterType == TSDB_ALTER_USER_ENABLE ||
|
||||||
alterReq.alterType == TSDB_ALTER_USER_SYSINFO){
|
alterReq.alterType == TSDB_ALTER_USER_SYSINFO){
|
||||||
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, alterReq.objname, "");
|
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, alterReq.objname, detail);
|
||||||
}
|
}
|
||||||
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB||
|
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB||
|
||||||
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB||
|
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB||
|
||||||
|
@ -988,10 +996,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
|
||||||
alterReq.alterType == TSDB_ALTER_USER_ADD_READ_TABLE||
|
alterReq.alterType == TSDB_ALTER_USER_ADD_READ_TABLE||
|
||||||
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_TABLE||
|
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_TABLE||
|
||||||
alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE){
|
alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE){
|
||||||
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, alterReq.objname, "");
|
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, alterReq.objname, detail);
|
||||||
}
|
}
|
||||||
else{
|
else{
|
||||||
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, alterReq.objname, "");
|
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, alterReq.objname, detail);
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -1063,7 +1071,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
|
||||||
code = mndDropUser(pMnode, pReq, pUser);
|
code = mndDropUser(pMnode, pReq, pUser);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "dropUser", dropReq.user, "", dropReq.sql);
|
auditRecord(pReq, pMnode->clusterId, "dropUser", dropReq.user, "", "");
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -2175,7 +2175,11 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
||||||
char obj[33] = {0};
|
char obj[33] = {0};
|
||||||
sprintf(obj, "%d", req.vgId);
|
sprintf(obj, "%d", req.vgId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", obj, "", req.sql);
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "dnodeId1:%d, dnodeId2:%d, dnodeId3:%d",
|
||||||
|
req.dnodeId1, req.dnodeId2, req.dnodeId3);
|
||||||
|
|
||||||
|
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", obj, "", detail);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -2987,7 +2991,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
||||||
code = mndBalanceVgroup(pMnode, pReq, pArray);
|
code = mndBalanceVgroup(pMnode, pReq, pArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql);
|
auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", "");
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
|
|
@ -26,7 +26,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum DirtyEntryType {
|
typedef enum DirtyEntryType {
|
||||||
ENTRY_TYPE_DEL = 1,
|
ENTRY_TYPE_DELETE = 1,
|
||||||
ENTRY_TYPE_UPSERT = 2,
|
ENTRY_TYPE_UPSERT = 2,
|
||||||
} DirtyEntryType;
|
} DirtyEntryType;
|
||||||
|
|
||||||
|
@ -44,6 +44,8 @@ typedef struct STtlManger {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t ttlDays;
|
int64_t ttlDays;
|
||||||
int64_t changeTimeMs;
|
int64_t changeTimeMs;
|
||||||
|
int64_t ttlDaysDirty;
|
||||||
|
int64_t changeTimeMsDirty;
|
||||||
} STtlCacheEntry;
|
} STtlCacheEntry;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -209,7 +209,8 @@ static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void
|
||||||
int64_t ttlDays = *(int64_t *)pVal;
|
int64_t ttlDays = *(int64_t *)pVal;
|
||||||
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
|
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
|
||||||
|
|
||||||
STtlCacheEntry data = {.ttlDays = ttlDays, .changeTimeMs = changeTimeMs};
|
STtlCacheEntry data = {
|
||||||
|
.ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
|
||||||
|
|
||||||
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
|
||||||
}
|
}
|
||||||
|
@ -257,34 +258,37 @@ static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const voi
|
||||||
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
|
||||||
SMeta *meta = pMeta;
|
SMeta *meta = pMeta;
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl start.");
|
metaInfo("ttlMgr convert start.");
|
||||||
|
|
||||||
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
|
||||||
|
|
||||||
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("failed to convert ttl since %s", tstrerror(terrno));
|
metaError("failed to convert since %s", tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
||||||
metaInfo("ttlMgr convert ttl end.");
|
metaInfo("ttlMgr convert end.");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
|
||||||
if (updCtx->ttlDays == 0) return 0;
|
if (updCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
|
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
|
||||||
|
.changeTimeMs = updCtx->changeTimeMs,
|
||||||
|
.ttlDaysDirty = updCtx->ttlDays,
|
||||||
|
.changeTimeMsDirty = updCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,11 +308,11 @@ _out:
|
||||||
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
|
||||||
if (delCtx->ttlDays == 0) return 0;
|
if (delCtx->ttlDays == 0) return 0;
|
||||||
|
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
|
||||||
|
|
||||||
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,19 +336,22 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
|
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
|
||||||
|
.changeTimeMs = oldData->changeTimeMs,
|
||||||
|
.ttlDaysDirty = oldData->ttlDays,
|
||||||
|
.changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
|
||||||
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
|
||||||
sizeof(dirtryEntry));
|
sizeof(dirtryEntry));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
|
metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
@ -396,27 +403,35 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
|
||||||
STtlIdxKeyV1 ttlKey;
|
STtlIdxKeyV1 ttlKey;
|
||||||
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
|
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
|
||||||
|
|
||||||
|
STtlIdxKeyV1 ttlKeyDirty;
|
||||||
|
ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
|
||||||
|
|
||||||
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
if (pEntry->type == ENTRY_TYPE_UPSERT) {
|
||||||
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
|
// delete old key & upsert new key
|
||||||
pTxn);
|
tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
|
||||||
|
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
|
||||||
|
sizeof(cacheEntry->ttlDaysDirty), pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else if (pEntry->type == ENTRY_TYPE_DEL) {
|
|
||||||
|
cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
|
||||||
|
cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
|
||||||
|
} else if (pEntry->type == ENTRY_TYPE_DELETE) {
|
||||||
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -637,7 +637,7 @@ _exit:
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
||||||
}
|
}
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t close_file_system(STFileSystem *fs) {
|
static int32_t close_file_system(STFileSystem *fs) {
|
||||||
|
@ -730,7 +730,7 @@ _exit:
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
||||||
}
|
}
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
|
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
|
||||||
|
|
|
@ -934,7 +934,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
|
||||||
|
|
||||||
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
|
||||||
|
|
||||||
auditRecord(pReq, clusterId, "createTable", pVnode->config.dbname, pCreateReq->name, "");
|
char detail[1000] = {0};
|
||||||
|
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
|
||||||
|
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
|
||||||
|
|
||||||
|
auditRecord(pReq, clusterId, "createTable", pVnode->config.dbname, pCreateReq->name, detail);
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
|
||||||
|
|
|
@ -969,6 +969,9 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
|
||||||
|
|
||||||
if (NULL == pJoin->node.pConditions) {
|
if (NULL == pJoin->node.pConditions) {
|
||||||
int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
|
int32_t code = pushDownCondOptJoinExtractCond(pCxt, pJoin);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = pushDownCondOptJoinExtractEqualOnCond(pCxt, pJoin);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
|
||||||
pCxt->optimized = true;
|
pCxt->optimized = true;
|
||||||
|
|
|
@ -289,8 +289,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||||
SSvrConn* conn = cli->data;
|
SSvrConn* conn = cli->data;
|
||||||
STrans* pTransInst = conn->pTransInst;
|
SWorkThrd* pThrd = conn->hostThrd;
|
||||||
|
|
||||||
|
if (true == pThrd->quit) {
|
||||||
|
tInfo("work thread received quit msg, destroy conn");
|
||||||
|
destroyConn(conn, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
STrans* pTransInst = conn->pTransInst;
|
||||||
|
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
|
|
|
@ -155,9 +155,37 @@ class TDTestCase:
|
||||||
tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
|
tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
|
||||||
for i in range(self.tbnum):
|
for i in range(self.tbnum):
|
||||||
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
|
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
|
||||||
def run(self):
|
|
||||||
|
def alter_stable_column_varchar_39001(self):
|
||||||
|
"""Check alter stable column varchar 39001 from 39000(TS-3841)
|
||||||
|
"""
|
||||||
|
stbname = "st1"
|
||||||
|
column_dict = {
|
||||||
|
'ts' : 'timestamp',
|
||||||
|
'col1': 'varchar(39000)',
|
||||||
|
'col2': 'tinyint',
|
||||||
|
'col3': 'timestamp',
|
||||||
|
'col4': 'tinyint',
|
||||||
|
'col5': 'timestamp',
|
||||||
|
'col6': 'varchar(18)',
|
||||||
|
'col7': 'varchar(17)'
|
||||||
|
}
|
||||||
|
tag_dict = {
|
||||||
|
'id': 'int'
|
||||||
|
}
|
||||||
|
|
||||||
|
tdSql.execute(self.setsql.set_create_stable_sql(stbname, column_dict, tag_dict))
|
||||||
|
res = tdSql.getResult(f'desc {stbname}')
|
||||||
|
tdLog.info(res)
|
||||||
|
assert(res[1][2] == 39000)
|
||||||
|
tdSql.execute(f'alter stable {stbname} modify column col1 varchar(39001)')
|
||||||
|
res = tdSql.getResult(f'desc {stbname}')
|
||||||
|
tdLog.info(res)
|
||||||
|
assert(res[1][2] == 39001)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
self.alter_stable_check()
|
self.alter_stable_check()
|
||||||
|
self.alter_stable_column_varchar_39001()
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
|
@ -106,6 +106,9 @@ class TDTestCase:
|
||||||
tdSql.query(f"select a.ts, b.ts from sta a, stb b where a.ts=b.ts and (a.tg1=b.tg1 and a.tg1 > b.tg1);")
|
tdSql.query(f"select a.ts, b.ts from sta a, stb b where a.ts=b.ts and (a.tg1=b.tg1 and a.tg1 > b.tg1);")
|
||||||
tdSql.checkRows(0)
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
|
tdSql.query(f"select a.* from sta a join stb b on a.tg1=b.tg1 and a.ts=b.ts and a.tg2=b.tg2;")
|
||||||
|
tdSql.checkRows(12)
|
||||||
|
|
||||||
# tdSql.checkData(0,1,10)
|
# tdSql.checkData(0,1,10)
|
||||||
|
|
||||||
tdSql.error(f"select a.* from sta a join stb b on a.tg1=b.tg1 where a.ts=b.ts or a.tg2=b.tg2;")
|
tdSql.error(f"select a.* from sta a join stb b on a.tg1=b.tg1 where a.ts=b.ts or a.tg2=b.tg2;")
|
||||||
|
|
Loading…
Reference in New Issue