Merge branch '3.0' into cpwu/3.0

This commit is contained in:
cpwu 2022-06-30 16:09:23 +08:00
commit f506147b28
33 changed files with 477 additions and 1085 deletions

View File

@ -125,6 +125,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
FUNCTION_TYPE_TO_COLUMN, FUNCTION_TYPE_TO_COLUMN,
FUNCTION_TYPE_GROUP_KEY, FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
// distributed splitting functions // distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,

View File

@ -91,6 +91,7 @@ typedef struct SAggLogicNode {
SLogicNode node; SLogicNode node;
SNodeList* pGroupKeys; SNodeList* pGroupKeys;
SNodeList* pAggFuncs; SNodeList* pAggFuncs;
bool hasLastRow;
} SAggLogicNode; } SAggLogicNode;
typedef struct SProjectLogicNode { typedef struct SProjectLogicNode {

View File

@ -1,20 +0,0 @@
[Unit]
Description=TDengine arbitrator service
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/bin/tarbitrator
TimeoutStopSec=1000000s
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
TimeoutStartSec=0
StandardOutput=null
Restart=always
StartLimitBurst=3
StartLimitInterval=60s
[Install]
WantedBy=multi-user.target

View File

@ -1,88 +0,0 @@
#!/bin/bash
#
# Modified from original source: Elastic Search
# https://github.com/elasticsearch/elasticsearch
# Thank you to the Elastic Search authors
#
# chkconfig: 2345 99 01
#
### BEGIN INIT INFO
# Provides: taoscluster
# Required-Start: $local_fs $network $syslog
# Required-Stop: $local_fs $network $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Starts taoscluster tarbitrator
# Description: Starts taoscluster tarbitrator, a arbitrator
### END INIT INFO
set -e
PATH="/bin:/usr/bin:/sbin:/usr/sbin"
NAME="taoscluster"
USER="root"
GROUP="root"
DAEMON="/usr/local/taos/bin/tarbitrator"
DAEMON_OPTS=""
PID_FILE="/var/run/$NAME.pid"
APPARGS=""
# Maximum number of open files
MAX_OPEN_FILES=65535
. /lib/lsb/init-functions
case "$1" in
start)
log_action_begin_msg "Starting tarbitrator..."
if start-stop-daemon --test --start --chuid "$USER:$GROUP" --background --make-pidfile --pidfile "$PID_FILE" --exec "$DAEMON" -- $APPARGS &> /dev/null; then
touch "$PID_FILE" && chown "$USER":"$GROUP" "$PID_FILE"
if [ -n "$MAX_OPEN_FILES" ]; then
ulimit -n $MAX_OPEN_FILES
fi
start-stop-daemon --start --chuid "$USER:$GROUP" --background --make-pidfile --pidfile "$PID_FILE" --exec "$DAEMON" -- $APPARGS
log_end_msg $?
fi
;;
stop)
log_action_begin_msg "Stopping tarbitrator..."
set +e
if [ -f "$PID_FILE" ]; then
start-stop-daemon --stop --pidfile "$PID_FILE" --user "$USER" --retry=TERM/120/KILL/5 > /dev/null
if [ $? -eq 1 ]; then
log_action_cont_msg "TSD is not running but pid file exists, cleaning up"
elif [ $? -eq 3 ]; then
PID="`cat $PID_FILE`"
log_failure_msg "Failed to stop tarbitrator (pid $PID)"
exit 1
fi
rm -f "$PID_FILE"
else
log_action_cont_msg "tarbitrator was not running"
fi
log_action_end_msg 0
set -e
;;
restart|force-reload)
if [ -f "$PID_FILE" ]; then
$0 stop
sleep 1
fi
$0 start
;;
status)
status_of_proc -p "$PID_FILE" "$DAEMON" "$NAME"
;;
*)
exit 1
;;
esac
exit 0

View File

@ -111,9 +111,9 @@ else
fi fi
csudo="" csudo=""
if command -v sudo > /dev/null; then #if command -v sudo > /dev/null; then
csudo="sudo " # csudo="sudo "
fi #fi
function is_valid_version() { function is_valid_version() {
[ -z $1 ] && return 1 || : [ -z $1 ] && return 1 || :
@ -181,7 +181,9 @@ cd "${curr_dir}"
# 2. cmake executable file # 2. cmake executable file
compile_dir="${top_dir}/debug" compile_dir="${top_dir}/debug"
${csudo}rm -rf ${compile_dir} if [ -d ${compile_dir} ]; then
rm -rf ${compile_dir}
fi
mkdir -p ${compile_dir} mkdir -p ${compile_dir}
cd ${compile_dir} cd ${compile_dir}
@ -258,9 +260,9 @@ if [ "$osType" != "Darwin" ]; then
if [[ "$pagMode" == "full" ]]; then if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb cd ${top_dir}/tools/taos-tools/packaging/deb
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}')
${csudo}./make-taos-tools-deb.sh ${top_dir} \ ${csudo}./make-taos-tools-deb.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi fi
@ -283,9 +285,9 @@ if [ "$osType" != "Darwin" ]; then
if [[ "$pagMode" == "full" ]]; then if [[ "$pagMode" == "full" ]]; then
if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then if [ -d ${top_dir}/tools/taos-tools/packaging/rpm ]; then
cd ${top_dir}/tools/taos-tools/packaging/rpm cd ${top_dir}/tools/taos-tools/packaging/rpm
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taos_tools_ver=$(git describe --tags | sed -e 's/ver-//g' | awk -F '-' '{print $1}' | sed -e 's/-/_/g')
${csudo}./make-taos-tools-rpm.sh ${top_dir} \ ${csudo}./make-taos-tools-rpm.sh ${top_dir} \
${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType} ${compile_dir} ${output_dir} ${taos_tools_ver} ${cpuType} ${osType} ${verMode} ${verType}
fi fi
@ -300,7 +302,6 @@ if [ "$osType" != "Darwin" ]; then
${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName} ${csudo}./makepkg.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${verNumberComp} ${dbName}
${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName} ${csudo}./makeclient.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode} ${dbName}
# ${csudo}./makearbi.sh ${compile_dir} ${verNumber} "${build_time}" ${cpuType} ${osType} ${verMode} ${verType} ${pagMode}
else else
# only make client for Darwin # only make client for Darwin

View File

