diff --git a/tools/tdgpt/README.md b/tools/tdgpt/README.md new file mode 100644 index 0000000000..0b61f8bcef --- /dev/null +++ b/tools/tdgpt/README.md @@ -0,0 +1,135 @@ +# Table of Contents + +1. [Introduction](#1-introduction) +1. [Documentation](#2-documentation) +1. [Prerequisites](#3-prerequisites) +1. [Building](#4-building) +1. [Packaging](#5-packaging) +1. [Installation](#6-installing) +1. [Running](#7-running) +1. [Testing](#8-testing) +1. [Releasing](#9-releasing) +1. [CI/CD](#10-cicd) +1. [Coverage](#11-coverage) +1. [Contributing](#12-contributing) + +# 1. Introduction +tdanalytics: an analytic platform for tdengine + +# 2. Documentation + +For user manual, system design and architecture, please refer to [TDengine Documentation](https://docs.tdengine.com/next) ([TDengine 文档](https://docs.taosdata.com/next)). + +# 3. Prerequisites + +List the software and tools required to work on the project. + +- python 3.10.12+ (for test) + +Step-by-step instructions to set up the prerequisites software. + +## 3.1 Install Python3.10 +Make sure Python3.10 or above is available before installing anode in your system. + +In case of Ubuntu, use the following instructions to install Python 3.10. + +``` +sudo apt-get install software-properties-common +sudo add-apt-repository ppa:deadsnakes/ppa +sudo apt update +sudo apt install python3.10 +sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10 2 +sudo update-alternatives --config python3 +sudo apt install python3.10-venv +sudo apt install python3.10-dev +``` + +Install the Pip3.10 + +```bash +curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10 +``` + +Add the ``~/.local/bin`` into ``~/.bashrc`` or ``~/.bash_profile`` + +```bash +export PATH=$PATH:~/.local/bin +``` + +# 4. Building +There is no need to build the taosanode, since it is implemented in Python, which is an interpreted language. + + +# 5. Packaging +In the base directory, you can use the following command to package to build an tarball. + +```bash +cd script && ./release.sh +``` + +After the packaging is completed, you will find the tarball in the `release` directory. + +```bash +ls -lht /root/tdanalytics/release + +-rw-rw-r-- 1 root root 74K Feb 21 17:04 TDengine-enterprise-anode-1.0.1.tar.gz +``` + +# 6. Installing + +## 6.1 Install taosanode + +Please use the following command to install taosanode in your system. + +```bash +./install.sh +``` + +During the installation, Python virtual environment will be established in `/var/lib/taos/taosanode/venv` by default, as well as the required libraries. +The taosanode will be installed as an system service, but will not automatic started when installed. You need to start the service mannually, by using the following command + +```bash +systemctl start taosanoded +``` + + +## 6.2 Configure the Service +taosanode provides the RESTFul service powered by `uWSGI`. You can config the options to tune the +performance by changing the default configuration file `taosanode.ini` located in `/etc/taos`, which is also the configuration directory for `taosd` service. + +```ini +# taosanode service ip:port +http = 127.0.0.1:6090 +``` + +# 7. Running +## 7.1 Start/Stop Service +`systemctl start/stop/restart taosanoded.service` will start/stop/restart the service of taosanode. + + +## 7.2 Uninstall +The command `rmtaosanode` will remove the installed taosanode from your system. Note that the python environment won't removed by this script, you need to remove it mannually. + +# 8. Testing +we use github Actions to run the test suit. Please refer to the file [.github/workflows/python-package.yml](https://github.com/taosdata/tdanalytics/.github/workflows/python-package.yml) for more details. + + +# 9 Releasing +For the complete list of taosanode Releases, please see Releases. + +# 10 CI/CD + +We use github Actions for CI/CD workflow configuration. Please refer to the file .github/workflows/python-package.yml for more details. + + +# 11 Coverage + + +# 12 Contributing + +Guidelines for contributing to the project: + +- Fork the repository +- Create a feature branch +- Submit a pull request + diff --git a/tools/tdgpt/cfg/taosanode.ini b/tools/tdgpt/cfg/taosanode.ini new file mode 100755 index 0000000000..51782bccd6 --- /dev/null +++ b/tools/tdgpt/cfg/taosanode.ini @@ -0,0 +1,81 @@ +#uwsgi --ini taosanode.ini +#uwsgi --reload taosanode.pid +#uwsgi --stop taosanode.pid + +[uwsgi] +# charset +env = LC_ALL = en_US.UTF-8 + +# ip:port +http = 127.0.0.1:6090 + +# the local unix socket file than communicate to Nginx +#socket = 127.0.0.1:8001 +#socket-timeout = 10 + +# base directory +chdir = /usr/local/taos/taosanode/lib + +# initialize python file +wsgi-file = /usr/local/taos/taosanode/lib/taosanalytics/app.py + +# call module of uWSGI +callable = app + +# auto remove unix Socket and pid file when stopping +vacuum = true + +# socket exec model +#chmod-socket = 664 + +# uWSGI pid +uid = root + +# uWSGI gid +gid = root + +# main process +master = true + +# the number of worker processes +processes = 2 + +# pid file +pidfile = /usr/local/taos/taosanode/taosanode.pid + +# enable threads +enable-threads = true + +# the number of threads for each process +threads = 4 + +# memory useage report +memory-report = true + +# smooth restart +reload-mercy = 10 + +# conflict with systemctl, so do NOT uncomment this +# daemonize = /var/log/taos/taosanode/taosanode.log + +# log directory +logto = /var/log/taos/taosanode/taosanode.log + +# wWSGI monitor port +stats = 127.0.0.1:8387 + +# python virtual environment directory +virtualenv = /usr/local/taos/taosanode/venv/ + +[taosanode] +# default app log file +app-log = /var/log/taos/taosanode/taosanode.app.log + +# model storage directory +model-dir = /usr/local/taos/taosanode/model/ + +# default log level +log-level = DEBUG + +# draw the query results +draw-result = 1 diff --git a/tools/tdgpt/cfg/taosanoded.service b/tools/tdgpt/cfg/taosanoded.service new file mode 100755 index 0000000000..a8d86cabe7 --- /dev/null +++ b/tools/tdgpt/cfg/taosanoded.service @@ -0,0 +1,22 @@ +[Unit] +Description=TaosANode Service +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +Environment=PATH=/usr/lib/taos/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin +ExecStart=/usr/local/taos/taosanode/bin/start.sh +ExecStop=/usr/local/taos/taosanode/bin/stop.sh +TimeoutStartSec=0 +TimeoutStopSec=120s +LimitNOFILE=1048576 +LimitNPROC=infinity +LimitCORE=infinity +StandardOutput=null +Restart=always +StartLimitBurst=6 +StartLimitInterval=60s + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/tools/tdgpt/script/install.sh b/tools/tdgpt/script/install.sh new file mode 100755 index 0000000000..9308b37cfc --- /dev/null +++ b/tools/tdgpt/script/install.sh @@ -0,0 +1,748 @@ +#!/bin/bash +# +# This file is used to install analysis platform on linux systems. The operating system +# is required to use systemd to manage services at boot + +set -e + +iplist="" +serverFqdn="" + +# -----------------------Variables definition--------------------- +script_dir=$(dirname $(readlink -f "$0")) +echo -e "${script_dir}" + +# Dynamic directory +PREFIX="taos" +PRODUCTPREFIX="taosanode" +serverName="${PRODUCTPREFIX}d" +configFile="taosanode.ini" +productName="TDengine Anode" +emailName="taosdata.com" +tarName="package.tar.gz" +logDir="/var/log/${PREFIX}/${PRODUCTPREFIX}" +moduleDir="/var/lib/${PREFIX}/${PRODUCTPREFIX}/model" +venvDir="/var/lib/${PREFIX}/${PRODUCTPREFIX}/venv" +global_conf_dir="/etc/${PREFIX}" +installDir="/usr/local/${PREFIX}/${PRODUCTPREFIX}" + +python_minor_ver=0 #check the python version +bin_link_dir="/usr/bin" + +#install main path +install_main_dir=${installDir} + +service_config_dir="/etc/systemd/system" + +# Color setting +RED='\033[0;31m' +GREEN='\033[1;32m' +GREEN_DARK='\033[0;32m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' + +csudo="" +if command -v sudo >/dev/null; then + csudo="sudo " +fi + +update_flag=0 +prompt_force=0 + +initd_mod=0 +service_mod=2 +if ps aux | grep -v grep | grep systemd &>/dev/null; then + service_mod=0 +elif $(which service &>/dev/null); then + service_mod=1 + service_config_dir="/etc/init.d" + if $(which chkconfig &>/dev/null); then + initd_mod=1 + elif $(which insserv &>/dev/null); then + initd_mod=2 + elif $(which update-rc.d &>/dev/null); then + initd_mod=3 + else + service_mod=2 + fi +else + service_mod=2 +fi + +# get the operating system type for using the corresponding init file +# ubuntu/debian(deb), centos/fedora(rpm), others: opensuse, redhat, ..., no verification +#osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release) +if [[ -e /etc/os-release ]]; then + osinfo=$(cat /etc/os-release | grep "NAME" | cut -d '"' -f2) || : +else + osinfo="" +fi +#echo "osinfo: ${osinfo}" +os_type=0 +if echo $osinfo | grep -qwi "ubuntu"; then + # echo "This is ubuntu system" + os_type=1 +elif echo $osinfo | grep -qwi "debian"; then + # echo "This is debian system" + os_type=1 +elif echo $osinfo | grep -qwi "Kylin"; then + # echo "This is Kylin system" + os_type=1 +elif echo $osinfo | grep -qwi "centos"; then + # echo "This is centos system" + os_type=2 +elif echo $osinfo | grep -qwi "fedora"; then + # echo "This is fedora system" + os_type=2 +elif echo $osinfo | grep -qwi "Linux"; then + # echo "This is Linux system" + os_type=1 + service_mod=0 + initd_mod=0 + service_config_dir="/etc/systemd/system" +else + echo " osinfo: ${osinfo}" + echo " This is an officially unverified linux system," + echo " if there are any problems with the installation and operation, " + echo " please feel free to contact ${emailName} for support." + os_type=1 +fi + +# ============================= get input parameters ================================================= + +# install.sh -v [server | client] -e [yes | no] -i [systemd | service | ...] + +# set parameters by default value +interactiveFqdn=yes # [yes | no] +verType=server # [server | client] +initType=systemd # [systemd | service | ...] + +while getopts "hv:e:" arg; do + case $arg in + e) + #echo "interactiveFqdn=$OPTARG" + interactiveFqdn=$(echo $OPTARG) + ;; + v) + #echo "verType=$OPTARG" + verType=$(echo $OPTARG) + ;; + h) + echo "Usage: $(basename $0) -v [server | client] -e [yes | no]" + exit 0 + ;; + ?) #unknow option + echo "unknown argument" + exit 1 + ;; + esac +done + +#echo "verType=${verType} interactiveFqdn=${interactiveFqdn}" + +services=(${serverName}) + +function install_services() { + for service in "${services[@]}"; do + install_service ${service} + done +} + +function kill_process() { + pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + ${csudo}kill -9 "$pid" || : + fi +} + +function install_main_path() { + #create install main dir and all sub dir + if [ ! -z "${install_main_dir}" ]; then + ${csudo}rm -rf ${install_main_dir} || : + fi + + ${csudo}mkdir -p ${install_main_dir} + ${csudo}mkdir -p ${install_main_dir}/cfg + ${csudo}mkdir -p ${install_main_dir}/bin + ${csudo}mkdir -p ${install_main_dir}/lib + ${csudo}mkdir -p ${global_conf_dir} +} + +function install_bin_and_lib() { + ${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin + ${csudo}cp -r ${script_dir}/lib/* ${install_main_dir}/lib/ + + if [[ ! -e "${bin_link_dir}/rmtaosanode" ]]; then + ${csudo}ln -s ${install_main_dir}/bin/uninstall.sh ${bin_link_dir}/rmtaosanode + fi +} + +function add_newHostname_to_hosts() { + localIp="127.0.0.1" + OLD_IFS="$IFS" + IFS=" " + iphost=$(cat /etc/hosts | grep $1 | awk '{print $1}') + arr=($iphost) + IFS="$OLD_IFS" + for s in "${arr[@]}"; do + if [[ "$s" == "$localIp" ]]; then + return + fi + done + + if grep -q "127.0.0.1 $1" /etc/hosts; then + return + else + ${csudo}chmod 666 /etc/hosts + ${csudo}echo "127.0.0.1 $1" >>/etc/hosts + fi +} + +function set_hostname() { + echo -e -n "${GREEN}Host name or IP (assigned to this machine) which can be accessed by your tools or apps (must not be 'localhost')${NC}" + read -e -p " : " -i "$(hostname)" newHostname + while true; do + if [ -z "$newHostname" ]; then + newHostname=$(hostname) + break + elif [ "$newHostname" != "localhost" ]; then + break + else + echo -e -n "${GREEN}Host name or IP (assigned to this machine) which can be accessed by your tools or apps (must not be 'localhost')${NC}" + read -e -p " : " -i "$(hostname)" newHostname + fi + done + + if [ -f ${global_conf_dir}/${configFile} ]; then + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$newHostname/" ${global_conf_dir}/${configFile} + else + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$newHostname/" ${script_dir}/cfg/${configFile} + fi + serverFqdn=$newHostname + + if [[ -e /etc/hosts ]] && [[ ! $newHostname =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then + add_newHostname_to_hosts $newHostname + fi +} + +function is_correct_ipaddr() { + newIp=$1 + OLD_IFS="$IFS" + IFS=" " + arr=($iplist) + IFS="$OLD_IFS" + for s in "${arr[@]}"; do + if [[ "$s" == "$newIp" ]]; then + return 0 + fi + done + + return 1 +} + +function set_ipAsFqdn() { + iplist=$(ip address | grep inet | grep -v inet6 | grep -v 127.0.0.1 | awk '{print $2}' | awk -F "/" '{print $1}') || : + if [ -z "$iplist" ]; then + iplist=$(ifconfig | grep inet | grep -v inet6 | grep -v 127.0.0.1 | awk '{print $2}' | awk -F ":" '{print $2}') || : + fi + + if [ -z "$iplist" ]; then + echo + echo -e -n "${GREEN}Unable to get local ip, use 127.0.0.1${NC}" + localFqdn="127.0.0.1" + # Write the local FQDN to configuration file + + if [ -f ${global_conf_dir}/${configFile} ]; then + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${global_conf_dir}/${configFile} + else + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${script_dir}/cfg/${configFile} + fi + serverFqdn=$localFqdn + echo + return + fi + + echo -e -n "${GREEN}Please choose an IP from local IP list${NC}:" + echo + echo -e -n "${GREEN}$iplist${NC}" + echo + echo + echo -e -n "${GREEN}Notes: if IP is used as the node name, data can NOT be migrated to other machine directly${NC}:" + read localFqdn + while true; do + if [ ! -z "$localFqdn" ]; then + # Check if correct ip address + is_correct_ipaddr $localFqdn + retval=$(echo $?) + if [[ $retval != 0 ]]; then + read -p "Please choose an IP from local IP list:" localFqdn + else + # Write the local FQDN to configuration file + if [ -f ${global_conf_dir}/${configFile} ]; then + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${global_conf_dir}/${configFile} + else + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$localFqdn/" ${script_dir}/cfg/${configFile} + fi + serverFqdn=$localFqdn + break + fi + else + read -p "Please choose an IP from local IP list:" localFqdn + fi + done +} + +function local_fqdn_check() { + #serverFqdn=$(hostname) + echo + echo -e -n "System hostname is: ${GREEN}$serverFqdn${NC}" + echo + set_hostname +} + +function install_anode_config() { + fileName="${script_dir}/cfg/${configFile}" + echo -e $fileName + + if [ -f ${fileName} ]; then + ${csudo}sed -i -r "s/#*\s*(fqdn\s*).*/\1$serverFqdn/" ${script_dir}/cfg/${configFile} + + if [ -f "${global_conf_dir}/${configFile}" ]; then + ${csudo}cp ${fileName} ${global_conf_dir}/${configFile}.new + else + ${csudo}cp ${fileName} ${global_conf_dir}/${configFile} + fi + fi + + ${csudo}ln -sf ${global_conf_dir}/${configFile} ${install_main_dir}/cfg +} + +function install_config() { + + [ ! -z $1 ] && return 0 || : # only install client + + if ((${update_flag} == 1)); then + install_taosd_config + return 0 + fi + + if [ "$interactiveFqdn" == "no" ]; then + install_taosd_config + return 0 + fi + + local_fqdn_check + install_anode_config + + echo + echo -e -n "${GREEN}Enter FQDN:port (like h1.${emailName}:6030) of an existing ${productName} cluster node to join${NC}" + echo + echo -e -n "${GREEN}OR leave it blank to build one${NC}:" + read firstEp + while true; do + if [ ! -z "$firstEp" ]; then + if [ -f ${global_conf_dir}/${configFile} ]; then + ${csudo}sed -i -r "s/#*\s*(firstEp\s*).*/\1$firstEp/" ${global_conf_dir}/${configFile} + else + ${csudo}sed -i -r "s/#*\s*(firstEp\s*).*/\1$firstEp/" ${script_dir}/cfg/${configFile} + fi + break + else + break + fi + done + + echo + echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: " + read emailAddr + while true; do + if [ ! -z "$emailAddr" ]; then + email_file="${install_main_dir}/email" + ${csudo}bash -c "echo $emailAddr > ${email_file}" + break + else + break + fi + done +} + +function install_log() { + ${csudo}mkdir -p ${logDir} && ${csudo}chmod 777 ${logDir} + ${csudo}ln -sf ${logDir} ${install_main_dir}/log +} + +function install_module() { + ${csudo}mkdir -p ${moduleDir} && ${csudo}chmod 777 ${moduleDir} + ${csudo}ln -sf ${moduleDir} ${install_main_dir}/model +} + +function install_anode_venv() { + ${csudo}mkdir -p ${venvDir} && ${csudo}chmod 777 ${venvDir} + ${csudo}ln -sf ${venvDir} ${install_main_dir}/venv + + # build venv + ${csudo}python3.${python_minor_ver} -m venv ${venvDir} + + echo -e "active Python3 virtual env: ${venvDir}" + source ${venvDir}/bin/activate + + echo -e "install the required packages by pip3, this may take a while depending on the network condition" + ${csudo}${venvDir}/bin/pip3 install numpy==1.26.4 + ${csudo}${venvDir}/bin/pip3 install pandas==1.5.0 + + ${csudo}${venvDir}/bin/pip3 install scikit-learn + ${csudo}${venvDir}/bin/pip3 install outlier_utils + ${csudo}${venvDir}/bin/pip3 install statsmodels + ${csudo}${venvDir}/bin/pip3 install pyculiarity + ${csudo}${venvDir}/bin/pip3 install pmdarima + ${csudo}${venvDir}/bin/pip3 install flask + ${csudo}${venvDir}/bin/pip3 install matplotlib + ${csudo}${venvDir}/bin/pip3 install uwsgi + ${csudo}${venvDir}/bin/pip3 install torch --index-url https://download.pytorch.org/whl/cpu + ${csudo}${venvDir}/bin/pip3 install --upgrade keras + + echo -e "Install python library for venv completed!" +} + +function clean_service_on_sysvinit() { + if ps aux | grep -v grep | grep $1 &>/dev/null; then + ${csudo}service $1 stop || : + fi + + if ((${initd_mod} == 1)); then + if [ -e ${service_config_dir}/$1 ]; then + ${csudo}chkconfig --del $1 || : + fi + elif ((${initd_mod} == 2)); then + if [ -e ${service_config_dir}/$1 ]; then + ${csudo}insserv -r $1 || : + fi + elif ((${initd_mod} == 3)); then + if [ -e ${service_config_dir}/$1 ]; then + ${csudo}update-rc.d -f $1 remove || : + fi + fi + + ${csudo}rm -f ${service_config_dir}/$1 || : + + if $(which init &>/dev/null); then + ${csudo}init q || : + fi +} + +function install_service_on_sysvinit() { + if [ "$1" != "${serverName}" ]; then + return + fi + + clean_service_on_sysvinit $1 + sleep 1 + + if ((${os_type} == 1)); then + ${csudo}cp ${script_dir}/init.d/${serverName}.deb ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} + elif ((${os_type} == 2)); then + ${csudo}cp ${script_dir}/init.d/${serverName}.rpm ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} + fi + + if ((${initd_mod} == 1)); then + ${csudo}chkconfig --add $1 || : + ${csudo}chkconfig --level 2345 $1 on || : + elif ((${initd_mod} == 2)); then + ${csudo}insserv $1} || : + ${csudo}insserv -d $1 || : + elif ((${initd_mod} == 3)); then + ${csudo}update-rc.d $1 defaults || : + fi +} + +function clean_service_on_systemd() { + service_config="${service_config_dir}/$1.service" + + if systemctl is-active --quiet $1; then + echo "$1 is running, stopping it..." + ${csudo}systemctl stop $1 &>/dev/null || echo &>/dev/null + fi + ${csudo}systemctl disable $1 &>/dev/null || echo &>/dev/null + ${csudo}rm -f ${service_config} +} + +function install_service_on_systemd() { + clean_service_on_systemd $1 + + cfg_source_dir=${script_dir}/cfg + if [[ "$1" == "${xname}" || "$1" == "${explorerName}" ]]; then + cfg_source_dir=${script_dir}/cfg + fi + + if [ -f ${cfg_source_dir}/$1.service ]; then + ${csudo}cp ${cfg_source_dir}/$1.service ${service_config_dir}/ || : + fi + + ${csudo}systemctl enable $1 + ${csudo}systemctl daemon-reload +} + +function install_service() { + if ((${service_mod} == 0)); then + install_service_on_systemd $1 + elif ((${service_mod} == 1)); then + install_service_on_sysvinit $1 + else + kill_process $1 + fi +} + +vercomp() { + if [[ $1 == $2 ]]; then + return 0 + fi + local IFS=. + local i ver1=($1) ver2=($2) + # fill empty fields in ver1 with zeros + for ((i = ${#ver1[@]}; i < ${#ver2[@]}; i++)); do + ver1[i]=0 + done + + for ((i = 0; i < ${#ver1[@]}; i++)); do + if [[ -z ${ver2[i]} ]]; then + # fill empty fields in ver2 with zeros + ver2[i]=0 + fi + if ((10#${ver1[i]} > 10#${ver2[i]})); then + return 1 + fi + if ((10#${ver1[i]} < 10#${ver2[i]})); then + return 2 + fi + done + return 0 +} + +function is_version_compatible() { + + curr_version=$(ls ${script_dir}/driver/libtaos.so* | awk -F 'libtaos.so.' '{print $2}') + + if [ -f ${script_dir}/driver/vercomp.txt ]; then + min_compatible_version=$(cat ${script_dir}/driver/vercomp.txt) + else + min_compatible_version=$(${script_dir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 5) + fi + + exist_version=$(${installDir}/bin/${serverName} -V | grep version | head -1 | cut -d ' ' -f 3) + vercomp $exist_version "3.0.0.0" + case $? in + 2) + prompt_force=1 + ;; + esac + + vercomp $curr_version $min_compatible_version + echo "" # avoid $? value not update + + case $? in + 0) return 0 ;; + 1) return 0 ;; + 2) return 1 ;; + esac +} + +deb_erase() { + confirm="" + while [ "" == "${confirm}" ]; do + echo -e -n "${RED}Existing TDengine deb is detected, do you want to remove it? [yes|no] ${NC}:" + read confirm + if [ "yes" == "$confirm" ]; then + ${csudo}dpkg --remove tdengine || : + break + elif [ "no" == "$confirm" ]; then + break + fi + done +} + +rpm_erase() { + confirm="" + while [ "" == "${confirm}" ]; do + echo -e -n "${RED}Existing TDengine rpm is detected, do you want to remove it? [yes|no] ${NC}:" + read confirm + if [ "yes" == "$confirm" ]; then + ${csudo}rpm -e tdengine || : + break + elif [ "no" == "$confirm" ]; then + break + fi + done +} + +function updateProduct() { + # Check if version compatible + if ! is_version_compatible; then + echo -e "${RED}Version incompatible${NC}" + return 1 + fi + + # Start to update + if [ ! -e ${tarName} ]; then + echo "File ${tarName} does not exist" + exit 1 + fi + + if echo $osinfo | grep -qwi "centos"; then + rpm -q tdengine 2>&1 >/dev/null && rpm_erase tdengine || : + elif echo $osinfo | grep -qwi "ubuntu"; then + dpkg -l tdengine 2>&1 | grep ii >/dev/null && deb_erase tdengine || : + fi + + tar -zxf ${tarName} + + echo "Start to update ${productName}..." + # Stop the service if running + if ps aux | grep -v grep | grep ${serverName} &>/dev/null; then + if ((${service_mod} == 0)); then + ${csudo}systemctl stop ${serverName} || : + elif ((${service_mod} == 1)); then + ${csudo}service ${serverName} stop || : + else + kill_process ${serverName} + fi + sleep 1 + fi + + install_main_path + install_log + install_module + install_config + + if [ -z $1 ]; then + install_bin + install_services + + echo + echo -e "${GREEN_DARK}To configure ${productName} ${NC}\t\t: edit ${global_conf_dir}/${configFile}" + [ -f ${global_conf_dir}/${adapterName}.toml ] && [ -f ${installDir}/bin/${adapterName} ] && + echo -e "${GREEN_DARK}To configure ${adapterName} ${NC}\t: edit ${global_conf_dir}/${adapterName}.toml" + echo -e "${GREEN_DARK}To configure ${explorerName} ${NC}\t: edit ${global_conf_dir}/explorer.toml" + if ((${service_mod} == 0)); then + echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ${csudo}systemctl start ${serverName}${NC}" + elif ((${service_mod} == 1)); then + echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ${csudo}service ${serverName} start${NC}" + else + echo -e "${GREEN_DARK}To start ${productName} server ${NC}\t: ./${serverName}${NC}" + fi + + echo + echo "${productName} is updated successfully!" + echo + + else + install_bin + fi + + cd $script_dir + rm -rf $(tar -tf ${tarName} | grep -Ev "^\./$|^\/") +} + +function installProduct() { + # Start to install + if [ ! -e ${tarName} ]; then + echo "File ${tarName} does not exist" + exit 1 + fi + + tar -zxf ${tarName} + + echo "Start to install ${productName}..." + + install_main_path + install_log + install_anode_config + install_module + + install_bin_and_lib + install_services + + echo + echo -e "\033[44;32;1m${productName} is installed successfully!${NC}" + + echo + echo -e "\033[44;32;1mStart to create virtual python env in ${venvDir}${NC}" + install_anode_venv +} + +# check for python version, only the 3.10/3.11 is supported +check_python3_env() { + if ! command -v python3 &> /dev/null + then + echo -e "\033[31mWarning: Python3 command not found. Version 3.10/3.11 is required.\033[0m" + exit 1 + fi + + python3_version=$(python3 --version 2>&1 | awk -F' ' '{print $2}') + + python3_version_ok=false + python_minor_ver=$(echo "$python3_version" | cut -d"." -f2) + if [[ $(echo "$python3_version" | cut -d"." -f1) -eq 3 && $(echo "$python3_version" | cut -d"." -f2) -ge 10 ]]; then + python3_version_ok=true + fi + + if $python3_version_ok; then + echo -e "\033[32mPython3 ${python3_version} has been found.\033[0m" + else + if command -v python3.10 &> /dev/null + then + echo -e "\033[32mPython3.10 has been found.\033[0m" + python_minor_ver=10 + elif command -v python3.11 &> /dev/null + then + python_minor_ver=11 + echo -e "\033[32mPython3.11 has been found.\033[0m" + else + echo -e "\033[31mWarning: Python3.10/3.11 is required, only found python${python3_version}.\033[0m" + exit 1 + fi + fi + +# echo -e "Python3 minor version is:${python_minor_ver}" + + # check the existence pip3.10/pip3.11 + if ! command -v pip3 &> /dev/null + then + echo -e "\033[31mWarning: Pip3 command not found. Version 3.10/3.11 is required.\033[0m" + exit 1 + fi + + pip3_version=$(pip3 --version 2>&1 | awk -F' ' '{print $6}' | cut -d")" -f1) + major_ver=$(echo "${pip3_version}" | cut -d"." -f1) + minor_ver=$(echo "${pip3_version}" | cut -d"." -f2) + + pip3_version_ok=false; + if [[ ${major_ver} -eq 3 && ${minor_ver} -ge 10 ]]; then + pip3_version_ok=true + fi + + if $pip3_version_ok; then + echo -e "\033[32mpip3 ${pip3_version} has been found.\033[0m" + else + if command -v pip3.${python_minor_ver} &> /dev/null + then + echo -e "\033[32mpip3.${python_minor_ver} has been found.\033[0m" + else + echo -e "\033[31mWarning: pip3.10/3.11 is required, only found pip${pip3_version}.\033[0m" + exit 1 + fi + fi + +# if ! command -v python3.${python_minor_ver}-venv &> /dev/null +# then +# echo -e "\033[31mWarning: python3.${python_minor_ver}-venv command not found.\033[0m" +# exit 1 +# fi +} + +## ==============================Main program starts from here============================ +serverFqdn=$(hostname) + +if [ "$verType" == "server" ]; then + check_python3_env + installProduct +fi diff --git a/tools/tdgpt/script/release.sh b/tools/tdgpt/script/release.sh new file mode 100755 index 0000000000..c143357eb1 --- /dev/null +++ b/tools/tdgpt/script/release.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# Generate install package for all os system + +set -e +# set -x + +curr_dir=$(pwd) +compile_dir=$1 +version="1.0.1" +osType= +pagMode= +productName="TDengine-enterprise-anode" + +script_dir="$(dirname $(readlink -f $0))" +top_dir="$(readlink -f ${script_dir}/..)" + +echo -e ${top_dir} + +serverName="taosanoded" +configFile="taosanode.ini" +tarName="package.tar.gz" + +# create compressed install file. +build_dir="${compile_dir}/build" +release_dir="${top_dir}/release" + +#package_name='linux' +install_dir="${release_dir}/${productName}-${version}" + +if [ "$pagMode" == "lite" ]; then + strip ${build_dir}/bin/${serverName} +fi + +cfg_dir="${top_dir}/cfg" +install_files="${script_dir}/install.sh" + +# make directories. +mkdir -p ${install_dir} +mkdir -p ${install_dir}/cfg && cp ${cfg_dir}/${configFile} ${install_dir}/cfg/${configFile} + +if [ -f "${cfg_dir}/${serverName}.service" ]; then + cp ${cfg_dir}/${serverName}.service ${install_dir}/cfg || : +fi + +# python files +mkdir -p ${install_dir}/bin && mkdir -p ${install_dir}/lib + +# script to control start/stop/uninstall process +rm -r ${top_dir}/taosanalytics/*.pyc || : +cp -r ${top_dir}/taosanalytics/ ${install_dir}/lib/ && chmod a+x ${install_dir}/lib/ || : +cp -r ${top_dir}/script/st*.sh ${install_dir}/bin/ && chmod a+x ${install_dir}/bin/* || : +cp -r ${top_dir}/script/uninstall.sh ${install_dir}/bin/ && chmod a+x ${install_dir}/bin/* || : + +cd ${install_dir} + +#if [ "$osType" != "Darwin" ]; then +# tar -zcv -f ${tarName} ./bin/* || : +# rm -rf ${install_dir}/bin || : +#else +tar -zcv -f ${tarName} ./lib/* || : + +if [ ! -z "${install_dir}" ]; then + # shellcheck disable=SC2115 + rm -rf "${install_dir}"/lib || : +fi + +exitcode=$? +if [ "$exitcode" != "0" ]; then + echo "tar ${tarName} error !!!" + exit $exitcode +fi + +cd ${curr_dir} +cp ${install_files} ${install_dir} + +chmod a+x ${install_dir}/install.sh + +# Copy release note +# cp ${script_dir}/release_note ${install_dir} + +# exit 1 +cd ${release_dir} + +pkg_name=${install_dir} +echo -e "pkg_name is: ${pkg_name}" + +if [ "$osType" != "Darwin" ]; then + tar -zcv -f "$(basename ${pkg_name}).tar.gz" "$(basename ${install_dir})" --remove-files || : +else + tar -zcv -f "$(basename ${pkg_name}).tar.gz" "$(basename ${install_dir})" || : + rm -rf "${install_dir}" ||: +fi + +exitcode=$? +if [ "$exitcode" != "0" ]; then + echo "tar ${pkg_name}.tar.gz error !!!" + exit $exitcode +fi + +cd ${curr_dir} diff --git a/tools/tdgpt/script/start.sh b/tools/tdgpt/script/start.sh new file mode 100755 index 0000000000..144ee08392 --- /dev/null +++ b/tools/tdgpt/script/start.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# start the flask service by using uwsgi +/usr/local/taos/taosanode/venv/bin/uwsgi /usr/local/taos/taosanode/cfg/taosanode.ini \ No newline at end of file diff --git a/tools/tdgpt/script/stop.sh b/tools/tdgpt/script/stop.sh new file mode 100755 index 0000000000..c6f81d05cc --- /dev/null +++ b/tools/tdgpt/script/stop.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# stop the uwsgi server +/usr/local/taos/taosanode/venv/bin/uwsgi --stop /usr/local/taos/taosanode/taosanode.pid \ No newline at end of file diff --git a/tools/tdgpt/script/uninstall.sh b/tools/tdgpt/script/uninstall.sh new file mode 100755 index 0000000000..29c62f9782 --- /dev/null +++ b/tools/tdgpt/script/uninstall.sh @@ -0,0 +1,220 @@ +#!/bin/bash +# uninstall the deployed app info, not remove the python virtual environment + +set -e +#set -x + +osType=`uname` + +RED='\033[0;31m' +GREEN='\033[1;32m' +NC='\033[0m' + +MAIN_NAME="taosanode" +installDir="/usr/local/taos/taosanode" +venv_dir="/usr/local/taos/taosanode/venv" +serverName="${MAIN_NAME}d" +uninstallName="rmtaosanode" +productName="TDengine Enterprise ANode" + +if [ "$osType" != "Darwin" ]; then + bin_link_dir="/usr/bin" +else + bin_link_dir="/usr/local/bin" +fi + +#install main path +bin_dir=${installDir}/bin +lib_dir=${installDir}/lib +local_log_dir=${installDir}/log +local_conf_dir=${installDir}/cfg +local_model_dir=${installDir}/model + +global_log_dir="/var/log/taos/${MAIN_NAME}" +global_conf_dir="/etc/taos/" + +service_config_dir="/etc/systemd/system" + +services=(${serverName} ${uninstallName}) + +csudo="" +if command -v sudo >/dev/null; then + csudo="sudo " +fi + +initd_mod=0 +service_mod=2 + +if ps aux | grep -v grep | grep systemd &>/dev/null; then + service_mod=0 +elif $(which service &>/dev/null); then + service_mod=1 + service_config_dir="/etc/init.d" + if $(which chkconfig &>/dev/null); then + initd_mod=1 + elif $(which insserv &>/dev/null); then + initd_mod=2 + elif $(which update-rc.d &>/dev/null); then + initd_mod=3 + else + service_mod=2 + fi +else + service_mod=2 +fi + +kill_service_of() { + _service=$1 + pid=$(ps -ef | grep $_service | grep -v grep | awk '{print $2}') + if [ -n "$pid" ]; then + "${csudo}"${installDir}/bin/stop.sh: + fi +} + +clean_service_on_systemd_of() { + _service=$1 + _service_config="${service_config_dir}/${_service}.service" + if systemctl is-active --quiet ${_service}; then + echo "${_service} is running, stopping it..." + "${csudo}"systemctl stop ${_service} &>/dev/null || echo &>/dev/null + fi + + "${csudo}"systemctl disable ${_service} &>/dev/null || echo &>/dev/null + + if [[ ! -z "${_service_config}" && -f "${_service_config}" ]]; then + ${csudo}rm ${_service_config} + fi +} + +clean_service_on_sysvinit_of() { + _service=$1 + if pidof ${_service} &>/dev/null; then + echo "${_service} is running, stopping it..." + "${csudo}"service ${_service} stop || : + fi + if ((${initd_mod} == 1)); then + if [ -e ${service_config_dir}/${_service} ]; then + # shellcheck disable=SC2086 + ${csudo}chkconfig --del ${_service} || : + fi + elif ((${initd_mod} == 2)); then + if [ -e ${service_config_dir}/${_service} ]; then + ${csudo}insserv -r ${_service} || : + fi + elif ((${initd_mod} == 3)); then + if [ -e ${service_config_dir}/${_service} ]; then + ${csudo}update-rc.d -f ${_service} remove || : + fi + fi + + # shellcheck disable=SC2236 + if [ ! -z "${service_config_dir}" ]; then + echo -e "rm ${service_config_dir}/${_service}" + fi + + #${csudo}rm ${service_config_dir}/${_service} || : + + if $(which init &>/dev/null); then + ${csudo}init q || : + fi +} + +clean_service_of() { + if ((${service_mod} == 0)); then + clean_service_on_systemd_of $_service + elif ((${service_mod} == 1)); then + clean_service_on_sysvinit_of $_service + else + kill_service_of $_service + fi +} + +remove_service_of() { + _service=$1 + clean_service_of ${_service} + if [[ -e "${bin_link_dir}/${_service}" ]]; then + ${csudo}rm "${bin_link_dir}"/${_service} + echo "${_service} is removed successfully!" + fi +} + +remove_service() { + for _service in "${services[@]}"; do + remove_service_of "${_service}" + done +} + +function clean_venv() { + # Remove python virtual environment + #${csudo}rm ${venv_dir}/* || : + if [ ! -z "${venv_dir}" ]; then + echo -e "${csudo}rm -rf ${venv_dir}/*" + fi +} + +function clean_module() { + if [ ! -z "${local_model_dir}" ]; then + ${csudo}unlink ${local_model_dir} || : + fi +} + +function clean_config() { + # Remove config file + if [ ! -z "${global_conf_dir}" ]; then + ${csudo}rm -f ${global_conf_dir}/taosanode.ini || : + fi + + if [ ! -z "${local_conf_dir}" ]; then + # shellcheck disable=SC2086 + ${csudo}rm -rf ${local_conf_dir} || : + fi +} + +function clean_log() { + # Remove log files + if [ ! -z "${global_log_dir}" ]; then + ${csudo}rm -rf ${global_log_dir} || : + fi + + if [ ! -z "${local_log_dir}" ]; then + ${csudo}rm -rf ${local_log_dir} || : + fi +} + +function remove_deploy_binary() { + if [ ! -z "${bin_dir}" ]; then + ${csudo}rm -rf ${bin_dir} || : + fi + + if [ ! -z "${lib_dir}" ]; then + ${csudo}rm -rf ${lib_dir} + fi +} + +remove_service +clean_log # Remove link log directory +clean_config # Remove link configuration file +remove_deploy_binary +clean_venv + +if [[ -e /etc/os-release ]]; then + osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release) +else + osinfo="" +fi + +if echo $osinfo | grep -qwi "ubuntu"; then + # echo "this is ubuntu system" + ${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || : +elif echo $osinfo | grep -qwi "debian"; then + # echo "this is debian system" + ${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || : +elif echo $osinfo | grep -qwi "centos"; then + # echo "this is centos system" + ${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || : +fi + +command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true +echo +echo "${productName} is uninstalled successfully!" +echo diff --git a/tools/tdgpt/taosanalytics/__init__.py b/tools/tdgpt/taosanalytics/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/algo/__init__.py b/tools/tdgpt/taosanalytics/algo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/algo/ad/__init__.py b/tools/tdgpt/taosanalytics/algo/ad/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py b/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py new file mode 100644 index 0000000000..0d3bb21faa --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/autoencoder.py @@ -0,0 +1,117 @@ +# encoding:utf-8 +# pylint: disable=c0103 +""" auto encoder algorithms to detect anomaly for time series data""" +import os.path + +import joblib +import numpy as np +import pandas as pd + +from taosanalytics.conf import app_logger, conf +from taosanalytics.misc.train_model import create_sequences +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _AutoEncoderDetectionService(AbstractAnomalyDetectionService): + name = 'ad_encoder' + desc = "anomaly detection based on auto encoder" + + def __init__(self): + super().__init__() + + self.table_name = None + self.mean = None + self.std = None + self.threshold = None + self.time_interval = None + self.model = None + self.dir = 'ad_autoencoder' + + self.root_path = conf.get_model_directory() + + self.root_path = self.root_path + f'/{self.dir}/' + + if not os.path.exists(self.root_path): + app_logger.log_inst.error( + "%s ad algorithm failed to locate default module directory:" + "%s, not active", self.__class__.__name__, self.root_path) + else: + app_logger.log_inst.info("%s ad algorithm root path is: %s", self.__class__.__name__, + self.root_path) + + def execute(self): + if self.input_is_empty(): + return [] + + if self.model is None: + raise FileNotFoundError("not load autoencoder model yet, or load model failed") + + array_2d = np.reshape(self.list, (len(self.list), 1)) + df = pd.DataFrame(array_2d) + + # normalize input data using z-score + normalized_list = (df - self.mean.value) / self.std.value + seq = create_sequences(normalized_list.values, self.time_interval) + + # Get test MAE loss. + pred_list = self.model.predict(seq) + mae_loss = np.mean(np.abs(pred_list - seq), axis=1) + mae = mae_loss.reshape((-1)) + + # Detect all the samples which are anomalies. + anomalies = mae > self.threshold + + # syslogger.log_inst( + # "Number of anomaly samples: %f, Indices of anomaly samples:{}". + # format(np.sum(anomalies), np.where(anomalies)) + # ) + + # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies + ad_indices = [] + for data_idx in range(self.time_interval - 1, + len(normalized_list) - self.time_interval + 1): + if np.all(anomalies[data_idx - self.time_interval + 1: data_idx]): + ad_indices.append(data_idx) + + return [-1 if i in ad_indices else 1 for i in range(len(self.list))] + + def set_params(self, params): + + if "model" not in params: + raise ValueError("model needs to be specified") + + name = params['model'] + + module_file_path = f'{self.root_path}/{name}.dat' + module_info_path = f'{self.root_path}/{name}.info' + + app_logger.log_inst.info("try to load module:%s", module_file_path) + + if os.path.exists(module_file_path): + self.model = joblib.load(module_file_path) + else: + app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path) + raise FileNotFoundError(f"{module_file_path} not found") + + if os.path.exists(module_info_path): + info = joblib.load(module_info_path) + else: + app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path) + raise FileNotFoundError("%s not found", module_info_path) + + if info is not None: + self.mean = info["mean"] + self.std = info["std"] + self.threshold = info["threshold"] + self.time_interval = info["timesteps"] + + app_logger.log_inst.info( + "load ac module success, mean: %f, std: %f, threshold: %f, time_interval: %d", + self.mean[0], self.std[0], self.threshold, self.time_interval + ) + else: + app_logger.log_inst.error("failed to load %s model", name) + raise RuntimeError(f"failed to load model {name}") + + def get_params(self): + return {"dir": self.dir + '/*'} diff --git a/tools/tdgpt/taosanalytics/algo/ad/grubbs.py b/tools/tdgpt/taosanalytics/algo/ad/grubbs.py new file mode 100644 index 0000000000..6318c109f5 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/grubbs.py @@ -0,0 +1,42 @@ +# encoding:utf-8 +""" grubbs algorithm class""" + +from outliers import smirnov_grubbs as grubbs +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _GrubbsService(AbstractAnomalyDetectionService): + """ Grubbs algorithm is to check the anomaly data in the input list """ + name = 'grubbs' + desc = """Grubbs' test is to detect the presence of one outlier in a data set that is normally + distributed""" + + def __init__(self, alpha_val=0.95): + super().__init__() + + if alpha_val <= 0 or alpha_val >= 1: + raise ValueError("invalid alpha value, valid range is (0, 1)") + self.alpha = 1 - alpha_val + + def execute(self): + """perform Grubbs' test and identify (if any) the outlier""" + if self.input_is_empty(): + return [] + + res = grubbs.test(self.list, alpha=self.alpha) + + error_indicator = [1 if k in set(res) else -1 for k in self.list] + return error_indicator + + def set_params(self, params): + """ set the value of alpha """ + super().set_params(params) + + if "alpha".lower() in params: + # raise ValueError("alpha parameter is missing for grubbs algorithm") + alpha_val = float(params["alpha"]) + + if alpha_val <= 0 or alpha_val >= 1: + raise ValueError("invalid alpha value, valid range is (0, 1)") + + self.alpha = 1 - alpha_val diff --git a/tools/tdgpt/taosanalytics/algo/ad/iqr.py b/tools/tdgpt/taosanalytics/algo/ad/iqr.py new file mode 100644 index 0000000000..c0918a5090 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/iqr.py @@ -0,0 +1,29 @@ +# encoding:utf-8 +"""iqr class definition""" +import numpy as np +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _IqrService(AbstractAnomalyDetectionService): + """ IQR algorithm is to check the anomaly data in the input list """ + name = 'iqr' + desc = """found the anomaly data according to the inter-quartile range""" + + def __init__(self): + super().__init__() + + def execute(self): + if self.input_is_empty(): + return [] + + lower = np.quantile(self.list, 0.25) + upper = np.quantile(self.list, 0.75) + + min_val = lower - 1.5 * (upper - lower) + max_val = upper + 1.5 * (upper - lower) + + threshold = [min_val, max_val] + return [-1 if k < threshold[0] or k > threshold[1] else 1 for k in self.list] + + def set_params(self, params): + pass diff --git a/tools/tdgpt/taosanalytics/algo/ad/ksigma.py b/tools/tdgpt/taosanalytics/algo/ad/ksigma.py new file mode 100644 index 0000000000..9d872dd11a --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/ksigma.py @@ -0,0 +1,47 @@ +# encoding:utf-8 +"""ksigma class definition""" + +import numpy as np +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _KSigmaService(AbstractAnomalyDetectionService): + """ KSigma algorithm is to check the anomaly data in the input list """ + name = "ksigma" + desc = """the k-sigma algorithm (or 3σ rule) expresses a conventional heuristic that nearly all + values are taken to lie within k (usually three) standard deviations of the mean, and thus + it is empirically useful to treat 99.7% probability as near certainty""" + + def __init__(self, k_val=3): + super().__init__() + self.k_val = k_val + + def execute(self): + def get_k_sigma_range(vals, k_value): + """ Return the k-sigma value range """ + avg = np.mean(vals) + std = np.std(vals) + + upper = avg + k_value * std + lower = avg - k_value * std + return [float(lower), float(upper)] + + if self.input_is_empty(): + return [] + + threshold = get_k_sigma_range(self.list, self.k_val) + return [-1 if k < threshold[0] or k > threshold[1] else 1 for k in self.list] + + def set_params(self, params): + super().set_params(params) + + if "k" in params: + k = int(params["k"]) + + if k < 1 or k > 3: + raise ValueError("k value out of range, valid range [1, 3]") + + self.k_val = k + + def get_params(self): + return {"k": self.k_val} diff --git a/tools/tdgpt/taosanalytics/algo/ad/lof.py b/tools/tdgpt/taosanalytics/algo/ad/lof.py new file mode 100644 index 0000000000..9f7b8fdc04 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/lof.py @@ -0,0 +1,43 @@ +# encoding:utf-8 +"""local outlier factor class definition""" + +import numpy as np +import sklearn.neighbors as neighbor +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _LofService(AbstractAnomalyDetectionService): + """ LOF(local outlier factor) algorithm is to check the anomaly data in the input list """ + name = 'lof' + desc = """Local Outlier Factor, Ref: M. M. Breunig, H. P. Kriegel, R. T. Ng, J. Sander. + LOF:Identifying Density-based Local Outliers. SIGMOD, 2000.""" + + def __init__(self, n_neighbors=10, algo="auto"): + super().__init__() + + self.neighbors = n_neighbors + self.algorithm = algo + + def execute(self): + """perform LOF(local outlier factor) test and identify (if any) the outlier""" + if self.input_is_empty(): + return [] + + checker = neighbor.LocalOutlierFactor(n_neighbors=self.neighbors, algorithm=self.algorithm) + + arr_2d = np.reshape(self.list, (len(self.list), 1)) + res = checker.fit_predict(arr_2d) + + print(f"The negative outlier factor is:{checker.negative_outlier_factor_}") + return res + + def set_params(self, params): + super().set_params(params) + + if "neighbors" in params: # todo check value range + self.neighbors = int(params["neighbors"]) + if "algorithm" in params: + self.algorithm = params["algorithm"] + + def get_params(self): + return {"neighbors": self.neighbors, "algorithm": self.algorithm} diff --git a/tools/tdgpt/taosanalytics/algo/ad/shesd.py b/tools/tdgpt/taosanalytics/algo/ad/shesd.py new file mode 100644 index 0000000000..e105743bb5 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/ad/shesd.py @@ -0,0 +1,44 @@ +# encoding:utf-8 +"""shesd algorithm class definition""" + +from pandas import Series +from pyculiarity import detect_vec +from taosanalytics.service import AbstractAnomalyDetectionService + + +class _SHESDService(AbstractAnomalyDetectionService): + """ s-h-esd algorithm is to check the anomaly data in the input list """ + name = 'shesd' + desc = "" + + def __init__(self, n_period=0, direction="both", anoms=0.05): + super().__init__() + + self.period = n_period + self.direction = direction + self.max_anoms = anoms + + def execute(self): + """perform SHESD test and identify (if any) the outlier""" + if self.input_is_empty(): + return [] + + results = detect_vec(Series(self.list), max_anoms=self.max_anoms, direction=self.direction, + period=self.period) + + res_val = results['anoms']['anoms'] + + return [-1 if k in set(res_val) else 1 for k in self.list] + + def set_params(self, params): + super().set_params(params) + + if "period" in params: # todo check value range + self.period = int(params["period"]) + if "direction" in params: + self.direction = params["direction"] + if "max_anoms" in params: + self.max_anoms = float(params["max_anoms"]) + + def get_params(self): + return {"period": self.period, "direction": self.direction, "max_anoms": self.max_anoms} diff --git a/tools/tdgpt/taosanalytics/algo/anomaly.py b/tools/tdgpt/taosanalytics/algo/anomaly.py new file mode 100644 index 0000000000..5f04283c06 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/anomaly.py @@ -0,0 +1,49 @@ +# encoding:utf-8 +# pylint: disable=c0103 +""" anomaly detection register/display functions """ + +from matplotlib import pyplot as plt +from taosanalytics.conf import app_logger, conf +from taosanalytics.servicemgmt import loader + + +def do_ad_check(input_list, ts_list, algo_name, params): + """ actual anomaly detection handler """ + s = loader.get_service(algo_name) + + if s is None: + s = loader.get_service("ksigma") + + if s is None: + raise ValueError(f"failed to load {algo_name} or ksigma analysis service") + + s.set_input_list(input_list, ts_list) + s.set_params(params) + + res = s.execute() + + n_error = abs(sum(filter(lambda x: x == -1, res))) + app_logger.log_inst.debug("There are %d in input, and %d anomaly points found: %s", + len(input_list), + n_error, + res) + + draw_ad_results(input_list, res, algo_name) + return res + + +def draw_ad_results(input_list, res, fig_name): + """ draw the detected anomaly points """ + + # not in debug, do not visualize the anomaly detection result + if not conf.get_draw_result_option(): + return + + plt.clf() + for index, val in enumerate(res): + if val != -1: + continue + plt.scatter(index, input_list[index], marker='o', color='r', alpha=0.5, s=100, zorder=3) + + plt.plot(input_list, label='sample') + plt.savefig(fig_name) diff --git a/tools/tdgpt/taosanalytics/algo/fc/__init__.py b/tools/tdgpt/taosanalytics/algo/fc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/algo/fc/arima.py b/tools/tdgpt/taosanalytics/algo/fc/arima.py new file mode 100644 index 0000000000..9e087a5e9e --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/fc/arima.py @@ -0,0 +1,114 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""arima class definition""" +import pmdarima as pm + +from taosanalytics.algo.forecast import insert_ts_list +from taosanalytics.conf import app_logger +from taosanalytics.service import AbstractForecastService + + +class _ArimaService(AbstractForecastService): + """ ARIMA algorithm is to do the fc in the input list """ + name = "arima" + desc = "do time series data fc by using ARIMA model" + + def __init__(self): + super().__init__() + + self.diff = 0 + self.start_p = 0 + self.max_p = 10 + self.start_q = 0 + self.max_q = 10 + + def set_params(self, params): + super().set_params(params) + + self.start_p = int(params['start_p']) if 'start_p' in params else 0 + self.max_p = int(params['max_p']) if 'max_p' in params else 0 + self.start_q = int(params['start_q']) if 'start_q' in params else 0 + self.max_q = int(params['max_q']) if 'max_q' in params else 0 + + def get_params(self): + """ get the default value for fc algorithms """ + p = super().get_params() + p.update( + { + "start_p": self.start_p, "max_p": self.max_p, "start_q": self.start_q, + "max_q": self.max_q, "diff": self.diff + } + ) + + return p + + def __do_forecast_helper(self, fc_rows): + """ do arima fc """ + # plot_acf(self.list, lags=25, title='raw_acf') + # plot_pacf(self.list, lags=25, title='raw_pacf') + # plt.show() + + seasonal = self.period > 0 + + # Fit model + model = pm.auto_arima(self.list, + start_p=self.start_p, + start_q=self.start_q, + max_p=self.max_p, + max_q=self.max_q, + d=1, + m=self.period, + seasonal=seasonal, + start_P=0, + D=self.diff) + + app_logger.log_inst.debug(model.summary()) + + # predict N steps into the future + fc = model.predict(n_periods=fc_rows, return_conf_int=self.return_conf, + alpha=self.conf) + + # plt.plot(source_data, label='training') + # plt.plot(xrange, actual_data, label='actual') + + # fc_list = fc.tolist() + # fc_without_diff = restore_from_diff(self.list, fc_list, 2) + # print(fc_without_diff) + + # plt.plot(xrange, fc_without_diff, label='fc') + + # residuals = pd.DataFrame(model.arima_res_.resid) + # wn = is_white_noise(residuals) + # print("residual is white noise:", wn) + + # fig, ax = plt.subplots(1, 2) + # residuals.plot(title="Residuals", ax=ax[0]) + # residuals.plot(kind='kde', title='Density', ax=ax[1]) + # plt.show() + + res1 = [fc[0].tolist(), fc[1][:, 0].tolist(), + fc[1][:, 1].tolist()] if self.return_conf else [fc.tolist()] + + return ( + res1, + model.arima_res_.mse, + f"SARIMAX{model.order}x{model.seasonal_order}" + ) + + def execute(self): + """ do fc the time series data""" + + if self.list is None or len(self.list) < self.period: + raise ValueError("number of input data is less than the periods") + + if self.fc_rows <= 0: + raise ValueError("fc rows is not specified yet") + + res, mse, model_info = self.__do_forecast_helper(self.fc_rows) + insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) + + return { + "mse": mse, + "model_info": model_info, + "res": res + } diff --git a/tools/tdgpt/taosanalytics/algo/fc/holtwinters.py b/tools/tdgpt/taosanalytics/algo/fc/holtwinters.py new file mode 100644 index 0000000000..d8225eaa5a --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/fc/holtwinters.py @@ -0,0 +1,79 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""holt winters definition""" + +from statsmodels.tsa.holtwinters import ExponentialSmoothing, SimpleExpSmoothing + +from taosanalytics.algo.forecast import insert_ts_list +from taosanalytics.service import AbstractForecastService + + +class _HoltWintersService(AbstractForecastService): + """ Holt winters algorithm is to do the fc in the input list """ + name = "holtwinters" + desc = "forecast algorithm by using exponential smoothing" + + def __init__(self): + super().__init__() + + self.trend_option = None + self.seasonal_option = None + + def set_params(self, params): + super().set_params(params) + + self.trend_option = params['trend'] if 'trend' in params else None + + if self.trend_option is not None: + if self.trend_option not in ('add', 'mul'): + raise ValueError("trend parameter can only be 'mul' or 'add'") + + self.seasonal_option = params['seasonal'] if 'seasonal' in params else None + if self.seasonal_option is not None: + if self.seasonal_option not in ('add', 'mul'): + raise ValueError("seasonal parameter can only be 'mul' or 'add'") + + def get_params(self): + p = super().get_params() + p.update({'trend': self.trend_option, 'seasonal': self.seasonal_option}) + return p + + def __do_forecast_helper(self, source_data, fc_rows): + """ do holt winters impl """ + if self.trend_option is None: + fitted_model = SimpleExpSmoothing(source_data).fit() + else: + if self.period == 0 or self.seasonal_option is None: + # no valid seasonal periods, so not need to specify the seasonal parameters + fitted_model = ExponentialSmoothing(source_data, trend=self.trend_option).fit() + else: # seasonal attributes + fitted_model = ExponentialSmoothing( + source_data, + trend=self.trend_option, + seasonal=self.seasonal_option, + seasonal_periods=self.period + ).fit() + + fc = fitted_model.forecast(fc_rows) + + if self.return_conf: + return [fc.tolist(), fc.tolist(), fc.tolist()], fitted_model.sse + else: + return [fc.tolist()], fitted_model.sse + + def execute(self): + """ do fc the time series data""" + if self.list is None or len(self.list) < self.period: + raise ValueError("number of input data is less than the periods") + + if self.fc_rows <= 0: + raise ValueError("fc rows is not specified yet") + + res, mse = self.__do_forecast_helper(self.list, self.fc_rows) + insert_ts_list(res, self.start_ts, self.time_step, self.fc_rows) + + # add the conf range if required + return { + "mse": mse, + "res": res + } diff --git a/tools/tdgpt/taosanalytics/algo/forecast.py b/tools/tdgpt/taosanalytics/algo/forecast.py new file mode 100644 index 0000000000..e1e321a7b0 --- /dev/null +++ b/tools/tdgpt/taosanalytics/algo/forecast.py @@ -0,0 +1,110 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""forecast helper methods""" + +import numpy as np +import pandas as pd +from matplotlib import pyplot as plt + +from taosanalytics.conf import app_logger, conf +from taosanalytics.servicemgmt import loader + + +def do_forecast(input_list, ts_list, algo_name, params): + """ data fc handler """ + s = loader.get_service(algo_name) + + if s is None: + s = loader.get_service("holtwinters") + + if s is None: + raise ValueError(f"failed to load {algo_name} or holtwinters analysis service") + + s.set_input_list(input_list, ts_list) + s.set_params(params) + + app_logger.log_inst.debug("start to do forecast") + res = s.execute() + + app_logger.log_inst.debug("forecast done") + + res["period"] = s.period + res["algo"] = algo_name + + check_fc_results(res) + + fc = res["res"] + draw_fc_results(input_list, len(fc) > 2, fc, len(fc[0]), algo_name) + return res + + +def do_add_fc_params(params, json_obj): + """ add params into parameters """ + if "forecast_rows" in json_obj: + params["fc_rows"] = int(json_obj["forecast_rows"]) + + if "start" in json_obj: + params["start_ts"] = int(json_obj["start"]) + + if "every" in json_obj: + params["time_step"] = int(json_obj["every"]) + + if "conf" in json_obj: + params["conf"] = int(json_obj["conf"]) + + if "return_conf" in json_obj: + params["return_conf"] = int(json_obj["return_conf"]) + + +def insert_ts_list(res, start_ts, time_step, fc_rows): + """ insert the ts list before return results """ + ts_list = [start_ts + i * time_step for i in range(fc_rows)] + res.insert(0, ts_list) + return res + + +def draw_fc_results(input_list, return_conf, fc, n_rows, fig_name): + """Visualize the forecast results """ + # controlled by option, do not visualize the anomaly detection result + if not conf.get_draw_result_option(): + return + + app_logger.log_inst.debug('draw forecast result in debug model') + plt.clf() + + x = np.arange(len(input_list), len(input_list) + n_rows, 1) + + # draw the range of conf + if return_conf: + lower_series = pd.Series(fc[2], index=x) + upper_series = pd.Series(fc[3], index=x) + + plt.fill_between(lower_series.index, lower_series, upper_series, color='k', alpha=.15) + + plt.plot(input_list) + plt.plot(x, fc[1], c='blue') + plt.savefig(fig_name) + + app_logger.log_inst.debug("draw results completed in debug model") + + +def check_fc_results(res): + app_logger.log_inst.debug("start to check forecast result") + + if "res" not in res: + raise ValueError("forecast result is empty") + + fc = res["res"] + if len(fc) < 2: + raise ValueError("result length should greater than or equal to 2") + + n_rows = len(fc[0]) + if n_rows != len(fc[1]): + raise ValueError("result length is not identical, ts rows:%d res rows:%d" % ( + n_rows, len(fc[1]))) + + if len(fc) > 2 and (len(fc[2]) != n_rows or len(fc[3]) != n_rows): + raise ValueError( + "result length is not identical in confidence, ts rows:%d, lower confidence rows:%d, " + "upper confidence rows%d" % + (n_rows, len(fc[2]), len(fc[3]))) diff --git a/tools/tdgpt/taosanalytics/app.py b/tools/tdgpt/taosanalytics/app.py new file mode 100644 index 0000000000..682bce012c --- /dev/null +++ b/tools/tdgpt/taosanalytics/app.py @@ -0,0 +1,163 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""the main route definition for restful service""" +import os.path, sys + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") + +from flask import Flask, request + +from taosanalytics.algo.anomaly import do_ad_check +from taosanalytics.algo.forecast import do_forecast, do_add_fc_params +from taosanalytics.conf import conf +from taosanalytics.model import get_avail_model +from taosanalytics.servicemgmt import loader +from taosanalytics.util import app_logger, validate_pay_load, get_data_index, get_ts_index, is_white_noise, \ + parse_options, convert_results_to_windows + +app = Flask(__name__) + +# load the all algos +app_logger.set_handler(conf.get_log_path()) +app_logger.set_log_level(conf.get_log_level()) +loader.load_all_service() + + +@app.route("/") +def start(): + """ default rsp """ + return "TDengine© Time Series Data Analytics Platform (ver 1.0.1)" + + +@app.route("/status") +def server_status(): + """ return server status """ + return { + 'protocol': 1.0, + 'status': 'ready' + } + + +@app.route("/list") +def list_all_services(): + """ + API function to return all available services, including both fc and anomaly detection + """ + return loader.get_service_list() + + +@app.route("/models") +def list_all_models(): + """ list all available models """ + return get_avail_model() + + +@app.route("/anomaly-detect", methods=['POST']) +def handle_ad_request(): + """handle the anomaly detection requests""" + app_logger.log_inst.info('recv ad request from %s', request.remote_addr) + app_logger.log_inst.debug('req payload: %s', request.json) + + algo = request.json["algo"].lower() if "algo" in request.json else "ksigma" + + # 1. validate the input data in json format + try: + validate_pay_load(request.json) + except ValueError as e: + return {"msg": str(e), "rows": -1} + + payload = request.json["data"] + + # 2. white noise data check + wn_check = request.json["wncheck"] if "wncheck" in request.json else 1 + + data_index = get_data_index(request.json["schema"]) + ts_index = get_ts_index(request.json["schema"]) + + if wn_check: + try: + data = payload[data_index] + if is_white_noise(data): + app_logger.log_inst.debug("wn data, not process") + return {"msg": "white noise can not be check", "rows": -1} + except Exception as e: + return {"msg": str(e), "rows": -1} + + # 3. parse the options for different ad services + # the default options is like following: "algo=ksigma,k=2,invalid_option=44" + options = request.json["option"] if "option" in request.json else None + params = parse_options(options) + + # 4. do anomaly detection + try: + res_list = do_ad_check(payload[data_index], payload[ts_index], algo, params) + ano_window = convert_results_to_windows(res_list, payload[ts_index]) + + result = {"algo": algo, "option": options, "res": ano_window, "rows": len(ano_window)} + app_logger.log_inst.debug("anomaly-detection result: %s", str(result)) + + return result + + except Exception as e: + result = {"res": {}, "rows": 0, "msg": str(e)} + app_logger.log_inst.error("failed to do anomaly-detection, %s", str(e)) + + return result + + +@app.route("/forecast", methods=['POST']) +def handle_forecast_req(): + """handle the fc request """ + app_logger.log_inst.info('recv fc from %s', request.remote_addr) + app_logger.log_inst.debug('req payload: %s', request.json) + + # holt-winters by default + algo = request.json['algo'].lower() if 'algo' in request.json else 'holtwinters' + + # 1. validate the input data in json format + try: + validate_pay_load(request.json) + except ValueError as e: + app_logger.log_inst.error('validate req json failed, %s', e) + return {"msg": str(e), "rows": -1} + + payload = request.json["data"] + + # 2. white noise data check + wn_check = request.json["wncheck"] if "wncheck" in request.json else 1 + data_index = get_data_index(request.json["schema"]) + ts_index = get_ts_index(request.json["schema"]) + + if wn_check: + try: + data = payload[data_index] + if is_white_noise(data): + app_logger.log_inst.debug("%s wn data, not process", data) + return {"msg": "white noise can not be check", "rows": -1} + except Exception as e: + return {"msg": str(e), "rows": -1} + + options = request.json["option"] if "option" in request.json else None + params = parse_options(options) + + try: + do_add_fc_params(params, request.json) + except ValueError as e: + app_logger.log_inst.error("invalid fc params: %s", e) + return {"msg": f"{e}", "rows": -1} + + try: + res1 = do_forecast(payload[data_index], payload[ts_index], algo, params) + res = {"option": options, "rows": params["fc_rows"]} + res.update(res1) + + app_logger.log_inst.debug("forecast result: %s", res) + + return res + except Exception as e: + app_logger.log_inst.error('forecast failed, %s', str(e)) + return {"msg": str(e), "rows": -1} + + +if __name__ == '__main__': + app.run() diff --git a/tools/tdgpt/taosanalytics/conf.py b/tools/tdgpt/taosanalytics/conf.py new file mode 100644 index 0000000000..c255b8e258 --- /dev/null +++ b/tools/tdgpt/taosanalytics/conf.py @@ -0,0 +1,105 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""configuration model definition""" +import configparser +import logging + +_ANODE_SECTION_NAME = "taosanode" + + +class Configure: + """ configuration class """ + + def __init__(self, conf_path="/etc/taos/taosanode.ini"): + self.path = None + + self._log_path = 'taosanode.app.log' + self._log_level = logging.INFO + self._model_directory = '/var/lib/taos/taosanode/model/' + self._draw_result = 0 + + self.conf = configparser.ConfigParser() + self.reload(conf_path) + + def get_log_path(self) -> str: + """ return log file full path """ + return self._log_path + + def get_log_level(self): + """ return the log level specified by configuration file """ + return self._log_level + + def get_model_directory(self): + """ return model directory """ + return self._model_directory + + def get_draw_result_option(self): + """ get the option for draw results or not""" + return self._draw_result + + def reload(self, new_path: str): + """ load the info from config file """ + self.path = new_path + + self.conf.read(self.path) + + if self.conf.has_option(_ANODE_SECTION_NAME, 'app-log'): + self._log_path = self.conf.get(_ANODE_SECTION_NAME, 'app-log') + + if self.conf.has_option(_ANODE_SECTION_NAME, 'log-level'): + log_level = self.conf.get(_ANODE_SECTION_NAME, 'log-level') + + log_flag = { + 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, 'WARN': logging.WARN + } + + if log_level.upper() in log_flag: + self._log_level = log_flag[log_level.upper()] + else: + self._log_level = logging.INFO + + if self.conf.has_option(_ANODE_SECTION_NAME, 'model-dir'): + self._model_directory = self.conf.get(_ANODE_SECTION_NAME, 'model-dir') + + if self.conf.has_option(_ANODE_SECTION_NAME, 'draw-result'): + self._draw_result = self.conf.get(_ANODE_SECTION_NAME, 'draw-result') + + +class AppLogger(): + """ system log_inst class """ + LOG_STR_FORMAT = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' + + def __init__(self): + self.log_inst = logging.getLogger(__name__) + self.log_inst.setLevel(logging.INFO) + + def set_handler(self, file_path: str): + """ set the log_inst handler """ + + handler = logging.FileHandler(file_path) + handler.setFormatter(logging.Formatter(self.LOG_STR_FORMAT)) + + self.log_inst.addHandler(handler) + + def set_log_level(self, log_level): + """adjust log level""" + try: + self.log_inst.setLevel(log_level) + self.log_inst.info("set log level:%d", log_level) + except ValueError as e: + self.log_inst.error("failed to set log level: %d, %s", log_level, str(e)) + + +conf = Configure() +app_logger = AppLogger() + + +def setup_log_info(name: str): + """ prepare the log info for unit test """ + app_logger.set_handler(name) + + try: + app_logger.set_log_level(logging.DEBUG) + except ValueError as e: + print("set log level failed:%s", e) diff --git a/tools/tdgpt/taosanalytics/misc/__init__.py b/tools/tdgpt/taosanalytics/misc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/model.py b/tools/tdgpt/taosanalytics/model.py new file mode 100644 index 0000000000..6efd85544e --- /dev/null +++ b/tools/tdgpt/taosanalytics/model.py @@ -0,0 +1,22 @@ +# encoding:utf-8 +# pylint: disable=c0103 + +def get_avail_model(): + return [ + { + "name": "ad_encoder_keras", + "algo": "auto-encoder", + "type": "anomaly-detection", + "src-table": "*", + "build-time": "2024-10-07 13:21:44" + } + ] + + +def train_model(): + pass + + +if __name__ == '__main__': + a = get_avail_model() + print(a) diff --git a/tools/tdgpt/taosanalytics/service.py b/tools/tdgpt/taosanalytics/service.py new file mode 100644 index 0000000000..79244aae8c --- /dev/null +++ b/tools/tdgpt/taosanalytics/service.py @@ -0,0 +1,110 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""main service module""" +from abc import abstractmethod, ABC + + +class AnalyticsService: + """ Analytics root class with only one method""" + + @abstractmethod + def execute(self): + """ the main execute method to perform fc or anomaly detection """ + + def get_desc(self) -> str: + """algorithm description""" + return "" + + def get_params(self) -> dict: + """return exist params """ + return {} + + +class AbstractAnalyticsService(AnalyticsService, ABC): + """ abstract base analytics service class definition""" + name = '' + desc = '' + + def __init__(self): + self.list = None + self.ts_list = None + + def set_input_list(self, input_list: list, input_ts_list: list = None): + """ set the input list """ + self.list = input_list + self.ts_list = input_ts_list + + def set_params(self, params: dict) -> None: + """set the parameters for current algo """ + if params is None: + return + + if not isinstance(params, dict): + raise ValueError('invalid parameter type, only dict allowed') + + def get_desc(self) -> str: + return self.desc + + +class AbstractAnomalyDetectionService(AbstractAnalyticsService, ABC): + """ abstract anomaly detection service, all anomaly detection algorithm class should be + inherent from this class""" + + def __init__(self): + super().__init__() + self.type = "anomaly-detection" + + def input_is_empty(self): + """ check if the input list is empty or None """ + return (self.list is None) or (len(self.list) == 0) + + +class AbstractForecastService(AbstractAnalyticsService, ABC): + """abstract forecast service, all forecast algorithms class should be inherent from + this base class""" + + def __init__(self): + super().__init__() + self.type = "forecast" + + self.period = 0 + self.start_ts = 0 + self.time_step = 0 + self.fc_rows = 0 + + self.return_conf = 1 + self.conf = 0.05 + + def set_params(self, params: dict) -> None: + if not {'start_ts', 'time_step', 'fc_rows'}.issubset(params.keys()): + raise ValueError('params are missing, start_ts, time_step, fc_rows are all required') + + self.start_ts = int(params['start_ts']) + + self.time_step = int(params['time_step']) + + if self.time_step <= 0: + raise ValueError('time_step should be greater than 0') + + self.fc_rows = int(params['fc_rows']) + + if self.fc_rows <= 0: + raise ValueError('fc rows is not specified yet') + + self.period = int(params['period']) if 'period' in params else 0 + if self.period < 0: + raise ValueError("periods should be greater than 0") + + self.conf = float(params['conf']) if 'conf' in params else 95 + + self.conf = 1.0 - self.conf / 100.0 + if self.conf < 0 or self.conf >= 1.0: + raise ValueError("invalid value of conf, should between 0 and 100") + + self.return_conf = int(params['return_conf']) if 'return_conf' in params else 1 + + def get_params(self): + return { + "period": self.period, "start": self.start_ts, "every": self.time_step, + "forecast_rows": self.fc_rows, "return_conf": self.return_conf, "conf": self.conf + } diff --git a/tools/tdgpt/taosanalytics/servicemgmt.py b/tools/tdgpt/taosanalytics/servicemgmt.py new file mode 100644 index 0000000000..5b20c73249 --- /dev/null +++ b/tools/tdgpt/taosanalytics/servicemgmt.py @@ -0,0 +1,120 @@ +# encoding:utf-8 +"""load and return the available services""" +import copy +import importlib +import inspect +import os +from collections import defaultdict +from taosanalytics.conf import app_logger +from taosanalytics.service import AbstractAnomalyDetectionService, AbstractForecastService + +os.environ['KERAS_BACKEND'] = 'torch' + + +class AnalyticsServiceLoader: + """ Singleton register for multiple anomaly detection algorithms and fc algorithms""" + + def __init__(self): + self.services = defaultdict(list) + + def get_service(self, name): + """ get the required service """ + serv = self.services.get(name, [])[0] if self.services.get(name) else None + return copy.copy(serv) + + def get_typed_services(self, type_str: str) -> list: + """ get specified type service """ + all_items = [] + for key, val in self.services.items(): + if val[0].type == type_str: + try: + one = {"name": key, "desc": val[0].get_desc(), "params": val[0].get_params()} + all_items.append(one) + except AttributeError as e: + app_logger.log_inst.error("failed to get service: %s info, reason: %s", key, e); + + return all_items + + def get_service_list(self): + """ return all available service info """ + info = { + "protocol": 1.0, + "version": 0.1, + "details": [ + self.get_forecast_algo_list(), + self.get_anomaly_detection_algo_list() + ] + } + + return info + + def get_anomaly_detection_algo_list(self): + """ get all available service list """ + return { + "type": "anomaly-detection", + "algo": self.get_typed_services("anomaly-detection") + } + + def get_forecast_algo_list(self): + """ get all available service list """ + return { + "type": "forecast", + "algo": self.get_typed_services("forecast") + } + + def load_all_service(self) -> None: + """ load all algorithms in the specified directory""" + + def register_service(container, name: str, service): + """ register service for both anomaly detection and fc """ + app_logger.log_inst.info("register service: %s", name) + container[name].append(service) + + def do_load_service(cur_directory, lib_prefix, sub_directory): + """ the implementation of load services """ + service_directory = cur_directory + sub_directory + + if not os.path.exists(service_directory): + app_logger.log_inst.fatal( + "service directory:%s not lib exists, failed to load service", + service_directory) + raise FileNotFoundError(f"service directory:{service_directory} not found") + + all_files = os.listdir(service_directory) + + for item in all_files: + if item in ('__init__.py', '__pycache__') or not item.endswith('py'): + continue + + full_path = os.path.join(service_directory, item) + if os.path.isdir(full_path): + continue + + # do load algorithm + name = lib_prefix + item.split('.')[0] + module = importlib.import_module(name) + + app_logger.log_inst.info("load algorithm:%s", name) + + for (class_name, _) in inspect.getmembers(module, inspect.isclass): + + if class_name in ( + AbstractAnomalyDetectionService.__name__, + AbstractForecastService.__name__ + ) or (not class_name.startswith('_')): + continue + + algo_cls = getattr(module, class_name) + + if algo_cls is not None: + obj = algo_cls() + register_service(self.services, algo_cls.name, obj) + + # start to load all services + current_directory = os.path.dirname(os.path.abspath(__file__)) + + do_load_service(current_directory, 'taosanalytics.algo.ad.', '/algo/ad/') + do_load_service(current_directory, 'taosanalytics.algo.fc.', '/algo/fc/') + + +loader: AnalyticsServiceLoader = AnalyticsServiceLoader() diff --git a/tools/tdgpt/taosanalytics/test/__init__.py b/tools/tdgpt/taosanalytics/test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/tdgpt/taosanalytics/test/anomaly_test.py b/tools/tdgpt/taosanalytics/test/anomaly_test.py new file mode 100644 index 0000000000..f44a7f0d52 --- /dev/null +++ b/tools/tdgpt/taosanalytics/test/anomaly_test.py @@ -0,0 +1,170 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""anomaly detection unit test""" +import unittest, sys, os.path +import pandas as pd + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../") + +from taosanalytics.algo.anomaly import draw_ad_results +from taosanalytics.conf import setup_log_info, app_logger +from taosanalytics.servicemgmt import loader + + +class AnomalyDetectionTest(unittest.TestCase): + """ anomaly detection unit test class""" + input_list = [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] + large_list = [ + 13, 14, 8, 10, 16, 26, 32, 27, 18, 32, 36, 24, + 22, 23, 22, 18, 25, 21, 21, 14, 8, 11, 14, 23, + 18, 17, 19, 20, 22, 19, 13, 26, 13, 14, 22, 24, + 21, 22, 26, 21, 23, 24, 27, 41, 31, 27, 35, 26, + 28, 36, 39, 21, 17, 22, 17, 19, 15, 34, 10, 15, + 22, 18, 15, 20, 15, 22, 19, 16, 30, 27, 29, 23, + 20, 16, 21, 21, 25, 16, 18, 15, 18, 14, 10, 15, + 8, 15, 6, 11, 8, 7, 13, 10, 23, 16, 15, 25, + 22, 20, 16 + ] + + @classmethod + def setUpClass(cls): + """ set up environment for unit test, set the log file path """ + setup_log_info("unit_test.log") + loader.load_all_service() + + def test_ksigma(self): + """ + Test the ksigma algorithm for anomaly detection. This test case verifies the + functionality of the ksigma algorithm by setting up the input data, + executing the algorithm, and asserting the expected results. + """ + + s = loader.get_service("ksigma") + s.set_input_list(AnomalyDetectionTest.input_list, None) + s.set_params({"k": 2}) + + r = s.execute() + draw_ad_results(AnomalyDetectionTest.input_list, r, "ksigma") + + self.assertEqual(r[-1], -1) + self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) + + def test_iqr(self): + """ + Test the IQR(Interquartile Range) algorithm for anomaly detection. This test case verifies the functionality + of the IQR algorithm by setting up the input data, executing the algorithm, and asserting the expected results. + """ + + s = loader.get_service("iqr") + s.set_input_list(AnomalyDetectionTest.input_list, None) + + try: + s.set_params({"k": 2}) + except ValueError as e: + self.assertEqual(1, 0, e) + + r = s.execute() + draw_ad_results(AnomalyDetectionTest.input_list, r, "iqr") + + self.assertEqual(r[-1], -1) + self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) + + def test_grubbs(self): + """ + Test the Grubbs algorithm for anomaly detection. + + This test case verifies the functionality of the Grubbs algorithm by setting up the input data, + executing the algorithm, and asserting the expected results. + """ + + s = loader.get_service("grubbs") + s.set_input_list(AnomalyDetectionTest.input_list, None) + s.set_params({"alpha": 0.95}) + + r = s.execute() + draw_ad_results(AnomalyDetectionTest.input_list, r, "grubbs") + + self.assertEqual(r[-1], -1) + self.assertEqual(len(r), len(AnomalyDetectionTest.input_list)) + + def test_shesd(self): + """ + Test the SHESD (Seasonal Hybrid ESD) algorithm for anomaly detection. + + This test case verifies the functionality of the SHESD algorithm by setting up the input data, + executing the algorithm, and asserting the expected results. + """ + + s = loader.get_service("shesd") + s.set_params({"period": 3}) + s.set_input_list(AnomalyDetectionTest.input_list, None) + + r = s.execute() + draw_ad_results(AnomalyDetectionTest.input_list, r, "shesd") + + self.assertEqual(r[-1], -1) + + def test_lof(self): + """ + Test the LOF (Local Outlier Factor) algorithm for anomaly detection. + + This test case verifies the functionality of the LOF algorithm by setting up the input data, + executing the algorithm, and asserting the expected results. + """ + s = loader.get_service("lof") + s.set_params({"period": 3}) + s.set_input_list(AnomalyDetectionTest.input_list, None) + + r = s.execute() + draw_ad_results(AnomalyDetectionTest.input_list, r, "lof") + + self.assertEqual(r[-1], -1) + self.assertEqual(r[-2], -1) + + def test_multithread_safe(self): + """ Test the multithread safe function""" + s1 = loader.get_service("shesd") + s2 = loader.get_service("shesd") + + s1.set_params({"period": 3}) + self.assertNotEqual(s1.period, s2.period) + + def __load_remote_data_for_ad(self): + """load the remote data for anomaly detection""" + + url = ("https://raw.githubusercontent.com/numenta/NAB/master/data/artificialWithAnomaly/" + "art_daily_jumpsup.csv") + + remote_data = pd.read_csv(url, parse_dates=True, index_col="timestamp") + k = remote_data.values.ravel().tolist() + return k + + def test_autoencoder_ad(self): + """for local test only, disabled it in github action""" + pass + + # data = self.__load_remote_data_for_ad() + # + # s = loader.get_service("ad_encoder") + # s.set_input_list(data) + # + # try: + # s.set_params({"model": "ad_encoder_"}) + # except ValueError as e: + # app_logger.log_inst.error(f"failed to set the param for auto_encoder algorithm, reason:{e}") + # return + # + # r = s.execute() + # + # num_of_error = -(sum(filter(lambda x: x == -1, r))) + # self.assertEqual(num_of_error, 109) + # + # draw_ad_results(data, r, "autoencoder") + + def test_get_all_services(self): + """Test get all services""" + loader.get_anomaly_detection_algo_list() + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/tdgpt/taosanalytics/test/forecast_test.py b/tools/tdgpt/taosanalytics/test/forecast_test.py new file mode 100644 index 0000000000..1e4874b8c8 --- /dev/null +++ b/tools/tdgpt/taosanalytics/test/forecast_test.py @@ -0,0 +1,115 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""forecast unit test cases""" + +import unittest, os.path, sys +import pandas as pd + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../") + +from taosanalytics.algo.forecast import draw_fc_results +from taosanalytics.conf import setup_log_info +from taosanalytics.servicemgmt import loader + + +class ForecastTest(unittest.TestCase): + """forecast unit test cases""" + + @classmethod + def setUpClass(cls): + """ set up the environment for unit test """ + setup_log_info("unit_test.log") + loader.load_all_service() + + def get_input_list(self): + """ load data from csv """ + url = ('https://raw.githubusercontent.com/jbrownlee/Datasets/refs/heads/master/' + 'airline-passengers.csv') + data = pd.read_csv(url, index_col='Month', parse_dates=True) + + ts_list = data[['Passengers']].index.tolist() + dst_list = [int(item.timestamp()) for item in ts_list] + + return data[['Passengers']].values.tolist(), dst_list + + def test_holt_winters_forecast(self): + """ test holt winters forecast with invalid and then valid parameters""" + s = loader.get_service("holtwinters") + data, ts = self.get_input_list() + + s.set_input_list(data, ts) + self.assertRaises(ValueError, s.execute) + + s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30}) + + r = s.execute() + draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters") + + def test_holt_winters_forecast_2(self): + """test holt winters with valid parameters""" + s = loader.get_service("holtwinters") + data, ts = self.get_input_list() + + s.set_input_list(data, ts) + s.set_params( + { + "fc_rows": 10, "trend": 'mul', "seasonal": 'mul', "start_ts": 171000000, + "time_step": 86400 * 30, "period": 12 + } + ) + + r = s.execute() + + draw_fc_results(data, len(r["res"]) > 2, r["res"], len(r["res"][0]), "holtwinters") + + def test_holt_winter_invalid_params(self): + """parameters validation check""" + s = loader.get_service("holtwinters") + + self.assertRaises(ValueError, s.set_params, {"trend": "mul"}) + + self.assertRaises(ValueError, s.set_params, {"trend": "mul"}) + + self.assertRaises(ValueError, s.set_params, {"trend": "mul", "fc_rows": 10}) + + self.assertRaises(ValueError, s.set_params, {"trend": "multi"}) + + self.assertRaises(ValueError, s.set_params, {"seasonal": "additive"}) + + self.assertRaises(ValueError, s.set_params, { + "fc_rows": 10, "trend": 'multi', "seasonal": 'addi', "start_ts": 171000000, + "time_step": 86400 * 30, "period": 12} + ) + + self.assertRaises(ValueError, s.set_params, + {"fc_rows": 10, "trend": 'mul', "seasonal": 'add', "time_step": 86400 * 30, "period": 12} + ) + + s.set_params({"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30}) + + self.assertRaises(ValueError, s.set_params, {"fc_rows": 'abc', "start_ts": 171000000, "time_step": 86400 * 30}) + + self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": "aaa", "time_step": "30"}) + + self.assertRaises(ValueError, s.set_params, {"fc_rows": 10, "start_ts": 171000000, "time_step": 0}) + + def test_arima(self): + """arima algorithm check""" + s = loader.get_service("arima") + data, ts = self.get_input_list() + + s.set_input_list(data, ts) + self.assertRaises(ValueError, s.execute) + + s.set_params( + {"fc_rows": 10, "start_ts": 171000000, "time_step": 86400 * 30, "period": 12, + "start_p": 0, "max_p": 10, "start_q": 0, "max_q": 10} + ) + r = s.execute() + + rows = len(r["res"][0]) + draw_fc_results(data, len(r["res"]) > 1, r["res"], rows, "arima") + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/tdgpt/taosanalytics/test/install_test.py b/tools/tdgpt/taosanalytics/test/install_test.py new file mode 100644 index 0000000000..9c5aa9238f --- /dev/null +++ b/tools/tdgpt/taosanalytics/test/install_test.py @@ -0,0 +1,27 @@ +"""perform the build release package and install and then test the restful service""" + +import unittest +import os + + +class ForecastTest(unittest.TestCase): + + def test_release(self): + """ test the package """ + pass + + # print("build install package") + # os.system("../../script/release.sh") + # print("build completed") + # + # self.assertEqual(os.path.exists("../../release/TDengine-enterprise-anode-1.0.0.tar.gz"), 1) + + def test_install(self): + """ test """ + pass + + # print("start to install package") + # os.system("tar zxvf ../../release/TDengine-enterprise-anode-1.0.0.tar.gz") + # os.chdir("../../release/TDengine-enterprise-anode-1.0.0/") + # + # os.system("./install.sh") diff --git a/tools/tdgpt/taosanalytics/test/restful_api_test.py b/tools/tdgpt/taosanalytics/test/restful_api_test.py new file mode 100644 index 0000000000..6463343e00 --- /dev/null +++ b/tools/tdgpt/taosanalytics/test/restful_api_test.py @@ -0,0 +1,259 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""flask restful api test module""" + +import sys, os.path + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../") + +from flask_testing import TestCase +from taosanalytics.app import app +from taosanalytics.conf import setup_log_info + + +class RestfulTest(TestCase): + """ restful api test class """ + + def create_app(self): + app.testing = True + setup_log_info("restfull_test.log") + return app + + def test_access_main_page(self): + """ test asscess default main page """ + response = self.client.get('/') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content_length, len("TDengine© Time Series Data Analytics Platform (ver 1.0.1)") + 1) + + def test_load_status(self): + """ test load the server status """ + response = self.client.get('/status') + self.assertEqual(response.status_code, 200) + res = response.json + + self.assertEqual(res['protocol'], 1.0) + self.assertEqual(res['status'], 'ready') + + def test_load_algos(self): + """ test load provided algos""" + response = self.client.get('/list') + self.assertEqual(response.status_code, 200) + + res = response.json + self.assertEqual(res['version'], 0.1) + self.assertEqual(res['protocol'], 1.0) + + d = res['details'] + self.assertEqual(len(d), 2) + + def test_forecast(self): + """test forecast api""" + response = self.client.post("/forecast", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4] + ], + "data": [ + [ + 1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000, + 1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000, + 1577808015000, 1577808016000, 1577808017000, 1577808018000, 1577808019000, + 1577808020000, 1577808021000, 1577808022000, 1577808023000, 1577808024000, + 1577808025000, 1577808026000, 1577808027000, 1577808028000, 1577808029000, + 1577808030000, 1577808031000, 1577808032000, 1577808033000, 1577808034000, + 1577808035000, 1577808036000, 1577808037000, 1577808038000, 1577808039000, + 1577808040000, 1577808041000, 1577808042000, 1577808043000, 1577808044000, + 1577808045000, 1577808046000, 1577808047000, 1577808048000, 1577808049000, + 1577808050000, 1577808051000, 1577808052000, 1577808053000, 1577808054000, + 1577808055000, 1577808056000, 1577808057000, 1577808058000, 1577808059000, + 1577808060000, 1577808061000, 1577808062000, 1577808063000, 1577808064000, + 1577808065000, 1577808066000, 1577808067000, 1577808068000, 1577808069000, + 1577808070000, 1577808071000, 1577808072000, 1577808073000, 1577808074000, + 1577808075000, 1577808076000, 1577808077000, 1577808078000, 1577808079000, + 1577808080000, 1577808081000, 1577808082000, 1577808083000, 1577808084000, + 1577808085000, 1577808086000, 1577808087000, 1577808088000, 1577808089000, + 1577808090000, 1577808091000, 1577808092000, 1577808093000, 1577808094000, + 1577808095000 + ], + [ + 13, 14, 8, 10, 16, 26, 32, 27, 18, 32, 36, 24, 22, 23, 22, 18, 25, 21, 21, + 14, 8, 11, 14, 23, 18, 17, 19, 20, 22, 19, 13, 26, 13, 14, 22, 24, 21, 22, + 26, 21, 23, 24, 27, 41, 31, 27, 35, 26, 28, 36, 39, 21, 17, 22, 17, 19, 15, + 34, 10, 15, 22, 18, 15, 20, 15, 22, 19, 16, 30, 27, 29, 23, 20, 16, 21, 21, + 25, 16, 18, 15, 18, 14, 10, 15, 8, 15, 6, 11, 8, 7, 13, 10, 23, 16, 15, 25 + ] + ], + "option": "algo=holtwinters", + "algo": "holtwinters", + "prec": "ms", + "wncheck": 1, + "return_conf": 1, + "forecast_rows": 10, + "conf": 95, + "start": 1577808096000, + "every": 1000, + "rows": 96, + "protocol": 1.0 + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["algo"], "holtwinters") + self.assertEqual(response.json["rows"], 10) + self.assertEqual(response.json["period"], 0) + self.assertEqual(response.json["res"][0][0], 1577808096000) + self.assertEqual(response.json["res"][0][-1], 1577808105000) + self.assertEqual(len(response.json["res"][0]), response.json["rows"]) + self.assertEqual(len(response.json["res"]), 4) + + def test_ad(self): + """test anomaly detect api""" + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000, + 1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000, + 1577808015000, 1577808016000], + [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] + ], + "rows": 17, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], 1) + self.assertEqual(response.json["algo"], "iqr") + + def test_ad_error_get(self): + """1. invalid http method""" + response = self.client.get("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000, + 1577808010000, 1577808011000, 1577808012000, 1577808013000, 1577808014000, + 1577808015000, 1577808016000], + [5, 14, 15, 15, 14, 19, 17, 16, 20, 22, 8, 21, 28, 11, 9, 29, 40] + ], + "rows": 17, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 405) + + def test_ad_error_empty_payload(self): + """2. list that is going to apply anomaly detection is empty or less value than the threshold + , which is [10, 100000]""" + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000], + [5, 14, 15, 15, 14, 19, 17, 16, 20] + ], + "rows": 9, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], -1) + + def test_ad_error_single_col(self): + """3. only one column""" + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000] + ], + "rows": 9, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], -1) + + def test_ad_error_three_cols(self): + """4. there are three input columns """ + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8], + ["val", "INT", 4], + ["val1", "INT", 4] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000], + [5, 14, 15, 15, 14, 19, 17, 16, 20, 44], + [5, 14, 15, 15, 14, 19, 17, 16, 20, 44] + ], + "rows": 10, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], -1) + + def test_ad_disorder_cols(self): + """5. disorder two columns """ + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["val", "INT", 4], + ["ts", "TIMESTAMP", 8] + ], + "data": [ + [5, 14, 15, 15, 14, 19, 17, 16, 20, 44], + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000], + ], + "rows": 10, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], 2) + + def test_missing_schema(self): + """6. missing schema info""" + response = self.client.post("/anomaly-detect", json={ + "data": [ + [5, 14, 15, 15, 14, 19, 17, 16, 20, 44], + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000], + ], + "rows": 10, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], -1) + + def test_invalid_schema_info(self): + """7. invalid schema info""" + response = self.client.post("/anomaly-detect", json={ + "schema": [ + ["ts", "TIMESTAMP", 8] + ], + "data": [ + [1577808000000, 1577808001000, 1577808002000, 1577808003000, 1577808004000, + 1577808005000, 1577808006000, 1577808007000, 1577808008000, 1577808009000], + ], + "rows": 10, + "algo": "iqr" + }) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json["rows"], -1) diff --git a/tools/tdgpt/taosanalytics/test/unit_test.py b/tools/tdgpt/taosanalytics/test/unit_test.py new file mode 100644 index 0000000000..f6ecdf0d5b --- /dev/null +++ b/tools/tdgpt/taosanalytics/test/unit_test.py @@ -0,0 +1,106 @@ +# encoding:utf-8 +# pylint: disable=c0103 +"""unit test module""" +import os.path +import unittest +import sys + +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../") + +from taosanalytics.servicemgmt import loader +from taosanalytics.util import convert_results_to_windows, is_white_noise, parse_options, is_stationary + + +class UtilTest(unittest.TestCase): + """utility test cases""" + + def test_generate_anomaly_window(self): + # Test case 1: Normal input + wins = convert_results_to_windows([1, 1, 1, 1, 1, 1, -1, -1, -1, 1, 1, -1], + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) + print(f"The result window is:{wins}") + + # Assert the number of windows + self.assertEqual(len(wins), 2) + + # Assert the first window + self.assertListEqual(wins[0], [7, 9]) + + # Assert the second window + self.assertListEqual(wins[1], [12, 12]) + + # Test case 2: Anomaly input list is empty + wins = convert_results_to_windows([], [1, 2]) + self.assertListEqual(wins, []) + + # Test case 3: Anomaly input list is None + wins = convert_results_to_windows([], None) + self.assertListEqual(wins, []) + + # Test case 4: Timestamp list is None + wins = convert_results_to_windows(None, []) + self.assertListEqual(wins, []) + + def test_validate_input_data(self): + pass + + def test_validate_pay_load(self): + pass + + def test_validate_forecast_input_data(self): + pass + + def test_convert_results_to_windows(self): + pass + + def test_is_white_noise(self): + """ + Test the is_white_noise function. + This function tests the functionality of the is_white_noise function by providing a list and asserting the expected result. + """ + list1 = [] + wn = is_white_noise(list1) + self.assertFalse(wn) + + def test_is_stationary(self): + """test whether data is stationary or not""" + st = is_stationary([1, 2, 3, 4, 5, 7, 5, 1, 54, 3, 6, 87, 45, 14, 24]) + self.assertEquals(st, False) + + def test_parse_options(self): + """test case for parse key/value string into k/v pair""" + option_str = "algo=ksigma,k=2,invalid_option=invalid_str" + opt = parse_options(option_str) + + self.assertEqual(len(opt), 3) + self.assertDictEqual(opt, {'algo': 'ksigma', 'k': '2', 'invalid_option': 'invalid_str'}) + + def test_get_data_index(self): + """ test the get the data index method""" + schema = [ + ["val", "INT", 4], + ["ts", "TIMESTAMP", 8] + ] + for index, val in enumerate(schema): + if val[0] == "val": + return index + + +class ServiceTest(unittest.TestCase): + def setUp(self): + """ load all service before start unit test """ + loader.load_all_service() + + def test_get_all_algos(self): + service_list = loader.get_service_list() + self.assertEqual(len(service_list["details"]), 2) + + for item in service_list["details"]: + if item["type"] == "anomaly-detection": + self.assertEqual(len(item["algo"]), 6) + else: + self.assertEqual(len(item["algo"]), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/tdgpt/taosanalytics/util.py b/tools/tdgpt/taosanalytics/util.py new file mode 100644 index 0000000000..b9b292c3b4 --- /dev/null +++ b/tools/tdgpt/taosanalytics/util.py @@ -0,0 +1,126 @@ +# encoding:utf-8 +"""utility methods to helper query processing""" +import numpy as np +from statsmodels.stats.diagnostic import acorr_ljungbox +from statsmodels.tsa.stattools import adfuller + +from taosanalytics.conf import app_logger + + +def validate_pay_load(json_obj): + """ validate the input payload """ + if "data" not in json_obj: + raise ValueError('data attr does not exist in json') + + data = json_obj["data"] + + if len(data) <= 1: + raise ValueError('only one column, primary timestamp column should be provided') + + if len(data) > 2: + raise ValueError('too many columns') + + rows = len(data[0]) + + if rows != len(data[1]): + raise ValueError('data inconsistent, number of rows are not identical') + + if rows < 10 or rows > 40000: + raise ValueError(f'number of rows should between 10 and 40000, actual {rows} rows') + + if "schema" not in json_obj: + raise ValueError('schema is missing') + + index = get_data_index(json_obj["schema"]) + if index == -1: + raise ValueError('invalid schema info, data column is missing') + + +def convert_results_to_windows(result, ts_list): + """generate the window according to anomaly detection result""" + skey, ekey = -1, -1 + wins = [] + + if ts_list is None or result is None or len(result) != len(ts_list): + return wins + + for index, val in enumerate(result): + if val == -1: + ekey = ts_list[index] + if skey == -1: + skey = ts_list[index] + else: + if ekey != -1: + wins.append([skey, ekey]) + skey, ekey = -1, -1 + + if ekey != -1: + wins.append([skey, ekey]) + + return wins + + +def is_white_noise(input_list): + """ determine whether the input list is a white noise list or not """ + if len(input_list) < 16: # the number of items in the list is insufficient + return False + + res = acorr_ljungbox(input_list, lags=[6, 12, 16], boxpierce=True, return_df=True) + q_lb = res.lb_pvalue.array[2] + return q_lb >= 0.05 + + +def is_stationary(input_list): + """ determine whether the input list is weak stationary or not """ + adf, pvalue, usedlag, nobs, critical_values, _ = adfuller(input_list, autolag='AIC') + app_logger.log_inst.info("adf is:%f critical value is:%s" % (adf, critical_values)) + return pvalue < 0.05 + + +def parse_options(option_str) -> dict: + """ + the option format is like the following string: "algo=ksigma,k=2,invalid_option=invalid_str" + convert it to the dict format + """ + options = {} + + if option_str is None or len(option_str) == 0: + return options + + opt_list = option_str.split(",") + for line in opt_list: + if "=" not in line or len(line.strip()) < 3: + continue + + kv_pair = line.strip().split("=") + if kv_pair[0].strip() == '' or kv_pair[1].strip() == '': + continue + + options[kv_pair[0].strip()] = kv_pair[1].strip() + + return options + + +def get_data_index(schema): + """get the data index according to the schema info""" + for index, val in enumerate(schema): + if val[0] == "val": + return index + + return -1 + + +def get_ts_index(schema): + """get the timestamp index according to the schema info""" + for index, val in enumerate(schema): + if val[0] == "ts": + return index + return -1 + + +def create_sequences(values, time_steps): + """ create sequences for training model """ + output = [] + for i in range(len(values) - time_steps + 1): + output.append(values[i: (i + time_steps)]) + return np.stack(output)