Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-11-04 09:11:43 +08:00
commit de92e455a6
62 changed files with 5353 additions and 4034 deletions

View File

@ -48,7 +48,7 @@ jobs:
working-directory: tools/keeper
run: |
go mod tidy
go test -v -coverpkg=./... -coverprofile=coverage.out ./...
sudo go test -v -ldflags="-X 'github.com/taosdata/taoskeeper/version.IsEnterprise=true'" -coverpkg=./... -coverprofile=coverage.out ./...
go tool cover -func=coverage.out
- name: Clean up

View File

@ -191,48 +191,11 @@ ELSE()
SET(COMPILER_SUPPORT_AVX512VL false)
ELSE()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
INCLUDE(CheckCSourceRuns)
SET(CMAKE_REQUIRED_FLAGS "-mavx")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256d a, b, c;
double buf[4] = {0};
a = _mm256_loadu_pd(buf);
b = _mm256_loadu_pd(buf);
c = _mm256_add_pd(a, b);
_mm256_storeu_pd(buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
IF (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX)
SET(CMAKE_REQUIRED_FLAGS "-mavx2")
check_c_source_runs("
#include <immintrin.h>
int main() {
__m256i a, b, c;
int buf[8] = {0};
a = _mm256_loadu_si256((__m256i *)buf);
b = _mm256_loadu_si256((__m256i *)buf);
c = _mm256_and_si256(a, b);
_mm256_storeu_si256((__m256i *)buf, c);
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
IF (buf[i] != 0) {
return 1;
}
}
return 0;
}
" COMPILER_SUPPORT_AVX2)
ENDIF()
IF(COMPILER_SUPPORT_SSE42)

View File

@ -4,6 +4,6 @@
"main": "index.js",
"license": "MIT",
"dependencies": {
"@tdengine/websocket": "^3.1.0"
"@tdengine/websocket": "^3.1.1"
}
}

View File