@ -1,141 +0,0 @@
#!/bin/bash
#
# tarbitratord This shell script takes care of starting and stopping tarbitrator.
#
# chkconfig: 2345 99 01
# description: tarbitrator is a arbitrator used in TDengine cluster.
#
#
### BEGIN INIT INFO
# Provides: taoscluster
# Required-Start: $network $local_fs $remote_fs
# Required-Stop: $network $local_fs $remote_fs
# Short-Description: start and stop tarbitrator
# Description: tarbitrator is a arbitrator used in TDengine cluster.
### END INIT INFO
# Source init functions
. /etc/init.d/functions
# Maximum number of open files
MAX_OPEN_FILES=65535
# Default program options
NAME=tarbitrator
PROG=/usr/local/taos/bin/tarbitrator
USER=root
GROUP=root
# Default directories
LOCK_DIR=/var/lock/subsys
PID_DIR=/var/run/$NAME
# Set file names
LOCK_FILE=$LOCK_DIR/$NAME
PID_FILE=$PID_DIR/$NAME.pid
[ -e $PID_DIR ] || mkdir -p $PID_DIR
PROG_OPTS=""
start() {
echo -n "Starting ${NAME}: "
# check identity
curid="`id -u -n`"
if [ "$curid" != root ] && [ "$curid" != "$USER" ] ; then
echo "Must be run as root or $USER, but was run as $curid"
return 1
fi
# Sets the maximum number of open file descriptors allowed.
ulimit -n $MAX_OPEN_FILES
curulimit="`ulimit -n`"
if [ "$curulimit" -lt $MAX_OPEN_FILES ] ; then
echo "'ulimit -n' must be greater than or equal to $MAX_OPEN_FILES, is $curulimit"
return 1
fi
if [ "`id -u -n`" == root ] ; then
# Changes the owner of the lock, and the pid files to allow
# non-root OpenTSDB daemons to run /usr/share/opentsdb/bin/opentsdb_restart.py.
touch $LOCK_FILE && chown $USER:$GROUP $LOCK_FILE
touch $PID_FILE && chown $USER:$GROUP $PID_FILE
daemon --user $USER --pidfile $PID_FILE "$PROG $PROG_OPTS &> /dev/null &"
else
# Don't have to change user.
daemon --pidfile $PID_FILE "$PROG $PROG_OPTS &> /dev/null &"
fi
retval=$?
sleep 2
echo
[ $retval -eq 0 ] && (findproc > $PID_FILE && touch $LOCK_FILE)
return $retval
}
stop() {
echo -n "Stopping ${NAME}: "
killproc -p $PID_FILE $NAME
retval=$?
echo
# Non-root users don't have enough permission to remove pid and lock files.
# So, the opentsdb_restart.py cannot get rid of the files, and the command
# "service opentsdb status" will complain about the existing pid file.
# Makes the pid file empty.
echo > $PID_FILE
[ $retval -eq 0 ] && (rm -f $PID_FILE && rm -f $LOCK_FILE)
return $retval
}
restart() {
stop
start
}
reload() {
restart
}
force_reload() {
restart
}
rh_status() {
# run checks to determine if the service is running or use generic status
status -p $PID_FILE -l $LOCK_FILE $NAME
}
rh_status_q() {
rh_status >/dev/null 2>&1
}
case "$1" in
start)
rh_status_q && exit 0
$1
;;
stop)
rh_status_q || exit 0
$1
;;
restart)
$1
;;
reload)
rh_status_q || exit 7
$1
;;
force-reload)
force_reload
;;
status)
rh_status
;;
condrestart|try-restart)
rh_status_q || exit 0
restart
;;
*)
echo "Usage: $0 {start|stop|status|restart|condrestart|try-restart|reload|force-reload}"
exit 2
esac
exit $?

View File

