Merge branch '3.3.6' into merge/3.0to3.3.6

This commit is contained in:
Simon Guan 2025-03-23 17:25:14 +08:00
commit c9c1bddda0
37 changed files with 186 additions and 137 deletions

View File

@ -41,21 +41,23 @@
# 1. 简介 # 1. 简介
TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外TDengine 还提供缓存、数据订阅、流式计算等功能是一极简的时序数据处理平台最大程度的减小系统设计的复杂度降低研发和运营成本。与其他时序数据库相比TDengine 的主要优势如下: TDengine 是一款开源、高性能、云原生、AI驱动的时序数据库 (Time-Series Database, TSDB)。TDengine 能被广泛运用于物联网、工业互联网、车联网、IT 运维、金融等领域。除核心的时序数据库功能外TDengine 还提供缓存、数据订阅、流式计算、AI智能体等功能是一极简的时序数据处理平台最大程度的减小系统设计的复杂度降低研发和运营成本。与其他时序数据库相比TDengine 的主要优势如下:
- **高性能**通过创新的存储引擎设计无论是数据写入还是查询TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的 1/10。 - **高性能**通过创新的存储引擎设计无论是数据写入还是查询TDengine 的性能比通用数据库快 10 倍以上,也远超其他时序数据库,存储空间不及通用数据库的 1/10。
- **云原生**通过原生分布式的设计充分利用云平台的优势TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持 k8s 部署,可运行在公有云、私有云和混合云上。 - **云原生**通过原生分布式的设计充分利用云平台的优势TDengine 提供了水平扩展能力,具备弹性、韧性和可观测性,支持 k8s 部署,可运行在公有云、私有云和混合云上。
- **极简时序数据平台**TDengine 内建消息队列、缓存、流式计算等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。 - **极简时序数据平台**TDengine 内建消息队列、缓存、流式计算、AI智能体等功能,应用无需再集成 Kafka/Redis/HBase/Spark 等软件,大幅降低系统的复杂度,降低应用开发和运营成本。
- **分析能力**:支持 SQL同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数等技术TDengine 具备强大的分析能力。 - **分析能力**:支持 SQL同时为时序数据特有的分析提供SQL扩展。通过超级表、存储计算分离、分区分片、预计算、自定义函数以及AI Agent等技术TDengine 具备强大的分析能力。
- **AI智能体**内置时序数据智能体TDgpt, 无缝连接时序数据基础模型、大语言模型、机器学习、传统统计算法等,提供时序数据预测、异常检测、数据补全和数据分类的功能。
- **简单易用**无任何依赖安装、集群几秒搞定提供REST以及各种语言连接器与众多第三方工具无缝集成提供命令行程序便于管理和即席查询提供各种运维工具。 - **简单易用**无任何依赖安装、集群几秒搞定提供REST以及各种语言连接器与众多第三方工具无缝集成提供命令行程序便于管理和即席查询提供各种运维工具。
- **核心开源**TDengine 的核心代码包括集群功能全部开源,截止到 2022 年 8 月 1 日,全球超过 135.9k 个运行实例GitHub Star 18.7kFork 4.4k,社区活跃。 - **核心开源**TDengine 的核心代码包括集群功能全部开源,截止到 2022 年 8 月 1 日,全球超过 135.9k 个运行实例GitHub Star 18.7kFork 4.4k,社区活跃。
了解TDengine高级功能的完整列表请 [点击](https://tdengine.com/tdengine/)。体验 TDengine 最简单的方式是通过 [TDengine云平台](https://cloud.tdengine.com)。 了解TDengine高级功能的完整列表请 [点击](https://tdengine.com/tdengine/)。体验 TDengine 最简单的方式是通过 [TDengine云平台](https://cloud.tdengine.com)。对最新发布的TDengine 组件 TDgpt请访问[TDgpt README](./tools/tdgpt/README.md) 了解细节。
# 2. 文档 # 2. 文档
@ -67,7 +69,7 @@ TDengine 是一款开源、高性能、云原生的时序数据库 (Time-Series
# 3. 前置条件 # 3. 前置条件
TDengine 目前可以在 Linux、 Windows、macOS 等平台上安装和运行。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64、ARM64后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。目前不支持使用交叉编译器构建。 TDengine 目前可以在 Linux 和 macOS 平台上安装和运行 (企业版支持 Windows)。任何 OS 的应用也可以选择 taosAdapter 的 RESTful 接口连接服务端 taosd。CPU 支持 X64、ARM64后续会支持 MIPS64、Alpha64、ARM32、RISC-V 等 CPU 架构。目前不支持使用交叉编译器构建。
如果你想要编译 taosAdapter 或者 taosKeeper需要安装 Go 1.18 及以上版本。 如果你想要编译 taosAdapter 或者 taosKeeper需要安装 Go 1.18 及以上版本。

View File

@ -54,23 +54,23 @@ English | [简体中文](README-CN.md) | [TDengine Cloud](https://cloud.tdengine
# 1. Introduction # 1. Introduction
TDengine is an open source, high-performance, cloud native [time-series database](https://tdengine.com/tsdb/) optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and monitoring of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other time-series databases with the following advantages: TDengine is an open source, high-performance, cloud native and AI powered [time-series database](https://tdengine.com/tsdb/) designed for Internet of Things (IoT), Connected Cars, and Industrial IoT. It enables efficient, real-time data ingestion, processing, and analysis of TB and even PB scale data per day, generated by billions of sensors and data collectors. TDengine differentiates itself from other time-series databases with the following advantages:
- **[High Performance](https://tdengine.com/tdengine/high-performance-time-series-database/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression. - **[High Performance](https://tdengine.com/tdengine/high-performance-time-series-database/)**: TDengine is the only time-series database to solve the high cardinality issue to support billions of data collection points while out performing other time-series databases for data ingestion, querying and data compression.
- **[Simplified Solution](https://tdengine.com/tdengine/simplified-time-series-data-solution/)**: Through built-in caching, stream processing and data subscription features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly. - **[Simplified Solution](https://tdengine.com/tdengine/simplified-time-series-data-solution/)**: Through built-in caching, stream processing, data subscription and AI agent features, TDengine provides a simplified solution for time-series data processing. It reduces system design complexity and operation costs significantly.
- **[Cloud Native](https://tdengine.com/tdengine/cloud-native-time-series-database/)**: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine is a cloud native Time-Series Database and can be deployed on public, private or hybrid clouds. - **[Cloud Native](https://tdengine.com/tdengine/cloud-native-time-series-database/)**: Through native distributed design, sharding and partitioning, separation of compute and storage, RAFT, support for kubernetes deployment and full observability, TDengine is a cloud native Time-Series Database and can be deployed on public, private or hybrid clouds.
- **[AI Powered](https://tdengine.com/tdengine/tdgpt/)**: Through the built in AI agent TDgpt, TDengine can connect to a variety of time series foundation model, large language model, machine learning and traditional algorithms to provide time series data forecasting, anomly detection, imputation and classification.
- **[Ease of Use](https://tdengine.com/tdengine/easy-time-series-data-platform/)**: For administrators, TDengine significantly reduces the effort to deploy and maintain. For developers, it provides a simple interface, simplified solution and seamless integrations for third party tools. For data users, it gives easy data access. - **[Ease of Use](https://tdengine.com/tdengine/easy-time-series-data-platform/)**: For administrators, TDengine significantly reduces the effort to deploy and maintain. For developers, it provides a simple interface, simplified solution and seamless integrations for third party tools. For data users, it gives easy data access.
- **[Easy Data Analytics](https://tdengine.com/tdengine/time-series-data-analytics-made-easy/)**: Through super tables, storage and compute separation, data partitioning by time interval, pre-computation and other means, TDengine makes it easy to explore, format, and get access to data in a highly efficient way. - **[Easy Data Analytics](https://tdengine.com/tdengine/time-series-data-analytics-made-easy/)**: Through super tables, storage and compute separation, data partitioning by time interval, pre-computation and AI agent, TDengine makes it easy to explore, format, and get access to data in a highly efficient way.
- **[Open Source](https://tdengine.com/tdengine/open-source-time-series-database/)**: TDengines core modules, including cluster feature, are all available under open source licenses. It has gathered 19.9k stars on GitHub. There is an active developer community, and over 139k running instances worldwide. - **[Open Source](https://tdengine.com/tdengine/open-source-time-series-database/)**: TDengines core modules, including cluster feature and AI agent, are all available under open source licenses. It has gathered 23.7k stars on GitHub. There is an active developer community, and over 730k running instances worldwide.
For a full list of TDengine competitive advantages, please [check here](https://tdengine.com/tdengine/). The easiest way to experience TDengine is through [TDengine Cloud](https://cloud.tdengine.com). For a full list of TDengine competitive advantages, please [check here](https://tdengine.com/tdengine/). The easiest way to experience TDengine is through [TDengine Cloud](https://cloud.tdengine.com). For the latest TDengine component TDgpt, please refer to [TDgpt README](./tools/tdgpt/README.md) for details.
For the latest TDengine component TDgpt, please refer to [TDgpt README](./tools/tdgpt/README.md) for details.
# 2. Documentation # 2. Documentation

View File

@ -99,6 +99,7 @@ The list of keywords is as follows:
| CONSUMER | | | CONSUMER | |
| CONSUMERS | | | CONSUMERS | |
| CONTAINS | | | CONTAINS | |
| CONTINUOUS_WINDOW_CLOSE | 3.3.6.0+ |
| COPY | | | COPY | |
| COUNT | | | COUNT | |
| COUNT_WINDOW | | | COUNT_WINDOW | |
@ -113,7 +114,7 @@ The list of keywords is as follows:
| DATABASE | | | DATABASE | |
| DATABASES | | | DATABASES | |
| DBS | | | DBS | |
| DECIMAL | | | DECIMAL | 3.3.6.0+ |
| DEFERRED | | | DEFERRED | |
| DELETE | | | DELETE | |
| DELETE_MARK | | | DELETE_MARK | |

View File

@ -136,7 +136,7 @@ create stream if not exists count_history_s fill_history 1 into count_history as
```sql ```sql
create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s) create stream if not exists continuous_query_s trigger force_window_close into continuous_query as select count(*) from power.meters interval(10s) sliding(1s)
``` ```
5. CONTINUOUS_WINDOW_CLOSE窗口关闭时输出结果。修改、删除数据并不会立即触发重算每等待 rec_time_val 时长,会进行周期性重算。如果不指定 rec_time_val那么重算周期是60分钟。如果重算的时间长度超过 rec_time_val在本次重算后自动开启下一次重算。该模式当前只支持 INTERVAL 窗口。如果使用 FILL需要配置 adapter的相关信息adapterFqdn、adapterPort、adapterToken。adapterToken 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==` 5. CONTINUOUS_WINDOW_CLOSE窗口关闭时输出结果。修改、删除数据并不会立即触发重算每等待 rec_time_val 时长,会进行周期性重算。如果不指定 rec_time_val那么重算周期是 60 分钟。如果重算的时间长度超过 rec_time_val在本次重算后自动开启下一次重算。该模式当前只支持 INTERVAL 窗口。如果使用 FILL需要配置 adapter的相关信息adapterFqdn、adapterPort、adapterToken。adapterToken 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,此时事件时间无法更新,可能导致无法得到最新的计算结果。 窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,此时事件时间无法更新,可能导致无法得到最新的计算结果。

View File

@ -1096,14 +1096,14 @@ charset 的有效值是 UTF-8。
- 支持版本v3.3.6.0 引入 - 支持版本v3.3.6.0 引入
#### adapterFqdn #### adapterFqdn
- 说明taosadapter服务的地址 `内部参数` - 说明taosAdapter 服务的地址 `内部参数`
- 类型fqdn - 类型fqdn
- 默认值localhost - 默认值localhost
- 动态修改:不支持 - 动态修改:不支持
- 支持版本v3.3.6.0 引入 - 支持版本v3.3.6.0 引入
#### adapterPort #### adapterPort
- 说明taosadapter服务的端口号 `内部参数` - 说明taosAdapter 服务的端口号 `内部参数`
- 类型:整数 - 类型:整数
- 默认值6041 - 默认值6041
- 最小值1 - 最小值1

View File

@ -64,16 +64,16 @@ CREATE DATABASE db_name PRECISION 'ns';
::: :::
### DECIMAL数据类型 ### DECIMAL 数据类型
`DECIMAL`数据类型用于高精度数值存储, 自版本3.3.6开始支持, 定义语法: DECIMAL(18, 2), DECIMAL(38, 10), 其中需要指定两个参数, 分别为`precision`和`scale`. `precision`是指最大支持的有效数字个数, `scale`是指最大支持的小数位数. 如DECIMAL(8, 4), 可表示范围即[-9999.9999, 9999.9999]. 定义DECIMAL数据类型时, `precision`范围为: [1,38], scale的范围为: [0,precision], scale为0时, 仅表示整数. 也可以不指定scale, 默认为0, 如DECIMAL(18), 与DECIMAL(18,0)相同。 `DECIMAL` 数据类型用于高精度数值存储,自 v3.3.6.0 开始支持, 定义语法:`DECIMAL(18, 2)``DECIMAL(38, 10)`, 其中需要指定两个参数, 分别为 `precision``scale`。`precision` 是指最大支持的有效数字个数,`scale` 是指最大支持的小数位数。如 `DECIMAL(8, 4)`,可表示范围即 `[-9999.9999, 9999.9999]`。定义 DECIMAL 数据类型时,`precision` 范围为:`[1, 38]` scale 的范围为:`[0, precision]`scale 为 0 时,仅表示整数。也可以不指定 scale默认为 0例如 `DECIMAL(18)`,与 `DECIMAL(18, 0)` 相同。
`precision`值不大于18时, 内部使用8字节存储(DECIMAL64), 当precision范围为(18, 38]时, 使用16字节存储(DECIMAL). SQL中写入DECIMAL类型数据时, 可直接使用数值写入, 当写入值大于类型可表示的最大值时会报DECIMAL_OVERFLOW错误, 当未大于类型表示的最大值, 但小数位数超过SCALE时, 会自动四舍五入处理, 如定义类型DECIMAL(10, 2), 写入10.987, 则实际存储值为10.99 `precision` 值不大于 18 时, 内部使用 8 字节存储(DECIMAL64), 当 `precision` 范围为 `(18, 38]` 时, 使用 16 字节存储(DECIMAL)。SQL 中写入 DECIMAL 类型数据时,可直接使用数值写入,当写入值大于类型可表示的最大值时会报 DECIMAL_OVERFLOW 错误, 当未大于类型表示的最大值, 但小数位数超过 SCALE 时, 会自动四舍五入处理。如定义类型 DECIMAL(10, 2)写入10.987,则实际存储值为 10.99
DECIMAL类型仅支持普通列, 暂不支持tag列. DECIMAL类型只支持SQL写入 暂不支持stmt写入和schemeless写入。 DECIMAL 类型仅支持普通列,暂不支持 tag 列。DECIMAL 类型只支持 SQL 写入,暂不支持 stmt 写入和 schemeless 写入。
整数类型和DECIMAL类型操作时, 会将整数类型转换为DECIMAL类型再进行计算. DECIMAL类型与DOUBLE/FLOAT/VARCHAR/NCHAR等类型计算时, 转换为DOUBLE类型进行计算. 整数类型和 DECIMAL 类型操作时, 会将整数类型转换为 DECIMAL 类型再进行计算。DECIMAL 类型与 DOUBLE/FLOAT/VARCHAR/NCHAR 等类型计算时, 转换为 DOUBLE 类型进行计算。
查询DECIMAL类型表达式时, 若计算的中间结果超出当前类型可表示的最大值时, 报DECIMAL OVERFLOW错误. 查询 DECIMAL 类型表达式时,若计算的中间结果超出当前类型可表示的最大值时,报 DECIMAL OVERFLOW 错误.
## 常量 ## 常量

View File

@ -1137,7 +1137,7 @@ CAST(expr AS type_name)
- 字符串类型转换数值类型时可能出现的无效字符情况,例如 "a" 可能转为 0但不会报错。 - 字符串类型转换数值类型时可能出现的无效字符情况,例如 "a" 可能转为 0但不会报错。
- 转换到数值类型时,数值大于 type_name 可表示的范围时,则会溢出,但不会报错。 - 转换到数值类型时,数值大于 type_name 可表示的范围时,则会溢出,但不会报错。
- 转换到字符串类型时,如果转换后长度超过 type_name 中指定的长度,则会截断,但不会报错。 - 转换到字符串类型时,如果转换后长度超过 type_name 中指定的长度,则会截断,但不会报错。
- DECIMAL类型不支持与JSON,VARBINARY,GEOMERTY类型的互转. - DECIMAL 类型不支持与 JSON、VARBINARY、GEOMERTY 类型的互转。
#### TO_CHAR #### TO_CHAR
@ -1619,13 +1619,13 @@ AVG(expr)
**功能说明**:统计指定字段的平均值。 **功能说明**:统计指定字段的平均值。
**返回数据类型**DOUBLE, DECIMAL。 **返回数据类型**DOUBLEDECIMAL。
**适用数据类型**:数值类型。 **适用数据类型**:数值类型。
**适用于**:表和超级表。 **适用于**:表和超级表。
**说明**: 当输入类型为DECIMAL类型时, 输出类型也为DECIMAL类型, 输出的precision和scale大小符合数据类型章节中的描述规则, 通过计算SUM类型和UINT64的除法得到结果类型, 若SUM的结果导致DECIMAL类型溢出, 则报DECIMAL OVERFLOW错误。 **说明**: 当输入类型为 DECIMAL 类型时,输出类型也为 DECIMAL 类型,输出的 precision 和 scale 大小符合数据类型章节中的描述规则,通过计算 SUM 类型和 UINT64 的除法得到结果类型,若 SUM 的结果导致 DECIMAL 类型溢出, 则报 DECIMAL OVERFLOW 错误。
### COUNT ### COUNT
@ -1808,13 +1808,13 @@ SUM(expr)
**功能说明**:统计表/超级表中某列的和。 **功能说明**:统计表/超级表中某列的和。
**返回数据类型**DOUBLE、BIGINT,DECIMAL。 **返回数据类型**DOUBLE、BIGINTDECIMAL。
**适用数据类型**:数值类型。 **适用数据类型**:数值类型。
**适用于**:表和超级表。 **适用于**:表和超级表。
**说明**: 输入类型为DECIMAL类型时, 输出类型为DECIMAL(38, scale), precision为当前支持的最大值, scale为输入类型的scale, 若SUM的结果溢出时, 报DECIMAL OVERFLOW错误. **说明**: 输入类型为 DECIMAL 类型时,输出类型为 DECIMAL(38, scale) precision 为当前支持的最大值scale 为输入类型的 scale若 SUM 的结果溢出时,报 DECIMAL OVERFLOW 错误.
### VAR_POP ### VAR_POP

View File

@ -35,6 +35,7 @@ description: TDengine 保留关键字的详细列表
| AS | | | AS | |
| ASC | | | ASC | |
| ASOF | | | ASOF | |
| ASYNC | 3.3.6.0+ |
| AT_ONCE | | | AT_ONCE | |
| ATTACH | | | ATTACH | |
| AUTO | 3.3.5.0+ | | AUTO | 3.3.5.0+ |
@ -96,6 +97,7 @@ description: TDengine 保留关键字的详细列表
| CONSUMER | | | CONSUMER | |
| CONSUMERS | | | CONSUMERS | |
| CONTAINS | | | CONTAINS | |
| CONTINUOUS_WINDOW_CLOSE | 3.3.6.0+ |
| COPY | | | COPY | |
| COUNT | | | COUNT | |
| COUNT_WINDOW | | | COUNT_WINDOW | |
@ -109,7 +111,7 @@ description: TDengine 保留关键字的详细列表
| DATABASE | | | DATABASE | |
| DATABASES | | | DATABASES | |
| DBS | | | DBS | |
| DECIMAL | | | DECIMAL | 3.3.6.0+ |
| DEFERRED | | | DEFERRED | |
| DELETE | | | DELETE | |
| DELETE_MARK | | | DELETE_MARK | |
@ -239,7 +241,7 @@ description: TDengine 保留关键字的详细列表
| LEADER | | | LEADER | |
| LEADING | | | LEADING | |
| LEFT | | | LEFT | |
| LEVEL | 3.3.0.0 到 3.3.2.11 的所有版本 | | LEVEL | 3.3.0.0 - 3.3.2.11 |
| LICENCES | | | LICENCES | |
| LIKE | | | LIKE | |
| LIMIT | | | LIMIT | |

View File

@ -827,7 +827,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一
- **返回值**:非 `NULL`:成功,返回一个指向 TAOS_FIELD 结构体的指针,每个元素代表一列的元数据。`NULL`:失败。 - **返回值**:非 `NULL`:成功,返回一个指向 TAOS_FIELD 结构体的指针,每个元素代表一列的元数据。`NULL`:失败。
- `TAOS_FIELD_E *taos_fetch_fields_e(TAOS_RES *res)` - `TAOS_FIELD_E *taos_fetch_fields_e(TAOS_RES *res)`
- **接口说明**:获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `taos_num_fields()` 配合使用,可用来解析 `taos_fetch_row()` 返回的一个元组(一行)的数据。TAOS_FIELD_E中除了TAOS_FIELD的基本信息外, 还包括了类型的`precision`和`scale`信息。 - **接口说明**:获取查询结果集每列数据的属性(列的名称、列的数据类型、列的长度),与 `taos_num_fields()` 配合使用,可用来解析 `taos_fetch_row()` 返回的一个元组(一行)的数据。TAOS_FIELD_E中 除了 TAOS_FIELD 的基本信息外, 还包括了类型的 `precision` `scale` 信息。
- **参数说明** - **参数说明**
- res[入参] 结果集。 - res[入参] 结果集。
- **返回值**:非 `NULL`:成功,返回一个指向 TAOS_FIELD_E 结构体的指针,每个元素代表一列的元数据。`NULL`:失败。 - **返回值**:非 `NULL`:成功,返回一个指向 TAOS_FIELD_E 结构体的指针,每个元素代表一列的元数据。`NULL`:失败。

View File

@ -1257,18 +1257,18 @@ int32_t tDeserializeRetrieveIpWhite(void* buf, int32_t bufLen, SRetrieveIpWhiteR
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t analVer; int64_t analVer;
} SRetrieveAnalAlgoReq; } SRetrieveAnalyticsAlgoReq;
typedef struct { typedef struct {
int64_t ver; int64_t ver;
SHashObj* hash; // algoname:algotype -> SAnalUrl SHashObj* hash; // algoname:algotype -> SAnalUrl
} SRetrieveAnalAlgoRsp; } SRetrieveAnalyticAlgoRsp;
int32_t tSerializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); int32_t tSerializeRetrieveAnalyticAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalyticsAlgoReq* pReq);
int32_t tDeserializeRetrieveAnalAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalAlgoReq* pReq); int32_t tDeserializeRetrieveAnalyticAlgoReq(void* buf, int32_t bufLen, SRetrieveAnalyticsAlgoReq* pReq);
int32_t tSerializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); int32_t tSerializeRetrieveAnalyticAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalyticAlgoRsp* pRsp);
int32_t tDeserializeRetrieveAnalAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalAlgoRsp* pRsp); int32_t tDeserializeRetrieveAnalyticAlgoRsp(void* buf, int32_t bufLen, SRetrieveAnalyticAlgoRsp* pRsp);
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp* pRsp); void tFreeRetrieveAnalyticAlgoRsp(SRetrieveAnalyticAlgoRsp* pRsp);
typedef struct { typedef struct {
int8_t alterType; int8_t alterType;

View File

@ -1213,6 +1213,34 @@ TEST(stmt2Case, stmt2_insert_non_statndard) {
taos_stmt2_close(stmt); taos_stmt2_close(stmt);
} }
// get fields insert into ? valuse
{
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
do_query(taos, "create table stmt2_testdb_6.ntb(ts timestamp, b binary(10))");
do_query(taos, "use stmt2_testdb_6");
const char* sql = "INSERT INTO ? VALUES (?,?)";
printf("stmt2 [%s] : %s\n", "get fields", sql);
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
char* tbname = "ntb";
TAOS_STMT2_BINDV bindv = {1, &tbname, NULL, NULL};
code = taos_stmt2_bind_param(stmt, &bindv, -1);
ASSERT_EQ(code, 0);
int fieldNum = 0;
TAOS_FIELD_ALL* pFields = NULL;
code = taos_stmt2_get_fields(stmt, &fieldNum, &pFields);
checkError(stmt, code);
ASSERT_EQ(fieldNum, 3);
ASSERT_STREQ(pFields[0].name, "tbname");
ASSERT_STREQ(pFields[1].name, "ts");
ASSERT_STREQ(pFields[2].name, "b");
taos_stmt2_close(stmt);
}
do_query(taos, "drop database if exists stmt2_testdb_6"); do_query(taos, "drop database if exists stmt2_testdb_6");
taos_close(taos); taos_close(taos);
} }

View File

@ -2297,7 +2297,7 @@ _exit:
return code; return code;
} }
int32_t tSerializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) { int32_t tSerializeRetrieveAnalyticAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalyticsAlgoReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -2319,7 +2319,7 @@ _exit:
return tlen; return tlen;
} }
int32_t tDeserializeRetrieveAnalAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalAlgoReq *pReq) { int32_t tDeserializeRetrieveAnalyticAlgoReq(void *buf, int32_t bufLen, SRetrieveAnalyticsAlgoReq *pReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -2336,7 +2336,7 @@ _exit:
return code; return code;
} }
int32_t tSerializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) { int32_t tSerializeRetrieveAnalyticAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalyticAlgoRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -2387,7 +2387,7 @@ _exit:
return tlen; return tlen;
} }
int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalAlgoRsp *pRsp) { int32_t tDeserializeRetrieveAnalyticAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnalyticAlgoRsp *pRsp) {
if (pRsp->hash == NULL) { if (pRsp->hash == NULL) {
pRsp->hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pRsp->hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (pRsp->hash == NULL) { if (pRsp->hash == NULL) {
@ -2425,7 +2425,10 @@ int32_t tDeserializeRetrieveAnalAlgoRsp(void *buf, int32_t bufLen, SRetrieveAnal
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0); TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&url.url, NULL) < 0);
} }
TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, name, nameLen, &url, sizeof(SAnalyticsUrl))); char dstName[TSDB_ANALYTIC_ALGO_NAME_LEN] = {0};
strntolower(dstName, name, nameLen);
TAOS_CHECK_EXIT(taosHashPut(pRsp->hash, dstName, nameLen, &url, sizeof(SAnalyticsUrl)));
} }
tEndDecode(&decoder); tEndDecode(&decoder);
@ -2435,7 +2438,7 @@ _exit:
return code; return code;
} }
void tFreeRetrieveAnalAlgoRsp(SRetrieveAnalAlgoRsp *pRsp) { void tFreeRetrieveAnalyticAlgoRsp(SRetrieveAnalyticAlgoRsp *pRsp) {
void *pIter = taosHashIterate(pRsp->hash, NULL); void *pIter = taosHashIterate(pRsp->hash, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter; SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;

View File

@ -98,15 +98,15 @@ static void dmMayShouldUpdateAnalFunc(SDnodeMgmt *pMgmt, int64_t newVer) {
if (oldVer == newVer) return; if (oldVer == newVer) return;
dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer); dDebug("analysis on dnode ver:%" PRId64 ", status ver:%" PRId64, oldVer, newVer);
SRetrieveAnalAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer}; SRetrieveAnalyticsAlgoReq req = {.dnodeId = pMgmt->pData->dnodeId, .analVer = oldVer};
int32_t contLen = tSerializeRetrieveAnalAlgoReq(NULL, 0, &req); int32_t contLen = tSerializeRetrieveAnalyticAlgoReq(NULL, 0, &req);
if (contLen < 0) { if (contLen < 0) {
dError("failed to serialize analysis function ver request since %s", tstrerror(contLen)); dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));
return; return;
} }
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
contLen = tSerializeRetrieveAnalAlgoReq(pHead, contLen, &req); contLen = tSerializeRetrieveAnalyticAlgoReq(pHead, contLen, &req);
if (contLen < 0) { if (contLen < 0) {
rpcFreeCont(pHead); rpcFreeCont(pHead);
dError("failed to serialize analysis function ver request since %s", tstrerror(contLen)); dError("failed to serialize analysis function ver request since %s", tstrerror(contLen));

View File

@ -116,13 +116,13 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
} }
} }
static void dmUpdateAnalFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { static void dmUpdateAnalyticFunc(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) {
SRetrieveAnalAlgoRsp rsp = {0}; SRetrieveAnalyticAlgoRsp rsp = {0};
if (tDeserializeRetrieveAnalAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) { if (tDeserializeRetrieveAnalyticAlgoRsp(pRpc->pCont, pRpc->contLen, &rsp) == 0) {
taosAnalyUpdate(rsp.ver, rsp.hash); taosAnalyUpdate(rsp.ver, rsp.hash);
rsp.hash = NULL; rsp.hash = NULL;
} }
tFreeRetrieveAnalAlgoRsp(&rsp); tFreeRetrieveAnalyticAlgoRsp(&rsp);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
@ -176,7 +176,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc); dmUpdateRpcIpWhite(&pDnode->data, pTrans->serverRpc, pRpc);
return; return;
case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP: case TDMT_MND_RETRIEVE_ANAL_ALGO_RSP:
dmUpdateAnalFunc(&pDnode->data, pTrans->serverRpc, pRpc); dmUpdateAnalyticFunc(&pDnode->data, pTrans->serverRpc, pRpc);
return; return;
default: default:
break; break;

View File

@ -847,10 +847,10 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
SAnalyticsUrl url; SAnalyticsUrl url;
int32_t nameLen; int32_t nameLen;
char name[TSDB_ANALYTIC_ALGO_KEY_LEN]; char name[TSDB_ANALYTIC_ALGO_KEY_LEN];
SRetrieveAnalAlgoReq req = {0}; SRetrieveAnalyticsAlgoReq req = {0};
SRetrieveAnalAlgoRsp rsp = {0}; SRetrieveAnalyticAlgoRsp rsp = {0};
TAOS_CHECK_GOTO(tDeserializeRetrieveAnalAlgoReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER); TAOS_CHECK_GOTO(tDeserializeRetrieveAnalyticAlgoReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER);
rsp.ver = sdbGetTableVer(pSdb, SDB_ANODE); rsp.ver = sdbGetTableVer(pSdb, SDB_ANODE);
if (req.analVer != rsp.ver) { if (req.analVer != rsp.ver) {
@ -906,15 +906,15 @@ static int32_t mndProcessAnalAlgoReq(SRpcMsg *pReq) {
} }
} }
int32_t contLen = tSerializeRetrieveAnalAlgoRsp(NULL, 0, &rsp); int32_t contLen = tSerializeRetrieveAnalyticAlgoRsp(NULL, 0, &rsp);
void *pHead = rpcMallocCont(contLen); void *pHead = rpcMallocCont(contLen);
(void)tSerializeRetrieveAnalAlgoRsp(pHead, contLen, &rsp); (void)tSerializeRetrieveAnalyticAlgoRsp(pHead, contLen, &rsp);
pReq->info.rspLen = contLen; pReq->info.rspLen = contLen;
pReq->info.rsp = pHead; pReq->info.rsp = pHead;
_OVER: _OVER:
tFreeRetrieveAnalAlgoRsp(&rsp); tFreeRetrieveAnalyticAlgoRsp(&rsp);
TAOS_RETURN(code); TAOS_RETURN(code);
} }

View File

@ -538,7 +538,7 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
"] %s: cfgVersion:%d, numOfVgroups:%d, numOfStables:%d, buffer:%d, cacheSize:%d, pageSize:%d, pages:%d" "] %s: cfgVersion:%d, numOfVgroups:%d, numOfStables:%d, buffer:%d, cacheSize:%d, pageSize:%d, pages:%d"
", daysPerFile:%d, daysToKeep0:%d, daysToKeep1:%d, daysToKeep2:%d, minRows:%d, maxRows:%d, walFsyncPeriod:%d" ", daysPerFile:%d, daysToKeep0:%d, daysToKeep1:%d, daysToKeep2:%d, minRows:%d, maxRows:%d, walFsyncPeriod:%d"
", hashPrefix:%d, hashSuffix:%d, walLevel:%d, precision:%d, compression:%d, replications:%d, strict:%d" ", hashPrefix:%d, hashSuffix:%d, walLevel:%d, precision:%d, compression:%d, replications:%d, strict:%d"
", cacheLast:%d, tsdbPageSize:%d, walRetentionPeriod:%d, walRollPeriod:%d, walRetentionSize:%" PRId64 "" ", cacheLast:%d, tsdbPageSize:%d, walRetentionPeriod:%d, walRollPeriod:%d, walRetentionSize:%" PRId64
", walSegmentSize:%" PRId64 ", numOfRetensions:%d, schemaless:%d, sstTrigger:%d", ", walSegmentSize:%" PRId64 ", numOfRetensions:%d, schemaless:%d, sstTrigger:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "", i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "",
pCfg->cfgVersion, pCfg->numOfVgroups, pCfg->numOfStables, pCfg->buffer, pCfg->cfgVersion, pCfg->numOfVgroups, pCfg->numOfStables, pCfg->buffer,

View File

@ -327,7 +327,7 @@ static int32_t anomalyParseJson(SJson* pJson, SArray* pWindows, const char* pId)
qError("%s failed to exec forecast, msg:%s", pId, pMsg); qError("%s failed to exec forecast, msg:%s", pId, pMsg);
} }
return TSDB_CODE_ANA_INTERNAL_ERROR; return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
} else if (rows == 0) { } else if (rows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -593,7 +593,7 @@ static int32_t anomalyAggregateBlocks(SOperatorInfo* pOperator) {
for (int32_t r = 0; r < pBlock->info.rows; ++r) { for (int32_t r = 0; r < pBlock->info.rows; ++r) {
TSKEY key = tsList[r]; TSKEY key = tsList[r];
bool keyInWin = (key >= pSupp->curWin.skey && key < pSupp->curWin.ekey); bool keyInWin = (key >= pSupp->curWin.skey && key <= pSupp->curWin.ekey);
bool lastRow = (r == pBlock->info.rows - 1); bool lastRow = (r == pBlock->info.rows - 1);
if (keyInWin) { if (keyInWin) {

View File

@ -145,6 +145,7 @@ static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
if (!hasWncheck) { if (!hasWncheck) {
qDebug("%s forecast wncheck not found from %s, use default:%" PRId64, id, pSupp->algoOpt, wncheck); qDebug("%s forecast wncheck not found from %s, use default:%" PRId64, id, pSupp->algoOpt, wncheck);
} }
code = taosAnalyBufWriteOptInt(pBuf, "wncheck", wncheck); code = taosAnalyBufWriteOptInt(pBuf, "wncheck", wncheck);
if (code != 0) return code; if (code != 0) return code;
@ -235,7 +236,7 @@ static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const
} }
tjsonDelete(pJson); tjsonDelete(pJson);
return TSDB_CODE_ANA_INTERNAL_ERROR; return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
} }
if (code < 0) { if (code < 0) {

View File

@ -1073,8 +1073,9 @@ int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32
STableMeta* pMeta, void* boundTags, uint8_t tbNameFlag) { STableMeta* pMeta, void* boundTags, uint8_t tbNameFlag) {
SBoundColInfo* tags = (SBoundColInfo*)boundTags; SBoundColInfo* tags = (SBoundColInfo*)boundTags;
bool hastag = (tags != NULL) && !(tbNameFlag & IS_FIXED_TAG); bool hastag = (tags != NULL) && !(tbNameFlag & IS_FIXED_TAG);
int32_t numOfBound = bool hasPreBindTbname =
boundColsInfo.numOfBound + ((tbNameFlag & IS_FIXED_VALUE) == 0 && (tbNameFlag & USING_CLAUSE) != 0 ? 1 : 0); (tbNameFlag & IS_FIXED_VALUE) == 0 && ((tbNameFlag & USING_CLAUSE) != 0 || pMeta->tableType == TSDB_NORMAL_TABLE);
int32_t numOfBound = boundColsInfo.numOfBound + (hasPreBindTbname ? 1 : 0);
if (hastag) { if (hastag) {
numOfBound += tags->mixTagsCols ? 0 : tags->numOfBound; numOfBound += tags->mixTagsCols ? 0 : tags->numOfBound;
} }
@ -1085,7 +1086,7 @@ int32_t buildStbBoundFields(SBoundColInfo boundColsInfo, SSchema* pSchema, int32
return terrno; return terrno;
} }
if ((tbNameFlag & IS_FIXED_VALUE) == 0 && (tbNameFlag & USING_CLAUSE) != 0) { if (hasPreBindTbname) {
(*fields)[idx].field_type = TAOS_FIELD_TBNAME; (*fields)[idx].field_type = TAOS_FIELD_TBNAME;
tstrncpy((*fields)[idx].name, "tbname", sizeof((*fields)[idx].name)); tstrncpy((*fields)[idx].name, "tbname", sizeof((*fields)[idx].name));
(*fields)[idx].type = TSDB_DATA_TYPE_BINARY; (*fields)[idx].type = TSDB_DATA_TYPE_BINARY;

View File

@ -158,9 +158,17 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
pMeta->startInfo.curStage = START_MARK_REQ_CHKPID; pMeta->startInfo.curStage = START_MARK_REQ_CHKPID;
SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now}; SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
taosArrayPush(pMeta->startInfo.pStagesList, &info); void* p = taosArrayPush(pMeta->startInfo.pStagesList, &info);
stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId, numOfConsensusChkptIdTasks, int32_t num = (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList);
info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
if (p != NULL) {
stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId,
numOfConsensusChkptIdTasks, info.ts, num);
} else {
stError("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64
" numOfStageHist:%d, FAILED, out of memory",
pMeta->vgId, numOfConsensusChkptIdTasks, info.ts, num);
}
} }
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
@ -230,8 +238,8 @@ static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal,
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
} }
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t startTs, int64_t endTs, bool ready) { int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId}; STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -312,7 +320,7 @@ bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32
if (px == NULL) { if (px == NULL) {
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx)); px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
if (px == NULL) { if (px == NULL) {
stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t) idx.taskId); stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t)idx.taskId);
return false; return false;
} }
} }

View File

@ -378,7 +378,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ANA_BUF_INVALID_TYPE, "Analysis invalid buffe
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_RETURN_ERROR, "Analysis failed since anode return error")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS, "Analysis failed since too many input rows for anode")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_WN_DATA, "white-noise data not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_ANA_INTERNAL_ERROR, "tdgpt internal error, not processed") TAOS_DEFINE_ERROR(TSDB_CODE_ANA_INTERNAL_ERROR, "Analysis internal error, not processed")
// mnode-sma // mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")

View File

@ -23,8 +23,8 @@ endi
print =============== show info print =============== show info
sql show anodes full sql show anodes full
if $rows != 8 then if $rows != 10 then
print expect 8 , actual $rows print expect 10 , actual $rows
return -1 return -1
endi endi

View File

@ -10,8 +10,7 @@
1. [Testing](#8-testing) 1. [Testing](#8-testing)
1. [Releasing](#9-releasing) 1. [Releasing](#9-releasing)
1. [CI/CD](#10-cicd) 1. [CI/CD](#10-cicd)
1. [Coverage](#11-coverage) 1. [Contributing](#11-contributing)
1. [Contributing](#12-contributing)
# 1. Introduction # 1. Introduction
@ -93,7 +92,6 @@ The taosanode will be installed as an system service, but will not automatic sta
systemctl start taosanoded systemctl start taosanoded
``` ```
## 6.2 Configure the Service ## 6.2 Configure the Service
taosanode provides the RESTFul service powered by `uWSGI`. You can config the options to tune the taosanode provides the RESTFul service powered by `uWSGI`. You can config the options to tune the
performance by changing the default configuration file `taosanode.ini` located in `/etc/taos`, which is also the configuration directory for `taosd` service. performance by changing the default configuration file `taosanode.ini` located in `/etc/taos`, which is also the configuration directory for `taosd` service.
@ -123,10 +121,7 @@ For the complete list of taosanode Releases, please see Releases.
We use Github Actions for CI/CD workflow configuration. Please refer to the workflow definition yaml file in [.github/workflows](../../.github/workflows/) for details. We use Github Actions for CI/CD workflow configuration. Please refer to the workflow definition yaml file in [.github/workflows](../../.github/workflows/) for details.
# 11 Coverage # 11 Contributing
# 12 Contributing
Guidelines for contributing to the project: Guidelines for contributing to the project:

View File

@ -78,4 +78,4 @@ model-dir = /usr/local/taos/taosanode/model/
log-level = DEBUG log-level = DEBUG
# draw the query results # draw the query results
draw-result = 1 draw-result = 0

View File

@ -5,6 +5,7 @@
from matplotlib import pyplot as plt from matplotlib import pyplot as plt
from taosanalytics.conf import app_logger, conf from taosanalytics.conf import app_logger, conf
from taosanalytics.servicemgmt import loader from taosanalytics.servicemgmt import loader
from taosanalytics.util import convert_results_to_windows
def do_ad_check(input_list, ts_list, algo_name, params): def do_ad_check(input_list, ts_list, algo_name, params):
@ -22,17 +23,19 @@ def do_ad_check(input_list, ts_list, algo_name, params):
res = s.execute() res = s.execute()
n_error = abs(sum(filter(lambda x: x == -1, res))) n_error = abs(sum(filter(lambda x: x != s.valid_code, res)))
app_logger.log_inst.debug("There are %d in input, and %d anomaly points found: %s", app_logger.log_inst.debug("There are %d in input, and %d anomaly points found: %s",
len(input_list), len(input_list),
n_error, n_error,
res) res)
draw_ad_results(input_list, res, algo_name) # draw_ad_results(input_list, res, algo_name, s.valid_code)
return res
ano_window = convert_results_to_windows(res, ts_list, s.valid_code)
return res, ano_window
def draw_ad_results(input_list, res, fig_name): def draw_ad_results(input_list, res, fig_name, valid_code):
""" draw the detected anomaly points """ """ draw the detected anomaly points """
# not in debug, do not visualize the anomaly detection result # not in debug, do not visualize the anomaly detection result
@ -41,9 +44,8 @@ def draw_ad_results(input_list, res, fig_name):
plt.clf() plt.clf()
for index, val in enumerate(res): for index, val in enumerate(res):
if val != -1: if val != valid_code:
continue plt.scatter(index, input_list[index], marker='o', color='r', alpha=0.5, s=100, zorder=3)
plt.scatter(index, input_list[index], marker='o', color='r', alpha=0.5, s=100, zorder=3)
plt.plot(input_list, label='sample') plt.plot(input_list, label='sample')
plt.savefig(fig_name) plt.savefig(fig_name)

View File

@ -86,11 +86,11 @@ class _ArimaService(AbstractForecastService):
if len(self.list) > 3000: if len(self.list) > 3000:
raise ValueError("number of input data is too large") raise ValueError("number of input data is too large")
if self.fc_rows <= 0: if self.rows <= 0:
raise ValueError("fc rows is not specified yet") raise ValueError("fc rows is not specified yet")
res, mse, model_info = self.__do_forecast_helper(self.fc_rows) res, mse, model_info = self.__do_forecast_helper(self.rows)
insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) insert_ts_list(res, self.start_ts, self.time_step, self.rows)
return { return {
"mse": mse, "mse": mse,

View File

@ -10,31 +10,30 @@ from taosanalytics.service import AbstractForecastService
class _GPTService(AbstractForecastService): class _GPTService(AbstractForecastService):
name = 'td_gpt_fc' name = 'tdtsfm_1'
desc = "internal gpt forecast model based on transformer" desc = "internal gpt forecast model based on transformer"
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.table_name = None self.table_name = None
self.service_host = 'http://127.0.0.1:5000/ds_predict' self.service_host = 'http://127.0.0.1:5000/tdtsfm'
self.headers = {'Content-Type': 'application/json'} self.headers = {'Content-Type': 'application/json'}
self.std = None self.std = None
self.threshold = None self.threshold = None
self.time_interval = None self.time_interval = None
self.dir = 'internal-gpt'
def execute(self): def execute(self):
if self.list is None or len(self.list) < self.period: if self.list is None or len(self.list) < self.period:
raise ValueError("number of input data is less than the periods") raise ValueError("number of input data is less than the periods")
if self.fc_rows <= 0: if self.rows <= 0:
raise ValueError("fc rows is not specified yet") raise ValueError("fc rows is not specified yet")
# let's request the gpt service # let's request the gpt service
data = {"input": self.list, 'next_len': self.fc_rows} data = {"input": self.list, 'next_len': self.rows}
try: try:
response = requests.post(self.service_host, data=json.dumps(data), headers=self.headers) response = requests.post(self.service_host, data=json.dumps(data), headers=self.headers)
except Exception as e: except Exception as e:
@ -54,7 +53,7 @@ class _GPTService(AbstractForecastService):
"res": [pred_y] "res": [pred_y]
} }
insert_ts_list(res["res"], self.start_ts, self.time_step, self.fc_rows) insert_ts_list(res["res"], self.start_ts, self.time_step, self.rows)
return res return res

View File

@ -66,11 +66,11 @@ class _HoltWintersService(AbstractForecastService):
if self.list is None or len(self.list) < self.period: if self.list is None or len(self.list) < self.period:
raise ValueError("number of input data is less than the periods") raise ValueError("number of input data is less than the periods")
if self.fc_rows <= 0: if self.rows <= 0:
raise ValueError("fc rows is not specified yet") raise ValueError("fc rows is not specified yet")
res, mse = self.__do_forecast_helper(self.list, self.fc_rows) res, mse = self.__do_forecast_helper(self.list, self.rows)
insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) insert_ts_list(res, self.start_ts, self.time_step, self.rows)
# add the conf range if required # add the conf range if required
return { return {

View File

@ -46,7 +46,7 @@ class _LSTMService(AbstractForecastService):
res = self.model.predict(self.list) res = self.model.predict(self.list)
insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) insert_ts_list(res, self.start_ts, self.time_step, self.rows)
if self.return_conf: if self.return_conf:
res1 = [res.tolist(), res.tolist(), res.tolist()], None res1 = [res.tolist(), res.tolist(), res.tolist()], None

View File

@ -34,14 +34,14 @@ def do_forecast(input_list, ts_list, algo_name, params):
check_fc_results(res) check_fc_results(res)
fc = res["res"] fc = res["res"]
draw_fc_results(input_list, len(fc) > 2, fc, len(fc[0]), algo_name) # draw_fc_results(input_list, len(fc) > 2, fc, len(fc[0]), algo_name)
return res return res
def do_add_fc_params(params, json_obj): def do_add_fc_params(params, json_obj):
""" add params into parameters """ """ add params into parameters """
if "forecast_rows" in json_obj: if "forecast_rows" in json_obj:
params["fc_rows"] = int(json_obj["forecast_rows"]) params["rows"] = int(json_obj["forecast_rows"])
if "start" in json_obj: if "start" in json_obj:
params["start_ts"] = int(json_obj["start"]) params["start_ts"] = int(json_obj["start"])

View File

@ -22,11 +22,12 @@ app_logger.set_handler(conf.get_log_path())
app_logger.set_log_level(conf.get_log_level()) app_logger.set_log_level(conf.get_log_level())
loader.load_all_service() loader.load_all_service()
_ANODE_VER = 'TDgpt - TDengine© Time-Series Data Analytics Platform (ver 3.3.6.0)'
@app.route("/") @app.route("/")
def start(): def start():
""" default rsp """ """ default rsp """
return "TDengine© Time Series Data Analytics Platform (ver 1.0.1)" return _ANODE_VER
@app.route("/status") @app.route("/status")
@ -90,9 +91,7 @@ def handle_ad_request():
# 4. do anomaly detection # 4. do anomaly detection
try: try:
res_list = do_ad_check(payload[data_index], payload[ts_index], algo, params) res_list, ano_window = do_ad_check(payload[data_index], payload[ts_index], algo, params)
ano_window = convert_results_to_windows(res_list, payload[ts_index])
result = {"algo": algo, "option": options, "res": ano_window, "rows": len(ano_window)} result = {"algo": algo, "option": options, "res": ano_window, "rows": len(ano_window)}
app_logger.log_inst.debug("anomaly-detection result: %s", str(result)) app_logger.log_inst.debug("anomaly-detection result: %s", str(result))
@ -148,7 +147,7 @@ def handle_forecast_req():
try: try:
res1 = do_forecast(payload[data_index], payload[ts_index], algo, params) res1 = do_forecast(payload[data_index], payload[ts_index], algo, params)
res = {"option": options, "rows": params["fc_rows"]} res = {"option": options, "rows": params["rows"]}
res.update(res1) res.update(res1)
app_logger.log_inst.debug("forecast result: %s", res) app_logger.log_inst.debug("forecast result: %s", res)

View File

@ -51,6 +51,7 @@ class AbstractAnomalyDetectionService(AbstractAnalyticsService, ABC):
inherent from this class""" inherent from this class"""
def __init__(self): def __init__(self):
self.valid_code = 1
super().__init__() super().__init__()
self.type = "anomaly-detection" self.type = "anomaly-detection"
@ -58,6 +59,12 @@ class AbstractAnomalyDetectionService(AbstractAnalyticsService, ABC):
""" check if the input list is empty or None """ """ check if the input list is empty or None """
return (self.list is None) or (len(self.list) == 0) return (self.list is None) or (len(self.list) == 0)
def set_params(self, params: dict) -> None:
super().set_params(params)
if "valid_code" in params:
self.valid_code = int(params["valid_code"])
class AbstractForecastService(AbstractAnalyticsService, ABC): class AbstractForecastService(AbstractAnalyticsService, ABC):
"""abstract forecast service, all forecast algorithms class should be inherent from """abstract forecast service, all forecast algorithms class should be inherent from
@ -70,14 +77,14 @@ class AbstractForecastService(AbstractAnalyticsService, ABC):
self.period = 0 self.period = 0
self.start_ts = 0 self.start_ts = 0
self.time_step = 0 self.time_step = 0
self.fc_rows = 0 self.rows = 0
self.return_conf = 1 self.return_conf = 1
self.conf = 0.05 self.conf = 0.05
def set_params(self, params: dict) -> None: def set_params(self, params: dict) -> None:
if not {'start_ts', 'time_step', 'fc_rows'}.issubset(params.keys()): if not {'start_ts', 'time_step', 'rows'}.issubset(params.keys()):
raise ValueError('params are missing, start_ts, time_step, fc_rows are all required') raise ValueError('params are missing, start_ts, time_step, rows are all required')
self.start_ts = int(params['start_ts']) self.start_ts = int(params['start_ts'])
@ -86,9 +93,9 @@ class AbstractForecastService(AbstractAnalyticsService, ABC):
if self.time_step <= 0: if self.time_step <= 0:
raise ValueError('time_step should be greater than 0') raise ValueError('time_step should be greater than 0')
self.fc_rows = int(params['fc_rows']) self.rows = int(params['rows'])
if self.fc_rows <= 0: if self.rows <= 0:
raise ValueError('fc rows is not specified yet') raise ValueError('fc rows is not specified yet')
self.period = int(params['period']) if 'period' in params else 0 self.period = int(params['period']) if 'period' in params else 0
@ -106,5 +113,5 @@ class AbstractForecastService(AbstractAnalyticsService, ABC):
def get_params(self): def get_params(self):
return { return {
"period": self.period, "start": self.start_ts, "every": self.time_step, "period": self.period, "start": self.start_ts, "every": self.time_step,
"forecast_rows": self.fc_rows, "return_conf": self.return_conf, "conf": self.conf "forecast_rows": self.rows, "return_conf": self.return_conf, "conf": self.conf
} }

View File

@ -44,7 +44,7 @@ class AnomalyDetectionTest(unittest.TestCase):
s.set_params({"k": 2}) s.set_params({"k": 2})
r = s.execute() r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "ksigma") draw_ad_results(AnomalyDetectionTest.input_list, r, "ksigma", s.valid_code)
self.assertEqual(r[-1], -1) self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
@ -64,7 +64,7 @@ class AnomalyDetectionTest(unittest.TestCase):
self.assertEqual(1, 0, e) self.assertEqual(1, 0, e)
r = s.execute() r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "iqr") draw_ad_results(AnomalyDetectionTest.input_list, r, "iqr", s.valid_code)
self.assertEqual(r[-1], -1) self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
@ -82,7 +82,7 @@ class AnomalyDetectionTest(unittest.TestCase):
s.set_params({"alpha": 0.95}) s.set_params({"alpha": 0.95})
r = s.execute() r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "grubbs") draw_ad_results(AnomalyDetectionTest.input_list, r, "grubbs", s.valid_code)
self.assertEqual(r[-1], -1) self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
@ -100,7 +100,7 @@ class AnomalyDetectionTest(unittest.TestCase):
s.set_input_list(AnomalyDetectionTest.input_list, None) s.set_input_list(AnomalyDetectionTest.input_list, None)
r = s.execute() r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "shesd") draw_ad_results(AnomalyDetectionTest.input_list, r, "shesd", s.valid_code)
self.assertEqual(r[-1], -1) self.assertEqual(r[-1], -1)
@ -116,7 +116,7 @@ class AnomalyDetectionTest(unittest.TestCase):
s.set_input_list(AnomalyDetectionTest.input_list, None) s.set_input_list(AnomalyDetectionTest.input_list, None)
r = s.execute() r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "lof") draw_ad_results(AnomalyDetectionTest.input_list, r, "lof", s.valid_code)
self.assertEqual(r[-1], -1) self.assertEqual(r[-1], -1)
self.assertEqual(r[-2], -1) self.assertEqual(r[-2], -1)

View File

@ -41,7 +41,7 @@ class ForecastTest(unittest.TestCase):
s.set_input_list(data, ts) s.set_input_list(data, ts)
self.assertRaises(ValueError, s.execute) self.assertRaises(ValueError, s.execute)
s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30}) s.set_params({"rows": 10, "start_ts": 171000000, "time_step": 86400 * 30})
r = s.execute() r = s.execute()
draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters") draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters")
@ -54,7 +54,7 @@ class ForecastTest(unittest.TestCase):
s.set_input_list(data, ts) s.set_input_list(data, ts)
s.set_params( s.set_params(
{ {
"fc_rows": 10, "trend": 'mul', "seasonal": 'mul', "start_ts": 171000000, "rows": 10, "trend": 'mul', "seasonal": 'mul', "start_ts": 171000000,
"time_step": 86400 * 30, "period": 12 "time_step": 86400 * 30, "period": 12
} }
) )
@ -71,28 +71,28 @@ class ForecastTest(unittest.TestCase):
self.assertRaises(ValueError, s.set_params, {"trend": "mul"}) self.assertRaises(ValueError, s.set_params, {"trend": "mul"})
self.assertRaises(ValueError, s.set_params, {"trend": "mul", "fc_rows": 10}) self.assertRaises(ValueError, s.set_params, {"trend": "mul", "rows": 10})
self.assertRaises(ValueError, s.set_params, {"trend": "multi"}) self.assertRaises(ValueError, s.set_params, {"trend": "multi"})
self.assertRaises(ValueError, s.set_params, {"seasonal": "additive"}) self.assertRaises(ValueError, s.set_params, {"seasonal": "additive"})
self.assertRaises(ValueError, s.set_params, { self.assertRaises(ValueError, s.set_params, {
"fc_rows": 10, "trend": 'multi', "seasonal": 'addi', "start_ts": 171000000, "rows": 10, "trend": 'multi', "seasonal": 'addi', "start_ts": 171000000,
"time_step": 86400 * 30, "period": 12} "time_step": 86400 * 30, "period": 12}
) )
self.assertRaises(ValueError, s.set_params, self.assertRaises(ValueError, s.set_params,
{"fc_rows": 10, "trend": 'mul', "seasonal": 'add', "time_step": 86400 * 30, "period": 12} {"rows": 10, "trend": 'mul', "seasonal": 'add', "time_step": 86400 * 30, "period": 12}
) )
s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30}) s.set_params({"rows": 10, "start_ts": 171000000, "time_step": 86400 * 30})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 'abc', "start_ts": 171000000, "time_step": 86400 * 30}) self.assertRaises(ValueError, s.set_params, {"rows": 'abc', "start_ts": 171000000, "time_step": 86400 * 30})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": "aaa", "time_step": "30"}) self.assertRaises(ValueError, s.set_params, {"rows": 10, "start_ts": "aaa", "time_step": "30"})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": 171000000, "time_step": 0}) self.assertRaises(ValueError, s.set_params, {"rows": 10, "start_ts": 171000000, "time_step": 0})
def test_arima(self): def test_arima(self):
"""arima algorithm check""" """arima algorithm check"""
@ -103,7 +103,7 @@ class ForecastTest(unittest.TestCase):
self.assertRaises(ValueError, s.execute) self.assertRaises(ValueError, s.execute)
s.set_params( s.set_params(
{"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "period": 12, {"rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "period": 12,
"start_p": 0, "max_p": 10, "start_q": 0, "max_q": 10} "start_p": 0, "max_p": 10, "start_q": 0, "max_q": 10}
) )
r = s.execute() r = s.execute()
@ -120,7 +120,7 @@ class ForecastTest(unittest.TestCase):
# s = loader.get_service("td_gpt_fc") # s = loader.get_service("td_gpt_fc")
# s.set_input_list(data, ts) # s.set_input_list(data, ts)
# #
# s.set_params({"host":'192.168.2.90:5000/ds_predict', 'fc_rows': 10, 'start_ts': 171000000, 'time_step': 86400*30}) # s.set_params({"host":'192.168.2.90:5000/ds_predict', 'rows': 10, 'start_ts': 171000000, 'time_step': 86400*30})
# r = s.execute() # r = s.execute()
# #
# rows = len(r["res"][0]) # rows = len(r["res"][0])

View File

@ -23,7 +23,8 @@ class RestfulTest(TestCase):
""" test asscess default main page """ """ test asscess default main page """
response = self.client.get('/') response = self.client.get('/')
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
self.assertEqual(response.content_length, len("TDengine© Time Series Data Analytics Platform (ver 1.0.1)") + 1) self.assertEqual(response.content_length,
len("TDgpt - TDengine© Time-Series Data Analytics Platform (ver 3.3.6.0)") + 1)
def test_load_status(self): def test_load_status(self):
""" test load the server status """ """ test load the server status """

View File

@ -17,7 +17,7 @@ class UtilTest(unittest.TestCase):
def test_generate_anomaly_window(self): def test_generate_anomaly_window(self):
# Test case 1: Normal input # Test case 1: Normal input
wins = convert_results_to_windows([1, 1, 1, 1, 1, 1, -1, -1, -1, 1, 1, -1], wins = convert_results_to_windows([1, 1, 1, 1, 1, 1, -1, -1, -1, 1, 1, -1],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 1)
print(f"The result window is:{wins}") print(f"The result window is:{wins}")
# Assert the number of windows # Assert the number of windows
@ -30,15 +30,15 @@ class UtilTest(unittest.TestCase):
self.assertListEqual(wins[1], [12, 12]) self.assertListEqual(wins[1], [12, 12])
# Test case 2: Anomaly input list is empty # Test case 2: Anomaly input list is empty
wins = convert_results_to_windows([], [1, 2]) wins = convert_results_to_windows([], [1, 2], 1)
self.assertListEqual(wins, []) self.assertListEqual(wins, [])
# Test case 3: Anomaly input list is None # Test case 3: Anomaly input list is None
wins = convert_results_to_windows([], None) wins = convert_results_to_windows([], None, 1)
self.assertListEqual(wins, []) self.assertListEqual(wins, [])
# Test case 4: Timestamp list is None # Test case 4: Timestamp list is None
wins = convert_results_to_windows(None, []) wins = convert_results_to_windows(None, [], 1)
self.assertListEqual(wins, []) self.assertListEqual(wins, [])
def test_validate_input_data(self): def test_validate_input_data(self):

View File

@ -36,7 +36,7 @@ def validate_pay_load(json_obj):
raise ValueError('invalid schema info, data column is missing') raise ValueError('invalid schema info, data column is missing')
def convert_results_to_windows(result, ts_list): def convert_results_to_windows(result, ts_list, valid_code):
"""generate the window according to anomaly detection result""" """generate the window according to anomaly detection result"""
skey, ekey = -1, -1 skey, ekey = -1, -1
wins = [] wins = []
@ -45,7 +45,7 @@ def convert_results_to_windows(result, ts_list):
return wins return wins
for index, val in enumerate(result): for index, val in enumerate(result):
if val == -1: if val != valid_code:
ekey = ts_list[index] ekey = ts_list[index]
if skey == -1: if skey == -1:
skey = ts_list[index] skey = ts_list[index]