@ -3,7 +3,6 @@ const taos = require("@tdengine/websocket");
let dsn = 'ws://localhost:6041';
async function createConnect() {
try {
let conf = new taos.WSConfig(dsn);
conf.setUser('root');

View File

@ -10,7 +10,6 @@ const groupId = "group1";
const clientId = "client1";
async function createConsumer() {
let groupId = "group1";
let clientId = "client1";
let configMap = new Map([

View File

@ -8,7 +8,7 @@ toc_max_heading_level: 4
## 基本查询
为了更好的介绍 TDengine 数据查询,使用 如下 taosBenchmark 命令,生成本章内容需要的时序数据。
为了更好的介绍 TDengine 数据查询,使用如下 taosBenchmark 命令,生成本章内容需要的时序数据。
```shell
taosBenchmark --start-timestamp=1600000000000 --tables=100 --records=10000000 --time-step=10000
@ -25,7 +25,7 @@ ORDER BY ts DESC
LIMIT 5;
```
上面的 SQL从超级表 `meters` 中查询出电压 `voltage` 大于 230 的记录,按时间降序排列,且仅输出前 5 行。查询结果如下:
上面的 SQL从超级表 `meters` 中查询出电压 `voltage` 大于 230V 的记录,按时间降序排列,且仅输出前 5 行。查询结果如下:
```text
ts | current | voltage | phase | groupid | location |

View File

@ -3,8 +3,52 @@ title: "ARIMA"
sidebar_label: "ARIMA"
---
本节讲述如何 ARIMA 算法的使用方法。
本节讲述 ARIMA 算法模型的使用方法。
## 功能概述
……
ARIMA 即自回归移动平均模型(Autoregressive Integrated Moving Average, ARIMA),也记作 ARIMA(p,d,q),是统计模型中最常见的一种用来进行时间序列预测的模型。
ARIMA模型是一种自回归模型只需要自变量即可预测后续的值。ARIMA模型要求时序数据是**平稳**,或经过差分处理后平稳,如果是不平稳的数据,**无法**获得正确的结果。
>平稳的时间序列:其性质不随观测时间的变化而变化。具有趋势或季节性的时间序列不是平稳时间序列——趋势和季节性使得时间序列在不同时段呈现不同性质。
以下参数可以动态输入控制预测过程中生成 合适的 ARIMA 的模型。
- p= 自回归模型阶数
- d= 差分阶数
- q= 移动平均模型阶数
### 参数
分析平台中使用自动化的 ARIMA 模型进行计算,因此每次计算的时候会根据输入的数据自动拟合最合适的模型,然后根据该模型进行预测输出结果。
|参数名称|说明|必填项|
|---|---|---|
|period|输入时间序列数据每个周期包含的数据点个数。如果不设置该参数或则该参数设置为 0 将使用非季节性/周期性的 ARIMA 模型预测。|选填|
|start_p| 自回归模型阶数的 起始值0 开始的整数,不推荐大于 10 |选填|
|max_p| 自回归模型阶数的 结束值0 开始的整数,不推荐大于 10 |选填|
|start_q| 移动平均模型阶数的起始值, 0 开始的整数,不推荐大于 10 |选填|
|max_q| 移动平均模型阶数的结束值, 0 开始的整数,不推荐大于 10 |选填|
|d| 差分阶数|选填|
`start_p`、`max_p` `start_q` `max_q` 四个参数约束了模型在多大的范围内去搜寻合适的最优解。相同输入数据的条件下,参数范围越大,消耗的资源越多,系统响应的时间越长。
### 示例及结果
针对 i32 列进行数据预测,输入列 i32 每 10 个点是一个周期start_p 起始是 1 最大拟合是 5start_q是1最大值是5预测结果中返回 95% 置信区间范围边界。
```
FORECAST(i32, "algo=arima,alpha=95,period=10, start_p=1, max_p=5, start_q=1, max_q=5")
```
```json5
{
"rows": fc_rows, // 预测结果的行数
"period": period, // 返回结果的周期性,同输入
"alpha": alpha, // 返回结果的置信区间,同输入
"algo": "arima", // 返回结果使用的算法
"mse":mse, // 拟合输入时序数据时候生成模型的最小均方误差(MSE)
"res": res // 列模式的结果
}
```
### 参考文献
- https://en.wikipedia.org/wiki/Autoregressive_moving-average_model
- https://baike.baidu.com/item/%E8%87%AA%E5%9B%9E%E5%BD%92%E6%BB%91%E5%8A%A8%E5%B9%B3%E5%9D%87%E6%A8%A1%E5%9E%8B/5023931?fromtitle=ARMA%E6%A8%A1%E5%9E%8B&fromid=8048415

View File

@ -0,0 +1,43 @@
---
title: "HoltWinters"
sidebar_label: "HoltWinters"
---
本节讲述 HoltWinters 算法模型的使用方法。
## 功能概述
HoltWinters模型又称为多次指数平滑模型EMA。对含有线性趋势和周期波动的非平稳序列适用利用指数平滑法让模型参数不断适应非平稳序列的变化并对未来趋势进行**短期**预测。
HoltWinters有两种不同的季节性组成部分当季节变化在该时间序列中大致保持不变时通常选择**加法模型**;而当季节变化与时间序列的水平成比例变化时,通常选择**乘法模型**。
该模型对于返回数据也不提供计算的置信区间范围结果。在 95% 置信区间的上下界结果与预测结果相同。
### 参数
分析平台中使用自动化的 ARIMA 模型进行计算,因此每次计算的时候会根据输入的数据自动拟合最合适的模型,然后根据该模型进行预测输出结果。
|参数名称|说明|必填项|
|---|---|---|
|period| 输入时间序列数据每个周期包含的数据点个数。如果不设置该参数或则该参数设置为 0 将使用一次(简单)指数平滑方式进行数据拟合,并据此进行未来数据的预测|选填|
|trend| 趋势模型使用加法模型还是乘法模型|选填|
|seasonal| 季节性采用加法模型还是乘法模型|选填|
参数 `trend``seasonal`的均可以选择 `add` (加法模型)或 `mul`(乘法模型)。
### 示例及结果
针对 i32 列进行数据预测,输入列 i32 每 10 个点是一个周期,趋势采用乘法模型,季节采用乘法模型
```
FORECAST(i32, "algo=holtwinters,period=10,trend=mul,seasonal=mul")
```
```json5
{
"rows": rows, // 结果的行数
"period": period, // 返回结果的周期性, 该结果与输入的周期性相同,如果没有周期性,该值为 0
"algo": 'holtwinters' // 返回结果使用的计算模型
"mse":mse, // 最小均方误差minmum square error
"res": res // 具体的结果,按照列形式返回的结果。一般意义上包含了 两列[timestamp][fc_results]。
}
```
### 参考文献
- https://en.wikipedia.org/wiki/Exponential_smoothing
- https://orangematter.solarwinds.com/2019/12/15/holt-winters-forecasting-simplified/

View File

@ -0,0 +1,46 @@
---
title: "Anomaly-detection"
sidebar_label: "Anomaly-detection"
---
本节讲述 异常检测 算法模型的使用方法。
## 概述
分析平台提供了 6 种异常检查模型6 种异常检查模型分为 3 个类别,分别属于基于统计的异常检测模型、基于数据密度的检测模型、基于深度学习的异常检测模型。在不指定异常检测使用的方法的情况下,默认调用 iqr 的方法进行计算。
### 统计学异常检测方法
- k-sigma<sup>[1]</sup>: 即 ***689599.7 rule*** 。***k***值默认为3 即序列均值的 3 倍标准差范围为边界超过边界的是异常值。KSigma 要求数据整体上服从正态分布如果一个点偏离均值K倍标准差则该点被视为异常点.
|参数名称|说明|是否必选|默认值|
|---|---|---|---|
|k|标准差倍数|选填|3|
- IQR<sup>[2]</sup>:四分位距 (Interquartile range, IQR) 是一种衡量变异性的方法. 四分位数将一个按等级排序的数据集划分为四个相等的部分。即 Q1第 1 个四分位数、Q2第 2 个四分位数)和 Q3第 3 个四分位数。IQR 定义为 Q3Q1位于 Q3+1.5 。无输入参数。
- Grubbs<sup>[3]</sup>: 又称为 Grubbs' test即最大标准残差测试。Grubbs 通常用作检验最大值、最小值偏离均值的程度是否为异常,该单变量数据集遵循近似标准正态分布。非正态分布数据集不能使用该方法。无输入参数。
- SHESD<sup>[4]</sup> 带有季节性的 ESD 检测算法。ESD 可以检测时间序列数据的多异常点。需要指定异常点比例的上界***k***最差的情况是至多49.9%。数据集的异常比例一般不超过5%
|参数名称|说明|是否必选|默认值|
|---|---|---|---|
|k|异常点在输入数据集中占比,范围是$`1\le K \le 49.9`$ |选填|5|
### 基于数据密度的检测方法
LOF<sup>[5]</sup>: 局部离群因子(LOF又叫局部异常因子)算法是Breunig于2000年提出的一种基于密度的局部离群点检测算法该方法适用于不同类簇密度分散情况迥异的数据。根据数据点周围的数据密集情况首先计算每个数据点的一个局部可达密度然后通过局部可达密度进一步计算得到每个数据点的一个离群因子该离群因子即标识了一个数据点的离群程度因子值越大表示离群程度越高因子值越小表示离群程度越低。最后输出离群程度最大的top(n)个点。
### 基于深度学习的检测方法
使用自动编码器的异常检测模型。可以对具有周期性的数据具有较好的检测结果。但是使用该模型需要针对输入的时序数据进行训练,同时将训练完成的模型部署到服务目录中,才能够运行与使用。
### 参考文献
1. https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule
2. https://en.wikipedia.org/wiki/Interquartile_range
3. Adikaram, K. K. L. B.; Hussein, M. A.; Effenberger, M.; Becker, T. (2015-01-14). "Data Transformation Technique to Improve the Outlier Detection Power of Grubbs's Test for Data Expected to Follow Linear Relation". Journal of Applied Mathematics. 2015: 19. doi:10.1155/2015/708948.
4. Hochenbaum, O. S. Vallis, and A. Kejariwal. 2017. Automatic Anomaly Detection in the Cloud Via Statistical Learning. arXiv preprint arXiv:1704.07706 (2017).
5. Breunig, M. M.; Kriegel, H.-P.; Ng, R. T.; Sander, J. (2000). LOF: Identifying Density-based Local Outliers (PDF). Proceedings of the 2000 ACM SIGMOD International Conference on Management of Data. SIGMOD. pp. 93104. doi:10.1145/335191.335388. ISBN 1-58113-217-4.

View File

@ -0,0 +1,167 @@
---
title: "addins"
sidebar_label: "addins"
---
本节说明如何将自己开发的新预测算法和异常检测算法整合到 TDengine 分析平台, 并能够通过 SQL 语句进行调用。
## 目录结构
![数据分析功能架构图](./pic/dir.png)
|目录|说明|
|---|---|
|taos|Python 源代码目录,其下包含了算法具体保存目录 algo放置杂项目录 misc 单元测试和集成测试目录 test。 algo目录下 ad 放置异常检测算法代码, fc 放置预测算法代码|
|script|是安装脚本和发布脚本放置目录|
|model|放置针对数据集完成的训练模型|
|cfg| 配置文件目录|
## 约定与限制
定义异常检测算法的 Python 代码文件 需放在 /taos/algo/ad 目录中,预测算法 Python 代码文件需要放在 /taos/algo/fc 目录中,以确保系统启动的时候能够正常加载对应目录下的 Python 文件。
### 类命名规范
算法类的名称需要 以下划线开始,以 Service 结尾。例如_KsigmaService 是 KSigma 异常检测算法的实现类。
### 类继承约定
异常检测算法需要从 `AbstractAnomalyDetectionService` 继承,并实现其核心抽象方法 `execute`.
预测算法需要从 `AbstractForecastService` 继承,同样需要实现其核心抽象方法 `execute`
### 类属性初始化
每个算法实现的类需要静态初始化两个类属性,分别是
`name`: 的触发调用关键词,全小写英文字母。
`desc`:该算法的描述信息。
### 核心方法输入与输出约定
`execute` 是算法处理的核心方法。调用该方法的时候, `self.list` 已经设置好输入数组。
异常检测输出结果
`execute` 的返回值是长度与 `self.list` 相同的数组,数组位置为 -1 的即为异常值点。例如:输入数组是 [2, 2, 2, 2, 100] 如果 100 是异常点,那么返回值是 [1, 1, 1, 1, -1]。
预测输出结果
对于预测算法, `AbstractForecastService` 的对象属性说明如下:
|属性名称|说明|默认值|
|---|---|---|
|period|输入时序数据的周期性,多少个数据点表示一个完整的周期。如果没有周期性,那么设置为 0 即可。| 0|
|start_ts|预测数据的开始时间| 0|
|time_step|预测结果的两个数据点之间时间间隔|0 |
|fc_rows|预测结果数量| 0 |
|return_conf|返回结果中是否包含执行区间范围,如果算法计算结果不包含置信区间,那么上界和下界与自身相同| 1|
|conf|执行区间分位数 0.05|
预测返回结果如下:
```python
return {
"rows": self.fc_rows, # 预测数据行数
"period": self.period, # 数据周期性,同输入
"algo": "holtwinters", # 预测使用的算法
"mse": mse, # 预测算法的 mse
"res": res # 结果数组 [时间戳数组, 预测结果数组, 预测结果执行区间下界数组,预测结果执行区间上界数组]
}
```
## 示例代码
```python
import numpy as np
from service import AbstractAnomalyDetectionService
# 算法实现类名称 需要以下划线 "_" 开始,并以 Service 结束,如下 _IqrService 是 IQR 异常检测算法的实现类。
class _IqrService(AbstractAnomalyDetectionService):
""" IQR algorithm 定义类,从 AbstractAnomalyDetectionService 继承,并实现 AbstractAnomalyDetectionService类的抽象函数 """
# 定义算法调用关键词全小写ASCII码(必须添加)
name = 'iqr'
# 该算法的描述信息(建议添加)
desc = """found the anomaly data according to the inter-quartile range"""
def __init__(self):
super().__init__()
def execute(self):
""" execute 是算法实现逻辑的核心实现,直接修改该实现即可 """
# self.list 是输入数值列list 类型,例如:[1,2,3,4,5]。设置 self.list 的方法在父类中已经进行了定义。实现自己的算法,修改该文件即可,以下代码使用自己的实现替换即可。
#lower = np.quantile(self.list, 0.25)
#upper = np.quantile(self.list, 0.75)
#min_val = lower - 1.5 * (upper - lower)
#max_val = upper + 1.5 * (upper - lower)
#threshold = [min_val, max_val]
# 返回值是与输入数值列长度相同的数据列,异常值对应位置是 -1。例如上述输入数据列返回数值列是 [1, 1, 1, 1, -1],表示 [5] 是异常值。
return [-1 if k < threshold[0] or k > threshold[1] else 1 for k in self.list]
def set_params(self, params):
"""该算法无需任何输入参数,直接重载父类该函数,不处理算法参数设置逻辑"""
pass
```
## 单元测试
在测试文件目录中的 anomaly_test.py 中增加单元测试用例。
```python
def test_iqr(self):
""" 测试 _IqrService 类 """
s = loader.get_service("iqr")
# 设置需要进行检测的输入数据
s.set_input_list(AnomalyDetectionTest.input_list)
# 测试 set_params 的处理逻辑
try:
s.set_params({"k": 2})
except ValueError as e:
self.assertEqual(1, 0)
r = s.execute()
# 绘制异常检测结果
draw_ad_results(AnomalyDetectionTest.input_list, r, "iqr")
# 检查结果
self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
```
## 需要模型的算法
针对特定数据集,进行模型训练的算法,在训练完成后。需要将训练得到的模型保存在 model 目录中。需要注意的是,针对每个算法,需要建立独立的文件夹。例如 auto_encoder 的训练算法在 model 目录下建立了, autoencoder的目录使用该算法针对不同数据集训练得到的模型均需要放置在该目录下。
训练完成后的模型,使用 joblib 进行保存。
并在 model 目录下建立对应的文件夹存放该模型。
保存模型的调用,可参考 encoder.py 的方式,用户通过调用 set_params 方法,并指定参数 {"model": "ad_encoder_keras"} 的方式,可以调用该模型进行计算。
具体的调用方式如下:
```python
def test_autoencoder_ad(self):
# 获取特定的算法服务
s = loader.get_service("ac")
data = self.__load_remote_data_for_ad()
# 设置异常检查的输入数据
s.set_input_list(data)
# 指定调用的模型,该模型是之前针对该数据集进行训练获得
s.set_params({"model": "ad_encoder_keras"})
# 执行检查动作,并返回结果
r = s.execute()
num_of_error = -(sum(filter(lambda x: x == -1, r)))
self.assertEqual(num_of_error, 109)
```

View File

@ -5,9 +5,314 @@ title: 数据分析功能
## 概述
TDengine 提供数据分析功能的扩展组件,通过引入 ANodeTDengine 能够支持时间序列的机器学习分析
下图展示了数据分析的技术架构
TDengine 通过 ANode(AnalysisNode) 是提供数据分析功能的扩展组件,通过 Restful 接口提供分析服务,从而拓展 TDengine 的功能,支持时间序列高级分析功能。
ANode 是无状态的数据分析节点,集群中可以存在多个 ANode节点相互之间没有关联。将 ANode 注册到 TDengine 集群以后,通过 SQL 语句即可调用并完成时序分析任务。
下图是数据分析的技术架构示意图
![数据分析功能架构图](./pic/data-analysis.png)
## 安装部署
### 环境准备
ANode 的要求节点上准备有 Python 3.10 及以上版本以及相应的Python包自动安装组件 Pip ,同时请确保能够正常连接互联网。
### 安装及卸载
使用专门的 ANode 安装包 TDengine-enterprise-anode-1.x.x.tar.gz 进行 ANode 的安装部署工作,安装过程与 TDengien 的安装流程一致。
```bash
tar -xzvf TDengine-enterprise-anode-1.0.0.tar.gz
cd TDengine-enterprise-anode-1.0.0
sudo ./install.sh
```
卸载 ANode执行命令 `rmtaosanode` 即可。
### 其他
为了避免 ANode 安装后影响目标节点现有的 Python 库。 ANode 使用 Python 虚拟环境运行,安装后的默认 Python 目录处于 `/var/lib/taos/taosanode/venv/`。为了避免反复安装虚拟环境带来的开销,卸载 ANode 并不会自动删除该虚拟环境,如果您确认不需要 Python 的虚拟环境,可以手动删除。
## 启动及停止服务
安装 ANode 以后,可以使用`systemctl`来管理 ANode 的服务。使用如下命令可以启动/停止/检查状态。
```bash
systemctl start taosanoded
systemctl stop taosanoded
systemctl status taosanoded
```
## 目录及配置说明
|目录/文件|说明|
|---------------|------|
|/usr/local/taos/taosanode/bin|可执行文件目录|
|/usr/local/taos/taosanode/resource|资源文件目录,链接到文件夹 /var/lib/taos/taosanode/resource/|
|/usr/local/taos/taosanode/lib|库文件目录|
|/var/lib/taos/taosanode/model/|模型文件目录,链接到文件夹 /var/lib/taos/taosanode/model|
|/var/log/taos/taosanode/|日志文件目录|
|/etc/taos/taosanode.ini|配置文件|
### 配置说明
Anode 提供的 RestFul 服务使用 uWSGI 驱动,因此 ANode 的配置和 uWSGI 的配置在同一个配置文件中,具体如下:
```ini
[uwsgi]
# charset
env=LC_ALL=en_US.UTF-8
# ip:port
http = 127.0.0.1:6050
# the local unix socket file than communicate to Nginx
#socket = 127.0.0.1:8001
#socket-timeout=10
# base directory
chdir = /usr/local/taos/taosanode/lib
# initialize python file
wsgi-file = /usr/local/taos/taosanode/lib/app.py
# invoke app model
callable = app
# auto remove unix Socket and pid file when stopping
vacuum = true
# socket exec model
#chmod-socket = 664
# uWSGI pid
uid=root
# uWSGI gid
gid=root
# main process
master = true
# the number of worker processes
processes = 2
# pid file
pidfile = /usr/local/taos/taosanode/uwsgi.pid
# enable threads
enable-threads=true
# the number of threads for each process
threads=2
# memory useage report
memory-report = true
reload-mercy = 10
# conflict with systemctl, so do NOT uncomment this
# daemonize = /var/log/taos/taosanode/taosanode.log
# set log
logto = /var/log/taos/taosanode/taosanode.log
# monitor server
stats = 127.0.0.1:8387
# python virtual environment directory
virtualenv = /usr/local/taos/taosanode/venv/
[taosanode]
# default app log file
app-log = /var/log/taos/taosanode/taosanode.app.log
# model storage directory
model-dir=/usr/local/taos/taosanode/model/
# default log level
log-level = DEBUG
```
**提示**
请勿设置 `daemonize` 参数,该参数会导致 uWSGI 与 systemctl 冲突,从而无法正常启动。
## ANode 基本操作
### 管理 ANode
#### 创建 ANode
```sql
CREATE ANODE {node_url}
```
node_url 是提供服务的 ANode 的 IP 和 PORT, 例如:`create anode 'http://localhost:6050'`。启动 ANode 以后如果不注册到 TDengine 集群中,无法提供正常的服务。不建议 ANode 注册到两个或多个集群中。
#### 查看 ANode
列出集群中所有的数据分析节点,包括其 `FQDN`, `PORT`, `STATUS`
```sql
SHOW ANODES;
```
#### 查看提供的时序数据分析服务
```SQL
SHOW ANODES FULL;
```
#### 强制刷新 TDengine 集群中分析算法缓存
```SQL
UPDATE ANODE {node_id}
UPDATE ALL ANODES
```
#### 删除 ANode
```sql
DROP ANODE {anode_id}
```
删除 ANode 只是将 ANode 从 TDengine 集群中删除,管理 ANode 的启停仍然需要使用`systemctl`命令。
### 时序数据分析功能
#### 白噪声检查
平台提供 Restful的服务检测输入时间序列是否是白噪声时间序列White Noise Data, WND白噪声时间序列及随机数序列。
此外,分析平台要求输入的数据不能是 , 因此针对的所有数据均默认进行 白噪声检查。当前白噪声检查采用通行的 `Ljung-Box`检验,`Ljung-Box` 统计量检查过程需要遍历整个输入序列并进行计算。
如果用户能够明确输入序列一定不是白噪声序列,那么可以通过输入参数,指定预测之前忽略该检查,从而节省分析过程的 CPU 计算资源。
同时支持独立地针对输入序列进行白噪声检测(该检测功能暂不独立对外开放)。
#### 数据重采样和时间戳对齐
数据分析平台支持将输入的数据进行重采样的预处理,从而确保输出结果按照用户指定的等间隔进行处理。处理过程分为两种类别:
- 数据时间戳对齐。由于真实数据时间可能并非严格按照查询指定的时间戳输入。此时数据平台将自动将数据的时间间隔按照指定的时间间隔进行对齐。例如有输入时间序列:[11, 22, 29, 41],用户指定时间间隔为 10那么该序列将被对齐重整为以下序列 [10, 20, 30, 40]。
- 数据时间重采样。用户输入的时间序列其采样频率超过了指定的查询需要获得结果的时间间隔,例如输入原始数据是 5 但是输出结果的频率是 10. [0 5 10 15 20 25 30],那么该输入数据列将重采用为间隔 为 10 的输入序列,其结果如下 [0, 10, 2030]。[5, 15, 25] 处的数据将被丢弃。
需要注意的是,数据输入平台不支持缺失数据补齐后进行的预测分析,如果输入时间序列数据[11, 22, 29, 49],并且用户要求的时间间隔为 10 重整对齐后的序列是 [10, 20, 30, 50] 那么该序列进行预测分析将返回错误。
#### 时序数据异常检测
异常检测是针对输入的时序数据,使用预设或用户指定的算法确定时间序列中**可能**出现异常时间序列点,对于时间序列中若干个连续的异常点,将自动合并成为一个连续的(闭区间)异常窗口。对于只有单个点的场景,异常窗口窗口退化成为一个起始时间和结束时间相同的点。
异常检测生成的异常窗口受检测算法和算法参数的共同影响,对于异常窗口范围内的数据,可以应用 TDengine 提供的聚合和标量函数进行查询或变换处理。
对于输入时间序列 (1, 20), (2, 22), (3, 91), (4, 120), (5, 18), (6, 19)。系统检测到 (3, 91), (4, 120) 为异常点,那么返回的异常窗口是闭区间 [3, 4]。
##### 语法
```SQL
ANOMALY_WINDOW(column_name, option_expr)
option_expr: {"
algo=expr1
[,wncheck=1|0]
[,expr2]
"}
```
1. `column`:进行时序数据异常检测的输入数据列,当前只支持单列输入,且只能是数值类型,不能是字符类型(例如:`NCHAR` `VARCHAR` `VARBINARY`等类型),**不支持函数表达式**。
2. `options`:字符串。其中使用 K/V 调用异常检测的算法,及与算法相关的参数。采用 逗号分隔的K/V字符串表示其中的字符串不需要使用单引号、双引号、或转意号等符号不能使用中文及其他宽字符。例如`algo=ksigma, k=2` 表示进行异常检测的算法是 ksigma该算法接受的输入参数是 2。
3. 异常检测的结果可以作为外层查询的子查询输入,在 `SELECT` 子句中使用的聚合函数或标量函数与其他类型的窗口查询相同。
4. 输入数据默认进行白噪声检查,如果检查结果是输入数据是白噪声,将不会有任何(异常)窗口信息返回。
**参数说明**
|参数|含义|默认值|
|---|---|---|
|algo|异常检测调用的算法|iqr|
|wncheck|对输入数据列是否进行白噪声检查|取值为0或者1默认值为 1表示进行白噪声检查|
异常检测的返回结果以窗口的形式呈现,因此窗口查询相关的伪列在这种场景下仍然可用。可以使用的伪列如下:
1. `_WSTART` 异常窗口开始时间戳
2. `_WEND`:异常窗口结束时间戳
3. `_WDURATION`:异常窗口持续时间
**示例**
```SQL
--- 使用 iqr 算法进行异常检测,检测列 i32 列。
SELECT _wstart, _wend, SUM(i32)
FROM ai.atb
ANOMALY_WINDOW(i32, "algo=iqr");
--- 使用 ksigma 算法进行异常检测,输入参数 k 值为 2检测列 i32 列
SELECT _wstart, _wend, SUM(i32)
FROM ai.atb
ANOMALY_WINDOW(i32, "algo=ksigma,k=2");
```
```
taos> SELECT _wstart, _wend, count(*) FROM ai.atb ANOMAYL_WINDOW(i32);
_wstart | _wend | count(*) |
============================================================================
2020-01-01 00:00:16.000 | 2020-01-01 00:00:16.001 | 1 |
Query OK, 1 row(s) in set (0.028946s)
```
**可用异常检测算法**
- iqr
- ksigma
- grubbs
- lof
- shesd
- tac
#### 时序数据预测
数据预测以一段训练数据作为输入,预测接下来一个连续时间区间内,时序数据的趋势。
##### 语法
```SQL
FORECAST(column_expr, option_expr)
option_expr: {"
algo=expr1
[,wncheck=1|0]
[,conf=conf_val]
[,every=every_val]
[,rows=rows_val]
[,start=start_ts_val]
[,expr2]
"}
```
1. `column_expr`:预测的时序数据列。与异常检测相同,只支持数值类型输入。
2. `options`:异常检测函数的参数,使用规则与 anomaly_window 相同。预测还支持`conf`, `every`, `rows`, `start`, `rows` 几个参数,其含义如下:
**参数说明**
|参数|含义|默认值|
|---|---|---|
|algo|预测分析使用的算法|holtwinters|
|wncheck|白噪声white noise data检查|默认值为 10 表示不进行检查|
|conf|预测数据的置信区间范围 ,取值范围[0, 100]|95|
|every|预测数据的采样间隔|输入数据的采样间隔|
|start|预测结果的开始时间戳|输入数据最后一个时间戳加上一个采样时间段|
|rows|预测结果的记录数|10|
1. 预测查询结果新增了三个伪列,具体如下: `_FROWTS`:预测结果的时间戳、`_FLOW`:置信区间下界、`_FHIGH`:置信区间上界, 对于没有置信区间的预测算法,其置信区间同预测结果
2. 更改参数 `START`:返回预测结果的起始时间,改变这个起始时间不会影响返回的预测数值,只影响起始时间。
3. `EVERY`:可以与输入数据的采样频率不同。采样频率只能低于或等于输入数据采样频率,不能**高于**输入数据的采样频率。
4. 对于某些不需要计算置信区间的算法,即使指定了置信区间,返回的结果中其上下界退化成为一个点。
**示例**
```SQL
--- 使用 arima 算法进行预测,预测结果是 10 条记录(默认值),数据进行白噪声检查,默认置信区间 95%.
SELECT _flow, _fhigh, _frowts, FORECAST(i32, "algo=arima")
FROM ai.ftb;
--- 使用 arima 算法进行预测输入数据的是周期数据每10个采样点是一个周期。返回置信区间是 95%.
SELECT _flow, _fhigh, _frowts, FORECAST(i32, "algo=arima,alpha=95,period=10")
FROM ai.ftb;
```
```
taos> select _flow, _fhigh, _frowts, forecast(i32) from ai.ftb;
_flow | _fhigh | _frowts | forecast(i32) |
========================================================================================
10.5286684 | 41.8038254 | 2020-01-01 00:01:35.001 | 26 |
-21.9861946 | 83.3938904 | 2020-01-01 00:01:36.001 | 30 |
-78.5686035 | 144.6729126 | 2020-01-01 00:01:37.001 | 33 |
-154.9797363 | 230.3057709 | 2020-01-01 00:01:38.001 | 37 |
-253.9852905 | 337.6083984 | 2020-01-01 00:01:39.001 | 41 |
-375.7857971 | 466.4594727 | 2020-01-01 00:01:40.001 | 45 |
-514.8043823 | 622.4426270 | 2020-01-01 00:01:41.001 | 53 |
-680.6343994 | 796.2861328 | 2020-01-01 00:01:42.001 | 57 |
-868.4956665 | 992.8603516 | 2020-01-01 00:01:43.001 | 62 |
-1076.1566162 | 1214.4498291 | 2020-01-01 00:01:44.001 | 69 |
```
**可用预测算法**
- arima
- holtwinters

Binary file not shown.

Before

Width:  |  Height:  |  Size: 62 KiB

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.1 KiB

View File

@ -30,6 +30,7 @@ database_option: {
| SINGLE_STABLE {0 | 1}
| TABLE_PREFIX value
| TABLE_SUFFIX value
| DNODES value
| TSDB_PAGESIZE value
| WAL_LEVEL {1 | 2}
| WAL_FSYNC_PERIOD value
@ -63,19 +64,20 @@ database_option: {
- MAXROWS文件块中记录的最大条数默认为 4096 条。
- MINROWS文件块中记录的最小条数默认为 100 条。
- KEEP表示数据文件保存的天数缺省值为 3650取值范围 [1, 365000]且必须大于或等于3倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m分钟、h小时和 d三个单位。也可以不写单位如 KEEP 50此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 \<= keep 1 \<= keep 2如 KEEP 100h,100d,3650d; 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。
- STT_TRIGGER表示落盘文件触发文件合并的个数。默认为 1范围 1 到 16。对于少表高频场景此参数建议使用默认配置或较小的值而对于多表低频场景,此参数建议配置较大的值。
- STT_TRIGGER表示落盘文件触发文件合并的个数。开源版本固定为 1企业版本可设置范围为 1 到 16。对于少表高频写入场景此参数建议使用默认配置而对于多表低频写入场景,此参数建议配置较大的值。
- SINGLE_STABLE表示此数据库中是否只可以创建一个超级表用于超级表列非常多的情况。
- 0表示可以创建多张超级表。
- 1表示只可以创建一张超级表。
- TABLE_PREFIX当其为正值时在决定把一个表分配到哪个 vgroup 时要忽略表名中指定长度的前缀;当其为负值时,在决定把一个表分配到哪个 vgroup 时只使用表名中指定长度的前缀;例如,假定表名为 "v30001",当 TSDB_PREFIX = 2 时 使用 "0001" 来决定分配到哪个 vgroup ,当 TSDB_PREFIX = -2 时使用 "v3" 来决定分配到哪个 vgroup
- TABLE_SUFFIX当其为正值时在决定把一个表分配到哪个 vgroup 时要忽略表名中指定长度的后缀;当其为负值时,在决定把一个表分配到哪个 vgroup 时只使用表名中指定长度的后缀;例如,假定表名为 "v30001",当 TSDB_SUFFIX = 2 时 使用 "v300" 来决定分配到哪个 vgroup ,当 TSDB_SUFFIX = -2 时使用 "01" 来决定分配到哪个 vgroup。
- TSDB_PAGESIZE一个 VNODE 中时序数据存储引擎的页大小,单位为 KB默认为 4 KB。范围为 1 到 16384即 1 KB到 16 MB。
- DNODES指定 VNODE 所在的 DNODE 列表,如 '1,2,3',以逗号区分且字符间不能有空格,仅企业版支持。
- WAL_LEVELWAL 级别,默认为 1。
- 1写 WAL但不执行 fsync。
- 2写 WAL而且执行 fsync。
- WAL_FSYNC_PERIOD当 WAL_LEVEL 参数设置为 2 时,用于设置落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。
- WAL_RETENTION_PERIOD: 为了数据订阅消费需要WAL日志文件额外保留的最大时长策略。WAL日志清理不受订阅客户端消费状态影响。单位为 s。默认为 3600表示在 WAL 保留最近 3600 秒的数据,请根据数据订阅的需要修改这个参数为适当值。
- WAL_RETENTION_SIZE为了数据订阅消费需要WAL日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0表示累计大小无上限。
- WAL_RETENTION_PERIOD: 为了数据订阅消费,需要 WAL 日志文件额外保留的最大时长策略。WAL 日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 3600表示在 WAL 保留最近 3600 秒的数据,请根据数据订阅的需要修改这个参数为适当值。
- WAL_RETENTION_SIZE为了数据订阅消费需要 WAL 日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0表示累计大小无上限。
### 创建数据库示例
```sql

View File

@ -11,7 +11,7 @@ description: 使用标签索引提升查询性能
创建索引的语法如下
```sql
CREATE INDEX index_name ON tbl_name (tagColName
CREATE INDEX index_name ON tbl_name (tagColName)
```
其中 `index_name` 为索引名称, `tbl_name` 为超级表名称,`tagColName` 为要在其上建立索引的 tag 列的名称。`tagColName` 的类型不受限制,即任何类型的 tag 列都可以建立索引。

View File

@ -26,6 +26,7 @@ Node.js 连接器目前仅支持 WebSocket 连接器, 其通过 taosAdapter
| Node.js 连接器 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------: | :----------------: |
| 3.1.1 | 优化了数据传输性能 | 3.3.2.0 及更高版本 |
| 3.1.0 | 新版本发布,支持 WebSocket 连接 | 3.2.0.0 及更高版本 |
## 处理异常

View File

@ -1339,6 +1339,7 @@ typedef struct {
char* sql;
int8_t withArbitrator;
int8_t encryptAlgorithm;
char dnodeListStr[TSDB_DNODE_LIST_LEN];
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);

View File

@ -238,12 +238,26 @@ typedef struct {
case TSDB_DATA_TYPE_UBIGINT: \
snprintf(_output, (int32_t)(_outputBytes), "%" PRIu64, *(uint64_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
case TSDB_DATA_TYPE_FLOAT: { \
int32_t n = snprintf(_output, (int32_t)(_outputBytes), "%f", *(float *)(_input)); \
if (n >= (_outputBytes)) { \
n = snprintf(_output, (int32_t)(_outputBytes), "%.7e", *(float *)(_input)); \
if (n >= (_outputBytes)) { \
snprintf(_output, (int32_t)(_outputBytes), "%f", *(float *)(_input)); \
} \
} \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
} \
case TSDB_DATA_TYPE_DOUBLE: { \
int32_t n = snprintf(_output, (int32_t)(_outputBytes), "%f", *(double *)(_input)); \
if (n >= (_outputBytes)) { \
snprintf(_output, (int32_t)(_outputBytes), "%.15e", *(double *)(_input)); \
if (n >= (_outputBytes)) { \
snprintf(_output, (int32_t)(_outputBytes), "%f", *(double *)(_input)); \
} \
} \
break; \
} \
case TSDB_DATA_TYPE_UINT: \
snprintf(_output, (int32_t)(_outputBytes), "%u", *(uint32_t *)(_input)); \
break; \

View File

@ -72,6 +72,7 @@ typedef struct SDatabaseOptions {
int8_t compressionLevel;
int8_t encryptAlgorithm;
int32_t daysPerFile;
char dnodeListStr[TSDB_DNODE_LIST_LEN];
char encryptAlgorithmStr[TSDB_ENCRYPT_ALGO_STR_LEN];
SValueNode* pDaysPerFile;
int32_t fsyncPeriod;

View File

@ -352,6 +352,8 @@ int32_t taosGetErrSize();
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
#define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B)
#define TSDB_CODE_MND_INVALID_WAL_LEVEL TAOS_DEF_ERROR_CODE(0, 0x039C)
#define TSDB_CODE_MND_INVALID_DNODE_LIST_FMT TAOS_DEF_ERROR_CODE(0, 0x039D)
#define TSDB_CODE_MND_DNODE_LIST_REPEAT TAOS_DEF_ERROR_CODE(0, 0x039E)
// mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)

View File

@ -152,15 +152,12 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
// for internal usage
int32_t getWordLength(char type);
#ifdef __AVX2__
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output);
int32_t tsDecompressDoubleImpAvx2(const char *input, int32_t nelements, char *output);
#endif
#ifdef __AVX512VL__
void tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian);
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
#endif
int32_t tsDecompressTimestampAvx2(const char *input, int32_t nelements, char *output, bool bigEndian);
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool bigEndian);
/*************************************************************************
* REGULAR COMPRESSION 2

View File

@ -41,6 +41,7 @@ extern const int32_t TYPE_BYTES[21];
#define FLOAT_BYTES sizeof(float)
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *)
#define M256_BYTES 32
#define TSDB_KEYSIZE sizeof(TSKEY)
#define TSDB_NCHAR_SIZE sizeof(TdUcs4)
@ -452,6 +453,7 @@ typedef enum ELogicConditionType {
#define TSDB_CACHE_MODEL_LAST_ROW 1
#define TSDB_CACHE_MODEL_LAST_VALUE 2
#define TSDB_CACHE_MODEL_BOTH 3
#define TSDB_DNODE_LIST_LEN 256
#define TSDB_ENCRYPT_ALGO_STR_LEN 16
#define TSDB_ENCRYPT_ALGO_NONE_STR "none"
#define TSDB_ENCRYPT_ALGO_SM4_STR "sm4"

View File

@ -363,6 +363,9 @@ int8_t validColEncode(uint8_t type, uint8_t l1) {
if (l1 == TSDB_COLVAL_ENCODE_NOCHANGE) {
return 1;
}
if (l1 == TSDB_COLVAL_ENCODE_DISABLED) {
return 1;
}
if (type == TSDB_DATA_TYPE_BOOL) {
return TSDB_COLVAL_ENCODE_RLE == l1 ? 1 : 0;
} else if (type >= TSDB_DATA_TYPE_TINYINT && type <= TSDB_DATA_TYPE_INT) {

View File

@ -3874,6 +3874,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->s3ChunkSize));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->s3KeepLocal));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->s3Compact));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dnodeListStr));
tEndEncode(&encoder);
@ -3962,6 +3963,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->s3Compact));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dnodeListStr));
}
tEndDecode(&decoder);
_exit:

View File

@ -741,7 +741,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vgId = alterReq.vgId;
dInfo(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"vgId:%d, vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d changeVersion:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);

View File

@ -37,6 +37,7 @@ const char *mndGetDbStr(const char *src);
const char *mndGetStableStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
int32_t mndCheckDbDnodeList(SMnode *pMnode, char *db, char *dnodeListStr, SArray *dnodeList);
#ifdef __cplusplus
}

View File

@ -70,7 +70,7 @@ typedef enum {
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
MND_OPER_SHOW_VARIABLES,
MND_OPER_SUBSCRIBE,
MND_OPER_CREATE_TOPIC,
MND_OPER_DROP_TOPIC,

View File

@ -35,9 +35,9 @@ void mndSortVnodeGid(SVgObj *pVgroup);
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId);
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId, SArray *dnodeList);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList);
int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);

View File

@ -462,8 +462,8 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->cacheLast < TSDB_CACHE_MODEL_NONE || pCfg->cacheLast > TSDB_CACHE_MODEL_BOTH) return code;
if (pCfg->hashMethod != 1) return code;
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return code;
code = TSDB_CODE_MND_NO_ENOUGH_DNODES;
TAOS_RETURN(code);
}
if (pCfg->walRetentionPeriod < TSDB_DB_MIN_WAL_RETENTION_PERIOD) return code;
if (pCfg->walRetentionSize < TSDB_DB_MIN_WAL_RETENTION_SIZE) return code;
@ -746,7 +746,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
TAOS_RETURN(code);
}
static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) {
static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser, SArray *dnodeList) {
int32_t code = 0;
SUserObj newUserObj = {0};
SDbObj dbObj = {0};
@ -823,7 +823,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
SVgObj *pVgroups = NULL;
if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups)) != 0) {
if ((code = mndAllocVgroup(pMnode, &dbObj, &pVgroups, dnodeList)) != 0) {
mError("db:%s, failed to create, alloc vgroup failed, since %s", pCreate->db, terrstr());
TAOS_RETURN(code);
}
@ -925,6 +925,17 @@ _exit:
TAOS_RETURN(code);
}
#ifndef TD_ENTERPRISE
int32_t mndCheckDbDnodeList(SMnode *pMnode, char *db, char *dnodeListStr, SArray *dnodeList) {
if (dnodeListStr[0] != 0) {
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
return terrno;
} else {
return 0;
}
}
#endif
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
@ -932,6 +943,10 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SCreateDbReq createReq = {0};
SArray *dnodeList = NULL;
dnodeList = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(int32_t));
TSDB_CHECK_NULL(dnodeList, code, lino, _OVER, TSDB_CODE_OUT_OF_MEMORY);
TAOS_CHECK_GOTO(tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
#ifdef WINDOWS
@ -975,9 +990,11 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
TAOS_CHECK_GOTO(mndCheckDbEncryptKey(pMnode, &createReq), &lino, _OVER);
TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, createReq.db, createReq.dnodeListStr, dnodeList), &lino, _OVER);
TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER);
TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser), &lino, _OVER);
TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser, dnodeList), &lino, _OVER);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
@ -994,6 +1011,7 @@ _OVER:
mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
tFreeSCreateDbReq(&createReq);
taosArrayDestroy(dnodeList);
TAOS_RETURN(code);
}
@ -1168,7 +1186,9 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
TSDB_CHECK_NULL(pArray, code, lino, _err, TSDB_CODE_OUT_OF_MEMORY);
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);

View File

@ -1068,7 +1068,7 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
SShowVariablesRsp rsp = {0};
int32_t code = -1;
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIBALES) != 0) {
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_SHOW_VARIABLES) != 0) {
goto _OVER;
}

View File

@ -717,11 +717,28 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
SDnodeObj *pDnode = pObj;
SArray *pArray = p1;
int32_t exceptDnodeId = *(int32_t *)p2;
SArray *dnodeList = p3;
if (exceptDnodeId == pDnode->id) {
return true;
}
if (dnodeList != NULL) {
int32_t dnodeListSize = taosArrayGetSize(dnodeList);
if (dnodeListSize > 0) {
bool inDnodeList = false;
for (int32_t index = 0; index < dnodeListSize; ++index) {
int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
if (pDnode->id == dnodeId) {
inDnodeList = true;
}
}
if (!inDnodeList) {
return true;
}
}
}
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool isMnode = mndIsMnode(pMnode, pDnode->id);
@ -741,7 +758,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
return true;
}
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
SSdb *pSdb = pMnode->pSdb;
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
@ -752,7 +769,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
}
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL);
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList);
mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
@ -845,7 +862,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = 0;
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
if (pArray == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
@ -868,7 +885,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
return 0;
}
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList) {
int32_t code = -1;
SArray *pArray = NULL;
SVgObj *pVgroups = NULL;
@ -879,7 +896,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
goto _OVER;
}
pArray = mndBuildDnodesArray(pMnode, 0);
pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
if (pArray == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
@ -2062,7 +2079,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
int32_t code = 0;
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId);
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
if (pArray == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
@ -3140,7 +3157,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
int32_t code = -1;
STrans *pTrans = NULL;
SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
int32_t numOfStreams = 0;
if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
@ -3506,7 +3523,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
sdbRelease(pMnode->pSdb, pDnode);
}
pArray = mndBuildDnodesArray(pMnode, 0);
pArray = mndBuildDnodesArray(pMnode, 0, NULL);
if (pArray == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;

View File

@ -1264,6 +1264,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
blockDataCleanup(pDest);
int32_t rows = taosArrayGetSize(pParInfo->rowIds);
code = blockDataEnsureCapacity(pDest, rows);
QUERY_CHECK_CODE(code, lino, _end);
SSDataBlock* pSrc = pInfo->pInputDataBlock;
for (int32_t i = 0; i < rows; i++) {
int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);

View File

@ -675,11 +675,13 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp
.pData = taosMemoryCalloc(1, pExprInfo->base.resSchema.bytes)};
if (!key.pData) {
taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
pInfo->pPrevGroupKeys = NULL;
return terrno;
}
if (NULL == taosArrayPush(pInfo->pPrevGroupKeys, &key)) {
taosMemoryFree(key.pData);
taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
pInfo->pPrevGroupKeys = NULL;
return terrno;
}
}

View File

@ -1,6 +1,10 @@
aux_source_directory(src FUNCTION_SRC)
aux_source_directory(src/detail FUNCTION_SRC_DETAIL)
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
set_source_files_properties(src/detail/tminmaxavx.c PROPERTIES COMPILE_FLAGS -mavx2)
ENDIF()
add_library(function STATIC ${FUNCTION_SRC} ${FUNCTION_SRC_DETAIL})
target_include_directories(
function

View File

@ -25,6 +25,11 @@ extern "C" {
#include "functionResInfoInt.h"
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems);
int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res);
int32_t floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res);
int32_t doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res);
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);

View File

@ -72,173 +72,6 @@
#define GET_INVOKE_INTRINSIC_THRESHOLD(_bits, _bytes) ((_bits) / ((_bytes) << 3u))
#ifdef __AVX2__
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
*width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width);
*rounds = numOfRows / (*width);
}
#define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec))
#define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec))
#define CMP_TYPE_MIN_MAX(type, cmp) \
const type* p = pData; \
__m256i initVal = _mm256_lddqu_si256((__m256i*)p); \
p += width; \
for (int32_t i = 1; i < (rounds); ++i) { \
__m256i next = _mm256_lddqu_si256((__m256i*)p); \
initVal = CMP_FUNC_##cmp##_##type(initVal, next); \
p += width; \
} \
const type* q = (const type*)&initVal; \
type* v = (type*)res; \
EXTRACT_##cmp##_VAL(q, p, width, remain, *v)
static void i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
const int8_t* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int8_t _mm256_min_epi8
#define CMP_FUNC_MAX_int8_t _mm256_max_epi8
#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8
#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MIN);
}
}
}
static void i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int16_t _mm256_min_epi16
#define CMP_FUNC_MAX_int16_t _mm256_max_epi16
#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16
#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MIN);
}
}
}
static void i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int32_t _mm256_min_epi32
#define CMP_FUNC_MAX_int32_t _mm256_max_epi32
#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32
#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MIN);
}
}
}
static void floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) {
const float* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width);
__m256 next;
__m256 initVal = _mm256_loadu_ps(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_max_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_min_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
}
static void doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) {
const double* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width);
__m256d next;
__m256d initVal = _mm256_loadu_pd(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_max_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_min_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
}
#endif
static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, int32_t numOfRows, bool isStr) {
int32_t i = start;
@ -255,13 +88,14 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
pBuf->v = ((const int8_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= sizeof(__m256i)) {
i8VectorCmpAVX2(data + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int8_t) >= M256_BYTES) {
int32_t code = i8VectorCmpAVX2(((char*)data) + start * sizeof(int8_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (signVal) {
const int8_t* p = (const int8_t*)data;
int8_t* v = (int8_t*)&pBuf->v;
@ -281,7 +115,6 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
}
pBuf->assign = true;
}
@ -292,13 +125,14 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = ((const int16_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= sizeof(__m256i)) {
i16VectorCmpAVX2(data + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int16_t) >= M256_BYTES) {
int32_t code = i16VectorCmpAVX2(((char*)data) + start * sizeof(int16_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (signVal) {
const int16_t* p = (const int16_t*)data;
int16_t* v = (int16_t*)&pBuf->v;
@ -318,7 +152,6 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
}
pBuf->assign = true;
}
@ -329,13 +162,14 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = ((const int32_t*)data)[start];
}
#ifdef __AVX2__
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= sizeof(__m256i)) {
i32VectorCmpAVX2(data + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v);
} else {
#else
if (true) {
#endif
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(int32_t) >= M256_BYTES) {
int32_t code = i32VectorCmpAVX2(((char*)data) + start * sizeof(int32_t), numOfRows, isMinFunc, signVal, &pBuf->v);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (signVal) {
const int32_t* p = (const int32_t*)data;
int32_t* v = (int32_t*)&pBuf->v;
@ -355,7 +189,6 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *v, p);
}
}
}
pBuf->assign = true;
}
@ -397,19 +230,19 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
*val = pData[start];
}
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(float) >= sizeof(__m256i)) {
floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
#else
if (true) {
#endif
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(float) >= M256_BYTES) {
int32_t code = floatVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
}
pBuf->assign = true;
}
@ -422,19 +255,19 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
*val = pData[start];
}
#ifdef __AVX2__
if (tsAVXSupported && tsSIMDEnable && numOfRows * sizeof(double) >= sizeof(__m256i)) {
doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
} else {
#else
if (true) {
#endif
if (tsAVX2Supported && tsSIMDEnable && numOfRows * sizeof(double) >= M256_BYTES) {
int32_t code = doubleVectorCmpAVX2(pData + start, numOfRows, isMinFunc, val);
if (code == TSDB_CODE_SUCCESS) {
pBuf->assign = true;
return;
}
}
if (isMinFunc) { // min
__COMPARE_EXTRACT_MIN(start, start + numOfRows, *val, pData);
} else { // max
__COMPARE_EXTRACT_MAX(start, start + numOfRows, *val, pData);
}
}
pBuf->assign = true;
}

View File

@ -0,0 +1,227 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "builtinsimpl.h"
#ifdef __AVX2__
static void calculateRounds(int32_t numOfRows, int32_t bytes, int32_t* remainder, int32_t* rounds, int32_t* width) {
const int32_t bitWidth = 256;
*width = (bitWidth >> 3u) / bytes;
*remainder = numOfRows % (*width);
*rounds = numOfRows / (*width);
}
#define __COMPARE_EXTRACT_MIN(start, end, val, _data) \
for (int32_t i = (start); i < (end); ++i) { \
if ((val) > (_data)[i]) { \
(val) = (_data)[i]; \
} \
}
#define __COMPARE_EXTRACT_MAX(start, end, val, _data) \
for (int32_t i = (start); i < (end); ++i) { \
if ((val) < (_data)[i]) { \
(val) = (_data)[i]; \
} \
}
#define EXTRACT_MAX_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MAX(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MAX(0, (_remain), (_v), (_sec))
#define EXTRACT_MIN_VAL(_first, _sec, _width, _remain, _v) \
__COMPARE_EXTRACT_MIN(0, (_width), (_v), (_first)) \
__COMPARE_EXTRACT_MIN(0, (_remain), (_v), (_sec))
#define CMP_TYPE_MIN_MAX(type, cmp) \
const type* p = pData; \
__m256i initVal = _mm256_lddqu_si256((__m256i*)p); \
p += width; \
for (int32_t i = 1; i < (rounds); ++i) { \
__m256i next = _mm256_lddqu_si256((__m256i*)p); \
initVal = CMP_FUNC_##cmp##_##type(initVal, next); \
p += width; \
} \
const type* q = (const type*)&initVal; \
type* v = (type*)res; \
EXTRACT_##cmp##_VAL(q, p, width, remain, *v)
#endif
int32_t i8VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
const int8_t* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int8_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int8_t _mm256_min_epi8
#define CMP_FUNC_MAX_int8_t _mm256_max_epi8
#define CMP_FUNC_MIN_uint8_t _mm256_min_epu8
#define CMP_FUNC_MAX_uint8_t _mm256_max_epu8
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int8_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint8_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t i16VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int16_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int16_t _mm256_min_epi16
#define CMP_FUNC_MAX_int16_t _mm256_max_epi16
#define CMP_FUNC_MIN_uint16_t _mm256_min_epu16
#define CMP_FUNC_MAX_uint16_t _mm256_max_epu16
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int16_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint16_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t i32VectorCmpAVX2(const void* pData, int32_t numOfRows, bool isMinFunc, bool signVal, int64_t* res) {
#ifdef __AVX2__
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(int32_t), &remain, &rounds, &width);
#define CMP_FUNC_MIN_int32_t _mm256_min_epi32
#define CMP_FUNC_MAX_int32_t _mm256_max_epi32
#define CMP_FUNC_MIN_uint32_t _mm256_min_epu32
#define CMP_FUNC_MAX_uint32_t _mm256_max_epu32
if (!isMinFunc) { // max function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MAX);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MAX);
}
} else { // min function
if (signVal) {
CMP_TYPE_MIN_MAX(int32_t, MIN);
} else {
CMP_TYPE_MIN_MAX(uint32_t, MIN);
}
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t floatVectorCmpAVX2(const float* pData, int32_t numOfRows, bool isMinFunc, float* res) {
#ifdef __AVX2__
const float* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(float), &remain, &rounds, &width);
__m256 next;
__m256 initVal = _mm256_loadu_ps(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_max_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_ps(p);
initVal = _mm256_min_ps(initVal, next);
p += width;
}
const float* q = (const float*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}
int32_t doubleVectorCmpAVX2(const double* pData, int32_t numOfRows, bool isMinFunc, double* res) {
#ifdef __AVX2__
const double* p = pData;
int32_t width, remain, rounds;
calculateRounds(numOfRows, sizeof(double), &remain, &rounds, &width);
__m256d next;
__m256d initVal = _mm256_loadu_pd(p);
p += width;
if (!isMinFunc) { // max function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_max_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MAX_VAL(q, p, width, remain, *res)
} else { // min function
for (int32_t i = 1; i < rounds; ++i) {
next = _mm256_loadu_pd(p);
initVal = _mm256_min_pd(initVal, next);
p += width;
}
// let sum up the final results
const double* q = (const double*)&initVal;
EXTRACT_MIN_VAL(q, p, width, remain, *res)
}
return TSDB_CODE_SUCCESS;
#else
uError("unable run %s without avx2 instructions", __func__);
return TSDB_CODE_OPS_NOT_SUPPORT;
#endif
}

View File

@ -175,8 +175,8 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t len = pSrc->node.resType.bytes + 1;
pDst->datum.p = taosMemoryCalloc(1, len);
int32_t len = varDataTLen(pSrc->datum.p);
pDst->datum.p = taosMemoryCalloc(1, len + 1);
if (NULL == pDst->datum.p) {
return terrno;
}

View File

@ -69,6 +69,7 @@ typedef enum EDatabaseOptionType {
DB_OPTION_S3_COMPACT,
DB_OPTION_KEEP_TIME_OFFSET,
DB_OPTION_ENCRYPT_ALGORITHM,
DB_OPTION_DNODES,
} EDatabaseOptionType;
typedef enum ETableOptionType {

View File

@ -28,6 +28,8 @@ extern "C" {
#define QUERY_SMA_OPTIMIZE_DISABLE 0
#define QUERY_SMA_OPTIMIZE_ENABLE 1
#define QUERY_NUMBER_MAX_DISPLAY_LEN 65
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData);
int32_t continueCreateTbFromFile(SParseContext* pCxt, SQuery** pQuery);
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);

View File

@ -286,6 +286,7 @@ db_options(A) ::= db_options(B) S3_KEEPLOCAL NK_VARIABLE(C).
db_options(A) ::= db_options(B) S3_COMPACT NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_S3_COMPACT, &C); }
db_options(A) ::= db_options(B) KEEP_TIME_OFFSET NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_KEEP_TIME_OFFSET, &C); }
db_options(A) ::= db_options(B) ENCRYPT_ALGORITHM NK_STRING(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_ENCRYPT_ALGORITHM, &C); }
db_options(A) ::= db_options(B) DNODES NK_STRING(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DNODES, &C); }
alter_db_options(A) ::= alter_db_option(B). { A = createAlterDatabaseOptions(pCxt); A = setAlterDatabaseOption(pCxt, A, &B); }
alter_db_options(A) ::= alter_db_options(B) alter_db_option(C). { A = setAlterDatabaseOption(pCxt, B, &C); }

View File

@ -1799,6 +1799,7 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->s3Compact = TSDB_DEFAULT_S3_COMPACT;
pOptions->withArbitrator = TSDB_DEFAULT_DB_WITH_ARBITRATOR;
pOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
pOptions->dnodeListStr[0] = 0;
return (SNode*)pOptions;
_err:
return NULL;
@ -1842,6 +1843,7 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
pOptions->s3Compact = -1;
pOptions->withArbitrator = -1;
pOptions->encryptAlgorithm = -1;
pOptions->dnodeListStr[0] = 0;
return (SNode*)pOptions;
_err:
return NULL;
@ -1981,6 +1983,14 @@ static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, ED
COPY_STRING_FORM_STR_TOKEN(pDbOptions->encryptAlgorithmStr, (SToken*)pVal);
pDbOptions->encryptAlgorithm = TSDB_DEFAULT_ENCRYPT_ALGO;
break;
case DB_OPTION_DNODES:
if (((SToken*)pVal)->n >= TSDB_DNODE_LIST_LEN) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "the dnode list is too long (should less than %d)",
TSDB_DNODE_LIST_LEN);
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
} else {
COPY_STRING_FORM_STR_TOKEN(pDbOptions->dnodeListStr, (SToken*)pVal);
}
default:
break;
}

View File

@ -3311,25 +3311,26 @@ static int32_t selectCommonType(SDataType* commonType, const SDataType* newType)
} else {
resultType = gDisplyTypes[type2][type1];
}
if (resultType == -1) {
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
if (commonType->type == newType->type) {
commonType->bytes = TMAX(commonType->bytes, newType->bytes);
return TSDB_CODE_SUCCESS;
}
if (resultType == commonType->type) {
return TSDB_CODE_SUCCESS;
}
if (resultType == newType->type) {
*commonType = *newType;
return TSDB_CODE_SUCCESS;
}
if ((resultType == TSDB_DATA_TYPE_VARCHAR) && (IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), QUERY_NUMBER_MAX_DISPLAY_LEN);
} else if ((resultType == TSDB_DATA_TYPE_NCHAR) && (IS_MATHABLE_TYPE(commonType->type) || IS_MATHABLE_TYPE(newType->type))) {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), QUERY_NUMBER_MAX_DISPLAY_LEN * TSDB_NCHAR_SIZE);
} else {
commonType->bytes = TMAX(TMAX(commonType->bytes, newType->bytes), TYPE_BYTES[resultType]);
if (resultType == TSDB_DATA_TYPE_VARCHAR && (IS_FLOAT_TYPE(commonType->type) || IS_FLOAT_TYPE(newType->type))) {
commonType->bytes += TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE];
}
commonType->type = resultType;
return TSDB_CODE_SUCCESS;
}
@ -7556,6 +7557,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->ignoreExist = pStmt->ignoreExists;
pReq->withArbitrator = pStmt->pOptions->withArbitrator;
pReq->encryptAlgorithm = pStmt->pOptions->encryptAlgorithm;
tstrncpy(pReq->dnodeListStr, pStmt->pOptions->dnodeListStr, TSDB_DNODE_LIST_LEN);
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
}

File diff suppressed because it is too large Load Diff

View File

@ -2085,7 +2085,8 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
(void)memcpy(varDataVal(output), convBuf, len);
varDataSetLen(output, len);
} else {
NUM_TO_STRING(inputType, input, bufSize, buf);
int32_t outputSize = (outputLen - VARSTR_HEADER_SIZE) < bufSize ? (outputLen - VARSTR_HEADER_SIZE + 1): bufSize;
NUM_TO_STRING(inputType, input, outputSize, buf);
int32_t len = (int32_t)strlen(buf);
len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE);
(void)memcpy(varDataVal(output), buf, len);

View File

@ -1031,23 +1031,23 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*GEOM*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0};
int8_t gDisplyTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/*NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIME NCHA UTINY USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, -1, -1, -1, 8,
/*BOOL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 5, 10, 11, 12, 13, 14, 8, -1, -1, -1, -1, 8,
/*TINY*/ 0, 0, 2, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, 8, -1, -1, -1, -1, 8,
/*SMAL*/ 0, 0, 0, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, 8, -1, -1, -1, -1, 8,
/*INT */ 0, 0, 0, 0, 4, 5, 8, 8, 8, 5, 10, 4, 4, 5, 8, 8, -1, -1, -1, -1, 8,
/*BIGI*/ 0, 0, 0, 0, 0, 5, 8, 8, 8, 5, 10, 5, 5, 5, 8, 8, -1, -1, -1, -1, 8,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 6, 7, 8, 8, 10, 8, 8, 8, 8, 8, -1, -1, -1, -1, 8,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 7, 8, 8, 10, 8, 8, 8, 8, 8, -1, -1, -1, -1, 8,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 10, 8, 8, 8, 8, 8, -1, -1, -1, -1, 8,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 10, 5, 5, 5, 8, 8, -1, -1, -1, -1, 8,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 10, 10, 10, -1, -1, -1, -1, 10,
/*UTINY*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 12, 13, 14, 8, -1, -1, -1, -1, 8,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 8, -1, -1, -1, -1, 8,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 8, -1, -1, -1, -1, 8,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 8, -1, -1, -1, -1, 8,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, -1, -1, -1, -1, 8,
/*NULL BOOL TINY SMAL INT BIGI FLOA DOUB VARC TIM NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, -1, -1, -1, 20,
/*BOOL*/ 0, 1, 2, 3, 4, 5, 6, 7, 8, 5, 10, 11, 12, 13, 14, -1, -1, -1, -1, -1, -1,
/*TINY*/ 0, 0, 2, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, -1, -1, -1, -1, -1, -1,
/*SMAL*/ 0, 0, 0, 3, 4, 5, 8, 8, 8, 5, 10, 3, 4, 5, 8, -1, -1, -1, -1, -1, -1,
/*INT */ 0, 0, 0, 0, 4, 5, 8, 8, 8, 5, 10, 4, 4, 5, 8, -1, -1, -1, -1, -1, -1,
/*BIGI*/ 0, 0, 0, 0, 0, 5, 8, 8, 8, 5, 10, 5, 5, 5, 8, -1, -1, -1, -1, -1, -1,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 6, 7, 8, 8, 10, 8, 8, 8, 8, -1, -1, -1, -1, -1, -1,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 7, 8, 8, 10, 8, 8, 8, 8, -1, -1, -1, -1, -1, -1,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 10, 8, 8, 8, 8, -1, 16, -1, -1, -1, -1,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 10, 5, 5, 5, 8, -1, -1, -1, -1, -1, -1,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 10, 10, -1, -1, -1, -1, -1, -1,
/*UTINY*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 12, 13, 14, -1, -1, -1, -1, -1, -1,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, -1, -1, -1, -1, -1, -1,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, -1, -1, -1, -1, -1, -1,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, -1, -1, -1, -1, -1, -1,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, -1, -1, -1, -1, -1,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, -1, -1, -1, -1,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, -1, -1, -1,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, -1, -1,

View File

@ -411,14 +411,14 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
void printFileSet(SArray* fileSet) {
static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {
int32_t sz = taosArrayGetSize(fileSet);
for (int32_t i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(fileSet, i);
wInfo("firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileSize:%" PRId64 ", syncedOffset:%" PRId64 ", createTs:%" PRId64
", closeTs:%" PRId64,
pFileInfo->firstVer, pFileInfo->lastVer, pFileInfo->fileSize, pFileInfo->syncedOffset, pFileInfo->createTs,
pFileInfo->closeTs);
wInfo("vgId:%d, %s-%d, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileSize:%" PRId64 ", syncedOffset:%" PRId64
", createTs:%" PRId64 ", closeTs:%" PRId64,
vgId, str, i, pFileInfo->firstVer, pFileInfo->lastVer, pFileInfo->fileSize, pFileInfo->syncedOffset,
pFileInfo->createTs, pFileInfo->closeTs);
}
}
@ -430,6 +430,9 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
regex_t logRegPattern;
regex_t idxRegPattern;
wInfo("vgId:%d, begin to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
if (regcomp(&logRegPattern, logPattern, REG_EXTENDED) != 0) {
wError("failed to compile log pattern, error:%s", tstrerror(terrno));
return terrno;
@ -482,9 +485,9 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
taosArraySort(actualLog, compareWalFileInfo);
wInfo("vgId:%d, wal path:%s, actual log file num:%d", pWal->cfg.vgId, pWal->path,
wInfo("vgId:%d, actual log file, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(actualLog));
printFileSet(actualLog);
printFileSet(pWal->cfg.vgId, actualLog, "actual log file");
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(actualLog);
@ -500,9 +503,9 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
TAOS_RETURN(code);
}
wInfo("vgId:%d, wal path:%s, meta log file num:%d", pWal->cfg.vgId, pWal->path,
wInfo("vgId:%d, log file in meta, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->fileInfoSet);
printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "log file in meta");
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
@ -563,7 +566,9 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
// repair ts of files
TAOS_CHECK_RETURN(walRepairLogFileTs(pWal, &updateMeta));
printFileSet(pWal->fileInfoSet);
wInfo("vgId:%d, log file after repair, wal path:%s, num:%d", pWal->cfg.vgId, pWal->path,
(int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file after repair");
// update meta file
if (updateMeta) {
TAOS_CHECK_RETURN(walSaveMeta(pWal));
@ -571,6 +576,9 @@ int32_t walCheckAndRepairMeta(SWal* pWal) {
TAOS_CHECK_RETURN(walLogEntriesComplete(pWal));
wInfo("vgId:%d, success to repair meta, wal path:%s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", snapshotVer:%" PRId64,
pWal->cfg.vgId, pWal->path, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer);
return code;
}
@ -1157,9 +1165,9 @@ int32_t walLoadMeta(SWal* pWal) {
(void)taosCloseFile(&pFile);
taosMemoryFree(buf);
wInfo("vgId:%d, load meta file: %s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileInfoSet size:%d", pWal->cfg.vgId,
wInfo("vgId:%d, meta file loaded: %s, firstVer:%" PRId64 ", lastVer:%" PRId64 ", fileInfoSet size:%d", pWal->cfg.vgId,
fnameStr, pWal->vers.firstVer, pWal->vers.lastVer, (int32_t)taosArrayGetSize(pWal->fileInfoSet));
printFileSet(pWal->fileInfoSet);
printFileSet(pWal->cfg.vgId, pWal->fileInfoSet, "file in meta");
TAOS_RETURN(code);
}

View File

@ -37,7 +37,6 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
char *tsProcPath = NULL;
char tsSIMDEnable = 1;
char tsAVX512Enable = 0;
char tsSSE42Supported = 0;
char tsAVXSupported = 0;

View File

@ -1,5 +1,9 @@
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/src/version.c.in" "${CMAKE_CURRENT_SOURCE_DIR}/src/version.c")
aux_source_directory(src UTIL_SRC)
IF(COMPILER_SUPPORT_AVX2)
MESSAGE(STATUS "AVX2 instructions is ACTIVATED")
set_source_files_properties(src/tdecompressavx.c PROPERTIES COMPILE_FLAGS -mavx2)
ENDIF()
add_library(util STATIC ${UTIL_SRC})
if(DEFINED GRANT_CFG_INCLUDE_DIR)

View File

@ -471,12 +471,12 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
return nelements * word_length;
}
#ifdef __AVX512F__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressIntImpl_Hw(input, nelements, output, type);
return nelements * word_length;
int32_t cnt = tsDecompressIntImpl_Hw(input, nelements, output, type);
if (cnt >= 0) {
return cnt;
}
}
#endif
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
char bit_per_integer[] = {0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 15, 20, 30, 60};
@ -867,12 +867,12 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement
memcpy(output, input + 1, nelements * longBytes);
return nelements * longBytes;
} else if (input[0] == 1) { // Decompress
#ifdef __AVX512VL__
if (tsSIMDEnable && tsAVX512Enable && tsAVX512Supported) {
tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, bool bigEndian);
return nelements * longBytes;
int32_t cnt = tsDecompressTimestampAvx512(input, nelements, output, false);
if (cnt >= 0) {
return cnt;
}
}
#endif
int64_t *ostream = (int64_t *)output;
@ -1103,13 +1103,14 @@ int32_t tsDecompressDoubleImp(const char *const input, int32_t ninput, const int
return nelements * DOUBLE_BYTES;
}
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * DOUBLE_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressDoubleImpAvx2(input + 1, nelements, output);
int32_t cnt = tsDecompressDoubleImpAvx2(input + 1, nelements, output);
if (cnt >= 0) {
return cnt;
}
}
#endif
// use implementation without SIMD instructions by default
return tsDecompressDoubleImpHelper(input + 1, nelements, output);
@ -1257,13 +1258,14 @@ int32_t tsDecompressFloatImp(const char *const input, int32_t ninput, const int3
return nelements * FLOAT_BYTES;
}
#ifdef __AVX2__
// use AVX2 implementation when allowed and the compression ratio is not high
double compressRatio = 1.0 * nelements * FLOAT_BYTES / ninput;
if (tsSIMDEnable && tsAVX2Supported && compressRatio < 2) {
return tsDecompressFloatImpAvx2(input + 1, nelements, output);
int32_t cnt = tsDecompressFloatImpAvx2(input + 1, nelements, output);
if (cnt >= 0) {
return cnt;
}
}
#endif
// use implementation without SIMD instructions by default
return tsDecompressFloatImpHelper(input + 1, nelements, output);
@ -1617,6 +1619,9 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
uTrace("encode:%s, compress:%s, level:%d, type:%s, l1:%d", compressL1Dict[l1].name, compressL2Dict[l2].name, \
lvl, tDataTypes[type].name, l1); \
int32_t len = compressL1Dict[l1].comprFn(pIn, nEle, pBuf, type); \
if (len < 0) { \
return len; \
} \
int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pBuf, len, pOut, nOut, type, alvl); \
} else { \
@ -1628,8 +1633,7 @@ int32_t tsDecompressBigint(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
} \
} else if (l1 == L1_DISABLED && l2 != L2_DISABLED) { \
if (compress) { \
uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", compressL2Dict[l1].name, lvl, \
tDataTypes[type].name); \
uTrace("encode:%s, compress:%s, level:%d, type:%s", "disabled", "disable", lvl, tDataTypes[type].name); \
int8_t alvl = tsGetCompressL2Level(l2, lvl); \
return compressL2Dict[l2].comprFn(pIn, nIn, pOut, nOut, type, alvl); \
} else { \
@ -1883,3 +1887,26 @@ int8_t tUpdateCompress(uint32_t oldCmpr, uint32_t newCmpr, uint8_t l2Disabled, u
return update;
}
int32_t getWordLength(char type) {
int32_t wordLength = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
wordLength = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
wordLength = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
wordLength = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
wordLength = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return TSDB_CODE_INVALID_PARA;
}
return wordLength;
}

View File

@ -13,35 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcompression.h"
#include "ttypes.h"
int32_t getWordLength(char type) {
int32_t wordLength = 0;
switch (type) {
case TSDB_DATA_TYPE_BIGINT:
wordLength = LONG_BYTES;
break;
case TSDB_DATA_TYPE_INT:
wordLength = INT_BYTES;
break;
case TSDB_DATA_TYPE_SMALLINT:
wordLength = SHORT_BYTES;
break;
case TSDB_DATA_TYPE_TINYINT:
wordLength = CHAR_BYTES;
break;
default:
uError("Invalid decompress integer type:%d", type);
return TSDB_CODE_INVALID_PARA;
}
return wordLength;
}
#ifdef __AVX2__
char tsSIMDEnable = 1;
#else
char tsSIMDEnable = 0;
#endif
int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type) {
#ifdef __AVX2__
int32_t word_length = getWordLength(type);
// Selector value: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@ -75,12 +56,12 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
int32_t batch = 0;
int32_t remain = 0;
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
batch = num >> 3;
remain = num & 0x07;
#endif
} else if (tsSIMDEnable && tsAVX2Supported) {
#if __AVX2__
#ifdef __AVX2__
batch = num >> 2;
remain = num & 0x03;
#endif
@ -88,7 +69,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
if (selector == 0 || selector == 1) {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
for (int32_t i = 0; i < batch; ++i) {
__m512i prev = _mm512_set1_epi64(prevValue);
_mm512_storeu_si512((__m512i *)&p[_pos], prev);
@ -117,7 +98,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
}
} else {
if (tsSIMDEnable && tsAVX512Supported && tsAVX512Enable) {
#if __AVX512F__
#ifdef __AVX512F__
__m512i sum_mask1 = _mm512_set_epi64(6, 6, 4, 4, 2, 2, 0, 0);
__m512i sum_mask2 = _mm512_set_epi64(5, 5, 5, 5, 1, 1, 1, 1);
__m512i sum_mask3 = _mm512_set_epi64(3, 3, 3, 3, 3, 3, 3, 3);
@ -310,10 +291,13 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements,
}
return nelements * word_length;
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
#define M256_BYTES sizeof(__m256i)
#ifdef __AVX2__
FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
@ -332,7 +316,27 @@ FORCE_INLINE __m256i decodeFloatAvx2(const char *data, const char *flag) {
return diffVec;
}
FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi64x(7);
__m256i lopart = _mm256_set_epi64x(0, -1, 0, -1);
__m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi64(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1));
__m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
}
#endif
int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *output) {
#ifdef __AVX2__
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
@ -380,27 +384,14 @@ int32_t tsDecompressFloatImpAvx2(const char *input, int32_t nelements, char *out
out += idx * FLOAT_BYTES;
}
return (int32_t)(out - output);
}
FORCE_INLINE __m256i decodeDoubleAvx2(const char *data, const char *flag) {
__m256i dataVec = _mm256_load_si256((__m256i *)data);
__m256i flagVec = _mm256_load_si256((__m256i *)flag);
__m256i k7 = _mm256_set1_epi64x(7);
__m256i lopart = _mm256_set_epi64x(0, -1, 0, -1);
__m256i hipart = _mm256_set_epi64x(-1, 0, -1, 0);
__m256i trTail = _mm256_cmpgt_epi64(flagVec, k7);
__m256i trHead = _mm256_andnot_si256(trTail, _mm256_set1_epi64x(-1));
__m256i shiftVec = _mm256_slli_epi64(_mm256_sub_epi64(k7, _mm256_and_si256(flagVec, k7)), 3);
__m256i maskVec = hipart;
__m256i diffVec = _mm256_sllv_epi64(dataVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_or_si256(trHead, lopart);
diffVec = _mm256_srlv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
maskVec = _mm256_and_si256(trTail, lopart);
diffVec = _mm256_sllv_epi64(diffVec, _mm256_and_si256(shiftVec, maskVec));
return diffVec;
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, char *const output) {
#ifdef __AVX2__
// Allocate memory-aligned buffer
char buf[M256_BYTES * 3];
memset(buf, 0, sizeof(buf));
@ -448,12 +439,15 @@ int32_t tsDecompressDoubleImpAvx2(const char *input, const int32_t nelements, ch
out += idx * DOUBLE_BYTES;
}
return (int32_t)(out - output);
}
#else
uError("unable run %s without avx2 instructions", __func__);
return -1;
#endif
}
#if __AVX512VL__
// decode two timestamps in one loop.
void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian) {
int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output,
bool bigEndian) {
#ifdef __AVX512VL__
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
@ -588,11 +582,16 @@ void tsDecompressTimestampAvx2(const char *const input, const int32_t nelements,
ostream[opos++] = prevVal[1] + prevDeltaX;
}
}
return;
return opos;
#else
uError("unable run %s without avx512 instructions", __func__);
return -1;
#endif
}
void tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output,
bool UNUSED_PARAM(bigEndian)) {
#ifdef __AVX512VL__
int64_t *ostream = (int64_t *)output;
int32_t ipos = 1, opos = 0;
@ -700,6 +699,9 @@ void tsDecompressTimestampAvx512(const char *const input, const int32_t nelement
}
}
return;
}
return opos;
#else
uError("unable run %s without avx512 instructions", __func__);
return -1;
#endif
}

View File

@ -268,6 +268,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_WAL_LEVEL, "Invalid option, wal_level 0 should be used with replica 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_LIST_FMT, "Invalid dnode list format")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_LIST_REPEAT, "Duplicate items in the dnode list")
// mnode-node
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")

View File

@ -1084,6 +1084,7 @@
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/user/privilege_create_db.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim
,,y,script,./test.sh -f tsim/db/dnodelist.sim
# ,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
,,y,script,./test.sh -f tsim/db/basic1.sim
,,y,script,./test.sh -f tsim/db/basic2.sim

View File

@ -20,7 +20,7 @@ sql create table $tb (ts timestamp, b bool, t tinyint, s smallint, i int, big bi
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
sql insert into $tb values( $ms , 10, 0, $count , $count , $count ,'it is a string')
$count = $count + 1
endw
@ -29,6 +29,13 @@ if $rows != $N then
return -1
endi
sql flush database $db
sql select * from $tb
if $rows != $N then
return -1
endi
print =============== step2
$i = 1
$db = $dbPrefix . $i

View File

@ -0,0 +1,146 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ============================ dnode1 start
$i = 0
$dbPrefix = db
$tbPrefix = tb
$db = $dbPrefix . $i
$tb = $tbPrefix . $i
$N = 2000
print =============== step1
sql create database $db
sql use $db
sql create table $tb (ts timestamp, b bool encode 'disabled', t tinyint encode 'disabled', s smallint encode 'disabled', i int encode 'disabled', big bigint encode 'disabled', str binary(256))
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1
endw
sql select * from $tb
if $rows != $N then
return -1
endi
sql flush database $db
sql select * from $tb
if $rows != $N then
return -1
endi
sql alter table $tb modify column ts encode 'disabled'
$count = 0
while $count < $N
$ms = 1591200030000 + $count
sql insert into $tb values( $ms , 1, 0, $count , $count , $count ,'it is a string')
$count = $count + 1
endw
$M = 4000
sql select * from $tb
if $rows != $M then
return -1
endi
sql flush database $db
sql select * from $tb
if $rows != $M then
return -1
endi
$stb1 = txx1
sql create table txx1 (ts timestamp encode 'disabled' compress 'disabled' level 'h', f int compress 'lz4') tags(t int)
$count = 0
$subTb1 = txx1_sub1
$subTb2 = txx1_sub2
sql create table $subTb1 using $stb1 tags(1)
sql create table $subTb2 using $stb1 tags(2)
while $count < $N
$ms = 1591200030000 + $count
sql insert into $subTb1 values( $ms , 1)
$ms2 = 1591200040000 + $count
sql insert into $subTb2 values( $ms2 , 1)
$count = $count + 1
endw
$count = 0
sql select * from $stb1
if $rows != $M then
return -1
endi
sql flush database $db
sql select * from $stb1
if $rows != $M then
return -1
endi
$L = 8000
sql alter table $stb1 modify column ts encode 'delta-i'
sql alter table $stb1 modify column f encode 'disabled'
while $count < $N
$ms = 1591200050000 + $count
sql insert into $subTb1 values( $ms , 1)
$ms2 = 1591200060000 + $count
sql insert into $subTb2 values( $ms2 , 1)
$count = $count + 1
endw
sql select * from $stb1
if $rows != $L then
return -1
endi
sql flush database $db
sql select * from $stb1
if $rows != $L then
return -1
endi
sql alter table $stb1 modify column ts encode 'disabled'
$count = 0
$I = 12000
while $count < $N
$ms = 1591200070000 + $count
sql insert into $subTb1 values( $ms , 1)
$ms2 = 1591200080000 + $count
sql insert into $subTb2 values( $ms2 , 1)
$count = $count + 1
endw
sql select * from $stb1
if $rows != $I then
return -1
endi
sql flush database $db
sql select * from $stb1
if $rows != $I then
return -1
endi

View File

@ -0,0 +1,258 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
system sh/exec.sh -n dnode5 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
sql create dnode $hostname port 7500
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
print ===> $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
if $data(5)[4] != ready then
goto step1
endi
print --- error case
sql_error create database d1 vgroups 1 dnodes '1234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890';
sql_error create database d1 vgroups 1 dnodes '1 ';
sql_error create database d1 vgroups 1 dnodes ' 1';
sql_error create database d1 vgroups 1 dnodes '1,';
sql_error create database d1 vgroups 1 dnodes '1, ';
sql_error create database d1 vgroups 1 dnodes 'a ';
sql_error create database d1 vgroups 1 dnodes '- ';
sql_error create database d1 vgroups 1 dnodes '1,1';
sql_error create database d1 vgroups 1 dnodes '1, 1';
sql_error create database d1 vgroups 1 dnodes '1,1234567890';
sql_error create database d1 vgroups 1 dnodes '1,2,6';
sql_error create database d1 vgroups 1 dnodes ',1,2';
sql_error create database d1 vgroups 1 dnodes 'x1,2';
sql_error create database d1 vgroups 1 dnodes 'c1,ab2';
sql_error create database d1 vgroups 1 dnodes '1,1,2';
sql_error create database d1 vgroups 1 replica 2 dnodes '1';
sql_error create database d1 vgroups 1 replica 2 dnodes '1,8';
sql_error create database d1 vgroups 1 replica 3 dnodes '1';
sql_error create database d1 vgroups 1 replica 3 dnodes '1,2';
sql_error create database d1 vgroups 1 replica 3 dnodes '1,2,4,6';
print --- replica 1
print --- case10
sql create database d10 vgroups 1 dnodes '1';
sql show dnodes;
if $data(1)[2] != 1 then
return -1
endi
sql_error alter database d10 replica 1 dnodes '1,2,3';
sql drop database d10;
print --- case11
sql create database d11 vgroups 1 dnodes '2';
sql show dnodes;
if $data(2)[2] != 1 then
return -1
endi
sql drop database d11;
print --- case12
sql create database d12 vgroups 2 dnodes '3,4';
sql show dnodes;
if $data(3)[2] != 1 then
return -1
endi
if $data(4)[2] != 1 then
return -1
endi
sql drop database d12;
print --- case13
sql create database d13 vgroups 2 dnodes '5';
sql show dnodes;
if $data(5)[2] != 2 then
return -1
endi
sql drop database d13;
print --- case14
sql create database d14 vgroups 1 dnodes '1,2,5';
sql drop database d14;
print --- case15
sql create database d15 vgroups 2 dnodes '1,4,3';
sql drop database d15;
print --- case16
sql create database d16 vgroups 3 dnodes '1';
sql show dnodes;
if $data(1)[2] != 3 then
return -1
endi
sql drop database d16;
print --- case17
sql create database d17 vgroups 3 dnodes '1,4';
sql drop database d17;
print --- case18
sql create database d18 vgroups 3 dnodes '1,2,4';
sql show dnodes;
if $data(1)[2] != 1 then
return -1
endi
if $data(2)[2] != 1 then
return -1
endi
if $data(4)[2] != 1 then
return -1
endi
sql drop database d18;
print --- replica 2
print --- case20
sql create database d20 replica 2 vgroups 1 dnodes '1,2';
sql show dnodes;
if $data(1)[2] != 1 then
return -1
endi
if $data(2)[2] != 1 then
return -1
endi
sql drop database d20;
print --- case21
sql create database d21 replica 2 vgroups 3 dnodes '1,2,3';
sql show dnodes;
if $data(1)[2] != 2 then
return -1
endi
if $data(2)[2] != 2 then
return -1
endi
if $data(3)[2] != 2 then
return -1
endi
sql drop database d21;
print --- case22
sql create database d22 replica 2 vgroups 2 dnodes '1,2';
sql show dnodes;
if $data(1)[2] != 2 then
return -1
endi
if $data(2)[2] != 2 then
return -1
endi
sql drop database d22;
print --- replica 3
print --- case30
sql create database d30 replica 3 vgroups 3 dnodes '1,2,3';
sql show dnodes;
if $data(1)[2] != 3 then
return -1
endi
if $data(2)[2] != 3 then
return -1
endi
if $data(3)[2] != 3 then
return -1
endi
sql_error alter database d30 replica 1 dnodes '1';
sql drop database d30;
print --- case31
sql create database d31 replica 3 vgroups 2 dnodes '1,2,4';
sql show dnodes;
if $data(1)[2] != 2 then
return -1
endi
if $data(2)[2] != 2 then
return -1
endi
if $data(4)[2] != 2 then
return -1
endi
sql drop database d31;
print --- case32
sql create database d32 replica 3 vgroups 4 dnodes '4,2,3,1';
sql show dnodes;
if $data(1)[2] != 3 then
return -1
endi
if $data(2)[2] != 3 then
return -1
endi
if $data(3)[2] != 3 then
return -1
endi
if $data(4)[2] != 3 then
return -1
endi
sql drop database d32;
print --- case33
sql create database d33 replica 3 vgroups 5 dnodes '4,2,3,1,5';
sql show dnodes;
if $data(1)[2] != 3 then
return -1
endi
if $data(2)[2] != 3 then
return -1
endi
if $data(3)[2] != 3 then
return -1
endi
if $data(4)[2] != 3 then
return -1
endi
if $data(5)[2] != 3 then
return -1
endi
sql drop database d33;
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT

View File

@ -838,7 +838,7 @@ endi
if $data20 != 11 then
return -1
endi
if $data30 != 1664176504 then
if $data30 != 1664176504000 then
return -1
endi
@ -1130,38 +1130,38 @@ if $data00 != varchar_val then
return -1
endi
sql select case when ts > '2022-01-01 00:00:00' then c_varchar else c_geometry end as result from t_test;
if $data00 != varchar_val then
sql select case when 1 then 1234567890987654 else 'abcertyuiojhgfddhjgfcvbn' end;
if $data00 != 1234567890987654 then
return -1
endi
sql select case when ts > '2022-01-01 00:00:00' then c_bool else c_geometry end as result from t_test;
if $data00 != true then
sql select case when 0 then 1234567890987654 else 'abcertyuiojhgfddhjgfcvbn' end;
if $data00 != abcertyuiojhgfddhjgfcvbn then
return -1
endi
sql select case when 0 then tag_id else c_geometry end as result from t_test;
if $data00 != 16842773 then
return -1
endi
sql select case when 0 then tag_id else c_nchar end as result from t_test;
sql select case when 0 then 1234567890987654 else c_nchar end from t_test;
if $data00 != 涛思数据 then
return -1
endi
sql select case when 0 then tag_id else c_int end as result from t_test;
if $data00 != 123 then
sql select case when 1 then 1234567890987654 else c_nchar end from t_test;
if $data00 != 1234567890987654 then
return -1
endi
sql select case when 0 then tag_id else c_float end as result from t_test;
if $data00 != 123.449997 then
sql select case when 1 then c_varchar else c_varbinary end from t_test;
if $data00 != null then
return -1
endi
sql_error select case when ts > '2022-01-01 00:00:00' then c_varchar else c_geometry end as result from t_test;
sql_error select case when ts > '2022-01-01 00:00:00' then c_bool else c_geometry end as result from t_test;
sql_error select case when 0 then tag_id else c_geometry end as result from t_test;
sql_error select case when 0 then tag_id else c_nchar end as result from t_test;
sql_error select case when 0 then tag_id else c_int end as result from t_test;
sql_error select case when 0 then tag_id else c_float end as result from t_test;
sql_error select case when c_double > 100 then c_varbinary else c_geometry end as result from t_test;
sql_error select case when c_bool then c_double else c_varbinary end as result from t_test;
sql_error select case when c_bool then c_varbinary else c_varchar end as result from t_test;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -325,6 +325,7 @@
./test.sh -f tsim/compress/compress.sim
./test.sh -f tsim/compress/compress_col.sim
./test.sh -f tsim/compress/uncompress.sim
./test.sh -f tsim/compress/compressDisable.sim
./test.sh -f tsim/compute/avg.sim
./test.sh -f tsim/compute/block_dist.sim
./test.sh -f tsim/compute/bottom.sim

View File

@ -1,11 +1,8 @@
package main
import (
"github.com/taosdata/taoskeeper/system"
)
import "github.com/taosdata/taoskeeper/system"
func main() {
r := system.Init()
system.Start(r)
// config.IsEnterprise
}

View File

@ -1,8 +0,0 @@
package process
import (
"testing"
)
func TestEmpty(t *testing.T) {
}

View File

@ -0,0 +1,121 @@
package process
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_i2string(t *testing.T) {
tests := []struct {
value interface{}
expected string
}{
{"abc", "abc"},
{"abcdef", "abcdef"},
{[]byte{97, 98, 99, 100, 101, 102}, "abcdef"},
}
for _, tt := range tests {
res := i2string(tt.value)
assert.Equal(t, tt.expected, res)
}
}
func Test_i2string_panic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected panic for unexpected type, but did not panic")
}
}()
i2string(12345)
}
func Test_i2float(t *testing.T) {
tests := []struct {
value interface{}
expected float64
}{
{int8(1), 1.0},
{int16(1), 1.0},
{int32(1), 1.0},
{int64(1), 1.0},
{uint8(1), 1.0},
{uint16(1), 1.0},
{uint32(1), 1.0},
{uint64(1), 1.0},
{float32(1.5), 1.5},
{float64(1.5), 1.5},
{true, 1.0},
{false, 0.0},
}
for _, tt := range tests {
res := i2float(tt.value)
assert.Equal(t, tt.expected, res)
}
}
func Test_i2float_panic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("Expected panic for unexpected type, but did not panic")
}
}()
i2float("unexpected type")
}
func Test_getRoleStr(t *testing.T) {
tests := []struct {
value float64
expected string
}{
{0, "offline"},
{99.5, "follower"},
{100, "follower"},
{100.4, "follower"},
{100.5, "candidate"},
{101, "candidate"},
{101.4, "candidate"},
{101.5, "leader"},
{102, "leader"},
{102.4, "leader"},
{102.5, "error"},
{103, "error"},
{104, "learner"},
{99.4, "unknown"},
{105, "unknown"},
{-1, "unknown"},
{150, "unknown"},
}
for _, tt := range tests {
res := getRoleStr(tt.value)
assert.Equal(t, tt.expected, res)
}
}
func Test_getStatusStr(t *testing.T) {
tests := []struct {
value float64
expected string
}{
{-0.4, "offline"},
{0, "offline"},
{0.4, "offline"},
{0.5, "ready"},
{1, "ready"},
{1.4, "ready"},
{1.5, "unknown"},
{2, "unknown"},
{-0.5, "unknown"},
{-1, "unknown"},
}
for _, tt := range tests {
res := getStatusStr(tt.value)
assert.Equal(t, tt.expected, res)
}
}

View File

@ -3,15 +3,18 @@ package system
import (
"context"
"fmt"
"net/http"
"testing"
"time"
"github.com/kardianos/service"
"github.com/stretchr/testify/assert"
"github.com/taosdata/taoskeeper/db"
"github.com/taosdata/taoskeeper/infrastructure/config"
"github.com/taosdata/taoskeeper/util"
)
func TestStart(t *testing.T) {
func TestInit(t *testing.T) {
server := Init()
assert.NotNil(t, server)
@ -20,3 +23,23 @@ func TestStart(t *testing.T) {
conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", config.Conf.Metrics.Database.Name), util.GetQidOwn())
conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", config.Conf.Audit.Database.Name), util.GetQidOwn())
}
func Test_program(t *testing.T) {
server := &http.Server{}
prg := newProgram(server)
svcConfig := &service.Config{
Name: "taoskeeper",
DisplayName: "taoskeeper",
Description: "taosKeeper is a tool for TDengine that exports monitoring metrics",
}
svc, err := service.New(prg, svcConfig)
assert.NoError(t, err)
err = prg.Start(svc)
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)
err = prg.Stop(svc)
assert.NoError(t, err)
}