@ -194,7 +194,6 @@ function install_bin() {
${csudo}rm -f ${bin_link_dir}/${serverName} || : ${csudo}rm -f ${bin_link_dir}/${serverName} || :
${csudo}rm -f ${bin_link_dir}/${adapterName} || : ${csudo}rm -f ${bin_link_dir}/${adapterName} || :
${csudo}rm -f ${bin_link_dir}/${uninstallScript} || : ${csudo}rm -f ${bin_link_dir}/${uninstallScript} || :
${csudo}rm -f ${bin_link_dir}/tarbitrator || :
${csudo}rm -f ${bin_link_dir}/set_core || : ${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || : ${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
@ -210,7 +209,6 @@ function install_bin() {
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || : [ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || : [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
[ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo}ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || :
if [ "$verMode" == "cluster" ]; then if [ "$verMode" == "cluster" ]; then
${csudo}cp -r ${script_dir}/nginxd/* ${nginx_dir} && ${csudo}chmod 0555 ${nginx_dir}/* ${csudo}cp -r ${script_dir}/nginxd/* ${nginx_dir} && ${csudo}chmod 0555 ${nginx_dir}/*
@ -606,28 +604,19 @@ function install_service_on_sysvinit() {
if ((${os_type} == 1)); then if ((${os_type} == 1)); then
# ${csudo}cp -f ${script_dir}/init.d/${serverName}.deb ${install_main_dir}/init.d/${serverName} # ${csudo}cp -f ${script_dir}/init.d/${serverName}.deb ${install_main_dir}/init.d/${serverName}
${csudo}cp ${script_dir}/init.d/${serverName}.deb ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} ${csudo}cp ${script_dir}/init.d/${serverName}.deb ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName}
# ${csudo}cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord
${csudo}cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord
elif ((${os_type} == 2)); then elif ((${os_type} == 2)); then
# ${csudo}cp -f ${script_dir}/init.d/${serverName}.rpm ${install_main_dir}/init.d/${serverName} # ${csudo}cp -f ${script_dir}/init.d/${serverName}.rpm ${install_main_dir}/init.d/${serverName}
${csudo}cp ${script_dir}/init.d/${serverName}.rpm ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName} ${csudo}cp ${script_dir}/init.d/${serverName}.rpm ${service_config_dir}/${serverName} && ${csudo}chmod a+x ${service_config_dir}/${serverName}
# ${csudo}cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord
${csudo}cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord
fi fi
if ((${initd_mod} == 1)); then if ((${initd_mod} == 1)); then
${csudo}chkconfig --add ${serverName} || : ${csudo}chkconfig --add ${serverName} || :
${csudo}chkconfig --level 2345 ${serverName} on || : ${csudo}chkconfig --level 2345 ${serverName} on || :
${csudo}chkconfig --add tarbitratord || :
${csudo}chkconfig --level 2345 tarbitratord on || :
elif ((${initd_mod} == 2)); then elif ((${initd_mod} == 2)); then
${csudo}insserv ${serverName} || : ${csudo}insserv ${serverName} || :
${csudo}insserv -d ${serverName} || : ${csudo}insserv -d ${serverName} || :
${csudo}insserv tarbitratord || :
${csudo}insserv -d tarbitratord || :
elif ((${initd_mod} == 3)); then elif ((${initd_mod} == 3)); then
${csudo}update-rc.d ${serverName} defaults || : ${csudo}update-rc.d ${serverName} defaults || :
${csudo}update-rc.d tarbitratord defaults || :
fi fi
} }
@ -669,9 +658,6 @@ function install_service_on_systemd() {
${csudo}systemctl enable ${serverName} ${csudo}systemctl enable ${serverName}
[ -f ${script_dir}/cfg/tarbitratord.service ] &&
${csudo}cp ${script_dir}/cfg/tarbitratord.service \
${service_config_dir}/ || :
${csudo}systemctl daemon-reload ${csudo}systemctl daemon-reload
if [ "$verMode" == "cluster" ]; then if [ "$verMode" == "cluster" ]; then

View File

@ -1,340 +0,0 @@
#!/bin/bash
#
# This file is used to install database on linux systems. The operating system
# is required to use systemd to manage services at boot
set -e
#set -x
# -----------------------Variables definition---------------------
script_dir=$(dirname $(readlink -f "$0"))
bin_link_dir="/usr/bin"
#inc_link_dir="/usr/include"
#install main path
install_main_dir="/usr/local/tarbitrator"
# old bin dir
bin_dir="/usr/local/tarbitrator/bin"
service_config_dir="/etc/systemd/system"
# Color setting
RED='\033[0;31m'
GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
csudo=""
if command -v sudo >/dev/null; then
csudo="sudo "
fi
update_flag=0
initd_mod=0
service_mod=2
if pidof systemd &>/dev/null; then
service_mod=0
elif $(which service &>/dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
if $(which chkconfig &>/dev/null); then
initd_mod=1
elif $(which insserv &>/dev/null); then
initd_mod=2
elif $(which update-rc.d &>/dev/null); then
initd_mod=3
else
service_mod=2
fi
else
service_mod=2
fi
# get the operating system type for using the corresponding init file
# ubuntu/debian(deb), centos/fedora(rpm), others: opensuse, redhat, ..., no verification
#osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
if [[ -e /etc/os-release ]]; then
osinfo=$(cat /etc/os-release | grep "NAME" | cut -d '"' -f2) || :
else
osinfo=""
fi
#echo "osinfo: ${osinfo}"
os_type=0
if echo $osinfo | grep -qwi "ubuntu"; then
# echo "This is ubuntu system"
os_type=1
elif echo $osinfo | grep -qwi "debian"; then
# echo "This is debian system"
os_type=1
elif echo $osinfo | grep -qwi "Kylin"; then
# echo "This is Kylin system"
os_type=1
elif echo $osinfo | grep -qwi "centos"; then
# echo "This is centos system"
os_type=2
elif echo $osinfo | grep -qwi "fedora"; then
# echo "This is fedora system"
os_type=2
else
echo " osinfo: ${osinfo}"
echo " This is an officially unverified linux system,"
echo " if there are any problems with the installation and operation, "
echo " please feel free to contact taosdata.com for support."
os_type=1
fi
function kill_tarbitrator() {
pid=$(ps -ef | grep "tarbitrator" | grep -v "grep" | awk '{print $2}')
if [ -n "$pid" ]; then
${csudo}kill -9 $pid || :
fi
}
function install_main_path() {
#create install main dir and all sub dir
${csudo}rm -rf ${install_main_dir} || :
${csudo}mkdir -p ${install_main_dir}
${csudo}mkdir -p ${install_main_dir}/bin
#${csudo}mkdir -p ${install_main_dir}/include
${csudo}mkdir -p ${install_main_dir}/init.d
}
function install_bin() {
# Remove links
${csudo}rm -f ${bin_link_dir}/rmtarbitrator || :
${csudo}rm -f ${bin_link_dir}/tarbitrator || :
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/*
#Make link
[ -x ${install_main_dir}/bin/remove_arbi.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove_arbi.sh ${bin_link_dir}/rmtarbitrator || :
[ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo}ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || :
}
function install_header() {
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
}
function install_jemalloc() {
jemalloc_dir=${script_dir}/jemalloc
if [ -d ${jemalloc_dir} ]; then
${csudo}/usr/bin/install -c -d /usr/local/bin
if [ -f ${jemalloc_dir}/bin/jemalloc-config ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jemalloc-config /usr/local/bin
fi
if [ -f ${jemalloc_dir}/bin/jemalloc.sh ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jemalloc.sh /usr/local/bin
fi
if [ -f ${jemalloc_dir}/bin/jeprof ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/bin/jeprof /usr/local/bin
fi
if [ -f ${jemalloc_dir}/include/jemalloc/jemalloc.h ]; then
${csudo}/usr/bin/install -c -d /usr/local/include/jemalloc
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/include/jemalloc/jemalloc.h /usr/local/include/jemalloc
fi
if [ -f ${jemalloc_dir}/lib/libjemalloc.so.2 ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${csudo}/usr/bin/install -c -d /usr/local/lib
if [ -f ${jemalloc_dir}/lib/libjemalloc.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc.a /usr/local/lib
fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then
${csudo}/usr/bin/install -c -m 755 ${jemalloc_dir}/lib/libjemalloc_pic.a /usr/local/lib
fi
if [ -f ${jemalloc_dir}/lib/libjemalloc_pic.a ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib/pkgconfig
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
fi
fi
if [ -f ${jemalloc_dir}/share/doc/jemalloc/jemalloc.html ]; then
${csudo}/usr/bin/install -c -d /usr/local/share/doc/jemalloc
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc
fi
if [ -f ${jemalloc_dir}/share/man/man3/jemalloc.3 ]; then
${csudo}/usr/bin/install -c -d /usr/local/share/man/man3
${csudo}/usr/bin/install -c -m 644 ${jemalloc_dir}/share/man/man3/jemalloc.3 /usr/local/share/man/man3
fi
if [ -d /etc/ld.so.conf.d ]; then
echo "/usr/local/lib" | ${csudo}tee /etc/ld.so.conf.d/jemalloc.conf >/dev/null || echo -e "failed to write /etc/ld.so.conf.d/jemalloc.conf"
${csudo}ldconfig
else
echo "/etc/ld.so.conf.d not found!"
fi
fi
}
function clean_service_on_sysvinit() {
if pidof tarbitrator &>/dev/null; then
${csudo}service tarbitratord stop || :
fi
if ((${initd_mod} == 1)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}chkconfig --del tarbitratord || :
fi
elif ((${initd_mod} == 2)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}insserv -r tarbitratord || :
fi
elif ((${initd_mod} == 3)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}update-rc.d -f tarbitratord remove || :
fi
fi
${csudo}rm -f ${service_config_dir}/tarbitratord || :
if $(which init &>/dev/null); then
${csudo}init q || :
fi
}
function install_service_on_sysvinit() {
clean_service_on_sysvinit
sleep 1
if ((${os_type} == 1)); then
${csudo}cp -f ${script_dir}/init.d/tarbitratord.deb ${install_main_dir}/init.d/tarbitratord
${csudo}cp ${script_dir}/init.d/tarbitratord.deb ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord
elif ((${os_type} == 2)); then
${csudo}cp -f ${script_dir}/init.d/tarbitratord.rpm ${install_main_dir}/init.d/tarbitratord
${csudo}cp ${script_dir}/init.d/tarbitratord.rpm ${service_config_dir}/tarbitratord && ${csudo}chmod a+x ${service_config_dir}/tarbitratord
fi
if ((${initd_mod} == 1)); then
${csudo}chkconfig --add tarbitratord || :
${csudo}chkconfig --level 2345 tarbitratord on || :
elif ((${initd_mod} == 2)); then
${csudo}insserv tarbitratord || :
${csudo}insserv -d tarbitratord || :
elif ((${initd_mod} == 3)); then
${csudo}update-rc.d tarbitratord defaults || :
fi
}
function clean_service_on_systemd() {
tarbitratord_service_config="${service_config_dir}/tarbitratord.service"
if systemctl is-active --quiet tarbitratord; then
echo "tarbitrator is running, stopping it..."
${csudo}systemctl stop tarbitratord &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable tarbitratord &>/dev/null || echo &>/dev/null
${csudo}rm -f ${tarbitratord_service_config}
}
function install_service_on_systemd() {
clean_service_on_systemd
tarbitratord_service_config="${service_config_dir}/tarbitratord.service"
${csudo}bash -c "echo '[Unit]' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'Description=TDengine arbitrator service' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'After=network-online.target' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'Wants=network-online.target' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo >> ${tarbitratord_service_config}"
${csudo}bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'TimeoutStartSec=0' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'StandardOutput=null' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'Restart=always' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'StartLimitBurst=3' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'StartLimitInterval=60s' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo >> ${tarbitratord_service_config}"
${csudo}bash -c "echo '[Install]' >> ${tarbitratord_service_config}"
${csudo}bash -c "echo 'WantedBy=multi-user.target' >> ${tarbitratord_service_config}"
${csudo}systemctl enable tarbitratord
}
function install_service() {
if ((${service_mod} == 0)); then
install_service_on_systemd
elif ((${service_mod} == 1)); then
install_service_on_sysvinit
else
kill_tarbitrator
fi
}
function update_TDengine() {
# Start to update
echo -e "${GREEN}Start to update TDengine's arbitrator ...${NC}"
# Stop the service if running
if pidof tarbitrator &>/dev/null; then
if ((${service_mod} == 0)); then
${csudo}systemctl stop tarbitratord || :
elif ((${service_mod} == 1)); then
${csudo}service tarbitratord stop || :
else
kill_tarbitrator
fi
sleep 1
fi
install_main_path
#install_header
install_bin
install_service
install_jemalloc
echo
if ((${service_mod} == 0)); then
echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}systemctl start tarbitratord${NC}"
elif ((${service_mod} == 1)); then
echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}service tarbitratord start${NC}"
else
echo -e "${GREEN_DARK}To start arbitrator ${NC}: ./tarbitrator${NC}"
fi
echo
echo -e "\033[44;32;1mTDengine's arbitrator is updated successfully!${NC}"
}
function install_TDengine() {
# Start to install
echo -e "${GREEN}Start to install TDengine's arbitrator ...${NC}"
install_main_path
#install_header
install_bin
install_service
install_jemalloc
echo
if ((${service_mod} == 0)); then
echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}systemctl start tarbitratord${NC}"
elif ((${service_mod} == 1)); then
echo -e "${GREEN_DARK}To start arbitrator ${NC}: ${csudo}service tarbitratord start${NC}"
else
echo -e "${GREEN_DARK}To start arbitrator ${NC}: tarbitrator${NC}"
fi
echo -e "\033[44;32;1mTDengine's arbitrator is installed successfully!${NC}"
echo
}
## ==============================Main program starts from here============================
# Install server and client
if [ -x ${bin_dir}/tarbitrator ]; then
update_flag=1
update_TDengine
else
install_TDengine
fi

View File

@ -185,7 +185,6 @@ function install_bin() {
[ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || : [ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || : [ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || :
${csudo}cp -r ${binary_dir}/build/bin/${serverName} ${install_main_dir}/bin || : ${csudo}cp -r ${binary_dir}/build/bin/${serverName} ${install_main_dir}/bin || :
# ${csudo}cp -r ${binary_dir}/build/bin/tarbitrator ${install_main_dir}/bin || :
${csudo}cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin || : ${csudo}cp -r ${script_dir}/taosd-dump-cfg.gdb ${install_main_dir}/bin || :
${csudo}cp -r ${script_dir}/remove.sh ${install_main_dir}/bin || : ${csudo}cp -r ${script_dir}/remove.sh ${install_main_dir}/bin || :

View File

@ -1,71 +0,0 @@
#!/bin/bash
#
# Generate arbitrator's tar.gz setup package for all os system
set -e
#set -x
curr_dir=$(pwd)
compile_dir=$1
version=$2
build_time=$3
cpuType=$4
osType=$5
verMode=$6
verType=$7
pagMode=$8
script_dir="$(dirname $(readlink -f $0))"
top_dir="$(readlink -f ${script_dir}/../..)"
productName="TDengine"
# create compressed install file.
build_dir="${compile_dir}/build"
code_dir="${top_dir}"
release_dir="${top_dir}/release"
#package_name='linux'
if [ "$verMode" == "cluster" ]; then
install_dir="${release_dir}/${productName}-enterprise-arbitrator-${version}"
else
install_dir="${release_dir}/${productName}-arbitrator-${version}"
fi
# Directories and files.
bin_files="${build_dir}/bin/tarbitrator ${script_dir}/remove_arbi.sh"
install_files="${script_dir}/install_arbi.sh"
#header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h"
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
# make directories.
mkdir -p ${install_dir} && cp ${install_files} ${install_dir} && chmod a+x ${install_dir}/install_arbi.sh || :
#mkdir -p ${install_dir}/inc && cp ${header_files} ${install_dir}/inc || :
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || :
cd ${release_dir}
# install_dir has been distinguishes cluster from edege, so comments this code
pkg_name=${install_dir}-${osType}-${cpuType}
if [[ "$verType" == "beta" ]] || [[ "$verType" == "preRelease" ]]; then
pkg_name=${install_dir}-${verType}-${osType}-${cpuType}
elif [ "$verType" == "stable" ]; then
pkg_name=${pkg_name}
else
echo "unknow verType, nor stabel or beta"
exit 1
fi
tar -zcv -f "$(basename ${pkg_name}).tar.gz" $(basename ${install_dir}) --remove-files || :
exitcode=$?
if [ "$exitcode" != "0" ]; then
echo "tar ${pkg_name}.tar.gz error !!!"
exit $exitcode
fi
cd ${curr_dir}

View File

@ -85,7 +85,6 @@ else
${build_dir}/bin/${clientName} \ ${build_dir}/bin/${clientName} \
${taostools_bin_files} \ ${taostools_bin_files} \
${build_dir}/bin/taosadapter \ ${build_dir}/bin/taosadapter \
${build_dir}/bin/tarbitrator\
${script_dir}/remove.sh \ ${script_dir}/remove.sh \
${script_dir}/set_core.sh \ ${script_dir}/set_core.sh \
${script_dir}/startPre.sh \ ${script_dir}/startPre.sh \
@ -106,8 +105,6 @@ nginx_dir="${top_dir}/../enterprise/src/plugins/web"
init_file_deb=${script_dir}/../deb/taosd init_file_deb=${script_dir}/../deb/taosd
init_file_rpm=${script_dir}/../rpm/taosd init_file_rpm=${script_dir}/../rpm/taosd
init_file_tarbitrator_deb=${script_dir}/../deb/tarbitratord
init_file_tarbitrator_rpm=${script_dir}/../rpm/tarbitratord
# make directories. # make directories.
mkdir -p ${install_dir} mkdir -p ${install_dir}
@ -126,10 +123,6 @@ if [ -f "${cfg_dir}/${serverName}.service" ]; then
cp ${cfg_dir}/${serverName}.service ${install_dir}/cfg || : cp ${cfg_dir}/${serverName}.service ${install_dir}/cfg || :
fi fi
if [ -f "${top_dir}/packaging/cfg/tarbitratord.service" ]; then
cp ${top_dir}/packaging/cfg/tarbitratord.service ${install_dir}/cfg || :
fi
if [ -f "${top_dir}/packaging/cfg/nginxd.service" ]; then if [ -f "${top_dir}/packaging/cfg/nginxd.service" ]; then
cp ${top_dir}/packaging/cfg/nginxd.service ${install_dir}/cfg || : cp ${top_dir}/packaging/cfg/nginxd.service ${install_dir}/cfg || :
fi fi
@ -137,8 +130,6 @@ fi
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_deb} ${install_dir}/init.d/tarbitratord.deb || :
mkdir -p ${install_dir}/init.d && cp ${init_file_tarbitrator_rpm} ${install_dir}/init.d/tarbitratord.rpm || :
if [ $adapterName != "taosadapter" ]; then if [ $adapterName != "taosadapter" ]; then
mv ${install_dir}/cfg/taosadapter.toml ${install_dir}/cfg/$adapterName.toml mv ${install_dir}/cfg/taosadapter.toml ${install_dir}/cfg/$adapterName.toml

View File

@ -1,132 +0,0 @@
#!/bin/bash
#
# Script to stop the service and uninstall TDengine's arbitrator
set -e
#set -x
verMode=edge
RED='\033[0;31m'
GREEN='\033[1;32m'
NC='\033[0m'
#install main path
install_main_dir="/usr/local/tarbitrator"
bin_link_dir="/usr/bin"
#inc_link_dir="/usr/include"
service_config_dir="/etc/systemd/system"
tarbitrator_service_name="tarbitratord"
csudo=""
if command -v sudo > /dev/null; then
csudo="sudo "
fi
initd_mod=0
service_mod=2
if pidof systemd &> /dev/null; then
service_mod=0
elif $(which service &> /dev/null); then
service_mod=1
service_config_dir="/etc/init.d"
if $(which chkconfig &> /dev/null); then
initd_mod=1
elif $(which insserv &> /dev/null); then
initd_mod=2
elif $(which update-rc.d &> /dev/null); then
initd_mod=3
else
service_mod=2
fi
else
service_mod=2
fi
function kill_tarbitrator() {
pid=$(ps -ef | grep "tarbitrator" | grep -v "grep" | awk '{print $2}')
if [ -n "$pid" ]; then
${csudo}kill -9 $pid || :
fi
}
function clean_bin() {
# Remove link
${csudo}rm -f ${bin_link_dir}/tarbitrator || :
}
function clean_header() {
# Remove link
${csudo}rm -f ${inc_link_dir}/taos.h || :
${csudo}rm -f ${inc_link_dir}/taosdef.h || :
${csudo}rm -f ${inc_link_dir}/taoserror.h || :
${csudo}rm -f ${inc_link_dir}/taosudf.h || :
}
function clean_log() {
# Remove link
${csudo}rm -rf /arbitrator.log || :
}
function clean_service_on_systemd() {
tarbitratord_service_config="${service_config_dir}/${tarbitrator_service_name}.service"
if systemctl is-active --quiet ${tarbitrator_service_name}; then
echo "TDengine tarbitrator is running, stopping it..."
${csudo}systemctl stop ${tarbitrator_service_name} &> /dev/null || echo &> /dev/null
fi
${csudo}systemctl disable ${tarbitrator_service_name} &> /dev/null || echo &> /dev/null
${csudo}rm -f ${tarbitratord_service_config}
}
function clean_service_on_sysvinit() {
if pidof tarbitrator &> /dev/null; then
echo "TDengine's tarbitrator is running, stopping it..."
${csudo}service tarbitratord stop || :
fi
if ((${initd_mod}==1)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}chkconfig --del tarbitratord || :
fi
elif ((${initd_mod}==2)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}insserv -r tarbitratord || :
fi
elif ((${initd_mod}==3)); then
if [ -e ${service_config_dir}/tarbitratord ]; then
${csudo}update-rc.d -f tarbitratord remove || :
fi
fi
${csudo}rm -f ${service_config_dir}/tarbitratord || :
if $(which init &> /dev/null); then
${csudo}init q || :
fi
}
function clean_service() {
if ((${service_mod}==0)); then
clean_service_on_systemd
elif ((${service_mod}==1)); then
clean_service_on_sysvinit
else
# must manual stop
kill_tarbitrator
fi
}
# Stop service and disable booting start.
clean_service
# Remove binary file and links
clean_bin
# Remove header file.
##clean_header
# Remove log file
clean_log
${csudo}rm -rf ${install_main_dir}
echo -e "${GREEN}TDengine's arbitrator is removed successfully!${NC}"

View File

@ -1407,7 +1407,11 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
} }
SSyncQueryParam* pParam = pRequest->body.param; SSyncQueryParam* pParam = pRequest->body.param;
if (NULL == pParam) {
pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&pParam->sem, 0, 0);
}
// convert ucs4 to native multi-bytes string // convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4; pResultInfo->convertUcs4 = convertUcs4;

View File

@ -2456,10 +2456,10 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL; return NULL;
} }
int batchs = 0;
pTscObj->schemalessType = 1; pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
int cnt = ceil(((double)numLines) / LINE_BATCH);
Params params; Params params;
params.request = request; params.request = request;
tsem_init(&params.sem, 0, 0); tsem_init(&params.sem, 0, 0);
@ -2496,7 +2496,16 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
for (int i = 0; i < cnt; ++i) { if(protocol == TSDB_SML_JSON_PROTOCOL){
numLines = 1;
}else if(numLines <= 0){
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
goto end;
}
batchs = ceil(((double)numLines) / LINE_BATCH);
for (int i = 0; i < batchs; ++i) {
SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!req){ if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -48,7 +48,6 @@ struct SSmaEnv {
typedef struct { typedef struct {
int32_t smaRef; int32_t smaRef;
int32_t refId;
} SSmaMgmt; } SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
@ -63,12 +62,12 @@ struct STSmaStat {
struct SRSmaStat { struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t refId; int64_t refId; // shared by persistence/fetch tasks
void *tmrHandle; void *tmrHandle; // for persistence task
tmr_h tmrId; tmr_h tmrId; // for persistence task
int32_t tmrSeconds; int32_t tmrSeconds; // for persistence task
int8_t triggerStat; int8_t triggerStat; // for persistence task
int8_t runningStat; int8_t runningStat; // for persistence task
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo;
}; };

View File

@ -135,7 +135,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
// init smaMgmt // init smaMgmt
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
if (smaMgmt.refId < 0) { if (smaMgmt.smaRef < 0) {
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM); smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;

View File

@ -50,6 +50,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
struct SRSmaInfoItem { struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo; SRSmaInfo *pRsmaInfo;
int64_t refId;
void *taskInfo; // qTaskInfo_t void *taskInfo; // qTaskInfo_t
tmr_h tmrId; tmr_h tmrId;
int8_t level; int8_t level;
@ -60,11 +61,14 @@ struct SRSmaInfoItem {
struct SRSmaInfo { struct SRSmaInfo {
STSchema *pTSchema; STSchema *pTSchema;
SSma *pSma; SRSmaStat *pStat;
int64_t suid; int64_t suid;
SRSmaInfoItem items[TSDB_RETENTION_L2]; SRSmaInfoItem items[TSDB_RETENTION_L2];
}; };
#define RSMA_INFO_SMA(r) ((r)->pStat->pSma)
#define RSMA_INFO_STAT(r) ((r)->pStat)
struct SRSmaQTaskInfoItem { struct SRSmaQTaskInfoItem {
int32_t len; int32_t len;
int8_t type; int8_t type;
@ -107,22 +111,21 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
void *tdFreeRSmaInfo(SRSmaInfo *pInfo) { void *tdFreeRSmaInfo(SRSmaInfo *pInfo) {
if (pInfo) { if (pInfo) {
SSma *pSma = RSMA_INFO_SMA(pInfo);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = &pInfo->items[i]; SRSmaInfoItem *pItem = &pInfo->items[i];
if (pItem->taskInfo) { if (pItem->taskInfo) {
smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pInfo->pSma), pInfo->suid, smaDebug("vgId:%d, stb %" PRIi64 " stop fetch-timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId,
pItem->tmrId, i + 1); i + 1);
taosTmrStopA(&pItem->tmrId); taosTmrStopA(&pItem->tmrId);
tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pInfo->pSma), i + 1); tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1);
} else { } else {
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma),
SMA_VID(pInfo->pSma), pInfo->suid, i + 1); pInfo->suid, i + 1);
} }
} }
taosMemoryFree(pInfo->pTSchema); taosMemoryFree(pInfo->pTSchema);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} else {
smaDebug("vgId:%d, stb %" PRIi64 " no need to destroy rsma info since empty", SMA_VID(pInfo->pSma), pInfo->suid);
} }
return NULL; return NULL;
@ -255,6 +258,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
if (param->qmsg[idx]) { if (param->qmsg[idx]) {
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->refId = RSMA_REF_ID(pRSmaInfo->pStat);
pItem->pRsmaInfo = pRSmaInfo; pItem->pRsmaInfo = pRSmaInfo;
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
if (!pItem->taskInfo) { if (!pItem->taskInfo) {
@ -340,7 +344,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err; goto _err;
} }
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->pSma = pSma; pRSmaInfo->pStat = pStat;
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
@ -522,7 +526,7 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) {
SArray *pResult = NULL; SArray *pResult = NULL;
SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo;
SSma *pSma = pRSmaInfo->pSma; SSma *pSma = RSMA_INFO_SMA(pRSmaInfo);
while (1) { while (1) {
SSDataBlock *output = NULL; SSDataBlock *output = NULL;
@ -585,21 +589,29 @@ _err:
*/ */
static void tdRSmaFetchTrigger(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param; SRSmaInfoItem *pItem = param;
SSma *pSma = pItem->pRsmaInfo->pSma; SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId);
if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed");
return;
}
pSma = RSMA_INFO_SMA(pItem->pRsmaInfo);
// if rsma trigger stat in cancelled or finished, not start fetch task anymore
int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat));
if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_CANCELLED || rsmaTriggerStat == TASK_TRIGGER_STAT_FINISHED) {
smaDebug("vgId:%d, level %" PRIi8 " not fetch since stat is cancelled for table suid:%" PRIi64, SMA_VID(pSma), taosReleaseRef(smaMgmt.smaRef, pItem->refId);
pItem->level, pItem->pRsmaInfo->suid); smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
return; return;
} }
int8_t fetchTriggerStat = int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) { if (fetchTriggerStat == TASK_TRIGGER_STAT_ACTIVE) {
smaDebug("vgId:%d, level %" PRIi8 " stat is active for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma),
pItem->pRsmaInfo->suid); pItem->level, pItem->pRsmaInfo->suid);
tdRefSmaStat(pSma, (SSmaStat *)pStat); tdRefSmaStat(pSma, (SSmaStat *)pStat);
@ -610,9 +622,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdUnRefSmaStat(pSma, (SSmaStat *)pStat); tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
} else { } else {
smaDebug("vgId:%d, level %" PRIi8 " stat is inactive for table suid:%" PRIi64, SMA_VID(pSma), pItem->level, smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive",
pItem->pRsmaInfo->suid); SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid);
} }
_end:
taosReleaseRef(smaMgmt.smaRef, pItem->refId);
} }
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
@ -632,7 +646,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT); tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma));
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat);
@ -1036,7 +1049,7 @@ static void *tdRSmaPersistExec(void *param) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo;
if (!taskInfo) { if (!taskInfo) {
smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue; continue;
} }
@ -1044,27 +1057,20 @@ static void *tdRSmaPersistExec(void *param) {
int32_t len = 0; int32_t len = 0;
int8_t type = (int8_t)(i + 1); int8_t type = (int8_t)(i + 1);
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid,
terrstr(terrno)); i + 1, terrstr(terrno));
goto _err; goto _err;
} }
if (!pOutput || len <= 0) { if (!pOutput || len <= 0) {
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist", smaDebug("vgId:%d, rsma, table %" PRIi64
" level %d serialize qTaskInfo success but no output(len %d), not persist",
vid, pRSmaInfo->suid, i + 1, len); vid, pRSmaInfo->suid, i + 1, len);
taosMemoryFreeClear(pOutput); taosMemoryFreeClear(pOutput);
continue; continue;
} }
smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid, smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo success with len %d, need persist", vid,
pRSmaInfo->suid, i + 1, len); pRSmaInfo->suid, i + 1, len);
#if 0
if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) {
smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid,
i + 1, terrstr(terrno));
} else {
smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1);
}
#endif
if (!isFileCreated) { if (!isFileCreated) {
char qTaskInfoFName[TSDB_FILENAME_LEN]; char qTaskInfoFName[TSDB_FILENAME_LEN];
@ -1084,11 +1090,11 @@ static void *tdRSmaPersistExec(void *param) {
ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN);
tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid, smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d head part(len:%d) appended to offset:%" PRIi64, vid,
pRSmaInfo->suid, i + 1, headLen, toffset); pRSmaInfo->suid, i + 1, headLen, toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset); tdAppendTFile(&tFile, pOutput, len, &toffset);
smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid,
i + 1, len, toffset); pRSmaInfo->suid, i + 1, len, toffset);
taosMemoryFree(pOutput); taosMemoryFree(pOutput);
} }
@ -1098,13 +1104,13 @@ static void *tdRSmaPersistExec(void *param) {
_normal: _normal:
if (isFileCreated) { if (isFileCreated) {
if (tdUpdateTFileHeader(&tFile) < 0) { if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno)); tstrerror(terrno));
tdCloseTFile(&tFile); tdCloseTFile(&tFile);
tdRemoveTFile(&tFile); tdRemoveTFile(&tFile);
goto _err; goto _err;
} else { } else {
smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); smaDebug("vgId:%d, rsma, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile));
} }
tdCloseTFile(&tFile); tdCloseTFile(&tFile);
@ -1114,10 +1120,10 @@ _normal:
char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]); char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_F]);
strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); strncpy(pos, tdQTaskInfoFname[TD_QTASK_TMP_F], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName));
if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) {
smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); smaError("vgId:%d, rsma, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
goto _err; goto _err;
} else { } else {
smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); smaDebug("vgId:%d, rsma, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName);
} }
} }
goto _end; goto _end;
@ -1129,13 +1135,13 @@ _end:
if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_INACTIVE, TASK_TRIGGER_STAT_INACTIVE,
TASK_TRIGGER_STAT_ACTIVE)) { TASK_TRIGGER_STAT_ACTIVE)) {
smaDebug("vgId:%d, persist task is active again", vid); smaDebug("vgId:%d, rsma persist task is active again", vid);
} else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat),
TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) { TASK_TRIGGER_STAT_FINISHED)) {
smaDebug("vgId:%d, persist task is cancelled", vid); smaDebug("vgId:%d, rsma persist task is cancelled", vid);
} else { } else {
smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); smaWarn("vgId:%d, rsma persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
ASSERT(0); ASSERT(0);
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
@ -1179,9 +1185,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/ */
static void tdRSmaPersistTrigger(void *param, void *tmrId) { static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *rsmaStat = param; SRSmaStat *rsmaStat = param;
int64_t refId = rsmaStat->refId; SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId);
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, refId);
if (!pRSmaStat) { if (!pRSmaStat) {
smaDebug("rsma persistence task not start since already destroyed"); smaDebug("rsma persistence task not start since already destroyed");
return; return;
@ -1221,5 +1226,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
} break; } break;
} }
taosReleaseRef(smaMgmt.smaRef, refId); taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId);
} }

