Merge pull request #30041 from taosdata/fix/mergegpt

chore(analytics): add tdgpt into TDengine repo.
This commit is contained in:
Pan Wei 2025-03-07 17:41:46 +08:00 committed by GitHub
commit 7a30f31c9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 3311 additions and 0 deletions

135
tools/tdgpt/README.md Normal file
View File

@ -0,0 +1,135 @@
# Table of Contents
1. [Introduction](#1-introduction)
1. [Documentation](#2-documentation)
1. [Prerequisites](#3-prerequisites)
1. [Building](#4-building)
1. [Packaging](#5-packaging)
1. [Installation](#6-installing)
1. [Running](#7-running)
1. [Testing](#8-testing)
1. [Releasing](#9-releasing)
1. [CI/CD](#10-cicd)
1. [Coverage](#11-coverage)
1. [Contributing](#12-contributing)
# 1. Introduction
tdanalytics: an analytic platform for tdengine
# 2. Documentation
For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com/next) ([TDengine 文档](https://docs.taosdata.com/next)).
# 3. Prerequisites
List the software and tools required to work on the project.
- python 3.10.12+ (for test)
Step-by-step instructions to set up the prerequisites software.
## 3.1 Install Python3.10
Make sure Python3.10 or above is available before installing anode in your system.
In case of Ubuntu, use the following instructions to install Python 3.10.
```
sudo apt-get install software-properties-common
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
sudo apt install python3.10
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10 2
sudo update-alternatives --config python3
sudo apt install python3.10-venv
sudo apt install python3.10-dev
```
Install the Pip3.10
```bash
curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10
```
Add the ``~/.local/bin`` into ``~/.bashrc`` or ``~/.bash_profile``
```bash
export PATH=$PATH:~/.local/bin
```
# 4. Building
There is no need to build the taosanode, since it is implemented in Python, which is an interpreted language.
# 5. Packaging
In the base directory, you can use the following command to package to build an tarball.
```bash
cd script && ./release.sh
```
After the packaging is completed, you will find the tarball in the `release` directory.
```bash
ls -lht /root/tdanalytics/release
-rw-rw-r-- 1 root root 74K Feb 21 17:04 TDengine-enterprise-anode-1.0.1.tar.gz
```
# 6. Installing
## 6.1 Install taosanode
Please use the following command to install taosanode in your system.
```bash
./install.sh
```
During the installation, Python virtual environment will be established in `/var/lib/taos/taosanode/venv` by default, as well as the required libraries.
The taosanode will be installed as an system service, but will not automatic started when installed. You need to start the service mannually, by using the following command
```bash
systemctl start taosanoded
```
## 6.2 Configure the Service
taosanode provides the RESTFul service powered by `uWSGI`. You can config the options to tune the
performance by changing the default configuration file `taosanode.ini` located in `/etc/taos`, which is also the configuration directory for `taosd` service.
```ini
# taosanode service ip:port
http = 127.0.0.1:6090
```
# 7. Running
## 7.1 Start/Stop Service
`systemctl start/stop/restart taosanoded.service` will start/stop/restart the service of taosanode.
## 7.2 Uninstall
The command `rmtaosanode` will remove the installed taosanode from your system. Note that the python environment won't removed by this script, you need to remove it mannually.
# 8. Testing
we use github Actions to run the test suit. Please refer to the file [.github/workflows/python-package.yml](https://github.com/taosdata/tdanalytics/.github/workflows/python-package.yml) for more details.
# 9 Releasing
For the complete list of taosanode Releases, please see Releases.
# 10 CI/CD
We use github Actions for CI/CD workflow configuration. Please refer to the file .github/workflows/python-package.yml for more details.
# 11 Coverage
# 12 Contributing
Guidelines for contributing to the project:
- Fork the repository
- Create a feature branch
- Submit a pull request

81
tools/tdgpt/cfg/taosanode.ini Executable file
View File

@ -0,0 +1,81 @@
#uwsgi --ini taosanode.ini
#uwsgi --reload taosanode.pid
#uwsgi --stop taosanode.pid
[uwsgi]
# charset
env = LC_ALL = en_US.UTF-8
# ip:port
http = 127.0.0.1:6090
# the local unix socket file than communicate to Nginx
#socket = 127.0.0.1:8001
#socket-timeout = 10
# base directory
chdir = /usr/local/taos/taosanode/lib
# initialize python file
wsgi-file = /usr/local/taos/taosanode/lib/taosanalytics/app.py
# call module of uWSGI
callable = app
# auto remove unix Socket and pid file when stopping
vacuum = true
# socket exec model
#chmod-socket = 664
# uWSGI pid
uid = root
# uWSGI gid
gid = root
# main process
master = true
# the number of worker processes
processes = 2
# pid file
pidfile = /usr/local/taos/taosanode/taosanode.pid
# enable threads
enable-threads = true
# the number of threads for each process
threads = 4
# memory useage report
memory-report = true
# smooth restart
reload-mercy = 10
# conflict with systemctl, so do NOT uncomment this
# daemonize = /var/log/taos/taosanode/taosanode.log
# log directory
logto = /var/log/taos/taosanode/taosanode.log
# wWSGI monitor port
stats = 127.0.0.1:8387
# python virtual environment directory
virtualenv = /usr/local/taos/taosanode/venv/
[taosanode]
# default app log file
app-log = /var/log/taos/taosanode/taosanode.app.log
# model storage directory
model-dir = /usr/local/taos/taosanode/model/
# default log level
log-level = DEBUG
# draw the query results
draw-result = 1

View File

@ -0,0 +1,22 @@
[Unit]
Description=TaosANode Service
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
Environment=PATH=/usr/lib/taos/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
ExecStart=/usr/local/taos/taosanode/bin/start.sh
ExecStop=/usr/local/taos/taosanode/bin/stop.sh
TimeoutStartSec=0
TimeoutStopSec=120s
LimitNOFILE=1048576
LimitNPROC=infinity
LimitCORE=infinity
StandardOutput=null
Restart=always
StartLimitBurst=6
StartLimitInterval=60s
[Install]
WantedBy=multi-user.target

748
tools/tdgpt/script/install.sh Executable file
View File

@ -0,0 +1,748 @@
#!/bin/bash
#
# This file is used to install analysis platform on linux systems. The operating system
# is required to use systemd to manage services at boot
set -e
iplist=""
serverFqdn=""
# -----------------------Variables definition---------------------
script_dir=$(dirname $(readlink -f "$0"))
echo -e "${script_dir}"
# Dynamic directory
PREFIX="taos"
PRODUCTPREFIX="taosanode"
serverName="${PRODUCTPREFIX}d"
configFile="taosanode.ini"
productName="TDengine Anode"
emailName="taosdata.com"
tarName="package.tar.gz"
logDir="/var/log/${PREFIX}/${PRODUCTPREFIX}"
moduleDir="/var/lib/${PREFIX}/${PRODUCTPREFIX}/model"
venvDir="/var/lib/${PREFIX}/${PRODUCTPREFIX}/venv"
global_conf_dir="/etc/${PREFIX}"
installDir="/usr/local/${PREFIX}/${PRODUCTPREFIX}"
python_minor_ver=0 #check the python version
bin_link_dir="/usr/bin"
#install main path
install_main_dir=${installDir}
service_config_dir="/etc/systemd/system"
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
csudo=""
if command -v sudo >/dev/null; then
csudo="sudo "
fi
update_flag=0
prompt_force=0
initd_mod=0
service_mod=2
if ps aux | grep -v grep | grep systemd &>/dev/null; then
service_mod=0
elif $(which service &>/dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
if $(which chkconfig &>/dev/null); then
initd_mod=1
elif $(which insserv &>/dev/null); then
initd_mod=2
elif $(which update-rc.d &>/dev/null); then
initd_mod=3
else
service_mod=2
fi
else
service_mod=2
fi
# get the operating system type for using the corresponding init file
# ubuntu/debian(deb), centos/fedora(rpm), others: opensuse, redhat, ..., no verification
#osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
if [[ -e /etc/os-release ]]; then
osinfo=$(cat /etc/os-release | grep "NAME" | cut -d '"' -f2) || :
else
osinfo=""
fi
#echo "osinfo: ${osinfo}"
os_type=0
if echo $osinfo | grep -qwi "ubuntu"; then
# echo "This is ubuntu system"
os_type=1
elif echo $osinfo | grep -qwi "debian"; then
# echo "This is debian system"
os_type=1
elif echo $osinfo | grep -qwi "Kylin"; then
# echo "This is Kylin system"
os_type=1
elif echo $osinfo | grep -qwi "centos"; then
# echo "This is centos system"
os_type=2
elif echo $osinfo | grep -qwi "fedora"; then
# echo "This is fedora system"
os_type=2
elif echo $osinfo | grep -qwi "Linux"; then
# echo "This is Linux system"
os_type=1
service_mod=0
initd_mod=0
service_config_dir="/etc/systemd/system"
else
echo " osinfo: ${osinfo}"
echo " This is an officially unverified linux system,"
echo " if there are any problems with the installation and operation, "
echo " please feel free to contact ${emailName} for support."
os_type=1
fi
# ============================= get input parameters =================================================
# install.sh -v [server | client] -e [yes | no] -i [systemd | service | ...]
# set parameters by default value
interactiveFqdn=yes # [yes | no]
verType=server # [server | client]
initType=systemd # [systemd | service | ...]
while getopts "hv:e:" arg; do
case $arg in
e)
#echo "interactiveFqdn=$OPTARG"
interactiveFqdn=$(echo $OPTARG)
;;
v)
#echo "verType=$OPTARG"
verType=$(echo $OPTARG)
;;
h)
echo "Usage: $(basename $0) -v [server | client] -e [yes | no]"
exit 0
;;
?) #unknow option
echo "unknown argument"
exit 1
;;
esac
done
#echo "verType=${verType} interactiveFqdn=${interactiveFqdn}"
services=(${serverName})
function install_services() {
for service in "${services[@]}"; do
install_service ${service}
done
}
function kill_process() {
pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
if [ -n "$pid" ]; then
${csudo}kill -9 "$pid" || :
fi
}
function install_main_path() {
#create install main dir and all sub dir
if [ ! -z "${install_main_dir}" ]; then
${csudo}rm -rf ${install_main_dir} || :
fi
${csudo}mkdir -p ${install_main_dir}
${csudo}mkdir -p ${install_main_dir}/cfg
${csudo}mkdir -p ${install_main_dir}/bin
${csudo}mkdir -p ${install_main_dir}/lib
${csudo}mkdir -p ${global_conf_dir}
}
function install_bin_and_lib() {
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin
${csudo}cp -r ${script_dir}/lib/* ${install_main_dir}/lib/
if [[ ! -e "${bin_link_dir}/rmtaosanode" ]]; then
${csudo}ln -s ${install_main_dir}/bin/uninstall.sh ${bin_link_dir}/rmtaosanode
fi
}
function add_newHostname_to_hosts() {
localIp="127.0.0.1"
OLD_IFS="$IFS"
IFS=" "
iphost=$(cat /etc/hosts | grep $1 | awk '{print $1}')
arr=($iphost)
IFS="$OLD_IFS"
for s in "${arr[@]}"; do
if [[ "$s" == "$localIp" ]]; then
return
fi
done
if grep -q "127.0.0.1 $1" /etc/hosts; then
return
else
${csudo}chmod 666 /etc/hosts
${csudo}echo "127.0.0.1 $1" >>/etc/hosts
fi
}
function set_hostname() {
echo -e -n "${GREEN}Host name or IP (assigned to this machine) which can be accessed by your tools or apps (must not be 'localhost')${NC}"
read -e -p " : " -i "$(hostname)" newHostname
while true; do
if [ -z "$newHostname" ]; then
newHostname=$(hostname)
break
elif [ "$newHostname" != "localhost" ]; then
break
else
echo -e -n "${GREEN}Host name or IP (assigned to this machine) which can be accessed by your tools or apps (must not be 'localhost')${NC}"
read -e -p " : " -i "$(hostname)" newHostname
fi
done
if [ -f ${global_conf_dir}/${configFile} ]; then
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$newHostname/" ${global_conf_dir}/${configFile}
else
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$newHostname/" ${script_dir}/cfg/${configFile}
fi
serverFqdn=$newHostname
if [[ -e /etc/hosts ]] && [[ ! $newHostname =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
add_newHostname_to_hosts $newHostname
fi
}
function is_correct_ipaddr() {
newIp=$1
OLD_IFS="$IFS"
IFS=" "
arr=($iplist)
IFS="$OLD_IFS"
for s in "${arr[@]}"; do
if [[ "$s" == "$newIp" ]]; then
return 0
fi
done
return 1
}
function set_ipAsFqdn() {
iplist=$(ip address | grep inet | grep -v inet6 | grep -v 127.0.0.1 | awk '{print $2}' | awk -F "/" '{print $1}') || :
if [ -z "$iplist" ]; then
iplist=$(ifconfig | grep inet | grep -v inet6 | grep -v 127.0.0.1 | awk '{print $2}' | awk -F ":" '{print $2}') || :
fi
if [ -z "$iplist" ]; then
echo
echo -e -n "${GREEN}Unable to get local ip, use 127.0.0.1${NC}"
localFqdn="127.0.0.1"
# Write the local FQDN to configuration file
if [ -f ${global_conf_dir}/${configFile} ]; then
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${global_conf_dir}/${configFile}
else
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${script_dir}/cfg/${configFile}
fi
serverFqdn=$localFqdn
echo
return
fi
echo -e -n "${GREEN}Please choose an IP from local IP list${NC}:"
echo
echo -e -n "${GREEN}$iplist${NC}"
echo
echo
echo -e -n "${GREEN}Notes: if IP is used as the node name, data can NOT be migrated to other machine directly${NC}:"
read localFqdn
while true; do
if [ ! -z "$localFqdn" ]; then
# Check if correct ip address
is_correct_ipaddr $localFqdn
retval=$(echo $?)
if [[ $retval != 0 ]]; then
read -p "Please choose an IP from local IP list:" localFqdn
else
# Write the local FQDN to configuration file
if [ -f ${global_conf_dir}/${configFile} ]; then
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${global_conf_dir}/${configFile}
else
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${script_dir}/cfg/${configFile}
fi
serverFqdn=$localFqdn
break
fi
else
read -p "Please choose an IP from local IP list:" localFqdn
fi
done
}
function local_fqdn_check() {
#serverFqdn=$(hostname)
echo
echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}"
echo
set_hostname
}
function install_anode_config() {
fileName="${script_dir}/cfg/${configFile}"
echo -e $fileName
if [ -f ${fileName} ]; then
${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$serverFqdn/" ${script_dir}/cfg/${configFile}
if [ -f "${global_conf_dir}/${configFile}" ]; then
${csudo}cp ${fileName} ${global_conf_dir}/${configFile}.new
else
${csudo}cp ${fileName} ${global_conf_dir}/${configFile}
fi
fi
${csudo}ln -sf ${global_conf_dir}/${configFile} ${install_main_dir}/cfg
}
function install_config() {
[ ! -z $1 ] && return 0 || : # only install client
if ((${update_flag} == 1)); then
install_taosd_config
return 0
fi
if [ "$interactiveFqdn" == "no" ]; then
install_taosd_config
return 0
fi
local_fqdn_check
install_anode_config
echo
echo -e -n "${GREEN}Enter FQDN:port (like h1.${emailName}:6030) of an existing ${productName} cluster node to join${NC}"
echo
echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
read firstEp
while true; do
if [ ! -z "$firstEp" ]; then
if [ -f ${global_conf_dir}/${configFile} ]; then
${csudo}sed -i -r "s/#*\s*(firstEp\s*).*/\1$firstEp/" ${global_conf_dir}/${configFile}
else
${csudo}sed -i -r "s/#*\s*(firstEp\s*).*/\1$firstEp/" ${script_dir}/cfg/${configFile}
fi
break
else
break
fi
done
echo
echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
read emailAddr
while true; do
if [ ! -z "$emailAddr" ]; then
email_file="${install_main_dir}/email"
${csudo}bash -c "echo $emailAddr > ${email_file}"
break
else
break
fi
done
}
function install_log() {
${csudo}mkdir -p ${logDir} && ${csudo}chmod 777 ${logDir}
${csudo}ln -sf ${logDir} ${install_main_dir}/log
}
function install_module() {
${csudo}mkdir -p ${moduleDir} && ${csudo}chmod 777 ${moduleDir}
${csudo}ln -sf ${moduleDir} ${install_main_dir}/model
}
function install_anode_venv() {
${csudo}mkdir -p ${venvDir} && ${csudo}chmod 777 ${venvDir}
${csudo}ln -sf ${venvDir} ${install_main_dir}/venv
# build venv
${csudo}python3.${python_minor_ver} -m venv ${venvDir}
echo -e "active Python3 virtual env: ${venvDir}"
source ${venvDir}/bin/activate
echo -e "install the required packages by pip3, this may take a while depending on the network condition"
${csudo}${venvDir}/bin/pip3 install numpy==1.26.4
${csudo}${venvDir}/bin/pip3 install pandas==1.5.0
${csudo}${venvDir}/bin/pip3 install scikit-learn
${csudo}${venvDir}/bin/pip3 install outlier_utils
${csudo}${venvDir}/bin/pip3 install statsmodels
${csudo}${venvDir}/bin/pip3 install pyculiarity
${csudo}${venvDir}/bin/pip3 install pmdarima
${csudo}${venvDir}/bin/pip3 install flask
${csudo}${venvDir}/bin/pip3 install matplotlib
${csudo}${venvDir}/bin/pip3 install uwsgi
${csudo}${venvDir}/bin/pip3 install torch --index-url https://download.pytorch.org/whl/cpu
${csudo}${venvDir}/bin/pip3 install --upgrade keras
echo -e "Install python library for venv completed!"
}
function clean_service_on_sysvinit() {
if ps aux | grep -v grep | grep $1 &>/dev/null; then
${csudo}service $1 stop || :
fi
if ((${initd_mod} == 1)); then
if [ -e ${service_config_dir}/$1 ]; then
${csudo}chkconfig --del $1 || :
fi
elif ((${initd_mod} == 2)); then
if [ -e ${service_config_dir}/$1 ]; then
${csudo}insserv -r $1 || :
fi
elif ((${initd_mod} == 3)); then
if [ -e ${service_config_dir}/$1 ]; then
${csudo}update-rc.d -f $1 remove || :
fi
fi
${csudo}rm -f ${service_config_dir}/$1 || :
if $(which init &>/dev/null); then
${csudo}init q || :
fi
}
function install_service_on_sysvinit() {
if [ "$1" != "${serverName}" ]; then
return
fi
clean_service_on_sysvinit $1
sleep 1
if ((${os_type} == 1)); then
${csudo}cp ${script_dir}/init.d/${serverName}.deb ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName}
elif ((${os_type} == 2)); then
${csudo}cp ${script_dir}/init.d/${serverName}.rpm ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName}
fi
if ((${initd_mod} == 1)); then
${csudo}chkconfig --add $1 || :
${csudo}chkconfig --level 2345 $1 on || :
elif ((${initd_mod} == 2)); then
${csudo}insserv $1} || :
${csudo}insserv -d $1 || :
elif ((${initd_mod} == 3)); then
${csudo}update-rc.d $1 defaults || :
fi
}
function clean_service_on_systemd() {
service_config="${service_config_dir}/$1.service"
if systemctl is-active --quiet $1; then
echo "$1 is running, stopping it..."
${csudo}systemctl stop $1 &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable $1 &>/dev/null || echo &>/dev/null
${csudo}rm -f ${service_config}
}
function install_service_on_systemd() {
clean_service_on_systemd $1
cfg_source_dir=${script_dir}/cfg
if [[ "$1" == "${xname}" || "$1" == "${explorerName}" ]]; then
cfg_source_dir=${script_dir}/cfg
fi
if [ -f ${cfg_source_dir}/$1.service ]; then
${csudo}cp ${cfg_source_dir}/$1.service ${service_config_dir}/ || :
fi
${csudo}systemctl enable $1
${csudo}systemctl daemon-reload
}
function install_service() {
if ((${service_mod} == 0)); then
install_service_on_systemd $1
elif ((${service_mod} == 1)); then
install_service_on_sysvinit $1
else
kill_process $1
fi
}
vercomp() {
if [[ $1 == $2 ]]; then
return 0
fi
local IFS=.
local i ver1=($1) ver2=($2)
# fill empty fields in ver1 with zeros
for ((i = ${#ver1[@]}; i < ${#ver2[@]}; i++)); do
ver1[i]=0
done
for ((i = 0; i < ${#ver1[@]}; i++)); do
if [[ -z ${ver2[i]} ]]; then
# fill empty fields in ver2 with zeros
ver2[i]=0
fi
if ((10#${ver1[i]} > 10#${ver2[i]})); then
return 1
fi
if ((10#${ver1[i]} < 10#${ver2[i]})); then
return 2
fi
done
return 0
}
function is_version_compatible() {
curr_version=$(ls ${script_dir}/driver/libtaos.so* | awk -F 'libtaos.so.' '{print $2}')
if [ -f ${script_dir}/driver/vercomp.txt ]; then
min_compatible_version=$(cat ${script_dir}/driver/vercomp.txt)
else
min_compatible_version=$(${script_dir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 5)
fi
exist_version=$(${installDir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 3)
vercomp $exist_version "3.0.0.0"
case $? in
2)
prompt_force=1
;;
esac
vercomp $curr_version $min_compatible_version
echo "" # avoid $? value not update
case $? in
0) return 0 ;;
1) return 0 ;;
2) return 1 ;;
esac
}
deb_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Existing TDengine deb is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}dpkg --remove tdengine || :
break
elif [ "no" == "$confirm" ]; then
break
fi
done
}
rpm_erase() {
confirm=""
while [ "" == "${confirm}" ]; do
echo -e -n "${RED}Existing TDengine rpm is detected, do you want to remove it? [yes|no] ${NC}:"
read confirm
if [ "yes" == "$confirm" ]; then
${csudo}rpm -e tdengine || :
break
elif [ "no" == "$confirm" ]; then
break
fi
done
}
function updateProduct() {
# Check if version compatible
if ! is_version_compatible; then
echo -e "${RED}Version incompatible${NC}"
return 1
fi
# Start to update
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
if echo $osinfo | grep -qwi "centos"; then
rpm -q tdengine 2>&1 >/dev/null && rpm_erase tdengine || :
elif echo $osinfo | grep -qwi "ubuntu"; then
dpkg -l tdengine 2>&1 | grep ii >/dev/null && deb_erase tdengine || :
fi
tar -zxf ${tarName}
echo "Start to update ${productName}..."
# Stop the service if running
if ps aux | grep -v grep | grep ${serverName} &>/dev/null; then
if ((${service_mod} == 0)); then
${csudo}systemctl stop ${serverName} || :
elif ((${service_mod} == 1)); then
${csudo}service ${serverName} stop || :
else
kill_process ${serverName}
fi
sleep 1
fi
install_main_path
install_log
install_module
install_config
if [ -z $1 ]; then
install_bin
install_services
echo
echo -e "${GREEN_DARK}To configure ${productName} ${NC}\t\t: edit ${global_conf_dir}/${configFile}"
[ -f ${global_conf_dir}/${adapterName}.toml ] && [ -f ${installDir}/bin/${adapterName} ] &&
echo -e "${GREEN_DARK}To configure ${adapterName} ${NC}\t: edit ${global_conf_dir}/${adapterName}.toml"
echo -e "${GREEN_DARK}To configure ${explorerName} ${NC}\t: edit ${global_conf_dir}/explorer.toml"
if ((${service_mod} == 0)); then
echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ${csudo}systemctl start ${serverName}${NC}"
elif ((${service_mod} == 1)); then
echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ${csudo}service ${serverName} start${NC}"
else
echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ./${serverName}${NC}"
fi
echo
echo "${productName} is updated successfully!"
echo
else
install_bin
fi
cd $script_dir
rm -rf $(tar -tf ${tarName} | grep -Ev "^\./$|^\/")
}
function installProduct() {
# Start to install
if [ ! -e ${tarName} ]; then
echo "File ${tarName} does not exist"
exit 1
fi
tar -zxf ${tarName}
echo "Start to install ${productName}..."
install_main_path
install_log
install_anode_config
install_module
install_bin_and_lib
install_services
echo
echo -e "\033[44;32;1m${productName} is installed successfully!${NC}"
echo
echo -e "\033[44;32;1mStart to create virtual python env in ${venvDir}${NC}"
install_anode_venv
}
# check for python version, only the 3.10/3.11 is supported
check_python3_env() {
if ! command -v python3 &> /dev/null
then
echo -e "\033[31mWarning: Python3 command not found. Version 3.10/3.11 is required.\033[0m"
exit 1
fi
python3_version=$(python3 --version 2>&1 | awk -F' ' '{print $2}')
python3_version_ok=false
python_minor_ver=$(echo "$python3_version" | cut -d"." -f2)
if [[ $(echo "$python3_version" | cut -d"." -f1) -eq 3 && $(echo "$python3_version" | cut -d"." -f2) -ge 10 ]]; then
python3_version_ok=true
fi
if $python3_version_ok; then
echo -e "\033[32mPython3 ${python3_version} has been found.\033[0m"
else
if command -v python3.10 &> /dev/null
then
echo -e "\033[32mPython3.10 has been found.\033[0m"
python_minor_ver=10
elif command -v python3.11 &> /dev/null
then
python_minor_ver=11
echo -e "\033[32mPython3.11 has been found.\033[0m"
else
echo -e "\033[31mWarning: Python3.10/3.11 is required, only found python${python3_version}.\033[0m"
exit 1
fi
fi
# echo -e "Python3 minor version is:${python_minor_ver}"
# check the existence pip3.10/pip3.11
if ! command -v pip3 &> /dev/null
then
echo -e "\033[31mWarning: Pip3 command not found. Version 3.10/3.11 is required.\033[0m"
exit 1
fi
pip3_version=$(pip3 --version 2>&1 | awk -F' ' '{print $6}' | cut -d")" -f1)
major_ver=$(echo "${pip3_version}" | cut -d"." -f1)
minor_ver=$(echo "${pip3_version}" | cut -d"." -f2)
pip3_version_ok=false;
if [[ ${major_ver} -eq 3 && ${minor_ver} -ge 10 ]]; then
pip3_version_ok=true
fi
if $pip3_version_ok; then
echo -e "\033[32mpip3 ${pip3_version} has been found.\033[0m"
else
if command -v pip3.${python_minor_ver} &> /dev/null
then
echo -e "\033[32mpip3.${python_minor_ver} has been found.\033[0m"
else
echo -e "\033[31mWarning: pip3.10/3.11 is required, only found pip${pip3_version}.\033[0m"
exit 1
fi
fi
# if ! command -v python3.${python_minor_ver}-venv &> /dev/null
# then
# echo -e "\033[31mWarning: python3.${python_minor_ver}-venv command not found.\033[0m"
# exit 1
# fi
}
## ==============================Main program starts from here============================
serverFqdn=$(hostname)
if [ "$verType" == "server" ]; then
check_python3_env
installProduct
fi

100
tools/tdgpt/script/release.sh Executable file
View File

@ -0,0 +1,100 @@
#!/bin/bash
# Generate install package for all os system
set -e
# set -x
curr_dir=$(pwd)
compile_dir=$1
version="1.0.1"
osType=
pagMode=
productName="TDengine-enterprise-anode"
script_dir="$(dirname $(readlink -f $0))"
top_dir="$(readlink -f ${script_dir}/..)"
echo -e ${top_dir}
serverName="taosanoded"
configFile="taosanode.ini"
tarName="package.tar.gz"
# create compressed install file.
build_dir="${compile_dir}/build"
release_dir="${top_dir}/release"
#package_name='linux'
install_dir="${release_dir}/${productName}-${version}"
if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/${serverName}
fi
cfg_dir="${top_dir}/cfg"
install_files="${script_dir}/install.sh"
# make directories.
mkdir -p ${install_dir}
mkdir -p ${install_dir}/cfg && cp ${cfg_dir}/${configFile} ${install_dir}/cfg/${configFile}
if [ -f "${cfg_dir}/${serverName}.service" ]; then
cp ${cfg_dir}/${serverName}.service ${install_dir}/cfg || :
fi
# python files
mkdir -p ${install_dir}/bin && mkdir -p ${install_dir}/lib
# script to control start/stop/uninstall process
rm -r ${top_dir}/taosanalytics/*.pyc || :
cp -r ${top_dir}/taosanalytics/ ${install_dir}/lib/ && chmod a+x ${install_dir}/lib/ || :
cp -r ${top_dir}/script/st*.sh ${install_dir}/bin/ && chmod a+x ${install_dir}/bin/* || :
cp -r ${top_dir}/script/uninstall.sh ${install_dir}/bin/ && chmod a+x ${install_dir}/bin/* || :
cd ${install_dir}
#if [ "$osType" != "Darwin" ]; then
# tar -zcv -f ${tarName} ./bin/* || :
# rm -rf ${install_dir}/bin || :
#else
tar -zcv -f ${tarName} ./lib/* || :
if [ ! -z "${install_dir}" ]; then
# shellcheck disable=SC2115
rm -rf "${install_dir}"/lib || :
fi
exitcode=$?
if [ "$exitcode" != "0" ]; then
echo "tar ${tarName} error !!!"
exit $exitcode
fi
cd ${curr_dir}
cp ${install_files} ${install_dir}
chmod a+x ${install_dir}/install.sh
# Copy release note
# cp ${script_dir}/release_note ${install_dir}
# exit 1
cd ${release_dir}
pkg_name=${install_dir}
echo -e "pkg_name is: ${pkg_name}"
if [ "$osType" != "Darwin" ]; then
tar -zcv -f "$(basename ${pkg_name}).tar.gz" "$(basename ${install_dir})" --remove-files || :
else
tar -zcv -f "$(basename ${pkg_name}).tar.gz" "$(basename ${install_dir})" || :
rm -rf "${install_dir}" ||:
fi
exitcode=$?
if [ "$exitcode" != "0" ]; then
echo "tar ${pkg_name}.tar.gz error !!!"
exit $exitcode
fi
cd ${curr_dir}

4
tools/tdgpt/script/start.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
# start the flask service by using uwsgi
/usr/local/taos/taosanode/venv/bin/uwsgi /usr/local/taos/taosanode/cfg/taosanode.ini

4
tools/tdgpt/script/stop.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
# stop the uwsgi server
/usr/local/taos/taosanode/venv/bin/uwsgi --stop /usr/local/taos/taosanode/taosanode.pid

220
tools/tdgpt/script/uninstall.sh Executable file
View File

@ -0,0 +1,220 @@
#!/bin/bash
# uninstall the deployed app info, not remove the python virtual environment
set -e
#set -x
osType=`uname`
RED='\033[0;31m'
GREEN='\033[1;32m'
NC='\033[0m'
MAIN_NAME="taosanode"
installDir="/usr/local/taos/taosanode"
venv_dir="/usr/local/taos/taosanode/venv"
serverName="${MAIN_NAME}d"
uninstallName="rmtaosanode"
productName="TDengine Enterprise ANode"
if [ "$osType" != "Darwin" ]; then
bin_link_dir="/usr/bin"
else
bin_link_dir="/usr/local/bin"
fi
#install main path
bin_dir=${installDir}/bin
lib_dir=${installDir}/lib
local_log_dir=${installDir}/log
local_conf_dir=${installDir}/cfg
local_model_dir=${installDir}/model
global_log_dir="/var/log/taos/${MAIN_NAME}"
global_conf_dir="/etc/taos/"
service_config_dir="/etc/systemd/system"
services=(${serverName} ${uninstallName})
csudo=""
if command -v sudo >/dev/null; then
csudo="sudo "
fi
initd_mod=0
service_mod=2
if ps aux | grep -v grep | grep systemd &>/dev/null; then
service_mod=0
elif $(which service &>/dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
if $(which chkconfig &>/dev/null); then
initd_mod=1
elif $(which insserv &>/dev/null); then
initd_mod=2
elif $(which update-rc.d &>/dev/null); then
initd_mod=3
else
service_mod=2
fi
else
service_mod=2
fi
kill_service_of() {
_service=$1
pid=$(ps -ef | grep $_service | grep -v grep | awk '{print $2}')
if [ -n "$pid" ]; then
"${csudo}"${installDir}/bin/stop.sh:
fi
}
clean_service_on_systemd_of() {
_service=$1
_service_config="${service_config_dir}/${_service}.service"
if systemctl is-active --quiet ${_service}; then
echo "${_service} is running, stopping it..."
"${csudo}"systemctl stop ${_service} &>/dev/null || echo &>/dev/null
fi
"${csudo}"systemctl disable ${_service} &>/dev/null || echo &>/dev/null
if [[ ! -z "${_service_config}" && -f "${_service_config}" ]]; then
${csudo}rm ${_service_config}
fi
}
clean_service_on_sysvinit_of() {
_service=$1
if pidof ${_service} &>/dev/null; then
echo "${_service} is running, stopping it..."
"${csudo}"service ${_service} stop || :
fi
if ((${initd_mod} == 1)); then
if [ -e ${service_config_dir}/${_service} ]; then
# shellcheck disable=SC2086
${csudo}chkconfig --del ${_service} || :
fi
elif ((${initd_mod} == 2)); then
if [ -e ${service_config_dir}/${_service} ]; then
${csudo}insserv -r ${_service} || :
fi
elif ((${initd_mod} == 3)); then
if [ -e ${service_config_dir}/${_service} ]; then
${csudo}update-rc.d -f ${_service} remove || :
fi
fi
# shellcheck disable=SC2236
if [ ! -z "${service_config_dir}" ]; then
echo -e "rm ${service_config_dir}/${_service}"
fi
#${csudo}rm ${service_config_dir}/${_service} || :
if $(which init &>/dev/null); then
${csudo}init q || :
fi
}
clean_service_of() {
if ((${service_mod} == 0)); then
clean_service_on_systemd_of $_service
elif ((${service_mod} == 1)); then
clean_service_on_sysvinit_of $_service
else
kill_service_of $_service
fi
}
remove_service_of() {
_service=$1
clean_service_of ${_service}
if [[ -e "${bin_link_dir}/${_service}" ]]; then
${csudo}rm "${bin_link_dir}"/${_service}
echo "${_service} is removed successfully!"
fi
}
remove_service() {
for _service in "${services[@]}"; do
remove_service_of "${_service}"
done
}
function clean_venv() {
# Remove python virtual environment
#${csudo}rm ${venv_dir}/* || :
if [ ! -z "${venv_dir}" ]; then
echo -e "${csudo}rm -rf ${venv_dir}/*"
fi
}
function clean_module() {
if [ ! -z "${local_model_dir}" ]; then
${csudo}unlink ${local_model_dir} || :
fi
}
function clean_config() {
# Remove config file
if [ ! -z "${global_conf_dir}" ]; then
${csudo}rm -f ${global_conf_dir}/taosanode.ini || :
fi
if [ ! -z "${local_conf_dir}" ]; then
# shellcheck disable=SC2086
${csudo}rm -rf ${local_conf_dir} || :
fi
}
function clean_log() {
# Remove log files
if [ ! -z "${global_log_dir}" ]; then
${csudo}rm -rf ${global_log_dir} || :
fi
if [ ! -z "${local_log_dir}" ]; then
${csudo}rm -rf ${local_log_dir} || :
fi
}
function remove_deploy_binary() {
if [ ! -z "${bin_dir}" ]; then
${csudo}rm -rf ${bin_dir} || :
fi
if [ ! -z "${lib_dir}" ]; then
${csudo}rm -rf ${lib_dir}
fi
}
remove_service
clean_log # Remove link log directory
clean_config # Remove link configuration file
remove_deploy_binary
clean_venv
if [[ -e /etc/os-release ]]; then
osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
else
osinfo=""
fi
if echo $osinfo | grep -qwi "ubuntu"; then
# echo "this is ubuntu system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "debian"; then
# echo "this is debian system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "centos"; then
# echo "this is centos system"
${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || :
fi
command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true
echo
echo "${productName} is uninstalled successfully!"
echo

View File

View File

@ -0,0 +1,117 @@
# encoding:utf-8
# pylint: disable=c0103
""" auto encoder algorithms to detect anomaly for time series data"""
import os.path
import joblib
import numpy as np
import pandas as pd
from taosanalytics.conf import app_logger, conf
from taosanalytics.misc.train_model import create_sequences
from taosanalytics.service import AbstractAnomalyDetectionService
class _AutoEncoderDetectionService(AbstractAnomalyDetectionService):
name = 'ad_encoder'
desc = "anomaly detection based on auto encoder"
def __init__(self):
super().__init__()
self.table_name = None
self.mean = None
self.std = None
self.threshold = None
self.time_interval = None
self.model = None
self.dir = 'ad_autoencoder'
self.root_path = conf.get_model_directory()
self.root_path = self.root_path + f'/{self.dir}/'
if not os.path.exists(self.root_path):
app_logger.log_inst.error(
"%s ad algorithm failed to locate default module directory:"
"%s, not active", self.__class__.__name__, self.root_path)
else:
app_logger.log_inst.info("%s ad algorithm root path is: %s", self.__class__.__name__,
self.root_path)
def execute(self):
if self.input_is_empty():
return []
if self.model is None:
raise FileNotFoundError("not load autoencoder model yet, or load model failed")
array_2d = np.reshape(self.list, (len(self.list), 1))
df = pd.DataFrame(array_2d)
# normalize input data using z-score
normalized_list = (df - self.mean.value) / self.std.value
seq = create_sequences(normalized_list.values, self.time_interval)
# Get test MAE loss.
pred_list = self.model.predict(seq)
mae_loss = np.mean(np.abs(pred_list - seq), axis=1)
mae = mae_loss.reshape((-1))
# Detect all the samples which are anomalies.
anomalies = mae > self.threshold
# syslogger.log_inst(
# "Number of anomaly samples: %f, Indices of anomaly samples:{}".
# format(np.sum(anomalies), np.where(anomalies))
# )
# data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
ad_indices = []
for data_idx in range(self.time_interval - 1,
len(normalized_list) - self.time_interval + 1):
if np.all(anomalies[data_idx - self.time_interval + 1: data_idx]):
ad_indices.append(data_idx)
return [-1 if i in ad_indices else 1 for i in range(len(self.list))]
def set_params(self, params):
if "model" not in params:
raise ValueError("model needs to be specified")
name = params['model']
module_file_path = f'{self.root_path}/{name}.dat'
module_info_path = f'{self.root_path}/{name}.info'
app_logger.log_inst.info("try to load module:%s", module_file_path)
if os.path.exists(module_file_path):
self.model = joblib.load(module_file_path)
else:
app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)
raise FileNotFoundError(f"{module_file_path} not found")
if os.path.exists(module_info_path):
info = joblib.load(module_info_path)
else:
app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)
raise FileNotFoundError("%s not found", module_info_path)
if info is not None:
self.mean = info["mean"]
self.std = info["std"]
self.threshold = info["threshold"]
self.time_interval = info["timesteps"]
app_logger.log_inst.info(
"load ac module success, mean: %f, std: %f, threshold: %f, time_interval: %d",
self.mean[0], self.std[0], self.threshold, self.time_interval
)
else:
app_logger.log_inst.error("failed to load %s model", name)
raise RuntimeError(f"failed to load model {name}")
def get_params(self):
return {"dir": self.dir + '/*'}

View File

@ -0,0 +1,42 @@
# encoding:utf-8
""" grubbs algorithm class"""
from outliers import smirnov_grubbs as grubbs
from taosanalytics.service import AbstractAnomalyDetectionService
class _GrubbsService(AbstractAnomalyDetectionService):
""" Grubbs algorithm is to check the anomaly data in the input list """
name = 'grubbs'
desc = """Grubbs' test is to detect the presence of one outlier in a data set that is normally
distributed"""
def __init__(self, alpha_val=0.95):
super().__init__()
if alpha_val <= 0 or alpha_val >= 1:
raise ValueError("invalid alpha value, valid range is (0, 1)")
self.alpha = 1 - alpha_val
def execute(self):
"""perform Grubbs' test and identify (if any) the outlier"""
if self.input_is_empty():
return []
res = grubbs.test(self.list, alpha=self.alpha)
error_indicator = [1 if k in set(res) else -1 for k in self.list]
return error_indicator
def set_params(self, params):
""" set the value of alpha """
super().set_params(params)
if "alpha".lower() in params:
# raise ValueError("alpha parameter is missing for grubbs algorithm")
alpha_val = float(params["alpha"])
if alpha_val <= 0 or alpha_val >= 1:
raise ValueError("invalid alpha value, valid range is (0, 1)")
self.alpha = 1 - alpha_val

View File

@ -0,0 +1,29 @@
# encoding:utf-8
"""iqr class definition"""
import numpy as np
from taosanalytics.service import AbstractAnomalyDetectionService
class _IqrService(AbstractAnomalyDetectionService):
""" IQR algorithm is to check the anomaly data in the input list """
name = 'iqr'
desc = """found the anomaly data according to the inter-quartile range"""
def __init__(self):
super().__init__()
def execute(self):
if self.input_is_empty():
return []
lower = np.quantile(self.list, 0.25)
upper = np.quantile(self.list, 0.75)
min_val = lower - 1.5 * (upper - lower)
max_val = upper + 1.5 * (upper - lower)
threshold = [min_val, max_val]
return [-1 if k < threshold[0] or k > threshold[1] else 1 for k in self.list]
def set_params(self, params):
pass

View File

@ -0,0 +1,47 @@
# encoding:utf-8
"""ksigma class definition"""
import numpy as np
from taosanalytics.service import AbstractAnomalyDetectionService
class _KSigmaService(AbstractAnomalyDetectionService):
""" KSigma algorithm is to check the anomaly data in the input list """
name = "ksigma"
desc = """the k-sigma algorithm (or 3σ rule) expresses a conventional heuristic that nearly all
values are taken to lie within k (usually three) standard deviations of the mean, and thus
it is empirically useful to treat 99.7% probability as near certainty"""
def __init__(self, k_val=3):
super().__init__()
self.k_val = k_val
def execute(self):
def get_k_sigma_range(vals, k_value):
""" Return the k-sigma value range """
avg = np.mean(vals)
std = np.std(vals)
upper = avg + k_value * std
lower = avg - k_value * std
return [float(lower), float(upper)]
if self.input_is_empty():
return []
threshold = get_k_sigma_range(self.list, self.k_val)
return [-1 if k < threshold[0] or k > threshold[1] else 1 for k in self.list]
def set_params(self, params):
super().set_params(params)
if "k" in params:
k = int(params["k"])
if k < 1 or k > 3:
raise ValueError("k value out of range, valid range [1, 3]")
self.k_val = k
def get_params(self):
return {"k": self.k_val}

View File

@ -0,0 +1,43 @@
# encoding:utf-8
"""local outlier factor class definition"""
import numpy as np
import sklearn.neighbors as neighbor
from taosanalytics.service import AbstractAnomalyDetectionService
class _LofService(AbstractAnomalyDetectionService):
""" LOF(local outlier factor) algorithm is to check the anomaly data in the input list """
name = 'lof'
desc = """Local Outlier Factor, Ref: M. M. Breunig, H. P. Kriegel, R. T. Ng, J. Sander.
LOF:Identifying Density-based Local Outliers. SIGMOD, 2000."""
def __init__(self, n_neighbors=10, algo="auto"):
super().__init__()
self.neighbors = n_neighbors
self.algorithm = algo
def execute(self):
"""perform LOF(local outlier factor) test and identify (if any) the outlier"""
if self.input_is_empty():
return []
checker = neighbor.LocalOutlierFactor(n_neighbors=self.neighbors, algorithm=self.algorithm)
arr_2d = np.reshape(self.list, (len(self.list), 1))
res = checker.fit_predict(arr_2d)
print(f"The negative outlier factor is:{checker.negative_outlier_factor_}")
return res
def set_params(self, params):
super().set_params(params)
if "neighbors" in params: # todo check value range
self.neighbors = int(params["neighbors"])
if "algorithm" in params:
self.algorithm = params["algorithm"]
def get_params(self):
return {"neighbors": self.neighbors, "algorithm": self.algorithm}

View File

@ -0,0 +1,44 @@
# encoding:utf-8
"""shesd algorithm class definition"""
from pandas import Series
from pyculiarity import detect_vec
from taosanalytics.service import AbstractAnomalyDetectionService
class _SHESDService(AbstractAnomalyDetectionService):
""" s-h-esd algorithm is to check the anomaly data in the input list """
name = 'shesd'
desc = ""
def __init__(self, n_period=0, direction="both", anoms=0.05):
super().__init__()
self.period = n_period
self.direction = direction
self.max_anoms = anoms
def execute(self):
"""perform SHESD test and identify (if any) the outlier"""
if self.input_is_empty():
return []
results = detect_vec(Series(self.list), max_anoms=self.max_anoms, direction=self.direction,
period=self.period)
res_val = results['anoms']['anoms']
return [-1 if k in set(res_val) else 1 for k in self.list]
def set_params(self, params):
super().set_params(params)
if "period" in params: # todo check value range
self.period = int(params["period"])
if "direction" in params:
self.direction = params["direction"]
if "max_anoms" in params:
self.max_anoms = float(params["max_anoms"])
def get_params(self):
return {"period": self.period, "direction": self.direction, "max_anoms": self.max_anoms}

View File

@ -0,0 +1,49 @@
# encoding:utf-8
# pylint: disable=c0103
""" anomaly detection register/display functions """
from matplotlib import pyplot as plt
from taosanalytics.conf import app_logger, conf
from taosanalytics.servicemgmt import loader
def do_ad_check(input_list, ts_list, algo_name, params):
""" actual anomaly detection handler """
s = loader.get_service(algo_name)
if s is None:
s = loader.get_service("ksigma")
if s is None:
raise ValueError(f"failed to load {algo_name} or ksigma analysis service")
s.set_input_list(input_list, ts_list)
s.set_params(params)
res = s.execute()
n_error = abs(sum(filter(lambda x: x == -1, res)))
app_logger.log_inst.debug("There are %d in input, and %d anomaly points found: %s",
len(input_list),
n_error,
res)
draw_ad_results(input_list, res, algo_name)
return res
def draw_ad_results(input_list, res, fig_name):
""" draw the detected anomaly points """
# not in debug, do not visualize the anomaly detection result
if not conf.get_draw_result_option():
return
plt.clf()
for index, val in enumerate(res):
if val != -1:
continue
plt.scatter(index, input_list[index], marker='o', color='r', alpha=0.5, s=100, zorder=3)
plt.plot(input_list, label='sample')
plt.savefig(fig_name)

View File

@ -0,0 +1,114 @@
# encoding:utf-8
# pylint: disable=c0103
"""arima class definition"""
import pmdarima as pm
from taosanalytics.algo.forecast import insert_ts_list
from taosanalytics.conf import app_logger
from taosanalytics.service import AbstractForecastService
class _ArimaService(AbstractForecastService):
""" ARIMA algorithm is to do the fc in the input list """
name = "arima"
desc = "do time series data fc by using ARIMA model"
def __init__(self):
super().__init__()
self.diff = 0
self.start_p = 0
self.max_p = 10
self.start_q = 0
self.max_q = 10
def set_params(self, params):
super().set_params(params)
self.start_p = int(params['start_p']) if 'start_p' in params else 0
self.max_p = int(params['max_p']) if 'max_p' in params else 0
self.start_q = int(params['start_q']) if 'start_q' in params else 0
self.max_q = int(params['max_q']) if 'max_q' in params else 0
def get_params(self):
""" get the default value for fc algorithms """
p = super().get_params()
p.update(
{
"start_p": self.start_p, "max_p": self.max_p, "start_q": self.start_q,
"max_q": self.max_q, "diff": self.diff
}
)
return p
def __do_forecast_helper(self, fc_rows):
""" do arima fc """
# plot_acf(self.list, lags=25, title='raw_acf')
# plot_pacf(self.list, lags=25, title='raw_pacf')
# plt.show()
seasonal = self.period > 0
# Fit model
model = pm.auto_arima(self.list,
start_p=self.start_p,
start_q=self.start_q,
max_p=self.max_p,
max_q=self.max_q,
d=1,
m=self.period,
seasonal=seasonal,
start_P=0,
D=self.diff)
app_logger.log_inst.debug(model.summary())
# predict N steps into the future
fc = model.predict(n_periods=fc_rows, return_conf_int=self.return_conf,
alpha=self.conf)
# plt.plot(source_data, label='training')
# plt.plot(xrange, actual_data, label='actual')
# fc_list = fc.tolist()
# fc_without_diff = restore_from_diff(self.list, fc_list, 2)
# print(fc_without_diff)
# plt.plot(xrange, fc_without_diff, label='fc')
# residuals = pd.DataFrame(model.arima_res_.resid)
# wn = is_white_noise(residuals)
# print("residual is white noise:", wn)
# fig, ax = plt.subplots(1, 2)
# residuals.plot(title="Residuals", ax=ax[0])
# residuals.plot(kind='kde', title='Density', ax=ax[1])
# plt.show()
res1 = [fc[0].tolist(), fc[1][:, 0].tolist(),
fc[1][:, 1].tolist()] if self.return_conf else [fc.tolist()]
return (
res1,
model.arima_res_.mse,
f"SARIMAX{model.order}x{model.seasonal_order}"
)
def execute(self):
""" do fc the time series data"""
if self.list is None or len(self.list) < self.period:
raise ValueError("number of input data is less than the periods")
if self.fc_rows <= 0:
raise ValueError("fc rows is not specified yet")
res, mse, model_info = self.__do_forecast_helper(self.fc_rows)
insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows)
return {
"mse": mse,
"model_info": model_info,
"res": res
}

View File

@ -0,0 +1,79 @@
# encoding:utf-8
# pylint: disable=c0103
"""holt winters definition"""
from statsmodels.tsa.holtwinters import ExponentialSmoothing, SimpleExpSmoothing
from taosanalytics.algo.forecast import insert_ts_list
from taosanalytics.service import AbstractForecastService
class _HoltWintersService(AbstractForecastService):
""" Holt winters algorithm is to do the fc in the input list """
name = "holtwinters"
desc = "forecast algorithm by using exponential smoothing"
def __init__(self):
super().__init__()
self.trend_option = None
self.seasonal_option = None
def set_params(self, params):
super().set_params(params)
self.trend_option = params['trend'] if 'trend' in params else None
if self.trend_option is not None:
if self.trend_option not in ('add', 'mul'):
raise ValueError("trend parameter can only be 'mul' or 'add'")
self.seasonal_option = params['seasonal'] if 'seasonal' in params else None
if self.seasonal_option is not None:
if self.seasonal_option not in ('add', 'mul'):
raise ValueError("seasonal parameter can only be 'mul' or 'add'")
def get_params(self):
p = super().get_params()
p.update({'trend': self.trend_option, 'seasonal': self.seasonal_option})
return p
def __do_forecast_helper(self, source_data, fc_rows):
""" do holt winters impl """
if self.trend_option is None:
fitted_model = SimpleExpSmoothing(source_data).fit()
else:
if self.period == 0 or self.seasonal_option is None:
# no valid seasonal periods, so not need to specify the seasonal parameters
fitted_model = ExponentialSmoothing(source_data, trend=self.trend_option).fit()
else: # seasonal attributes
fitted_model = ExponentialSmoothing(
source_data,
trend=self.trend_option,
seasonal=self.seasonal_option,
seasonal_periods=self.period
).fit()
fc = fitted_model.forecast(fc_rows)
if self.return_conf:
return [fc.tolist(), fc.tolist(), fc.tolist()], fitted_model.sse
else:
return [fc.tolist()], fitted_model.sse
def execute(self):
""" do fc the time series data"""
if self.list is None or len(self.list) < self.period:
raise ValueError("number of input data is less than the periods")
if self.fc_rows <= 0:
raise ValueError("fc rows is not specified yet")
res, mse = self.__do_forecast_helper(self.list, self.fc_rows)
insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows)
# add the conf range if required
return {
"mse": mse,
"res": res
}

View File

@ -0,0 +1,110 @@
# encoding:utf-8
# pylint: disable=c0103
"""forecast helper methods"""
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from taosanalytics.conf import app_logger, conf
from taosanalytics.servicemgmt import loader
def do_forecast(input_list, ts_list, algo_name, params):
""" data fc handler """
s = loader.get_service(algo_name)
if s is None:
s = loader.get_service("holtwinters")
if s is None:
raise ValueError(f"failed to load {algo_name} or holtwinters analysis service")
s.set_input_list(input_list, ts_list)
s.set_params(params)
app_logger.log_inst.debug("start to do forecast")
res = s.execute()
app_logger.log_inst.debug("forecast done")
res["period"] = s.period
res["algo"] = algo_name
check_fc_results(res)
fc = res["res"]
draw_fc_results(input_list, len(fc) > 2, fc, len(fc[0]), algo_name)
return res
def do_add_fc_params(params, json_obj):
""" add params into parameters """
if "forecast_rows" in json_obj:
params["fc_rows"] = int(json_obj["forecast_rows"])
if "start" in json_obj:
params["start_ts"] = int(json_obj["start"])
if "every" in json_obj:
params["time_step"] = int(json_obj["every"])
if "conf" in json_obj:
params["conf"] = int(json_obj["conf"])
if "return_conf" in json_obj:
params["return_conf"] = int(json_obj["return_conf"])
def insert_ts_list(res, start_ts, time_step, fc_rows):
""" insert the ts list before return results """
ts_list = [start_ts + i * time_step for i in range(fc_rows)]
res.insert(0, ts_list)
return res
def draw_fc_results(input_list, return_conf, fc, n_rows, fig_name):
"""Visualize the forecast results """
# controlled by option, do not visualize the anomaly detection result
if not conf.get_draw_result_option():
return
app_logger.log_inst.debug('draw forecast result in debug model')
plt.clf()
x = np.arange(len(input_list), len(input_list) + n_rows, 1)
# draw the range of conf
if return_conf:
lower_series = pd.Series(fc[2], index=x)
upper_series = pd.Series(fc[3], index=x)
plt.fill_between(lower_series.index, lower_series, upper_series, color='k', alpha=.15)
plt.plot(input_list)
plt.plot(x, fc[1], c='blue')
plt.savefig(fig_name)
app_logger.log_inst.debug("draw results completed in debug model")
def check_fc_results(res):
app_logger.log_inst.debug("start to check forecast result")
if "res" not in res:
raise ValueError("forecast result is empty")
fc = res["res"]
if len(fc) < 2:
raise ValueError("result length should greater than or equal to 2")
n_rows = len(fc[0])
if n_rows != len(fc[1]):
raise ValueError("result length is not identical, ts rows:%d res rows:%d" % (
n_rows, len(fc[1])))
if len(fc) > 2 and (len(fc[2]) != n_rows or len(fc[3]) != n_rows):
raise ValueError(
"result length is not identical in confidence, ts rows:%d, lower confidence rows:%d, "
"upper confidence rows%d" %
(n_rows, len(fc[2]), len(fc[3])))

View File

@ -0,0 +1,163 @@
# encoding:utf-8
# pylint: disable=c0103
"""the main route definition for restful service"""
import os.path, sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
from flask import Flask, request
from taosanalytics.algo.anomaly import do_ad_check
from taosanalytics.algo.forecast import do_forecast, do_add_fc_params
from taosanalytics.conf import conf
from taosanalytics.model import get_avail_model
from taosanalytics.servicemgmt import loader
from taosanalytics.util import app_logger, validate_pay_load, get_data_index, get_ts_index, is_white_noise, \
parse_options, convert_results_to_windows
app = Flask(__name__)
# load the all algos
app_logger.set_handler(conf.get_log_path())
app_logger.set_log_level(conf.get_log_level())
loader.load_all_service()
@app.route("/")
def start():
""" default rsp """
return "TDengine© Time Series Data Analytics Platform (ver 1.0.1)"
@app.route("/status")
def server_status():
""" return server status """
return {
'protocol': 1.0,
'status': 'ready'
}
@app.route("/list")
def list_all_services():
"""
API function to return all available services, including both fc and anomaly detection
"""
return loader.get_service_list()
@app.route("/models")
def list_all_models():
""" list all available models """
return get_avail_model()
@app.route("/anomaly-detect", methods=['POST'])
def handle_ad_request():
"""handle the anomaly detection requests"""
app_logger.log_inst.info('recv ad request from %s', request.remote_addr)
app_logger.log_inst.debug('req payload: %s', request.json)
algo = request.json["algo"].lower() if "algo" in request.json else "ksigma"
# 1. validate the input data in json format
try:
validate_pay_load(request.json)
except ValueError as e:
return {"msg": str(e), "rows": -1}
payload = request.json["data"]
# 2. white noise data check
wn_check = request.json["wncheck"] if "wncheck" in request.json else 1
data_index = get_data_index(request.json["schema"])
ts_index = get_ts_index(request.json["schema"])
if wn_check:
try:
data = payload[data_index]
if is_white_noise(data):
app_logger.log_inst.debug("wn data, not process")
return {"msg": "white noise can not be check", "rows": -1}
except Exception as e:
return {"msg": str(e), "rows": -1}
# 3. parse the options for different ad services
# the default options is like following: "algo=ksigma,k=2,invalid_option=44"
options = request.json["option"] if "option" in request.json else None
params = parse_options(options)
# 4. do anomaly detection
try:
res_list = do_ad_check(payload[data_index], payload[ts_index], algo, params)
ano_window = convert_results_to_windows(res_list, payload[ts_index])
result = {"algo": algo, "option": options, "res": ano_window, "rows": len(ano_window)}
app_logger.log_inst.debug("anomaly-detection result: %s", str(result))
return result
except Exception as e:
result = {"res": {}, "rows": 0, "msg": str(e)}
app_logger.log_inst.error("failed to do anomaly-detection, %s", str(e))
return result
@app.route("/forecast", methods=['POST'])
def handle_forecast_req():
"""handle the fc request """
app_logger.log_inst.info('recv fc from %s', request.remote_addr)
app_logger.log_inst.debug('req payload: %s', request.json)
# holt-winters by default
algo = request.json['algo'].lower() if 'algo' in request.json else 'holtwinters'
# 1. validate the input data in json format
try:
validate_pay_load(request.json)
except ValueError as e:
app_logger.log_inst.error('validate req json failed, %s', e)
return {"msg": str(e), "rows": -1}
payload = request.json["data"]
# 2. white noise data check
wn_check = request.json["wncheck"] if "wncheck" in request.json else 1
data_index = get_data_index(request.json["schema"])
ts_index = get_ts_index(request.json["schema"])
if wn_check:
try:
data = payload[data_index]
if is_white_noise(data):
app_logger.log_inst.debug("%s wn data, not process", data)
return {"msg": "white noise can not be check", "rows": -1}
except Exception as e:
return {"msg": str(e), "rows": -1}
options = request.json["option"] if "option" in request.json else None
params = parse_options(options)
try:
do_add_fc_params(params, request.json)
except ValueError as e:
app_logger.log_inst.error("invalid fc params: %s", e)
return {"msg": f"{e}", "rows": -1}
try:
res1 = do_forecast(payload[data_index], payload[ts_index], algo, params)
res = {"option": options, "rows": params["fc_rows"]}
res.update(res1)
app_logger.log_inst.debug("forecast result: %s", res)
return res
except Exception as e:
app_logger.log_inst.error('forecast failed, %s', str(e))
return {"msg": str(e), "rows": -1}
if __name__ == '__main__':
app.run()

View File

@ -0,0 +1,105 @@
# encoding:utf-8
# pylint: disable=c0103
"""configuration model definition"""
import configparser
import logging
_ANODE_SECTION_NAME = "taosanode"
class Configure:
""" configuration class """
def __init__(self, conf_path="/etc/taos/taosanode.ini"):
self.path = None
self._log_path = 'taosanode.app.log'
self._log_level = logging.INFO
self._model_directory = '/var/lib/taos/taosanode/model/'
self._draw_result = 0
self.conf = configparser.ConfigParser()
self.reload(conf_path)
def get_log_path(self) -> str:
""" return log file full path """
return self._log_path
def get_log_level(self):
""" return the log level specified by configuration file """
return self._log_level
def get_model_directory(self):
""" return model directory """
return self._model_directory
def get_draw_result_option(self):
""" get the option for draw results or not"""
return self._draw_result
def reload(self, new_path: str):
""" load the info from config file """
self.path = new_path
self.conf.read(self.path)
if self.conf.has_option(_ANODE_SECTION_NAME, 'app-log'):
self._log_path = self.conf.get(_ANODE_SECTION_NAME, 'app-log')
if self.conf.has_option(_ANODE_SECTION_NAME, 'log-level'):
log_level = self.conf.get(_ANODE_SECTION_NAME, 'log-level')
log_flag = {
'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR, 'WARN': logging.WARN
}
if log_level.upper() in log_flag:
self._log_level = log_flag[log_level.upper()]
else:
self._log_level = logging.INFO
if self.conf.has_option(_ANODE_SECTION_NAME, 'model-dir'):
self._model_directory = self.conf.get(_ANODE_SECTION_NAME, 'model-dir')
if self.conf.has_option(_ANODE_SECTION_NAME, 'draw-result'):
self._draw_result = self.conf.get(_ANODE_SECTION_NAME, 'draw-result')
class AppLogger():
""" system log_inst class """
LOG_STR_FORMAT = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
def __init__(self):
self.log_inst = logging.getLogger(__name__)
self.log_inst.setLevel(logging.INFO)
def set_handler(self, file_path: str):
""" set the log_inst handler """
handler = logging.FileHandler(file_path)
handler.setFormatter(logging.Formatter(self.LOG_STR_FORMAT))
self.log_inst.addHandler(handler)
def set_log_level(self, log_level):
"""adjust log level"""
try:
self.log_inst.setLevel(log_level)
self.log_inst.info("set log level:%d", log_level)
except ValueError as e:
self.log_inst.error("failed to set log level: %d, %s", log_level, str(e))
conf = Configure()
app_logger = AppLogger()
def setup_log_info(name: str):
""" prepare the log info for unit test """
app_logger.set_handler(name)
try:
app_logger.set_log_level(logging.DEBUG)
except ValueError as e:
print("set log level failed:%s", e)

View File

@ -0,0 +1,22 @@
# encoding:utf-8
# pylint: disable=c0103
def get_avail_model():
return [
{
"name": "ad_encoder_keras",
"algo": "auto-encoder",
"type": "anomaly-detection",
"src-table": "*",
"build-time": "2024-10-07 13:21:44"
}
]
def train_model():
pass
if __name__ == '__main__':
a = get_avail_model()
print(a)

View File

@ -0,0 +1,110 @@
# encoding:utf-8
# pylint: disable=c0103
"""main service module"""
from abc import abstractmethod, ABC
class AnalyticsService:
""" Analytics root class with only one method"""
@abstractmethod
def execute(self):
""" the main execute method to perform fc or anomaly detection """
def get_desc(self) -> str:
"""algorithm description"""
return ""
def get_params(self) -> dict:
"""return exist params """
return {}
class AbstractAnalyticsService(AnalyticsService, ABC):
""" abstract base analytics service class definition"""
name = ''
desc = ''
def __init__(self):
self.list = None
self.ts_list = None
def set_input_list(self, input_list: list, input_ts_list: list = None):
""" set the input list """
self.list = input_list
self.ts_list = input_ts_list
def set_params(self, params: dict) -> None:
"""set the parameters for current algo """
if params is None:
return
if not isinstance(params, dict):
raise ValueError('invalid parameter type, only dict allowed')
def get_desc(self) -> str:
return self.desc
class AbstractAnomalyDetectionService(AbstractAnalyticsService, ABC):
""" abstract anomaly detection service, all anomaly detection algorithm class should be
inherent from this class"""
def __init__(self):
super().__init__()
self.type = "anomaly-detection"
def input_is_empty(self):
""" check if the input list is empty or None """
return (self.list is None) or (len(self.list) == 0)
class AbstractForecastService(AbstractAnalyticsService, ABC):
"""abstract forecast service, all forecast algorithms class should be inherent from
this base class"""
def __init__(self):
super().__init__()
self.type = "forecast"
self.period = 0
self.start_ts = 0
self.time_step = 0
self.fc_rows = 0
self.return_conf = 1
self.conf = 0.05
def set_params(self, params: dict) -> None:
if not {'start_ts', 'time_step', 'fc_rows'}.issubset(params.keys()):
raise ValueError('params are missing, start_ts, time_step, fc_rows are all required')
self.start_ts = int(params['start_ts'])
self.time_step = int(params['time_step'])
if self.time_step <= 0:
raise ValueError('time_step should be greater than 0')
self.fc_rows = int(params['fc_rows'])
if self.fc_rows <= 0:
raise ValueError('fc rows is not specified yet')
self.period = int(params['period']) if 'period' in params else 0
if self.period < 0:
raise ValueError("periods should be greater than 0")
self.conf = float(params['conf']) if 'conf' in params else 95
self.conf = 1.0 - self.conf / 100.0
if self.conf < 0 or self.conf >= 1.0:
raise ValueError("invalid value of conf, should between 0 and 100")
self.return_conf = int(params['return_conf']) if 'return_conf' in params else 1
def get_params(self):
return {
"period": self.period, "start": self.start_ts, "every": self.time_step,
"forecast_rows": self.fc_rows, "return_conf": self.return_conf, "conf": self.conf
}

View File

@ -0,0 +1,120 @@
# encoding:utf-8
"""load and return the available services"""
import copy
import importlib
import inspect
import os
from collections import defaultdict
from taosanalytics.conf import app_logger
from taosanalytics.service import AbstractAnomalyDetectionService, AbstractForecastService
os.environ['KERAS_BACKEND'] = 'torch'
class AnalyticsServiceLoader:
""" Singleton register for multiple anomaly detection algorithms and fc algorithms"""
def __init__(self):
self.services = defaultdict(list)
def get_service(self, name):
""" get the required service """
serv = self.services.get(name, [])[0] if self.services.get(name) else None
return copy.copy(serv)
def get_typed_services(self, type_str: str) -> list:
""" get specified type service """
all_items = []
for key, val in self.services.items():
if val[0].type == type_str:
try:
one = {"name": key, "desc": val[0].get_desc(), "params": val[0].get_params()}
all_items.append(one)
except AttributeError as e:
app_logger.log_inst.error("failed to get service: %s info, reason: %s", key, e);
return all_items
def get_service_list(self):
""" return all available service info """
info = {
"protocol": 1.0,
"version": 0.1,
"details": [
self.get_forecast_algo_list(),
self.get_anomaly_detection_algo_list()
]
}
return info
def get_anomaly_detection_algo_list(self):
""" get all available service list """
return {
"type": "anomaly-detection",
"algo": self.get_typed_services("anomaly-detection")
}
def get_forecast_algo_list(self):
""" get all available service list """
return {
"type": "forecast",
"algo": self.get_typed_services("forecast")
}
def load_all_service(self) -> None:
""" load all algorithms in the specified directory"""
def register_service(container, name: str, service):
""" register service for both anomaly detection and fc """
app_logger.log_inst.info("register service: %s", name)
container[name].append(service)
def do_load_service(cur_directory, lib_prefix, sub_directory):
""" the implementation of load services """
service_directory = cur_directory + sub_directory
if not os.path.exists(service_directory):
app_logger.log_inst.fatal(
"service directory:%s not lib exists, failed to load service",
service_directory)
raise FileNotFoundError(f"service directory:{service_directory} not found")
all_files = os.listdir(service_directory)
for item in all_files:
if item in ('__init__.py', '__pycache__') or not item.endswith('py'):
continue
full_path = os.path.join(service_directory, item)
if os.path.isdir(full_path):
continue
# do load algorithm
name = lib_prefix + item.split('.')[0]
module = importlib.import_module(name)
app_logger.log_inst.info("load algorithm:%s", name)
for (class_name, _) in inspect.getmembers(module, inspect.isclass):
if class_name in (
AbstractAnomalyDetectionService.__name__,
AbstractForecastService.__name__
) or (not class_name.startswith('_')):
continue
algo_cls = getattr(module, class_name)
if algo_cls is not None:
obj = algo_cls()
register_service(self.services, algo_cls.name, obj)
# start to load all services
current_directory = os.path.dirname(os.path.abspath(__file__))
do_load_service(current_directory, 'taosanalytics.algo.ad.', '/algo/ad/')
do_load_service(current_directory, 'taosanalytics.algo.fc.', '/algo/fc/')
loader: AnalyticsServiceLoader = AnalyticsServiceLoader()

View File

@ -0,0 +1,170 @@
# encoding:utf-8
# pylint: disable=c0103
"""anomaly detection unit test"""
import unittest, sys, os.path
import pandas as pd
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
from taosanalytics.algo.anomaly import draw_ad_results
from taosanalytics.conf import setup_log_info, app_logger
from taosanalytics.servicemgmt import loader
class AnomalyDetectionTest(unittest.TestCase):
""" anomaly detection unit test class"""
input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
large_list = [
13, 14, 8, 10, 16, 26, 32, 27, 18, 32, 36, 24,
22, 23, 22, 18, 25, 21, 21, 14, 8, 11, 14, 23,
18, 17, 19, 20, 22, 19, 13, 26, 13, 14, 22, 24,
21, 22, 26, 21, 23, 24, 27, 41, 31, 27, 35, 26,
28, 36, 39, 21, 17, 22, 17, 19, 15, 34, 10, 15,
22, 18, 15, 20, 15, 22, 19, 16, 30, 27, 29, 23,
20, 16, 21, 21, 25, 16, 18, 15, 18, 14, 10, 15,
8, 15, 6, 11, 8, 7, 13, 10, 23, 16, 15, 25,
22, 20, 16
]
@classmethod
def setUpClass(cls):
""" set up environment for unit test, set the log file path """
setup_log_info("unit_test.log")
loader.load_all_service()
def test_ksigma(self):
"""
Test the ksigma algorithm for anomaly detection. This test case verifies the
functionality of the ksigma algorithm by setting up the input data,
executing the algorithm, and asserting the expected results.
"""
s = loader.get_service("ksigma")
s.set_input_list(AnomalyDetectionTest.input_list, None)
s.set_params({"k": 2})
r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "ksigma")
self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
def test_iqr(self):
"""
Test the IQR(Interquartile Range) algorithm for anomaly detection. This test case verifies the functionality
of the IQR algorithm by setting up the input data, executing the algorithm, and asserting the expected results.
"""
s = loader.get_service("iqr")
s.set_input_list(AnomalyDetectionTest.input_list, None)
try:
s.set_params({"k": 2})
except ValueError as e:
self.assertEqual(1, 0, e)
r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "iqr")
self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
def test_grubbs(self):
"""
Test the Grubbs algorithm for anomaly detection.
This test case verifies the functionality of the Grubbs algorithm by setting up the input data,
executing the algorithm, and asserting the expected results.
"""
s = loader.get_service("grubbs")
s.set_input_list(AnomalyDetectionTest.input_list, None)
s.set_params({"alpha": 0.95})
r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "grubbs")
self.assertEqual(r[-1], -1)
self.assertEqual(len(r), len(AnomalyDetectionTest.input_list))
def test_shesd(self):
"""
Test the SHESD (Seasonal Hybrid ESD) algorithm for anomaly detection.
This test case verifies the functionality of the SHESD algorithm by setting up the input data,
executing the algorithm, and asserting the expected results.
"""
s = loader.get_service("shesd")
s.set_params({"period": 3})
s.set_input_list(AnomalyDetectionTest.input_list, None)
r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "shesd")
self.assertEqual(r[-1], -1)
def test_lof(self):
"""
Test the LOF (Local Outlier Factor) algorithm for anomaly detection.
This test case verifies the functionality of the LOF algorithm by setting up the input data,
executing the algorithm, and asserting the expected results.
"""
s = loader.get_service("lof")
s.set_params({"period": 3})
s.set_input_list(AnomalyDetectionTest.input_list, None)
r = s.execute()
draw_ad_results(AnomalyDetectionTest.input_list, r, "lof")
self.assertEqual(r[-1], -1)
self.assertEqual(r[-2], -1)
def test_multithread_safe(self):
""" Test the multithread safe function"""
s1 = loader.get_service("shesd")
s2 = loader.get_service("shesd")
s1.set_params({"period": 3})
self.assertNotEqual(s1.period, s2.period)
def __load_remote_data_for_ad(self):
"""load the remote data for anomaly detection"""
url = ("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/"
"art_daily_jumpsup.csv")
remote_data = pd.read_csv(url, parse_dates=True, index_col="timestamp")
k = remote_data.values.ravel().tolist()
return k
def test_autoencoder_ad(self):
"""for local test only, disabled it in github action"""
pass
# data = self.__load_remote_data_for_ad()
#
# s = loader.get_service("ad_encoder")
# s.set_input_list(data)
#
# try:
# s.set_params({"model": "ad_encoder_"})
# except ValueError as e:
# app_logger.log_inst.error(f"failed to set the param for auto_encoder algorithm, reason:{e}")
# return
#
# r = s.execute()
#
# num_of_error = -(sum(filter(lambda x: x == -1, r)))
# self.assertEqual(num_of_error, 109)
#
# draw_ad_results(data, r, "autoencoder")
def test_get_all_services(self):
"""Test get all services"""
loader.get_anomaly_detection_algo_list()
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,115 @@
# encoding:utf-8
# pylint: disable=c0103
"""forecast unit test cases"""
import unittest, os.path, sys
import pandas as pd
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
from taosanalytics.algo.forecast import draw_fc_results
from taosanalytics.conf import setup_log_info
from taosanalytics.servicemgmt import loader
class ForecastTest(unittest.TestCase):
"""forecast unit test cases"""
@classmethod
def setUpClass(cls):
""" set up the environment for unit test """
setup_log_info("unit_test.log")
loader.load_all_service()
def get_input_list(self):
""" load data from csv """
url = ('https://raw.githubusercontent.com/jbrownlee/Datasets/refs/heads/master/'
'airline-passengers.csv')
data = pd.read_csv(url, index_col='Month', parse_dates=True)
ts_list = data[['Passengers']].index.tolist()
dst_list = [int(item.timestamp()) for item in ts_list]
return data[['Passengers']].values.tolist(), dst_list
def test_holt_winters_forecast(self):
""" test holt winters forecast with invalid and then valid parameters"""
s = loader.get_service("holtwinters")
data, ts = self.get_input_list()
s.set_input_list(data, ts)
self.assertRaises(ValueError, s.execute)
s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30})
r = s.execute()
draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters")
def test_holt_winters_forecast_2(self):
"""test holt winters with valid parameters"""
s = loader.get_service("holtwinters")
data, ts = self.get_input_list()
s.set_input_list(data, ts)
s.set_params(
{
"fc_rows": 10, "trend": 'mul', "seasonal": 'mul', "start_ts": 171000000,
"time_step": 86400 * 30, "period": 12
}
)
r = s.execute()
draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters")
def test_holt_winter_invalid_params(self):
"""parameters validation check"""
s = loader.get_service("holtwinters")
self.assertRaises(ValueError, s.set_params, {"trend": "mul"})
self.assertRaises(ValueError, s.set_params, {"trend": "mul"})
self.assertRaises(ValueError, s.set_params, {"trend": "mul", "fc_rows": 10})
self.assertRaises(ValueError, s.set_params, {"trend": "multi"})
self.assertRaises(ValueError, s.set_params, {"seasonal": "additive"})
self.assertRaises(ValueError, s.set_params, {
"fc_rows": 10, "trend": 'multi', "seasonal": 'addi', "start_ts": 171000000,
"time_step": 86400 * 30, "period": 12}
)
self.assertRaises(ValueError, s.set_params,
{"fc_rows": 10, "trend": 'mul', "seasonal": 'add', "time_step": 86400 * 30, "period": 12}
)
s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 'abc', "start_ts": 171000000, "time_step": 86400 * 30})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": "aaa", "time_step": "30"})
self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": 171000000, "time_step": 0})
def test_arima(self):
"""arima algorithm check"""
s = loader.get_service("arima")
data, ts = self.get_input_list()
s.set_input_list(data, ts)
self.assertRaises(ValueError, s.execute)
s.set_params(
{"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "period": 12,
"start_p": 0, "max_p": 10, "start_q": 0, "max_q": 10}
)
r = s.execute()
rows = len(r["res"][0])
draw_fc_results(data, len(r["res"]) > 1, r["res"], rows, "arima")
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,27 @@
"""perform the build release package and install and then test the restful service"""
import unittest
import os
class ForecastTest(unittest.TestCase):
def test_release(self):
""" test the package """
pass
# print("build install package")
# os.system("../../script/release.sh")
# print("build completed")
#
# self.assertEqual(os.path.exists("../../release/TDengine-enterprise-anode-1.0.0.tar.gz"), 1)
def test_install(self):
""" test """
pass
# print("start to install package")
# os.system("tar zxvf ../../release/TDengine-enterprise-anode-1.0.0.tar.gz")
# os.chdir("../../release/TDengine-enterprise-anode-1.0.0/")
#
# os.system("./install.sh")

View File

@ -0,0 +1,259 @@
# encoding:utf-8
# pylint: disable=c0103
"""flask restful api test module"""
import sys, os.path
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
from flask_testing import TestCase
from taosanalytics.app import app
from taosanalytics.conf import setup_log_info
class RestfulTest(TestCase):
""" restful api test class """
def create_app(self):
app.testing = True
setup_log_info("restfull_test.log")
return app
def test_access_main_page(self):
""" test asscess default main page """
response = self.client.get('/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content_length, len("TDengine© Time Series Data Analytics Platform (ver 1.0.1)") + 1)
def test_load_status(self):
""" test load the server status """
response = self.client.get('/status')
self.assertEqual(response.status_code, 200)
res = response.json
self.assertEqual(res['protocol'], 1.0)
self.assertEqual(res['status'], 'ready')
def test_load_algos(self):
""" test load provided algos"""
response = self.client.get('/list')
self.assertEqual(response.status_code, 200)
res = response.json
self.assertEqual(res['version'], 0.1)
self.assertEqual(res['protocol'], 1.0)
d = res['details']
self.assertEqual(len(d), 2)
def test_forecast(self):
"""test forecast api"""
response = self.client.post("/forecast", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[
1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000,
1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000,
1577808015000, 1577808016000, 1577808017000, 1577808018000, 1577808019000,
1577808020000, 1577808021000, 1577808022000, 1577808023000, 1577808024000,
1577808025000, 1577808026000, 1577808027000, 1577808028000, 1577808029000,
1577808030000, 1577808031000, 1577808032000, 1577808033000, 1577808034000,
1577808035000, 1577808036000, 1577808037000, 1577808038000, 1577808039000,
1577808040000, 1577808041000, 1577808042000, 1577808043000, 1577808044000,
1577808045000, 1577808046000, 1577808047000, 1577808048000, 1577808049000,
1577808050000, 1577808051000, 1577808052000, 1577808053000, 1577808054000,
1577808055000, 1577808056000, 1577808057000, 1577808058000, 1577808059000,
1577808060000, 1577808061000, 1577808062000, 1577808063000, 1577808064000,
1577808065000, 1577808066000, 1577808067000, 1577808068000, 1577808069000,
1577808070000, 1577808071000, 1577808072000, 1577808073000, 1577808074000,
1577808075000, 1577808076000, 1577808077000, 1577808078000, 1577808079000,
1577808080000, 1577808081000, 1577808082000, 1577808083000, 1577808084000,
1577808085000, 1577808086000, 1577808087000, 1577808088000, 1577808089000,
1577808090000, 1577808091000, 1577808092000, 1577808093000, 1577808094000,
1577808095000
],
[
13, 14, 8, 10, 16, 26, 32, 27, 18, 32, 36, 24, 22, 23, 22, 18, 25, 21, 21,
14, 8, 11, 14, 23, 18, 17, 19, 20, 22, 19, 13, 26, 13, 14, 22, 24, 21, 22,
26, 21, 23, 24, 27, 41, 31, 27, 35, 26, 28, 36, 39, 21, 17, 22, 17, 19, 15,
34, 10, 15, 22, 18, 15, 20, 15, 22, 19, 16, 30, 27, 29, 23, 20, 16, 21, 21,
25, 16, 18, 15, 18, 14, 10, 15, 8, 15, 6, 11, 8, 7, 13, 10, 23, 16, 15, 25
]
],
"option": "algo=holtwinters",
"algo": "holtwinters",
"prec": "ms",
"wncheck": 1,
"return_conf": 1,
"forecast_rows": 10,
"conf": 95,
"start": 1577808096000,
"every": 1000,
"rows": 96,
"protocol": 1.0
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["algo"], "holtwinters")
self.assertEqual(response.json["rows"], 10)
self.assertEqual(response.json["period"], 0)
self.assertEqual(response.json["res"][0][0], 1577808096000)
self.assertEqual(response.json["res"][0][-1], 1577808105000)
self.assertEqual(len(response.json["res"][0]), response.json["rows"])
self.assertEqual(len(response.json["res"]), 4)
def test_ad(self):
"""test anomaly detect api"""
response = self.client.post("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000,
1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000,
1577808015000, 1577808016000],
[5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
],
"rows": 17,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], 1)
self.assertEqual(response.json["algo"], "iqr")
def test_ad_error_get(self):
"""1. invalid http method"""
response = self.client.get("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000,
1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000,
1577808015000, 1577808016000],
[5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40]
],
"rows": 17,
"algo": "iqr"
})
self.assertEqual(response.status_code, 405)
def test_ad_error_empty_payload(self):
"""2. list that is going to apply anomaly detection is empty or less value than the threshold
, which is [10, 100000]"""
response = self.client.post("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000],
[5, 14, 15, 15, 14, 19, 17, 16, 20]
],
"rows": 9,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1)
def test_ad_error_single_col(self):
"""3. only one column"""
response = self.client.post("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000]
],
"rows": 9,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1)
def test_ad_error_three_cols(self):
"""4. there are three input columns """
response = self.client.post("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8],
["val", "INT", 4],
["val1", "INT", 4]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000],
[5, 14, 15, 15, 14, 19, 17, 16, 20, 44],
[5, 14, 15, 15, 14, 19, 17, 16, 20, 44]
],
"rows": 10,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1)
def test_ad_disorder_cols(self):
"""5. disorder two columns """
response = self.client.post("/anomaly-detect", json={
"schema": [
["val", "INT", 4],
["ts", "TIMESTAMP", 8]
],
"data": [
[5, 14, 15, 15, 14, 19, 17, 16, 20, 44],
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000],
],
"rows": 10,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], 2)
def test_missing_schema(self):
"""6. missing schema info"""
response = self.client.post("/anomaly-detect", json={
"data": [
[5, 14, 15, 15, 14, 19, 17, 16, 20, 44],
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000],
],
"rows": 10,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1)
def test_invalid_schema_info(self):
"""7. invalid schema info"""
response = self.client.post("/anomaly-detect", json={
"schema": [
["ts", "TIMESTAMP", 8]
],
"data": [
[1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000,
1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000],
],
"rows": 10,
"algo": "iqr"
})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json["rows"], -1)

View File

@ -0,0 +1,106 @@
# encoding:utf-8
# pylint: disable=c0103
"""unit test module"""
import os.path
import unittest
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../")
from taosanalytics.servicemgmt import loader
from taosanalytics.util import convert_results_to_windows, is_white_noise, parse_options, is_stationary
class UtilTest(unittest.TestCase):
"""utility test cases"""
def test_generate_anomaly_window(self):
# Test case 1: Normal input
wins = convert_results_to_windows([1, 1, 1, 1, 1, 1, -1, -1, -1, 1, 1, -1],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])
print(f"The result window is:{wins}")
# Assert the number of windows
self.assertEqual(len(wins), 2)
# Assert the first window
self.assertListEqual(wins[0], [7, 9])
# Assert the second window
self.assertListEqual(wins[1], [12, 12])
# Test case 2: Anomaly input list is empty
wins = convert_results_to_windows([], [1, 2])
self.assertListEqual(wins, [])
# Test case 3: Anomaly input list is None
wins = convert_results_to_windows([], None)
self.assertListEqual(wins, [])
# Test case 4: Timestamp list is None
wins = convert_results_to_windows(None, [])
self.assertListEqual(wins, [])
def test_validate_input_data(self):
pass
def test_validate_pay_load(self):
pass
def test_validate_forecast_input_data(self):
pass
def test_convert_results_to_windows(self):
pass
def test_is_white_noise(self):
"""
Test the is_white_noise function.
This function tests the functionality of the is_white_noise function by providing a list and asserting the expected result.
"""
list1 = []
wn = is_white_noise(list1)
self.assertFalse(wn)
def test_is_stationary(self):
"""test whether data is stationary or not"""
st = is_stationary([1, 2, 3, 4, 5, 7, 5, 1, 54, 3, 6, 87, 45, 14, 24])
self.assertEquals(st, False)
def test_parse_options(self):
"""test case for parse key/value string into k/v pair"""
option_str = "algo=ksigma,k=2,invalid_option=invalid_str"
opt = parse_options(option_str)
self.assertEqual(len(opt), 3)
self.assertDictEqual(opt, {'algo': 'ksigma', 'k': '2', 'invalid_option': 'invalid_str'})
def test_get_data_index(self):
""" test the get the data index method"""
schema = [
["val", "INT", 4],
["ts", "TIMESTAMP", 8]
]
for index, val in enumerate(schema):
if val[0] == "val":
return index
class ServiceTest(unittest.TestCase):
def setUp(self):
""" load all service before start unit test """
loader.load_all_service()
def test_get_all_algos(self):
service_list = loader.get_service_list()
self.assertEqual(len(service_list["details"]), 2)
for item in service_list["details"]:
if item["type"] == "anomaly-detection":
self.assertEqual(len(item["algo"]), 6)
else:
self.assertEqual(len(item["algo"]), 2)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,126 @@
# encoding:utf-8
"""utility methods to helper query processing"""
import numpy as np
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller
from taosanalytics.conf import app_logger
def validate_pay_load(json_obj):
""" validate the input payload """
if "data" not in json_obj:
raise ValueError('data attr does not exist in json')
data = json_obj["data"]
if len(data) <= 1:
raise ValueError('only one column, primary timestamp column should be provided')
if len(data) > 2:
raise ValueError('too many columns')
rows = len(data[0])
if rows != len(data[1]):
raise ValueError('data inconsistent, number of rows are not identical')
if rows < 10 or rows > 40000:
raise ValueError(f'number of rows should between 10 and 40000, actual {rows} rows')
if "schema" not in json_obj:
raise ValueError('schema is missing')
index = get_data_index(json_obj["schema"])
if index == -1:
raise ValueError('invalid schema info, data column is missing')
def convert_results_to_windows(result, ts_list):
"""generate the window according to anomaly detection result"""
skey, ekey = -1, -1
wins = []
if ts_list is None or result is None or len(result) != len(ts_list):
return wins
for index, val in enumerate(result):
if val == -1:
ekey = ts_list[index]
if skey == -1:
skey = ts_list[index]
else:
if ekey != -1:
wins.append([skey, ekey])
skey, ekey = -1, -1
if ekey != -1:
wins.append([skey, ekey])
return wins
def is_white_noise(input_list):
""" determine whether the input list is a white noise list or not """
if len(input_list) < 16: # the number of items in the list is insufficient
return False
res = acorr_ljungbox(input_list, lags=[6, 12, 16], boxpierce=True, return_df=True)
q_lb = res.lb_pvalue.array[2]
return q_lb >= 0.05
def is_stationary(input_list):
""" determine whether the input list is weak stationary or not """
adf, pvalue, usedlag, nobs, critical_values, _ = adfuller(input_list, autolag='AIC')
app_logger.log_inst.info("adf is:%f critical value is:%s" % (adf, critical_values))
return pvalue < 0.05
def parse_options(option_str) -> dict:
"""
the option format is like the following string: "algo=ksigma,k=2,invalid_option=invalid_str"
convert it to the dict format
"""
options = {}
if option_str is None or len(option_str) == 0:
return options
opt_list = option_str.split(",")
for line in opt_list:
if "=" not in line or len(line.strip()) < 3:
continue
kv_pair = line.strip().split("=")
if kv_pair[0].strip() == '' or kv_pair[1].strip() == '':
continue
options[kv_pair[0].strip()] = kv_pair[1].strip()
return options
def get_data_index(schema):
"""get the data index according to the schema info"""
for index, val in enumerate(schema):
if val[0] == "val":
return index
return -1
def get_ts_index(schema):
"""get the timestamp index according to the schema info"""
for index, val in enumerate(schema):
if val[0] == "ts":
return index
return -1
def create_sequences(values, time_steps):
""" create sequences for training model """
output = []
for i in range(len(values) - time_steps + 1):
output.append(values[i: (i + time_steps)])
return np.stack(output)