From ff5fd96af6b0b7c787cf0e49147b8c334151b45c Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Fri, 29 Apr 2022 15:36:17 +0800 Subject: [PATCH 01/25] use docker container to run cases --- Jenkinsfile2 | 35 +-- tests/parallel_test/collect_cases.sh | 17 ++ tests/parallel_test/container_build.sh | 59 +++++ tests/parallel_test/run.sh | 342 +++++++++++++++++++++++++ tests/parallel_test/run_case.sh | 74 ++++++ tests/parallel_test/run_container.sh | 103 ++++++++ tests/unit-test/test.sh | 40 +++ 7 files changed, 643 insertions(+), 27 deletions(-) create mode 100755 tests/parallel_test/collect_cases.sh create mode 100755 tests/parallel_test/container_build.sh create mode 100755 tests/parallel_test/run.sh create mode 100755 tests/parallel_test/run_case.sh create mode 100755 tests/parallel_test/run_container.sh create mode 100755 tests/unit-test/test.sh diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 96e29328fa..dacb0a8702 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -73,23 +73,11 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git submodule update --init --recursive - ''' - sh ''' - cd ${WKC} - export TZ=Asia/Harbin - date - rm -rf debug - mkdir debug - cd debug - cmake .. > /dev/null - make -j4> /dev/null ''' sh ''' cd ${WKPY} git reset --hard git pull - pip3 install . ''' return 1 } @@ -161,9 +149,9 @@ pipeline { agent none options { skipDefaultCheckout() } environment{ - WK = '/var/lib/jenkins/workspace/TDinternal' - WKC= '/var/lib/jenkins/workspace/TDengine' - WKPY= '/var/lib/jenkins/workspace/taos-connector-python' + WKDIR = '/var/lib/jenkins/workspace/v3.0' + WKC= '/var/lib/jenkins/workspace/v3.0/TDengine' + WKPY= '/var/lib/jenkins/workspace/v3.0/taos-connector-python' } stages { stage('run test') { @@ -182,20 +170,13 @@ pipeline { changeRequest() } steps { - timeout(time: 45, unit: 'MINUTES'){ + timeout(time: 20, unit: 'MINUTES'){ pre_test() sh ''' - cd ${WKC}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - sh ''' - cd ${WKC}/tests - ./test-all.sh b1fq + cd ${WKC}/tests/parallel_test + time ./container_build.sh -w ${WKDIR} -t 8 + ./collect_cases.sh + time ./run.sh -m /home/m.json -t /tmp/cases.task -b 3.0 -l ${WKDIR}/log ''' } } diff --git a/tests/parallel_test/collect_cases.sh b/tests/parallel_test/collect_cases.sh new file mode 100755 index 0000000000..a513d62d94 --- /dev/null +++ b/tests/parallel_test/collect_cases.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +case_file=/tmp/cases.task + +if [ ! -z $1 ]; then + case_file="$1" +fi + +script_dir=`dirname $0` +cd $script_dir + +echo ",,unit-test,bash test.sh" >$case_file +cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file +grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file + +exit 0 + diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh new file mode 100755 index 0000000000..c1571bf279 --- /dev/null +++ b/tests/parallel_test/container_build.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -e enterprise edition" + echo -e "\t -t make thread count" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:t:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + e) + ent=1 + ;; + t) + THREAD_COUNT=$OPTARG + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$THREAD_COUNT" ]; then + THREAD_COUNT=1 +fi + +ulimit -c unlimited + +if [ $ent -eq 0 ]; then + REP_DIR=/home/TDengine + REP_MOUNT_PARAM=$WORKDIR/TDengine:/home/TDengine +else + REP_DIR=/home/TDinternal + REP_MOUNT_PARAM=$WORKDIR/TDinternal:/home/TDinternal +fi + +docker run \ + -v $REP_MOUNT_PARAM \ + --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake ..;make -j $THREAD_COUNT" + +ret=$? +exit $ret + diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh new file mode 100755 index 0000000000..a2d3da1564 --- /dev/null +++ b/tests/parallel_test/run.sh @@ -0,0 +1,342 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -m vm config file" + echo -e "\t -t task file" + echo -e "\t -b branch" + echo -e "\t -l log dir" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "m:t:b:l:o:eh" opt; do + case $opt in + m) + config_file=$OPTARG + ;; + t) + t_file=$OPTARG + ;; + b) + branch=$OPTARG + ;; + l) + log_dir=$OPTARG + ;; + e) + ent=1 + ;; + o) + timeout_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done +#config_file=$1 +if [ -z $config_file ]; then + usage + exit 1 +fi +if [ ! -f $config_file ]; then + echo "$config_file not found" + usage + exit 1 +fi +#t_file=$2 +if [ -z $t_file ]; then + usage + exit 1 +fi +if [ ! -f $t_file ]; then + echo "$t_file not found" + usage + exit 1 +fi +date_tag=`date +%Y%m%d-%H%M%S` +if [ -z $log_dir ]; then + log_dir="log/${branch}_${date_tag}" +else + log_dir="$log_dir/${branch}_${date_tag}" +fi + +hosts=() +usernames=() +passwords=() +workdirs=() +threads=() + +i=0 +while [ 1 ]; do + host=`jq .[$i].host $config_file` + if [ "$host" = "null" ]; then + break + fi + username=`jq .[$i].username $config_file` + if [ "$username" = "null" ]; then + break + fi + password=`jq .[$i].password $config_file` + if [ "$password" = "null" ]; then + password="" + fi + workdir=`jq .[$i].workdir $config_file` + if [ "$workdir" = "null" ]; then + break + fi + thread=`jq .[$i].thread $config_file` + if [ "$thread" = "null" ]; then + break + fi + hosts[i]=`echo $host|sed 's/\"$//'|sed 's/^\"//'` + usernames[i]=`echo $username|sed 's/\"$//'|sed 's/^\"//'` + passwords[i]=`echo $password|sed 's/\"$//'|sed 's/^\"//'` + workdirs[i]=`echo $workdir|sed 's/\"$//'|sed 's/^\"//'` + threads[i]=$thread + i=$(( i + 1 )) +done + + +function prepare_cases() { + cat $t_file >>$task_file + local i=0 + while [ $i -lt $1 ]; do + echo "%%FINISHED%%" >>$task_file + i=$(( i + 1 )) + done +} + +function clean_tmp() { + # clean tmp dir + local index=$1 + local ssh_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + ssh_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local cmd="${ssh_script} rm -rf ${workdirs[index]}/tmp" + ${cmd} +} + +function run_thread() { + local index=$1 + local thread_no=$2 + local runcase_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + runcase_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local count=0 + local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh" + if [ $ent -ne 0 ]; then + local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh" + fi + local cmd="${runcase_script} ${script}" + + # script="echo" + while [ 1 ]; do + local line=`flock -x $lock_file -c "head -n1 $task_file;sed -i \"1d\" $task_file"` + if [ "x$line" = "x%%FINISHED%%" ]; then + # echo "$index . $thread_no EXIT" + break + fi + if [ -z "$line" ]; then + continue + fi + echo "$line"|grep -q "^#" + if [ $? -eq 0 ]; then + continue + fi + local case_redo_time=`echo "$line"|cut -d, -f2` + if [ -z "$case_redo_time" ]; then + case_redo_time=${DEFAULT_RETRY_TIME:-2} + fi + local exec_dir=`echo "$line"|cut -d, -f3` + local case_cmd=`echo "$line"|cut -d, -f4` + local case_file="" + echo "$case_cmd"|grep -q "\.sh" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sh"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "^python3" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "\.sim" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + case_file=`echo "$case_cmd"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + continue + fi + case_file="$exec_dir/${case_file}.${index}.${thread_no}.${count}" + count=$(( count + 1 )) + local case_path=`dirname "$case_file"` + if [ ! -z "$case_path" ]; then + mkdir -p $log_dir/$case_path + fi + cmd="${runcase_script} ${script} -w ${workdirs[index]} -c \"${case_cmd}\" -t ${thread_no} -d ${exec_dir} ${timeout_param}" + # echo "$thread_no $count $cmd" + local ret=0 + local redo_count=1 + start_time=`date +%s` + while [ ${redo_count} -lt 6 ]; do + if [ -f $log_dir/$case_file.log ]; then + cp $log_dir/$case_file.log $log_dir/$case_file.${redo_count}.redolog + fi + echo "${hosts[index]}-${thread_no} order:${count}, redo:${redo_count} task:${line}" >$log_dir/$case_file.log + echo -e "\e[33m >>>>> \e[0m ${case_cmd}" + date >>$log_dir/$case_file.log + # $cmd 2>&1 | tee -a $log_dir/$case_file.log + # ret=${PIPESTATUS[0]} + $cmd >>$log_dir/$case_file.log 2>&1 + ret=$? + echo "${hosts[index]} `date` ret:${ret}" >>$log_dir/$case_file.log + if [ $ret -eq 0 ]; then + break + fi + redo=0 + grep -q "wait too long for taosd start" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "ssh_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: read: Connection reset by peer" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Database not ready" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Unable to establish connection" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + if [ $redo_count -lt $case_redo_time ]; then + redo=1 + fi + if [ $redo -eq 0 ]; then + break + fi + redo_count=$(( redo_count + 1 )) + done + end_time=`date +%s` + echo >>$log_dir/$case_file.log + echo "${hosts[index]} execute time: $(( end_time - start_time ))s" >>$log_dir/$case_file.log + # echo "$thread_no ${line} DONE" + if [ $ret -ne 0 ]; then + flock -x $lock_file -c "echo \"${hosts[index]} ret:${ret} ${line}\" >>$log_dir/failed.log" + mkdir -p $log_dir/${case_file}.coredump + local remote_coredump_dir="${workdirs[index]}/tmp/thread_volume/$thread_no/coredump" + local scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_coredump_dir}/* $log_dir/${case_file}.coredump/" + $cmd # 2>/dev/null + local case_info=`echo "$line"|cut -d, -f 3,4` + local corefile=`ls $log_dir/${case_file}.coredump/` + corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"` + echo -e "$case_info \e[31m failed\e[0m" + echo "=========================log============================" + cat $log_dir/$case_file.log + echo "=====================================================" + echo -e "\e[34m log file: $log_dir/$case_file.log \e[0m" + if [ ! -z "$corefile" ]; then + echo -e "\e[34m corefiles: $corefile \e[0m" + local build_dir=$log_dir/build_${hosts[index]} + local remote_build_dir="${workdirs[index]}/TDengine/debug/build" + if [ $ent -ne 0 ]; then + remote_build_dir="${workdirs[index]}/TDinternal/debug/build" + fi + mkdir $build_dir 2>/dev/null + if [ $? -eq 0 ]; then + # scp build binary + cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/" + echo "$cmd" + $cmd >/dev/null + fi + fi + fi + done +} + +# echo "hosts: ${hosts[@]}" +# echo "usernames: ${usernames[@]}" +# echo "passwords: ${passwords[@]}" +# echo "workdirs: ${workdirs[@]}" +# echo "threads: ${threads[@]}" +# TODO: check host accessibility + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + clean_tmp $i & + i=$(( i + 1 )) +done +wait + +mkdir -p $log_dir +rm -rf $log_dir/* +task_file=$log_dir/$$.task +lock_file=$log_dir/$$.lock + +i=0 +j=0 +while [ $i -lt ${#hosts[*]} ]; do + j=$(( j + threads[i] )) + i=$(( i + 1 )) +done +prepare_cases $j + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + j=0 + while [ $j -lt ${threads[i]} ]; do + run_thread $i $j & + j=$(( j + 1 )) + done + i=$(( i + 1 )) +done + +wait + +rm -f $lock_file +rm -f $task_file + +# docker ps -a|grep -v CONTAINER|awk '{print $1}'|xargs docker rm -f +RET=0 +i=1 +if [ -f "$log_dir/failed.log" ]; then + echo "=====================================================" + while read line; do + line=`echo "$line"|cut -d, -f 3,4` + echo -e "$i. $line \e[31m failed\e[0m" >&2 + i=$(( i + 1 )) + done <$log_dir/failed.log + RET=1 +fi + +echo "${log_dir}" >&2 + +date + +exit $RET diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh new file mode 100755 index 0000000000..9705c024b8 --- /dev/null +++ b/tests/parallel_test/run_case.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "d:c:o:eh" opt; do + case $opt in + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + o) + TIMEOUT_CMD="timeout $OPTARG" + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$exec_dir" ]; then + usage + exit 0 +fi +if [ -z "$cmd" ]; then + usage + exit 0 +fi + +if [ $ent -eq 0 ]; then + export PATH=$PATH:/home/TDengine/debug/build/bin + export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib + ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDengine +else + export PATH=$PATH:/home/TDinternal/debug/build/bin + export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib + ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDinternal/community +fi +mkdir -p /var/lib/taos/subscribe +mkdir -p /var/log/taos +mkdir -p /var/lib/taos + +cd $CONTAINER_TESTDIR/tests/$exec_dir +ulimit -c unlimited + +$TIMEOUT_CMD $cmd +RET=$? + +if [ $RET -ne 0 ]; then + pwd +fi + +exit $RET + diff --git a/tests/parallel_test/run_container.sh b/tests/parallel_test/run_container.sh new file mode 100755 index 0000000000..b0337febcc --- /dev/null +++ b/tests/parallel_test/run_container.sh @@ -0,0 +1,103 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -t thread number" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:d:c:t:o:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + t) + thread_no=$OPTARG + ;; + e) + ent=1 + ;; + o) + extra_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$exec_dir" ]; then + usage + exit 1 +fi +if [ -z "$cmd" ]; then + usage + exit 1 +fi +if [ -z "$thread_no" ]; then + usage + exit 1 +fi +if [ $ent -ne 0 ]; then + # enterprise edition + extra_param="$extra_param -e" + INTERNAL_REPDIR=$WORKDIR/TDinternal + REPDIR=$INTERNAL_REPDIR/community + CONTAINER_TESTDIR=/home/TDinternal/community + REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal" +else + # community edition + REPDIR=$WORKDIR/TDengine + CONTAINER_TESTDIR=/home/TDengine + REP_MOUNT_PARAM="$REPDIR:/home/TDengine" +fi + +ulimit -c unlimited + +TMP_DIR=$WORKDIR/tmp + +MOUNT_DIR="" +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump +rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/* +if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then + subdir=`echo "$exec_dir"|cut -d/ -f1` + echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/" + cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/ +fi +MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir" +echo "$thread_no -> ${exec_dir}:$cmd" +coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname` + +docker run \ + -v $REP_MOUNT_PARAM \ + -v $MOUNT_DIR \ + -v "$TMP_DIR/thread_volume/$thread_no/sim:${CONTAINER_TESTDIR}/sim" \ + -v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \ + -v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \ + --rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param +ret=$? +exit $ret + diff --git a/tests/unit-test/test.sh b/tests/unit-test/test.sh new file mode 100755 index 0000000000..4122597717 --- /dev/null +++ b/tests/unit-test/test.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "eh" opt; do + case $opt in + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +script_dir=`dirname $0` +cd ${script_dir} +PWD=`pwd` + +if [ $ent -eq 0 ]; then + cd ../../debug +else + cd ../../../debug +fi + +ctest -j8 +ret=$? +exit $ret + From e27ebe71ac5673f28f3c624a16ca36279feba5a4 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Fri, 29 Apr 2022 15:49:51 +0800 Subject: [PATCH 02/25] remove bdb related --- Jenkinsfile2 | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index dacb0a8702..3f3124a680 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -59,7 +59,6 @@ def pre_test(){ sh ''' cd ${WKC} git checkout 3.0 - [ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. ''' } else { sh ''' @@ -176,7 +175,7 @@ pipeline { cd ${WKC}/tests/parallel_test time ./container_build.sh -w ${WKDIR} -t 8 ./collect_cases.sh - time ./run.sh -m /home/m.json -t /tmp/cases.task -b 3.0 -l ${WKDIR}/log + time ./run.sh -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log ''' } } From fed9611473ec2978ae52ecaad96c078f282eeedc Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Fri, 29 Apr 2022 15:58:25 +0800 Subject: [PATCH 03/25] print date time before test --- Jenkinsfile2 | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 3f3124a680..5eed35ed98 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -173,8 +173,11 @@ pipeline { pre_test() sh ''' cd ${WKC}/tests/parallel_test + date time ./container_build.sh -w ${WKDIR} -t 8 + rm -f /tmp/cases.task ./collect_cases.sh + date time ./run.sh -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log ''' } From 4bac5194e27ae839aaf0be9532cce9b60443a7b3 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Fri, 6 May 2022 15:12:57 +0800 Subject: [PATCH 04/25] collect sim/log directory --- tests/parallel_test/run.sh | 15 +++++++++++++++ tests/parallel_test/run_container.sh | 1 + 2 files changed, 16 insertions(+) diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh index a2d3da1564..0ceb1cff5b 100755 --- a/tests/parallel_test/run.sh +++ b/tests/parallel_test/run.sh @@ -276,6 +276,21 @@ function run_thread() { $cmd >/dev/null fi fi + # get remote sim dir + local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no" + local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + tarcmd="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + cmd="$tarcmd sh -c \"cd $remote_sim_dir; tar -czf sim.tar.gz sim\"" + $cmd + local remote_sim_tar="${workdirs[index]}/tmp/thread_volume/$thread_no/sim.tar.gz" + scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz" + $cmd fi done } diff --git a/tests/parallel_test/run_container.sh b/tests/parallel_test/run_container.sh index b0337febcc..bd010698dd 100755 --- a/tests/parallel_test/run_container.sh +++ b/tests/parallel_test/run_container.sh @@ -79,6 +79,7 @@ ulimit -c unlimited TMP_DIR=$WORKDIR/tmp MOUNT_DIR="" +rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/* From 897aa85a20aea22bcdca65f40505e94947390f83 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 19:16:42 +0800 Subject: [PATCH 05/25] enh: ctest with enterprise version --- tests/parallel_test/collect_cases.sh | 36 ++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/tests/parallel_test/collect_cases.sh b/tests/parallel_test/collect_cases.sh index a513d62d94..c560598c81 100755 --- a/tests/parallel_test/collect_cases.sh +++ b/tests/parallel_test/collect_cases.sh @@ -2,14 +2,42 @@ case_file=/tmp/cases.task -if [ ! -z $1 ]; then - case_file="$1" -fi +function usage() { + echo "$0" + echo -e "\t -o output case file" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "o:eh" opt; do + case $opt in + o) + case_file=$OPTARG + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done script_dir=`dirname $0` cd $script_dir -echo ",,unit-test,bash test.sh" >$case_file +if [ $ent -eq 0 ]; then + echo ",,unit-test,bash test.sh" >$case_file +else + echo ",,unit-test,bash test.sh -e" >$case_file +fi cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file From a9ff8c28fa070579197b2077cf6232ac0672bfc2 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 19:35:49 +0800 Subject: [PATCH 06/25] refactor: always build enterprise edition and taos-tool --- Jenkinsfile2 | 54 +++++--------------------- tests/parallel_test/container_build.sh | 2 +- 2 files changed, 10 insertions(+), 46 deletions(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index a86a2535aa..7377e33c1e 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -34,16 +34,6 @@ def abort_previous(){ } def pre_test(){ sh 'hostname' - sh ''' - date - sudo rmtaos || echo "taosd has not installed" - ''' - sh ''' - killall -9 taosd ||echo "no taosd running" - killall -9 gdb || echo "no gdb running" - killall -9 python3.8 || echo "no python program running" - cd ${WKC} - ''' script { if (env.CHANGE_TARGET == 'master') { sh ''' @@ -106,16 +96,6 @@ def pre_test(){ cd ${WKC} git submodule update --init --recursive ''' - sh ''' - cd ${WK} - export TZ=Asia/Harbin - date - rm -rf debug - mkdir debug - cd debug - cmake .. > /dev/null - make -j4> /dev/null - ''' sh ''' cd ${WKPY} git reset --hard @@ -209,31 +189,15 @@ pipeline { timeout(time: 20, unit: 'MINUTES'){ pre_test() script { - if (env.CHANGE_URL =~ /\/TDengine\//) { - sh ''' - cd ${WKC}/tests/parallel_test - date - time ./container_build.sh -w ${WKDIR} -t 8 - rm -f /tmp/cases.task - ./collect_cases.sh - date - time ./run.sh -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log - ''' - } else if (env.CHANGE_URL =~ /\/TDinternal\//) { - sh ''' - cd ${WKC}/tests/parallel_test - date - time ./container_build.sh -w ${WKDIR} -t 8 -e - rm -f /tmp/cases.task - ./collect_cases.sh -e - date - time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log - ''' - } else { - sh ''' - echo "unmatched reposiotry ${CHANGE_URL}" - ''' - } + sh ''' + cd ${WKC}/tests/parallel_test + date + time ./container_build.sh -w ${WKDIR} -t 8 -e + rm -f /tmp/cases.task + ./collect_cases.sh -e + date + time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log + ''' } } } diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index c1571bf279..3f23cd8b5f 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -52,7 +52,7 @@ fi docker run \ -v $REP_MOUNT_PARAM \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake ..;make -j $THREAD_COUNT" + --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_TOOLS=true;make -j $THREAD_COUNT" ret=$? exit $ret From bf88f33603a75a36cfd82ecb4cfa040a61fad935 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 19:51:05 +0800 Subject: [PATCH 07/25] fix: make container creation compatible with enterprise edition --- tests/parallel_test/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh index 0ceb1cff5b..6417f41fd4 100755 --- a/tests/parallel_test/run.sh +++ b/tests/parallel_test/run.sh @@ -137,7 +137,7 @@ function run_thread() { local count=0 local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh" if [ $ent -ne 0 ]; then - local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh" + local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh -e" fi local cmd="${runcase_script} ${script}" From 0f02091581669df633f85b545d46baa205023b62 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 20:17:28 +0800 Subject: [PATCH 08/25] fix: correct sim dir with enterprise edition --- tests/parallel_test/run_container.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/parallel_test/run_container.sh b/tests/parallel_test/run_container.sh index bd010698dd..affd9128a4 100755 --- a/tests/parallel_test/run_container.sh +++ b/tests/parallel_test/run_container.sh @@ -66,11 +66,13 @@ if [ $ent -ne 0 ]; then INTERNAL_REPDIR=$WORKDIR/TDinternal REPDIR=$INTERNAL_REPDIR/community CONTAINER_TESTDIR=/home/TDinternal/community + SIM_DIR=/home/TDinternal/sim REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal" else # community edition REPDIR=$WORKDIR/TDengine CONTAINER_TESTDIR=/home/TDengine + SIM_DIR=/home/TDengine/sim REP_MOUNT_PARAM="$REPDIR:/home/TDengine" fi @@ -95,7 +97,7 @@ coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname` docker run \ -v $REP_MOUNT_PARAM \ -v $MOUNT_DIR \ - -v "$TMP_DIR/thread_volume/$thread_no/sim:${CONTAINER_TESTDIR}/sim" \ + -v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \ -v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \ -v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \ --rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param From 460168adce1b5593373413133e613c91f87c89eb Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 20:28:13 +0800 Subject: [PATCH 09/25] enh: use a seperate step to build --- Jenkinsfile2 | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 7377e33c1e..aecc76ffc7 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -195,6 +195,8 @@ pipeline { time ./container_build.sh -w ${WKDIR} -t 8 -e rm -f /tmp/cases.task ./collect_cases.sh -e + ''' + sh ''' date time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log ''' From 1f8377ba3c92f170e965c3123579148d0f464cce Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 11 May 2022 20:34:19 +0800 Subject: [PATCH 10/25] fix: change to a proper directory before run --- Jenkinsfile2 | 1 + 1 file changed, 1 insertion(+) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index aecc76ffc7..ca37857189 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -197,6 +197,7 @@ pipeline { ./collect_cases.sh -e ''' sh ''' + cd ${WKC}/tests/parallel_test date time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log ''' From 7f7b1a423a2b42b57d47a1aa1643ed0637c97e37 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 12:10:22 +0800 Subject: [PATCH 11/25] fix(rpc): avoid fd leak --- ...utorTests.cpp => index_executor_tests.cpp} | 0 source/libs/transport/src/transCli.c | 23 ++++++------ source/libs/transport/src/transSrv.c | 36 +++++++++++++------ tests/pytest/util/dnodes.py | 4 +-- 4 files changed, 39 insertions(+), 24 deletions(-) rename source/libs/executor/test/{indexexcutorTests.cpp => index_executor_tests.cpp} (100%) diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/index_executor_tests.cpp similarity index 100% rename from source/libs/executor/test/indexexcutorTests.cpp rename to source/libs/executor/test/index_executor_tests.cpp diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 718be6aa64..5570bdcd3e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,15 +21,16 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t writeReq; - void* hostThrd; - SConnBuffer readBuf; - void* data; - STransQueue cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - STransCtx ctx; + void* hostThrd; + int hThrdIdx; + + SConnBuffer readBuf; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + + STransCtx ctx; bool broken; // link broken or not ConnStatus status; // @@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ - while (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - if (T_REF_VAL_GET(conn) == 1) { \ + if (T_REF_VAL_GET(conn) > 1) { \ transUnrefCliHandle(conn); \ } \ destroyCmsg(pMsg); \ + addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ return; \ } \ } while (0) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index af66d39904..d0d45f11e3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,7 +35,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; void* pTransInst; // rpc init @@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static int reallocConnRefHandle(SSrvConn* conn); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); @@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -517,7 +517,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { int64_t refId = transMsg.refId; SExHandle* exh2 = uvAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { - tTrace("server handle %p except msg, ignore it", exh1); + tTrace("server handle except msg %p, ignore it", exh1); uvReleaseExHandle(refId); destroySmsg(msg); continue; @@ -581,11 +581,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (pObj->numOfWorkerReady < pObj->numOfThreads) { - tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, + pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } - + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -681,14 +682,14 @@ void* transAcceptThread(void* arg) { return NULL; } -void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { +void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { if (status != 0) { return; } SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; @@ -787,6 +788,19 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } +static int reallocConnRefHandle(SSrvConn* conn) { + uvReleaseExHandle(conn->refId); + uvRemoveExHandle(conn->refId); + // avoid app continue to send msg on invalid handle + SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + exh->handle = conn; + exh->pThrd = conn->hostThrd; + exh->refId = uvAddExHandle(exh); + uvAcquireExHandle(exh->refId); + conn->refId = exh->refId; + + return 0; +} static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; if (conn == NULL) { @@ -822,7 +836,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { ASSERT(status == 0); SServerObj* srv = container_of(handle, SServerObj, pipeListen); - uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); @@ -859,7 +873,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -874,7 +889,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd,pipeName)) { + if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); @@ -958,6 +973,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; + reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9dcd485194..3c23e784c5 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -35,7 +35,7 @@ class TDSimClient: "tableIncStepPerVnode": "10000", "maxVgroupsPerDb": "1000", "sdbDebugFlag": "143", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "udebugFlag": "135", @@ -136,7 +136,7 @@ class TDDnode: "tsdbDebugFlag": "135", "mDebugFlag": "135", "sdbDebugFlag": "135", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "httpDebugFlag": "135", From dd8e642f3771d9cad9dcbc4f56e2e78ddca896d4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 13:35:42 +0800 Subject: [PATCH 12/25] fix(rpc): avoid fd leak --- source/libs/qcom/src/queryUtil.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 3e3e393f5f..b7a3395206 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -153,10 +153,10 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .handle = pInfo->msgInfo.handle, .persistHandle = persistHandle, .code = 0}; - if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || - pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { - rpcMsg.persistHandle = 1; - } + // if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || + // pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { + // rpcMsg.persistHandle = 1; + //} assert(pInfo->fp != NULL); @@ -168,7 +168,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } -char *jobTaskStatusStr(int32_t status) { +char* jobTaskStatusStr(int32_t status) { switch (status) { case JOB_TASK_STATUS_NULL: return "NULL"; @@ -197,13 +197,10 @@ char *jobTaskStatusStr(int32_t status) { SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) { SSchema s = {0}; - s.type = type; + s.type = type; s.bytes = bytes; s.colId = colId; tstrncpy(s.name, name, tListLen(s.name)); return s; } - - - From 4f7886d2083c880d3ec66bca56557ccad31cccd9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 15:29:19 +0800 Subject: [PATCH 13/25] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index d0d45f11e3..a459e604ad 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -363,6 +363,10 @@ void uvOnSendCb(uv_write_t* req, int status) { if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); + reallocConnRefHandle(conn); + destroySmsg(msg); + transQueueClear(&conn->srvMsgs); + return; } destroySmsg(msg); // send second data, just use for push @@ -973,7 +977,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; - reallocConnRefHandle(conn); + // reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; From d4feb57631bec39e3dad0692b74a5be94985a8d0 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Thu, 12 May 2022 17:52:29 +0800 Subject: [PATCH 14/25] fix: timedatectl not available in docker container --- tests/system-test/2-query/timezone.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/timezone.py b/tests/system-test/2-query/timezone.py index 1f3dac90c6..ff55ab31bf 100644 --- a/tests/system-test/2-query/timezone.py +++ b/tests/system-test/2-query/timezone.py @@ -15,8 +15,16 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method tdSql.prepare() # get system timezone - time_zone = os.popen('timedatectl | grep zone').read( - ).strip().split(':')[1].lstrip() + time_zone_arr = os.popen('timedatectl | grep zone').read( + ).strip().split(':') + if len(time_zone_arr) > 1: + time_zone = time_zone_arr[1].lstrip() + else: + # possibly in a docker container + time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip() + time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip() + time_zone = time_zone_1 + " " + time_zone_2 + print("expected time zone: " + time_zone) tdLog.printNoPrefix("==========step1:create tables==========") tdSql.execute( From ddee7344d5b1cfa03dabeb2e41eade5f9d23492b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 11:25:35 +0800 Subject: [PATCH 15/25] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a459e604ad..7566d7147c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -360,14 +360,14 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSrvMsg* msg = transQueuePop(&conn->srvMsgs); - if (msg->type == Release && conn->status != ConnNormal) { - conn->status = ConnNormal; - transUnrefSrvHandle(conn); - reallocConnRefHandle(conn); - destroySmsg(msg); - transQueueClear(&conn->srvMsgs); - return; - } + // if (msg->type == Release && conn->status != ConnNormal) { + // conn->status = ConnNormal; + // transUnrefSrvHandle(conn); + // reallocConnRefHandle(conn); + // destroySmsg(msg); + // transQueueClear(&conn->srvMsgs); + // return; + //} destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { @@ -425,8 +425,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; } else { - pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; + if (smsg->type == Release) { + pHead->msgType = 0; + pConn->status = ConnNormal; + transUnrefSrvHandle(pConn); + } else { + pHead->msgType = pMsg->msgType; + } } + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); @@ -977,8 +984,8 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; - // reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { + reallocConnRefHandle(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { return; } From 86031c19852767f1bdf43f1426b8460c0500ba63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 May 2022 12:17:09 +0800 Subject: [PATCH 16/25] fix(query): add the repeat scan flag check during aggregate executor. --- include/libs/function/function.h | 2 +- source/libs/executor/src/executorimpl.c | 49 ++++++++++--------- source/libs/executor/src/scanoperator.c | 65 ++++++++++++++++++------- source/libs/function/src/builtinsimpl.c | 2 +- source/libs/function/src/taggfunction.c | 24 ++++----- 5 files changed, 87 insertions(+), 55 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 6141829a3f..8d0b93dde2 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; uint32_t order; // data block scanner order: asc|desc + uint8_t scanFlag; // record current running step, default: 0 //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number @@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx { bool hasNull; // null value exist in current block, TODO remove it bool requireNull; // require null in some function, TODO remove it int32_t columnIndex; // TODO remove it - uint8_t currentStage; // record current running step, default: 0 bool isAggSet; int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it bool stableQuery; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fe5066a065..8db5a282d3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -746,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].currentStage = scanFlag; + pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -826,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt return code; } -static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { +static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; - // this can be set during create the struct // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { int32_t code = pCtx[k].fpSet.process(&pCtx[k]); if (code != TSDB_CODE_SUCCESS) { - qError("%s call aggregate function error happens, code : %s", - GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - pOperator->pTaskInfo->code = code; - longjmp(pOperator->pTaskInfo->env, code); + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; } } } } + + return TSDB_CODE_SUCCESS; } static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { @@ -998,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } + if (pCtx->scanFlag == REPEAT_SCAN) { + return fmIsRepeatScanFunc(pCtx->functionId); + } + if (isRowEntryCompleted(pResInfo)) { return false; } - if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { - // return QUERY_IS_ASC_QUERY(pQueryAttr); - } - - // denote the order type - if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { - // return pCtx->param[0].i == pQueryAttr->order.order; - } +// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { +// // return QUERY_IS_ASC_QUERY(pQueryAttr); +// } +// +// // denote the order type +// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { +// // return pCtx->param[0].i == pQueryAttr->order.order; +// } // in the reverse table scan, only the following functions need to be executed // if (IS_REVERSE_SCAN(pRuntimeEnv) || @@ -1944,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t cleanupResultRowEntry(pEntry); pCtx[i].resultInfo = pEntry; - pCtx[i].currentStage = stage; + pCtx[i].scanFlag = stage; // set the timestamp output buffer for top/bottom/diff query // int32_t fid = pCtx[i].functionId; @@ -3724,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; @@ -3738,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); - // } int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -3750,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, - pAggInfo->numOfScalarExpr, NULL); + pAggInfo->numOfScalarExpr, NULL); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - longjmp(pTaskInfo->env, pTaskInfo->code); + longjmp(pTaskInfo->env, code); } } // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true); - doAggregateImpl(pOperator, 0, pInfo->pCtx); + code = doAggregateImpl(pOperator, 0, pInfo->pCtx); + if (code != 0) { + longjmp(pTaskInfo->env, code); + } #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1483b6b042..e0758a7210 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction pTableScanInfo->cond.order = TSDB_ORDER_DESC; } +static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr == 0) { + return; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0); + metaGetTableEntryByUid(&mr, pBlock->info.uid); + + for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) { + SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j]; + + int32_t dstSlotId = pExpr->base.resSchema.slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); + + int32_t functionId = pExpr->pExpr->_function.functionId; + + // this is to handle the tbname + if (fmIsScanPseudoColumnFunc(functionId)) { + struct SScalarFuncExecFuncs fpSet = {0}; + fmGetScalarFuncExecFuncs(functionId, &fpSet); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BIGINT; + infoData.info.bytes = sizeof(uint64_t); + colInfoDataEnsureCapacity(&infoData, 0, 1); + + colDataAppendInt64(&infoData, 0, &pBlock->info.uid); + SScalarParam srcParam = { + .numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; + + SScalarParam param = {.columnData = pColInfoData}; + fpSet.process(&srcParam, 1, ¶m); + } else { // these are tags + const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i, p, (p == NULL)); + } + } + } + + metaReaderClear(&mr); +} + static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SSDataBlock* pBlock = pTableScanInfo->pResBlock; @@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); - colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); - - struct SScalarFuncExecFuncs fpSet; - fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_BIGINT; - infoData.info.bytes = sizeof(uint64_t); - colInfoDataEnsureCapacity(&infoData, 0, 1); - - colDataAppendInt64(&infoData, 0, &pBlock->info.uid); - SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; - - SScalarParam param = {.columnData = pColInfoData}; - fpSet.process(&srcParam, 1, ¶m); + addTagPseudoColumnData(pTableScanInfo, pBlock); } return pBlock; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 64bee0c096..2ec050d82d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t type = pCol->info.type; SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 4d80b88a3a..b6d5d38c9e 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -37,7 +37,7 @@ #define GET_TRUE_DATA_TYPE() \ int32_t type = 0; \ - if (pCtx->currentStage == MERGE_STAGE) { \ + if (pCtx->scanFlag == MERGE_STAGE) { \ type = pCtx->resDataInfo.type; \ assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \ } else { \ @@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) { static void avg_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { @@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) { pStd->stage++; avg_finalizer(pCtx); @@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); // only the first_stage_merge is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { return (STopBotInfo*) pCtx->pOutput; } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer return GET_ROWCELL_INTERBUF(pResInfo); @@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; // do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, -// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; // do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, -// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed @@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo* pInfo = NULL; - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { pInfo = (SAPercentileInfo*) pCtx->pOutput; } else { pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null // assert(pOutput->pHisto->numOfElems > 0); // @@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx); static void tag_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { copy_function(pCtx); } else { taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true); @@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // this is the server-side setup function in client-side, the secondary merge do not need this procedure - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // pCtx->param[0].param.d = DBL_MAX; // pCtx->param[3].param.d = -DBL_MAX; } else { @@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { */ SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); // if (pResInfo->hasResult != DATA_SET_FLAG) { From c56b0e0b15736127fbe9088efe0ed23361aee724 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 12:24:57 +0800 Subject: [PATCH 17/25] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 38 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7566d7147c..f8e4a8bd09 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -167,25 +167,25 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head) && conn->status == ConnAcquire) { \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ From fee540c1fa38b25d2fa7df717b4acb45bec82db9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 13:09:34 +0800 Subject: [PATCH 18/25] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 39 ++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f8e4a8bd09..fc840691b6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -167,25 +167,26 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head) && conn->status == ConnAcquire) { \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRefHandle(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ From 3431d4aba5c8217713ca5599ae4266982d1c4470 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Fri, 13 May 2022 14:04:21 +0800 Subject: [PATCH 19/25] enh: windows build enterprise edition --- Jenkinsfile2 | 70 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index ca37857189..a3006c4f7d 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -71,10 +71,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WK} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else if (env.CHANGE_URL =~ /\/TDinternal\//) { sh ''' @@ -82,10 +82,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WKC} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else { sh ''' @@ -110,12 +110,14 @@ def pre_test_win(){ time /t taskkill /f /t /im python.exe taskkill /f /t /im bash.exe - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug + rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug exit 0 ''' bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git reset --hard + git fetch || git fetch + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git reset --hard git fetch || git fetch git checkout -f @@ -123,39 +125,73 @@ def pre_test_win(){ script { if (env.CHANGE_TARGET == 'master') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout master + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout master ''' } else if(env.CHANGE_TARGET == '2.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 2.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 2.0 ''' } else if(env.CHANGE_TARGET == '3.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 3.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 3.0 ''' } else { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout develop + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout develop ''' } } + script { + if (env.CHANGE_URL =~ /\/TDengine\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + ''' + } else if (env.CHANGE_URL =~ /\/TDinternal\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git log -5 + ''' + } else { + sh ''' + echo "unmatched reposiotry ${CHANGE_URL}" + ''' + } + } bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - git branch - git pull || git pull - git fetch origin +refs/pull/%CHANGE_ID%/merge - git checkout -qf FETCH_HEAD + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git submodule update --init --recursive ''' } def pre_test_build_win() { bat ''' echo "building ..." time /t - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal mkdir debug cd debug call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 From 158e23d5ae388d4c7ff90aa3fde9bcab04acfdd8 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 14:17:08 +0800 Subject: [PATCH 20/25] test: add log info in test case --- tests/system-test/0-others/taosShellNetChk.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/taosShellNetChk.py b/tests/system-test/0-others/taosShellNetChk.py index 524268101a..bbaeacf328 100644 --- a/tests/system-test/0-others/taosShellNetChk.py +++ b/tests/system-test/0-others/taosShellNetChk.py @@ -138,7 +138,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 1") # stop taosd tdDnodes.stop(1) @@ -149,7 +150,8 @@ class TDTestCase: if "0: unavailable" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 2") # restart taosd tdDnodes.start(1) @@ -158,7 +160,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 3") tdLog.printNoPrefix("================================ parameter: -n") # stop taosd From 4b564b2ebd358179e3aef8b1188bbebec0afba2a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 13 May 2022 15:09:31 +0800 Subject: [PATCH 21/25] add stream ci --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 13 +- source/libs/executor/src/scanoperator.c | 9 +- source/libs/planner/src/planOptimizer.c | 4 +- source/libs/stream/src/tstream.c | 2 +- tests/script/jenkins/basic.txt | 1 + tests/script/tsim/tstream/basic1.sim | 343 ++++++++++++++++++++++++ 7 files changed, 355 insertions(+), 19 deletions(-) create mode 100644 tests/script/tsim/tstream/basic1.sim diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 34b7fce33c..f79d3c8450 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -706,7 +706,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, - SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval); + SNode* pConditions, SOperatorInfo* pOperatorDumy); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8db5a282d3..df446def5e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4808,17 +4808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - - SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode); - - SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - SInterval interval = extractIntervalInfo(pTableScanNode); SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo); @@ -4826,7 +4815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo, - pScanPhyNode->node.pConditions, pOperatorDumy, &interval); + pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e0758a7210..c6cb01e8fb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -782,8 +782,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, - SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy, - SInterval* pInterval) { + SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -791,6 +790,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR goto _error; } + STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info; + int32_t numOfOutput = taosArrayGetSize(pColList); SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); @@ -823,7 +824,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR } pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan + pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -836,7 +837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->pDataReader = pDataReader; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->pOperatorDumy = pOperatorDumy; - pInfo->interval = *pInterval; + pInfo->interval = pSTInfo->interval; pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 5ed7d9c1b5..6ae1b23b97 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -228,10 +228,12 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { SOsdInfo info = {0}; int32_t code = osdMatch(pCxt, pLogicNode, &info); + if (TSDB_CODE_SUCCESS == code && info.pScan) { + setScanWindowInfo((SScanLogicNode*)info.pScan); + } if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { info.pScan->dataRequired = osdGetDataRequired(info.pSdrFuncs); info.pScan->pDynamicScanFuncs = info.pDsoFuncs; - setScanWindowInfo((SScanLogicNode*)info.pScan); OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD); pCxt->optimized = true; } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index a5811a5ace..08093c8b18 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - blockDebugShowData(pRes); + // blockDebugShowData(pRes); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 8420b91065..f6bc9f8306 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -63,6 +63,7 @@ # ---- tstream ./test.sh -f tsim/tstream/basic0.sim +./test.sh -f tsim/tstream/basic1.sim # ---- transaction ./test.sh -f tsim/trans/create_db.sim diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim new file mode 100644 index 0000000000..12c820d76b --- /dev/null +++ b/tests/script/tsim/tstream/basic1.sim @@ -0,0 +1,343 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791213004,4,2,3,4.1); +sleep 1000 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 2 then + print ======$data13 + return -1 +endi + +if $data14 != 2 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223001,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 12 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223002,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 2 then + print ======$data11 + return -1 +endi + +if $data12 != 2 then + print ======$data12 + return -1 +endi + +if $data13 != 24 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223003,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 36 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223001,1,1,1,1.1); +sql insert into t1 values(1648791223002,2,2,2,2.1); +sql insert into t1 values(1648791223003,3,3,3,3.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 6 then + print ======$data13 + return -1 +endi + +if $data14 != 3 then + print ======$data14 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT From 68ad7008bf3cc30a428f7c7ad3bae7ec03751d2c Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 15:54:15 +0800 Subject: [PATCH 22/25] test: add error test case for rerun fail --- tests/system-test/99-TDcase/TD-15554.py | 490 ++++++++++++++++++++++++ 1 file changed, 490 insertions(+) create mode 100644 tests/system-test/99-TDcase/TD-15554.py diff --git a/tests/system-test/99-TDcase/TD-15554.py b/tests/system-test/99-TDcase/TD-15554.py new file mode 100644 index 0000000000..fb3817ed89 --- /dev/null +++ b/tests/system-test/99-TDcase/TD-15554.py @@ -0,0 +1,490 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + + def tmqCase1(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 1: Produce while consume") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 1000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + time.sleep(2) + + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column' + topicFromCtb = 'topic_ctb_column' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + #tdSql.checkRows(2) + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") + tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) + tdSql.checkData(0 , 1, consumerId) + # mulit rows and mulit tables in one sql, this num of msg is not sure + #tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2: add child table with consuming ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column2' + topicFromCtb = 'topic_ctb_column2' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + rowsOfNewCtb = 1000 + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # create new child table and insert data + newCtbName = 'newctb' + tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) + startTs = parameterDict["startTs"] + for j in range(rowsOfNewCtb): + sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) + tdSql.execute(sql) + tdLog.debug("insert data into new child table ............ [OK]") + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \ + but at the beginning, no ctables in the stable of one topic,\ + after starting consumer, create ctables ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 30000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create stable2 for the seconde topic") + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 1, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 30000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict2['cfg'] = cfgPath + tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column3' + topicFromStb2 = 'topic_stb_column32' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) + + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromStb2: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromStb2: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicFromStb + ',' + topicFromStb2 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # start the second thread to create new child table and insert data + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromStb2) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + #self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + self.tmqCase3(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 26aef324f7f1fc9e196111cd3a5f2b4208c740fd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 15:59:56 +0800 Subject: [PATCH 23/25] enh(rpc): add more log --- source/libs/transport/src/transCli.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5570bdcd3e..cd7e3beb13 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,7 +13,6 @@ */ #ifdef USE_UV - #include "transComm.h" typedef struct SCliConn { @@ -706,7 +705,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us, threadID: %" PRId64 "", ((STrans*)pThrd->pTransInst)->label, el, + pThrd->thread); STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; @@ -1029,8 +1029,8 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), - EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } @@ -1057,8 +1057,8 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), - EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; From d9590b44737966966b13ca3e5fd9842bd2c8b99d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Fri, 13 May 2022 16:23:06 +0800 Subject: [PATCH 24/25] ci(stream): add stream ci --- tests/script/tsim/tstream/basic1.sim | 34 +++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 12c820d76b..3bb5943b3b 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -340,4 +340,36 @@ if $data15 != 1 then return -1 endi -#system sh/exec.sh -n dnode1 -s stop -x SIGINT +sql insert into t1 values(1648791233003,3,2,3,2.1); +sql insert into t1 values(1648791233002,5,6,7,8.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 2 +if $data21 != 2 then + print ======$data21 + return -1 +endi + +if $data22 != 2 then + print ======$data22 + return -1 +endi + +if $data23 != 6 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From 5461f3ef120a583efd5ff9d920199f0aa9d7e786 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Fri, 13 May 2022 17:13:20 +0800 Subject: [PATCH 25/25] test: add error case for rerun fail --- tests/system-test/99-TDcase/TD-15557.py | 215 ++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 tests/system-test/99-TDcase/TD-15557.py diff --git a/tests/system-test/99-TDcase/TD-15557.py b/tests/system-test/99-TDcase/TD-15557.py new file mode 100644 index 0000000000..4798bb7c8d --- /dev/null +++ b/tests/system-test/99-TDcase/TD-15557.py @@ -0,0 +1,215 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + event.set() + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + + def tmqCase1(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db1', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) + tdSql.checkData(0 , 1, consumerId) + # mulit rows and mulit tables in one sql, this num of msg is not sure + #tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase3(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())