View File

@ -39,6 +39,21 @@ static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName);
} }
void static addDbPrecisonParam(SNodeList** pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
pVal->isDuration = false;
pVal->translate = true;
pVal->notReserved = true;
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
pVal->node.resType.precision = precision;
pVal->datum.i = (int64_t)precision;
pVal->typeData = (int64_t)precision;
nodesListMakeAppend(pList, (SNode*)pVal);
}
// There is only one parameter of numeric type, and the return type is parameter type // There is only one parameter of numeric type, and the return type is parameter type
static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
@ -220,8 +235,20 @@ static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
//add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
return TSDB_CODE_SUCCESS;
}
static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters // pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP}; pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_TIMESTAMP};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -990,6 +1017,10 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
} }
//add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1379,20 +1410,6 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
return true; return true;
} }
void static addDbPrecisonParam(SNodeList* pList, uint8_t precision) {
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
pVal->literal = NULL;
pVal->isDuration = false;
pVal->translate = true;
pVal->node.resType.type = TSDB_DATA_TYPE_TINYINT;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TINYINT].bytes;
pVal->node.resType.precision = precision;
pVal->datum.i = (int64_t)precision;
pVal->typeData = (int64_t)precision;
nodesListAppend(pList, (SNode*)pVal);
}
void static addTimezoneParam(SNodeList* pList) { void static addTimezoneParam(SNodeList* pList) {
char buf[6] = {0}; char buf[6] = {0};
time_t t = taosTime(NULL); time_t t = taosTime(NULL);
@ -1462,7 +1479,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1482,7 +1499,7 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = pFunc->node.resType =
(SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
@ -1510,7 +1527,7 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
//add database precision as param //add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision; uint8_t dbPrec = pFunc->node.resType.precision;
addDbPrecisonParam(pFunc->pParameterList, dbPrec); addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1938,6 +1955,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "last_row", .name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW, .type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunction,
.finalizeFunc = firstLastFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
},
{
.name = "_cache_last_row",
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateLastRow, .translateFunc = translateLastRow,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getMinmaxFuncEnv,
.initFunc = minmaxFunctionSetup, .initFunc = minmaxFunctionSetup,
@ -2466,7 +2496,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "now", .name = "now",
.type = FUNCTION_TYPE_NOW, .type = FUNCTION_TYPE_NOW,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateNowToday,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = nowFunction, .sprocessFunc = nowFunction,
@ -2476,7 +2506,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "today", .name = "today",
.type = FUNCTION_TYPE_TODAY, .type = FUNCTION_TYPE_TODAY,
.classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_DATETIME_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateNowToday,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = todayFunction, .sprocessFunc = todayFunction,

View File

@ -4283,7 +4283,7 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
// TODO: process timeUnit for different db precisions // TODO: process timeUnit for different db precisions
int32_t timeUnit = 1000; int32_t timeUnit = 1;
if (pCtx->numOfParams == 5) { // TODO: param number incorrect if (pCtx->numOfParams == 5) { // TODO: param number incorrect
timeUnit = pCtx->param[3].param.i; timeUnit = pCtx->param[3].param.i;
} }
@ -5508,7 +5508,7 @@ int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double result = doCalcRate(pInfo, 1000); double result = doCalcRate(pInfo, (double)TSDB_TICK_PER_SECOND(pCtx->param[1].param.i));
colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes);
return pResInfo->numOfRes; return pResInfo->numOfRes;

View File

@ -154,16 +154,12 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot); return createRootLogicNode(pCxt, pSelect, pSelect->precision, (FCreateLogicNode)func, pRoot);
} }
static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pScanPseudoCols, static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
SNodeList* pScanCols, int8_t tableType) { int8_t tableType) {
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
return SCAN_TYPE_STREAM; return SCAN_TYPE_STREAM;
} }
if (pSelect->hasLastRowFunc) {
return SCAN_TYPE_LAST_ROW;
}
if (NULL == pScanCols) { if (NULL == pScanCols) {
// select count(*) from t // select count(*) from t
return NULL == pScanPseudoCols return NULL == pScanPseudoCols
@ -279,7 +275,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
} }
pScan->scanType = getScanType(pCxt, pSelect, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
@ -474,6 +470,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pAgg->hasLastRow = pSelect->hasLastRowFunc;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions // set grouyp keys, agg funcs and having conditions

View File

@ -1622,6 +1622,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
} }
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) {
return false;
}
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) {
return false;
}
}
return true;
}
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
if (NULL == pAgg) {
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
pFunc->functionName[len] = '\0';
fmGetFuncInfo(pFunc, NULL, 0);
}
pAgg->hasLastRow = false;
((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}
// merge projects // merge projects
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
@ -1710,7 +1750,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize} {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}
}; };
// clang-format on // clang-format on

