From c25981cb0d58349556a02f882a302497086a6895 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Mar 2025 09:27:04 +0800 Subject: [PATCH] feat(gpt): support lstm and do some internal refactor, add sample autoencoder model. --- .../taosanalytics/algo/ad/autoencoder.py | 16 ++-- tools/tdgpt/taosanalytics/algo/fc/lstm.py | 81 +++++++++++++++++++ .../tdgpt/taosanalytics/test/anomaly_test.py | 34 ++++---- 3 files changed, 103 insertions(+), 28 deletions(-) create mode 100644 tools/tdgpt/taosanalytics/algo/fc/lstm.py diff --git a/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py b/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py index e58db3f54b..89813656fc 100644 --- a/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py +++ b/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py @@ -4,6 +4,7 @@ import os.path import joblib +import keras import numpy as np import pandas as pd @@ -13,8 +14,8 @@ from taosanalytics.util import create_sequences class _AutoEncoderDetectionService(AbstractAnomalyDetectionService): - name = 'ad_encoder' - desc = "anomaly detection based on auto encoder" + name = 'sample_ad_model' + desc = "sample anomaly detection model based on auto encoder" def __init__(self): super().__init__() @@ -25,7 +26,7 @@ class _AutoEncoderDetectionService(AbstractAnomalyDetectionService): self.threshold = None self.time_interval = None self.model = None - self.dir = 'ad_autoencoder' + self.dir = 'sample-ad-autoencoder' self.root_path = conf.get_model_directory() @@ -61,11 +62,6 @@ class _AutoEncoderDetectionService(AbstractAnomalyDetectionService): # Detect all the samples which are anomalies. anomalies = mae > self.threshold - # syslogger.log_inst( - # "Number of anomaly samples: %f, Indices of anomaly samples:{}". - # format(np.sum(anomalies), np.where(anomalies)) - # ) - # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies ad_indices = [] for data_idx in range(self.time_interval - 1, @@ -82,13 +78,13 @@ class _AutoEncoderDetectionService(AbstractAnomalyDetectionService): name = params['model'] - module_file_path = f'{self.root_path}/{name}.dat' + module_file_path = f'{self.root_path}/{name}.keras' module_info_path = f'{self.root_path}/{name}.info' app_logger.log_inst.info("try to load module:%s", module_file_path) if os.path.exists(module_file_path): - self.model = joblib.load(module_file_path) + self.model = keras.models.load_model(module_file_path) else: app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path) raise FileNotFoundError(f"{module_file_path} not found") diff --git a/tools/tdgpt/taosanalytics/algo/fc/lstm.py b/tools/tdgpt/taosanalytics/algo/fc/lstm.py new file mode 100644 index 0000000000..5edae7fc9f --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/fc/lstm.py @@ -0,0 +1,81 @@ +# encoding:utf-8 +# pylint: disable=c0103 +""" auto encoder algorithms to detect anomaly for time series data""" +import os.path + +import keras + +from taosanalytics.algo.forecast import insert_ts_list +from taosanalytics.conf import app_logger, conf +from taosanalytics.service import AbstractForecastService + + +class _LSTMService(AbstractForecastService): + name = 'sample_forecast_model' + desc = "sample forecast model based on LSTM" + + def __init__(self): + super().__init__() + + self.table_name = None + self.mean = None + self.std = None + self.threshold = None + self.time_interval = None + self.model = None + self.dir = 'sample-fc-lstm' + + self.root_path = conf.get_model_directory() + + self.root_path = self.root_path + f'/{self.dir}/' + + if not os.path.exists(self.root_path): + app_logger.log_inst.error( + "%s ad algorithm failed to locate default module directory:" + "%s, not active", self.__class__.__name__, self.root_path) + else: + app_logger.log_inst.info("%s ad algorithm root path is: %s", self.__class__.__name__, + self.root_path) + + def execute(self): + if self.input_is_empty(): + return [] + + if self.model is None: + raise FileNotFoundError("not load autoencoder model yet, or load model failed") + + res = self.model.predict(self.list) + + insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) + + if self.return_conf: + res1 = [res.tolist(), res.tolist(), res.tolist()], None + else: + res1 = [res.tolist()], None + + # add the conf range if required + return { + "mse": None, + "res": res1 + } + + def set_params(self, params): + + if "model" not in params: + raise ValueError("model needs to be specified") + + name = params['model'] + + module_file_path = f'{self.root_path}/{name}.keras' + # module_info_path = f'{self.root_path}/{name}.info' + + app_logger.log_inst.info("try to load module:%s", module_file_path) + + if os.path.exists(module_file_path): + self.model = keras.models.load_model(module_file_path) + else: + app_logger.log_inst.error("failed to load LSTM model file: %s", module_file_path) + raise FileNotFoundError(f"{module_file_path} not found") + + def get_params(self): + return {"dir": self.dir + '/*'} diff --git a/tools/tdgpt/taosanalytics/test/anomaly_test.py b/tools/tdgpt/taosanalytics/test/anomaly_test.py index f44a7f0d52..5333b53fa8 100644 --- a/tools/tdgpt/taosanalytics/test/anomaly_test.py +++ b/tools/tdgpt/taosanalytics/test/anomaly_test.py @@ -141,25 +141,23 @@ class AnomalyDetectionTest(unittest.TestCase): def test_autoencoder_ad(self): """for local test only, disabled it in github action""" - pass + data = self.__load_remote_data_for_ad() - # data = self.__load_remote_data_for_ad() - # - # s = loader.get_service("ad_encoder") - # s.set_input_list(data) - # - # try: - # s.set_params({"model": "ad_encoder_"}) - # except ValueError as e: - # app_logger.log_inst.error(f"failed to set the param for auto_encoder algorithm, reason:{e}") - # return - # - # r = s.execute() - # - # num_of_error = -(sum(filter(lambda x: x == -1, r))) - # self.assertEqual(num_of_error, 109) - # - # draw_ad_results(data, r, "autoencoder") + s = loader.get_service("sample_ad_model") + s.set_input_list(data) + + try: + s.set_params({"model": "sample-ad-autoencoder"}) + except ValueError as e: + app_logger.log_inst.error(f"failed to set the param for auto_encoder algorithm, reason:{e}") + return + + r = s.execute() + + num_of_error = -(sum(filter(lambda x: x == -1, r))) + draw_ad_results(data, r, "autoencoder") + + self.assertEqual(num_of_error, 109) def test_get_all_services(self): """Test get all services"""