fix(gpt): add training script. (#30466)

This commit is contained in:
Haojun Liao 2025-03-25 21:47:51 +08:00 committed by GitHub
parent d3135c9de3
commit 065818afda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 293 additions and 31 deletions

View File

@ -61,7 +61,7 @@ TDgpt 可以在 TDengine Cloud 上进行快速体验。如果您已经有云服
> TDengine需使用 3.3.6.0 或以上版本。
> C 编译器:因依赖 uWSGI部署环境需包含 C 编译器。
可以使用以下的命令在 Ubuntu Linux 上安装 Python 3.10 环境
可以使用以下的命令在 Ubuntu Linux 上安装 Python 3.10 环境,如果您的系统环境中已经有 Python 3.10,请跳过本节,直接查看[获取安装包](#获取安装包) 部分。
#### 安装 Python
@ -119,11 +119,12 @@ cd TDengine-TDgpt-<version>
为了避免影响系统已有的 Python 环境Anode 使用虚拟环境运行。安装 Anode 会在目录 `/var/lib/taos/taosanode/venv/` 中创建默认的 Python 虚拟环境Anode 运行所需要的库均安装在该目录下。为了避免反复安装虚拟环境带来的开销,卸载命令 `rmtaosanode` 并不会自动删除该虚拟环境,如果您确认不再需要 Python 的虚拟环境,手动删除该目录即可。
### 激活使用虚拟环境
### 激活虚拟环境
为了避免安装操作系统的Python 环境, TDgpt 安装过程中会自动创建一个虚拟环境,该虚拟环境默认创建的路径在 `/var/lib/taos/taosanode/venv/`。创建完成该虚拟环境,该虚拟环境通过 PiPy 安装了支持 TDgpt 运行所必须的 Python 依赖库。
该虚拟环境不会被卸载脚本 `rmtaosanode` 删除,当您确认不再需要该虚拟环境的时候,需要手动删除该虚拟环境。
后续如果您需要开发自己的算法模型,并能够 TDgpt 正确调用,需要将新的依赖库通过虚拟环境的 Pip 正确地安装。
### 卸载
卸载 TDgpt执行 `rmtaosanode` 即可。 安装过程中自动安装的虚拟环境不会被自动删除,用户确认不再需要的时候,需要手动删除该虚拟环境。
卸载 TDgpt执行 `rmtaosanode` 即可。
> 安装过程中自动安装的虚拟环境不会被自动删除,用户确认不再需要的时候,需要手动删除该虚拟环境。

View File

@ -1,6 +1,6 @@
---
title: 预测算法
description: 预测算法
title: 预测分析
description: 预测分析
---
import fc_result from '../pic/fc-result.png';

View File

@ -1,6 +1,6 @@
---
title: 异常检测算法
description: 异常检测算法
title: 异常检测
description: 异常检测
---
import ad from '../pic/anomaly-detection.png';

View File

@ -3,29 +3,160 @@ title: "添加机器学习模型"
sidebar_label: "添加机器学习模型"
---
## 添加具有模型的分析算法
机器/深度学习模型一般要求训练集样本量足够大才能取得不错的预测效果。训练过程要消耗一定量的时间和计算资源并需要根据输入的数据进行定期的训练以及更新模型。TDgpt 内置了 Torch 和 Keras 机器学习库。所有使用 Torch 或 Keras 开发的模型均可以驱动运行。
基于统计学的分析算法可以直接针对输入时间序列数据进行分析,但是某些深度学习算法对于输入数据需要较长的时间训练,并且生成相应的模型。这种情况下,同一个分析算法对应不同的输入数据集有不同的分析模型。
将具有模型的分析算法添加到 Anode 中,首先需要在 `model` 目录中建立该算法对应的目录(目录名称可自拟),将采用该算法针对不同的输入时间序列数据生成的训练模型均需要保存在该目录下,同时目录名称要在分析算法中确定,以便能够固定加载该目录下的分析模型。为了确保模型能够正常读取加载,存储的模型使用`joblib`库进行序列化保存。
本章介绍将预训练完成的机器/深度学习分析模型添加到 TDgpt 中的方法。
下面以自编码器Autoencoder为例说明如何添加要预先训练的模型进行异常检测。
首先我们在 `model `目录中创建一个目录 -- `ad_autoencoder` (见上图目录结构),该目录将用来保存所有使用自编码器训练的模型。然后,我们使用自编码器对 foo 表的时间序列数据进行训练,得到模型 针对 foo 表的模型,我们将其命名为 `ad_autoencoder_foo`,使用 `joblib`序列化该模型以后保存在 `ad_autoencoder` 目录中。如下图所示ad_autoencoder_foo 由两个文件构成,分别是模型文件 (ad_autoencoder_foo.dat) 和模型文件描述文件 (ad_autoencoder_foo.info)。
## 准备模型
推荐将模型保存在默认的保存目录(`/usr/local/taos/taosanode/model/`)中,也可以在目录中建立下一级目录,用以保存模型。下面使用 Keras 开发的基于自编码器(auto encoder) 的异常检测模型添加到 TDgpt 为例讲解整个流程。
该模型在 TDgpt 系统中名称为 'sample_ad_model'。
训练该模型的代码见:[https://github.com/taosdata/TDengine/tree/main/tools/tdgpt/taosanalytics/misc/training_ad_model.py](https://github.com/taosdata/TDengine/tree/main/tools/tdgpt/taosanalytics/misc/training_ad_model.py)
该模型训练使用了 NAB 的[art_daily_small_noise 数据集](https://raw.githubusercontent.com/numenta/NAB/master/data/artificialNoAnomaly/art_daily_small_noise.csv)。
训练完成得到的模型保存成为了两个文件,点击[此处](https://github.com/taosdata/TDengine/blob/main/tools/tdgpt/model/sample-ad-autoencoder/)下载该模型文件,模型文件说明如下:
```bash
sample-ad-autoencoder.keras 模型文件,默认 keras 模型文件格式
sample-ad-autoencoder.info 模型附加参数文件,采用了 joblib 格式保存
```
## 保存在合适位置
然后在 `/usr/local/taos/taosanode/model` 文件目录下建立子目录 `sample-ad-autoencoder`, 用以保存下载两个模型文件。此时 `model` 文件夹结构如下:
```bash
.
└── model
└── ad_autoencoder
├── ad_autoencoder_foo.dat
└── ad_autoencoder_foo.info
└── sample-ad-autoencoder
├── sample-ad-autoencoder.keras
└── sample-ad-autoencoder.info
```
接下来说明如何使用 SQL 调用该模型。
通过设置参数 `algo=ad_encoder` 告诉分析平台要调用自编码器算法训练的模型(自编码器算法在可用算法列表中),因此直接指定即可。此外还需要指定自编码器针对某数据集训练的确定的模型,此时我们需要使用已经保存的模型 `ad_autoencoder_foo` ,因此需要添加参数 `model=ad_autoencoder_foo` 以便能够调用该模型。
## 添加模型适配代码
下面需要在 taosanalytics 目录下添加加载该模型并进行适配的 Python 代码,即可运行该模型。适配并行运行的代码见[https://github.com/taosdata/TDengine/blob/main/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py](https://github.com/taosdata/TDengine/blob/main/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py)。
为了便于方便,我们已经将该文件保存在该目录,所以您在执行 `show anodes full` 命令时候,能够看见该算法模型。
下面详细说明该代码的逻辑。
```python
class _AutoEncoderDetectionService(AbstractAnomalyDetectionService):
name = 'sample_ad_model' # 通过 show 命令看到的算法的名称,在此处定义
desc = "sample anomaly detection model based on auto encoder"
def __init__(self):
super().__init__()
self.table_name = None
self.mean = None
self.std = None
self.threshold = None
self.time_interval = None
self.model = None
# 模型文件保存的文件夹名称,如果您更改了文件夹名称,在此处需要同步修改,以确保代码可以正确加载模型文件
self.dir = 'sample-ad-autoencoder'
self.root_path = conf.get_model_directory()
self.root_path = self.root_path + f'/{self.dir}/'
# 检查代码中指定的模型文件路径是否存在
if not os.path.exists(self.root_path):
app_logger.log_inst.error(
"%s ad algorithm failed to locate default module directory:"
"%s, not active", self.__class__.__name__, self.root_path)
else:
app_logger.log_inst.info("%s ad algorithm root path is: %s", self.__class__.__name__,
self.root_path)
def execute(self):
"""异常检测主体执行函数"""
if self.input_is_empty():
return []
if self.model is None:
raise FileNotFoundError("not load autoencoder model yet, or load model failed")
# 初始化输入进行异常检测的数据
array_2d = np.reshape(self.list, (len(self.list), 1))
df = pd.DataFrame(array_2d)
# 使用 z-score 进行归一化处理
normalized_list = (df - self.mean.value) / self.std.value
seq = create_sequences(normalized_list.values, self.time_interval)
# 进行模型推理
pred_list = self.model.predict(seq)
# 计算 MAE 损失值
mae_loss = np.mean(np.abs(pred_list - seq), axis=1)
mae = mae_loss.reshape((-1))
# 大于阈值的设置为异常点
anomalies = mae > self.threshold
# data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
ad_indices = []
for data_idx in range(self.time_interval - 1,
len(normalized_list) - self.time_interval + 1):
if np.all(anomalies[data_idx - self.time_interval + 1: data_idx]):
ad_indices.append(data_idx)
# 变换结果,符合输出的约定,支持分析完成
return [-1 if i in ad_indices else 1 for i in range(len(self.list))]
def set_params(self, params):
"""在此函数中进行模型的加载操作,然后在 executor 中调用"""
if "model" not in params:
raise ValueError("model needs to be specified")
name = params['model']
# 拼装模型文件的全路径,此处有两个文件,一个模型主体文件,一个信息文件
module_file_path = f'{self.root_path}/{name}.keras'
module_info_path = f'{self.root_path}/{name}.info'
app_logger.log_inst.info("try to load module:%s", module_file_path)
# 因为保存成为 keras 格式,所以调用 keras API 加载模型文件
if os.path.exists(module_file_path):
self.model = keras.models.load_model(module_file_path)
else:
app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)
raise FileNotFoundError(f"{module_file_path} not found")
# 加载辅助信息文件
if os.path.exists(module_info_path):
info = joblib.load(module_info_path)
else:
app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)
raise FileNotFoundError("%s not found", module_info_path)
# 初始化模型推理的辅助信息到对象中
if info is not None:
self.mean = info["mean"]
self.std = info["std"]
self.threshold = info["threshold"]
self.time_interval = info["timesteps"]
app_logger.log_inst.info(
"load ac module success, mean: %f, std: %f, threshold: %f, time_interval: %d",
self.mean[0], self.std[0], self.threshold, self.time_interval
)
else:
app_logger.log_inst.error("failed to load %s model", name)
raise RuntimeError(f"failed to load model {name}")
```
## 使用 SQL 调用模型
该模型由于已经预置在系统中,所以您通过 `show anodes full` 能够直接看到。一个新的算法适配完成以后,需要重新启动 taosanode 并执行命令 `update all anodes` 更新 mnode 的。
通过设置参数 `algo=sample_ad_model` 告诉 TDgpt 要调用自编码器算法训练的模型(该算法模型在可用算法列表中),因此直接指定即可。此外还需要指定自编码器针对某数据集训练的确定的模型,此时我们需要使用已经保存的模型 `sample-ad-autoencoder` ,因此需要添加参数 `model=sample-ad-autoencoder` 以便能够调用该模型。
```SQL
--- 在 options 中增加 model 的名称ad_autoencoder_foo 针对 foo 数据集(表)训练的采用自编码器的异常检测模型进行异常检测
SELECT COUNT(*), _WSTART
FROM foo
ANOMALY_WINDOW(col1, 'algo=ad_encoder, model=ad_autoencoder_foo');
SELECT _wstart, count(*)
FROM ad_sample anomaly_window(val, 'algo=sample_ad_model,model=sample-ad-autoencoder');
```

View File

@ -3,6 +3,8 @@ title: "部署 Time-MoE 模型"
sidebar_label: "部署 Time-MoE 模型"
---
本章介绍如何本地部署 [Time-MoE] (https://github.com/Time-MoE/Time-MoE) 时序基础模型并与 TDgpt 适配完成后,提供时序数据预测服务。
# 准备环境
为了使用时间序列基础模型,需要在本地部署环境支持其运行。首先需要准备 Python 环境。使用 PiPy 安装必要的依赖包:
@ -13,17 +15,18 @@ pip install flask==3.0.3
pip install transformers==4.40.0
pip install accelerate
```
您可以使用安装 TDgpt 过程中自动创建的虚拟环境,也可以创建一个独立虚拟环境,使用该虚拟环境之前,确保安装了上述的依赖包。
# 设置服务端口和 URL 地址
# 设置服务端口和地址
TDgpt 安装根目录下的 `./lib/taosanalytics/time-moe.py` 文件负责 Time-MoE 模型的部署和服务,修改该问题设置合适的服务 URL 和服务端口即可
TDgpt 安装根目录下的 `./lib/taosanalytics/time-moe.py` 文件负责 Time-MoE 模型的部署和服务,修改文件设置合适的服务 URL
```Python
@app.route('/ds_predict', methods=['POST'])
def time_moe():
...
```
修改 `ds_predict` 为需要开启的 URL 服务地址,或者使用默认路径即可。
修改 `ds_predict` 为需要开启的 URL 服务地址,或者使用默认即可。
```Python
app.run(
@ -39,10 +42,10 @@ def time_moe():
# 启动部署 Python 脚本
```shell
nohup Python time-moe.py > custom_output.out 2>&1 &
nohup python time-moe.py > service_output.out 2>&1 &
```
第一次启动脚本会从 huggingface 自动加载[2亿参数时序数据基础模型](https://huggingface.co/Maple728/TimeMoE-200M)。该模型是 Time-MoE 200M参数版本如果您需要部署参数规模更小的版本请将 `time-moe.py` 文件中 `'Maple728/TimeMoE-200M'` 修改为 `Maple728/TimeMoE-50M`,此时将加载 [0.5亿参数版本模型](https://huggingface.co/Maple728/TimeMoE-50M)。
第一次启动脚本会从 huggingface 自动加载[2亿参数模型](https://huggingface.co/Maple728/TimeMoE-200M)。该模型是 Time-MoE 200M参数版本如果您需要部署参数规模更小的版本请将 `time-moe.py` 文件中 `'Maple728/TimeMoE-200M'` 修改为 `Maple728/TimeMoE-50M`,此时将加载 [0.5亿参数模型](https://huggingface.co/Maple728/TimeMoE-50M)。
如果加载失败,请尝试执行如下命令切换为国内镜像下载模型。
@ -52,10 +55,10 @@ export HF_ENDPOINT=https://hf-mirror.com
再执行脚本:
```shell
nohup Python time-moe.py > custom_output.out 2>&1 &
nohup python time-moe.py > service_output.out 2>&1 &
```
显示如下,则说明加载成功
检查 `service_output.out` 文件,有如下输出,则说明加载成功
```shell
Running on all addresses (0.0.0.0)
Running on http://127.0.0.1:5001
@ -68,7 +71,8 @@ Running on http://127.0.0.1:5001
```shell
curl 127.0.0.1:5001/ds_predict
```
如果看到如下返回表明服务正常
如果看到如下返回信息表明服务正常,自此部署 Time-MoE 完成。
```html
<!doctype html>
@ -77,3 +81,7 @@ curl 127.0.0.1:5001/ds_predict
<h1>Method Not Allowed</h1>
<p>The method is not allowed for the requested URL.</p>
```
# 参考文献
- Time-MoE: Billion-Scale Time Series Foundation Models with Mixture of Experts. [[paper](https://arxiv.org/abs/2409.16040)] [[GitHub Repo](https://github.com/Time-MoE/Time-MoE)]

View File

@ -3,7 +3,9 @@ sidebar_label: 添加时序基础模型
title: 添加时序基础模型
---
本章主要介绍如何通过 TDgpt 使用开源时序基础模型包括Time-MoE 等的时序功能。
本章主要介绍部署并使用开源时序基础模型Time Series Fundation Model, TSFM由众多研究机构及企业开源时序基础模型极大地简化了时序数据分析的复杂程度在数据分析算法、机器学习和传统深度学习模型之外提供了一个时间序列数据高级分析的新选择。
本章节将详细介绍如何部署使用 Time-MoE 等的时序基础模型,并通过 SQL 语句直接调用其时序数据分析能力。
```mdx-code-block

View File

@ -2,7 +2,7 @@
title: "开发者指南"
sidebar_label: "开发者指南"
---
TDgpt 是一个可扩展的时序数据高级分析平台,用户遵循简易的步骤就能将自己开发的分析算法添加到分析平台, 各种应用就可以通过SQL语句直接调用, 让高级分析算法的使用门槛降到几乎为零。目前 TDpgt 平台只支持使用 Python 语言开发的分析算法。
TDgpt 是一个可扩展的时序数据高级分析智能体,用户遵循简易的步骤就能将自己开发的分析算法添加到分析平台, 各种应用就可以通过SQL语句直接调用, 让高级分析算法的使用门槛降到几乎为零。目前 TDpgt 平台只支持使用 Python 语言开发的分析算法。
Anode 采用类动态加载模式,在启动的时候扫描特定目录内满足约定条件的所有代码文件,并将其加载到系统中。因此,开发者只需要遵循以下几步就能完成新算法的添加工作:
1. 开发完成符合要求的分析算法类
2. 将代码文件放入对应目录,然后重启 Anode

View File

@ -0,0 +1,120 @@
# encoding:utf-8
"""train the model for anomaly detection"""
import joblib
import keras
import numpy as np
import pandas as pd
from keras.api import layers
from matplotlib import pyplot as plt
from taosanalytics.util import create_sequences
def get_training_data():
""" load the remote training data """
url_str = ("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialNoAnomaly/"
"art_daily_small_noise.csv")
df_small_noise = pd.read_csv(url_str, parse_dates=True, index_col="timestamp")
return df_small_noise
def do_train_model():
""" do train the model by using input data """
df_small_noise = get_training_data()
time_steps = 288
training_mean = df_small_noise.mean()
training_std = df_small_noise.std()
info = {
"mean": training_mean,
"std": training_std,
"timesteps": time_steps,
}
df_training_value = (df_small_noise - training_mean) / training_std
print("Number of training samples:", len(df_training_value))
x_train = create_sequences(df_training_value.values, time_steps)
print("Training input shape: ", x_train.shape)
model = keras.Sequential(
[
layers.Input(shape=(x_train.shape[1], x_train.shape[2])),
layers.Conv1D(
filters=32,
kernel_size=7,
padding="same",
strides=2,
activation="relu",
),
layers.Dropout(rate=0.2),
layers.Conv1D(
filters=16,
kernel_size=7,
padding="same",
strides=2,
activation="relu",
),
layers.Conv1DTranspose(
filters=16,
kernel_size=7,
padding="same",
strides=2,
activation="relu",
),
layers.Dropout(rate=0.2),
layers.Conv1DTranspose(
filters=32,
kernel_size=7,
padding="same",
strides=2,
activation="relu",
),
layers.Conv1DTranspose(filters=1, kernel_size=7, padding="same"),
]
)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss="mse")
model.summary()
history = model.fit(
x_train,
x_train,
epochs=50,
batch_size=128,
validation_split=0.1,
callbacks=[
keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min")
],
)
print(history)
# Get train MAE loss.
x_train_pred = model.predict(x_train)
train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis=1)
plt.hist(train_mae_loss, bins=50)
plt.xlabel("Train MAE loss")
plt.ylabel("No of samples")
plt.show()
# Get reconstruction loss threshold.
threshold = np.max(train_mae_loss)
print("Reconstruction error threshold: ", threshold)
model.save('../../model/sample-ad-autoencoder/sample-ad-autoencoder.keras')
info["threshold"] = threshold
joblib.dump(info, '../../model/sample-ad-autoencoder/sample-ad-autoencoder.info')
plt.plot(x_train[0])
plt.plot(x_train_pred[0])
plt.show()
print("save model successfully")
if __name__ == '__main__':
do_train_model()