View File

@ -1344,7 +1344,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys); &pMerge->pMergeKeys);
} }

View File

@ -197,6 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
@ -431,7 +433,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
@ -887,6 +889,16 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code; return code;
} }
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
}
++(pCxt->groupId);
return code;
}
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (pCxt->pPlanCxt->rSmaQuery) { if (pCxt->pPlanCxt->rSmaQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -905,6 +917,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
code = stbSplSplitJoinNode(pCxt, &info); code = stbSplSplitJoinNode(pCxt, &info);
break; break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = stbSplSplitPartitionNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info); code = stbSplSplitAggNode(pCxt, &info);
break; break;

View File

@ -99,6 +99,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
run("SELECT LAST_ROW(c1, c2) FROM t1"); run("SELECT LAST_ROW(c1, c2) FROM t1");
run("SELECT LAST_ROW(c1) FROM st1"); run("SELECT LAST_ROW(c1) FROM st1");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
} }
TEST_F(PlanBasicTest, sampleFunc) { TEST_F(PlanBasicTest, sampleFunc) {

View File

@ -1515,7 +1515,10 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} }
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI); int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
int64_t ts = taosGetTimestamp(timePrec);
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts); colDataAppendInt64(pOutput->columnData, i, &ts);
} }
@ -1524,7 +1527,10 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
} }
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI); int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[0]), pInput[0].columnData->pData);
int64_t ts = taosGetTimestampToday(timePrec);
for (int32_t i = 0; i < pInput->numOfRows; ++i) { for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts); colDataAppendInt64(pOutput->columnData, i, &ts);
} }

