homework-jianmu/docs/zh/06-advanced/06-TDgpt/07-dev/02-algo/02-forecast.md

4.2 KiB
Raw Blame History

title sidebar_label
预测算法 预测算法

输入约定

execute 是预测算法处理的核心方法。框架调用该方法之前,在对象属性参数 self.list 中已经设置完毕用于预测的历史时间序列数据。

输出约定及父类属性说明

execute 方法执行完成后的返回一个如下字典对象, 预测返回结果如下:

return {
    "mse": mse, # 预测算法的拟合数据最小均方误差(minimum squared error)
    "res": res  # 结果数组 [时间戳数组, 预测结果数组, 预测结果执行区间下界数组,预测结果执行区间上界数组]
}

预测算法的父类 AbstractForecastService 包含的对象属性如下:

属性名称 说明 默认值
period 输入时间序列的周期性,多少个数据点表示一个完整的周期。如果没有周期性,设置为 0 即可 0
start_ts 预测结果的开始时间 0
time_step 预测结果的两个数据点之间时间间隔 0
fc_rows 预测结果的数量 0
return_conf 预测结果中是否包含置信区间范围,如果不包含置信区间,那么上界和下界与自身相同 1
conf 置信区间分位数 95

示例代码

下面我们开发一个示例预测算法,对于任何输入的时间序列数据,固定返回值 1 作为预测结果。

import numpy as np
from taosanalytics.service import AbstractForecastService


# 算法实现类名称 需要以下划线 "_" 开始,并以 Service 结束
class _MyForecastService(AbstractForecastService):
    """ 定义类,从 AbstractForecastService 继承并实现其定义的抽象方法 execute  """

    # 定义算法调用关键词全小写ASCII码
    name = 'myfc'

    # 该算法的描述信息(建议添加)
    desc = """return the forecast time series data"""

    def __init__(self):
        """类初始化方法"""
        super().__init__()

    def execute(self):
        """ 算法逻辑的核心实现"""
        res = []

        """这个预测算法固定返回 1 作为预测值,预测值的数量是用户通过 self.fc_rows 指定"""
        ts_list = [self.start_ts + i * self.time_step for i in range(self.fc_rows)]
        res.append(ts_list)  # 设置预测结果时间戳列

        """生成全部为 1 的预测结果 """
        res_list = [1] * self.fc_rows
        res.append(res_list)

        """检查用户输入,是否要求返回预测置信区间上下界"""
        if self.return_conf:
            """对于没有计算预测置信区间上下界的算法,直接返回预测值作为上下界即可"""
            bound_list = [1] * self.fc_rows
            res.append(bound_list)  # 预测结果置信区间下界
            res.append(bound_list)  # 预测结果执行区间上界

        """返回结果"""
        return {"res": res, "mse": 0}

    def set_params(self, params):
        """该算法无需任何输入参数,直接调用父类函数,不处理算法参数设置逻辑"""
        return super().set_params(params)

将该文件保存在 ./lib/taosanalytics/algo/fc/ 目录下,然后重启 taosanode 服务。在 TDengine 命令行接口中执行 SHOW ANODES FULL 能够看到新加入的算法。应用就可以通过 SQL 语句调用该预测算法。

--- 对 col 列进行异常检测,通过指定 algo 参数为 myfc 来调用新添加的预测类
SELECT  _flow, _fhigh, _frowts, FORECAST(col_name, "algo=myfc")
FROM foo;

如果是第一次启动该 Anode, 请按照 TDgpt 安装部署 里的步骤先将该 Anode 添加到 TDengine 系统中。

单元测试

在测试目录taosanalytics/test中的 forecast_test.py 中增加单元测试用例或添加新的测试文件。单元测试依赖 Python Unit test 包。

def test_myfc(self):
    """ 测试 myfc 类 """
    s = loader.get_service("myfc")

    # 设置用于预测分析的数据
    s.set_input_list(self.get_input_list(), None)
    # 检查预测结果应该全部为 1
    r = s.set_params(
        {"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "start_p": 0}
    )
    r = s.execute()

    expected_list = [1] * 10
    self.assertEqlist(r["res"][0], expected_list)