View File

@ -38,15 +38,15 @@ class TDTestCase:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10) tdSql.checkRows(10)
if j in ['LT' ,'lt','Lt','lT']: if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']: elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']: elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']: elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
@ -54,11 +54,11 @@ class TDTestCase:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test") tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10) tdSql.checkRows(10)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
@ -66,34 +66,34 @@ class TDTestCase:
for i in error_column_list: for i in error_column_list:
for j in self.param_list: for j in self.param_list:
tdSql.error(f"select stateduration({i},{j},5) from test") tdSql.error(f"select stateduration({i},{j},5) from test")
error_param_list = ['a',1] error_param_list = ['a',1]
for i in error_param_list: for i in error_param_list:
tdSql.error(f"select stateduration(col1,{i},5) from test") tdSql.error(f"select stateduration(col1,{i},5) from test")
# timestamp = 1s, time_unit =1s # timestamp = 1s, time_unit =1s
tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num): for i in range(self.row_num):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) % (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list: for i in integer_list:
for j in self.param_list: for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test1") tdSql.query(f"select stateduration(col{i},'{j}',5) from test1")
tdSql.checkRows(10) tdSql.checkRows(10)
# print(tdSql.queryResult) # print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']: if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']: elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['LE','le','Le','lE']: elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']: elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,), (5000,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
for j in self.param_list: for j in self.param_list:
@ -101,22 +101,22 @@ class TDTestCase:
tdSql.checkRows(10) tdSql.checkRows(10)
print(tdSql.queryResult) print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']: if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']: elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1000,), (2000,), (3000,), (4000,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1000,), (2000,), (3000,), (4000,), (5000,), (6000,), (7000,), (8000,), (9000,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# timestamp = 1m, time_unit =1m # timestamp = 1m, time_unit =1m
tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num): for i in range(self.row_num):
tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) % (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list: for i in integer_list:
for j in self.param_list: for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2") tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2")
@ -132,7 +132,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
for j in self.param_list: for j in self.param_list:
@ -147,14 +147,14 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# timestamp = 1h, time_unit =1h # timestamp = 1h, time_unit =1h
tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num): for i in range(self.row_num):
tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) % (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list: for i in integer_list:
for j in self.param_list: for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3") tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3")
@ -170,7 +170,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
for j in self.param_list: for j in self.param_list:
@ -202,7 +202,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
for j in self.param_list: for j in self.param_list:
@ -219,13 +219,13 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# for stb # for stb
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double, tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''') col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''')
tdSql.execute('create table stb_1 using stb tags(1)') tdSql.execute('create table stb_1 using stb tags(1)')
for i in range(self.row_num): for i in range(self.row_num):
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)" tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1)) % (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list: for i in integer_list:
for j in self.param_list: for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb") tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb")
@ -241,7 +241,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']: elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list: for i in float_list:
for j in self.param_list: for j in self.param_list:
@ -256,10 +256,10 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]) tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']: elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)]) tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())

View File

@ -9,28 +9,22 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import TDDnodes from util.dnodes import TDDnodes
from util.dnodes import TDDnode from util.dnodes import TDDnode
from util.cluster import *
import time import time
import socket import socket
import subprocess import subprocess
sys.path.append("./6-cluster")
class MyDnodes(TDDnodes): from clusterCommonCreate import *
def __init__(self ,dnodes_lists): from clusterCommonCheck import *
super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False
class TDTestCase: class TDTestCase:
noConn = True
def init(self,conn ,logSql): def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None tdSql.init(conn.cursor())
self.depoly_cluster(5) self.host = socket.gethostname()
self.master_dnode = self.TDDnodes.dnodes[0]
self.host=self.master_dnode.cfgDict["fqdn"]
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
tdSql.init(conn1.cursor())
def getBuildPath(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
@ -41,90 +35,12 @@ class TDTestCase:
projPath = selfPath[:selfPath.find("tests")] projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath): for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files): if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root)) rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath): if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
break break
return buildPath return buildPath
def depoly_cluster(self ,dnodes_nums):
testCluster = False
valgrind = 0
hostname = socket.gethostname()
dnodes = []
start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes)
self.TDDnodes.init("")
self.TDDnodes.setTestCluster(testCluster)
self.TDDnodes.setValgrind(valgrind)
self.TDDnodes.stopAll()
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.deploy(dnode.index,{})
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.starttaosd(dnode.index)
# create cluster
for dnode in self.TDDnodes.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s \"create dnode \\\"{dnode_id}\\\"\""
print(cmd)
os.system(cmd)
time.sleep(2)
tdLog.info(" create cluster done! ")
def five_dnode_one_mnode(self):
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
tdSql.query("show mnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')
tdSql.checkData(0,3,'ready')
tdSql.error("create mnode on dnode 1;")
tdSql.error("drop mnode on dnode 1;")
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db replica 1 duration 300")
tdSql.execute("use db")
tdSql.execute(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql.execute(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
def five_dnode_two_mnode(self): def five_dnode_two_mnode(self):
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
@ -187,6 +103,34 @@ class TDTestCase:
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
print(tdSql.queryResult) print(tdSql.queryResult)
clusterComCheck.checkDnodes(5)
# restart all taosd
tdDnodes=cluster.dnodes
# stop follower
tdLog.info("stop follower")
tdDnodes[1].stoptaosd()
if cluster.checkConnectStatus(0) :
print("cluster also work")
# start follower
tdLog.info("start follower")
tdDnodes[1].starttaosd()
if clusterComCheck.checkMnodeStatus(2) :
print("both mnodes are ready")
# stop leader
tdLog.info("stop leader")
tdDnodes[0].stoptaosd()
try:
cluster.checkConnectStatus(2)
tdLog.exit(" The election still succeeds when leader of both mnodes are killed ")
except Exception:
pass
tdLog.info("start leader")
tdDnodes[0].starttaosd()
if clusterComCheck.checkMnodeStatus(2) :
print("both mnodes are ready")
# # fisrt drop follower of mnode # # fisrt drop follower of mnode
# BUG # BUG
@ -229,8 +173,6 @@ class TDTestCase:
def run(self): def run(self):
print(self.master_dnode.cfgDict)
self.five_dnode_one_mnode()
self.five_dnode_two_mnode() self.five_dnode_two_mnode()

View File

@ -88,7 +88,7 @@ class TDTestCase:
tdLog.info("Confirm the status of the dnode again") tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
print(tdSql.queryResult) # print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers) clusterComCheck.checkDnodes(dnodenumbers)
# restart all taosd # restart all taosd
tdDnodes=cluster.dnodes tdDnodes=cluster.dnodes
@ -108,18 +108,6 @@ class TDTestCase:
tdDnodes[0].starttaosd() tdDnodes[0].starttaosd()
clusterComCheck.checkMnodeStatus(3) clusterComCheck.checkMnodeStatus(3)
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0
while stopcount <= 2:
tdLog.info("first restart loop")
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd()
stopcount+=1
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self): def run(self):
# print(self.master_dnode.cfgDict) # print(self.master_dnode.cfgDict)

View File

@ -0,0 +1,118 @@
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
from util.cluster import *
from test import tdDnodes
sys.path.append("./6-cluster")
from clusterCommonCreate import *
from clusterCommonCheck import *
import time
import socket
import subprocess
from multiprocessing import Process
class TDTestCase:
def init(self,conn ,logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.host = socket.gethostname()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def fiveDnodeThreeMnode(self,dnodenumbers,mnodeNums,restartNumber):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'replica': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
dnodenumbers=int(dnodenumbers)
mnodeNums=int(mnodeNums)
dbNumbers = int(dnodenumbers * restartNumber)
tdLog.info("first check dnode and mnode")
tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(4,1,'%s:6430'%self.host)
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(1)
# fisr add three mnodes;
tdLog.info("fisr add three mnodes and check mnode status")
tdSql.execute("create mnode on dnode 2")
clusterComCheck.checkMnodeStatus(2)
tdSql.execute("create mnode on dnode 3")
clusterComCheck.checkMnodeStatus(3)
# add some error operations and
tdLog.info("Confirm the status of the dnode again")
tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;")
# print(tdSql.queryResult)
clusterComCheck.checkDnodes(dnodenumbers)
# restart all taosd
tdDnodes=cluster.dnodes
tdLog.info("Take turns stopping all dnodes ")
# seperate vnode and mnode in different dnodes.
# create database and stable
stopcount =0
while stopcount <= 2:
tdLog.info(" restart loop: %d"%stopcount )
for i in range(dnodenumbers):
tdDnodes[i].stoptaosd()
tdDnodes[i].starttaosd()
stopcount+=1
clusterComCheck.checkDnodes(dnodenumbers)
clusterComCheck.checkMnodeStatus(3)
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(5,3,1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -14,19 +14,24 @@ sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
class TDTestCase: class TDTestCase:
paraDict = {'dbName': 'db12', paraDict = {'dbName': 'db12',
'dropFlag':1, 'dropFlag': 1,
'vgroups': 4, 'event': '',
'precision': 'ms', 'vgroups': 4,
'stbName': 'stb0', 'stbName': 'stb0',
'ctbNum': 10, 'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':16, 'count':1}, {'type': 'timestamp','count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000, 'rowsPerTbl': 10000,
'batchNum': 10, 'batchNum': 10,
'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'event':'', 'pollDelay': 20,
'columnDict': {'int':2}, 'showMsg': 1,
'tagDict': {'int':1} 'showRow': 1}
}
cdbName = 'cdb' cdbName = 'cdb'
# some parameter to consumer processor # some parameter to consumer processor
@ -57,17 +62,18 @@ class TDTestCase:
tmqCom.initConsumerTable(self.cdbName) tmqCom.initConsumerTable(self.cdbName)
tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"])
self.paraDict["stbName"] = 'stb1' self.paraDict["stbName"] = 'stb1'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
self.paraDict["stbName"] = 'stb2' self.paraDict["stbName"] = 'stb2'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) self.paraDict["ctbPrefix"] = 'newctb'
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"])
tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
tdLog.info("create topics from db") tdLog.info("create topics from db")
topicName1 = 'topic_%s'%(self.paraDict['dbName']) topicName1 = 'topic_%s'%(self.paraDict['dbName'])
@ -97,7 +103,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
time.sleep(15) time.sleep(10)
tdSql.query("drop topic %s"%topicName1) tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 12 end ...... ") tdLog.printNoPrefix("======== test case 12 end ...... ")

View File

@ -76,6 +76,22 @@ class TMQCom:
resultList.append(tdSql.getData(i , 3)) resultList.append(tdSql.getData(i , 3))
return resultList return resultList
def selectConsumeMsgResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 2))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0): def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()

View File

@ -116,17 +116,18 @@ python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/queryQnode.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3 # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3 # python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 5 -M 3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 # python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3 # python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/basic5.py
@ -135,6 +136,7 @@ python3 ./test.py -f 7-tmq/subscribeDb0.py
python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb2.py
python3 ./test.py -f 7-tmq/subscribeDb3.py python3 ./test.py -f 7-tmq/subscribeDb3.py
#python3 ./test.py -f 7-tmq/subscribeDb4.py
python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb1.py

View File

@ -635,6 +635,9 @@ void loop_consume(SThreadInfo* pInfo) {
} }
} }
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
@ -646,7 +649,15 @@ void loop_consume(SThreadInfo* pInfo) {
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 10 * 1000) {
taosFprintfFile(g_fp,
"consumer id %d has currently poll total msgs: %" PRId64 "\n",
pInfo->consumerId, totalMsgs);
lastPrintTime = currentPrintTime;
}
if (0 == once_flag) { if (0 == once_flag) {
once_flag = 1; once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
@ -663,7 +674,7 @@ void loop_consume(SThreadInfo* pInfo) {
break; break;
} }
} }
if (0 == running) { if (0 == running) {
taosFprintfFile(g_fp, "receive stop signal and not continue consume\n"); taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
} }
@ -676,8 +687,6 @@ void loop_consume(SThreadInfo* pInfo) {
} }
void* consumeThreadFunc(void* param) { void* consumeThreadFunc(void* param) {
int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
@ -859,12 +868,27 @@ int main(int32_t argc, char* argv[]) {
(void*)(&(g_stConfInfo.stThreads[i]))); (void*)(&(g_stConfInfo.stThreads[i])));
} }
int64_t start = taosGetTimestampUs();
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
taosThreadClear(&g_stConfInfo.stThreads[i].thread); taosThreadClear(&g_stConfInfo.stThreads[i].thread);
} }
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); int64_t end = taosGetTimestampUs();
int64_t totalMsgs = 0;
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
}
int64_t t = end - start;
if (0 == t) t = 1;
double tInMs = (double)t / 1000000.0;
taosFprintfFile(g_fp,
"Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n",
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);