Merge branch 'develop' into xiaoping/add_test_case2
This commit is contained in:
commit
ad009dd036
|
@ -123,7 +123,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
|
||||||
cp -r ${examples_dir}/R ${install_dir}/examples
|
cp -r ${examples_dir}/R ${install_dir}/examples
|
||||||
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
|
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
|
||||||
cp -r ${examples_dir}/go ${install_dir}/examples
|
cp -r ${examples_dir}/go ${install_dir}/examples
|
||||||
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
|
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
|
||||||
fi
|
fi
|
||||||
# Copy driver
|
# Copy driver
|
||||||
mkdir -p ${install_dir}/driver
|
mkdir -p ${install_dir}/driver
|
||||||
|
|
|
@ -146,7 +146,7 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
|
||||||
cp -r ${examples_dir}/R ${install_dir}/examples
|
cp -r ${examples_dir}/R ${install_dir}/examples
|
||||||
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
|
sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/examples/R/command.txt
|
||||||
cp -r ${examples_dir}/go ${install_dir}/examples
|
cp -r ${examples_dir}/go ${install_dir}/examples
|
||||||
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/src/taosapp/taosapp.go
|
sed -i '/root/ {s/taosdata/powerdb/g}' ${install_dir}/examples/go/taosdemo.go
|
||||||
fi
|
fi
|
||||||
# Copy driver
|
# Copy driver
|
||||||
mkdir -p ${install_dir}/driver
|
mkdir -p ${install_dir}/driver
|
||||||
|
|
|
@ -10,6 +10,7 @@ data_dir="/var/lib/taos"
|
||||||
log_dir="/var/log/taos"
|
log_dir="/var/log/taos"
|
||||||
data_link_dir="/usr/local/taos/data"
|
data_link_dir="/usr/local/taos/data"
|
||||||
log_link_dir="/usr/local/taos/log"
|
log_link_dir="/usr/local/taos/log"
|
||||||
|
install_main_dir="/usr/local/taos"
|
||||||
|
|
||||||
# static directory
|
# static directory
|
||||||
cfg_dir="/usr/local/taos/cfg"
|
cfg_dir="/usr/local/taos/cfg"
|
||||||
|
@ -134,6 +135,29 @@ function install_config() {
|
||||||
else
|
else
|
||||||
break
|
break
|
||||||
fi
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# user email
|
||||||
|
#EMAIL_PATTERN='^[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$'
|
||||||
|
#EMAIL_PATTERN='^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$'
|
||||||
|
#EMAIL_PATTERN="^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$"
|
||||||
|
echo
|
||||||
|
echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
|
||||||
|
read emailAddr
|
||||||
|
while true; do
|
||||||
|
if [ ! -z "$emailAddr" ]; then
|
||||||
|
# check the format of the emailAddr
|
||||||
|
#if [[ "$emailAddr" =~ $EMAIL_PATTERN ]]; then
|
||||||
|
# Write the email address to temp file
|
||||||
|
email_file="${install_main_dir}/email"
|
||||||
|
${csudo} bash -c "echo $emailAddr > ${email_file}"
|
||||||
|
break
|
||||||
|
#else
|
||||||
|
# read -p "Please enter the correct email address: " emailAddr
|
||||||
|
#fi
|
||||||
|
else
|
||||||
|
break
|
||||||
|
fi
|
||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -344,8 +344,6 @@ static FORCE_INLINE int32_t primaryKeyComparator(int64_t f1, int64_t f2, int32_t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(colIdx == 0);
|
|
||||||
|
|
||||||
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
|
if (tsOrder == TSDB_ORDER_DESC) { // primary column desc order
|
||||||
return (f1 < f2) ? 1 : -1;
|
return (f1 < f2) ? 1 : -1;
|
||||||
} else { // asc
|
} else { // asc
|
||||||
|
|
|
@ -7,6 +7,9 @@ set serverPort=%2
|
||||||
if "%severIp%"=="" (set severIp=127.0.0.1)
|
if "%severIp%"=="" (set severIp=127.0.0.1)
|
||||||
if "%serverPort%"=="" (set serverPort=6030)
|
if "%serverPort%"=="" (set serverPort=6030)
|
||||||
|
|
||||||
|
go env -w GO111MODULE=on
|
||||||
|
go env -w GOPROXY=https://goproxy.io,direct
|
||||||
|
|
||||||
cd case001
|
cd case001
|
||||||
case001.bat %severIp% %serverPort%
|
case001.bat %severIp% %serverPort%
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,9 @@ if [ ! -n "$serverPort" ]; then
|
||||||
serverPort=6030
|
serverPort=6030
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
go env -w GO111MODULE=on
|
||||||
|
go env -w GOPROXY=https://goproxy.io,direct
|
||||||
|
|
||||||
bash ./case001/case001.sh $severIp $serverPort
|
bash ./case001/case001.sh $severIp $serverPort
|
||||||
#bash ./case002/case002.sh $severIp $serverPort
|
#bash ./case002/case002.sh $severIp $serverPort
|
||||||
#bash ./case003/case003.sh $severIp $serverPort
|
#bash ./case003/case003.sh $severIp $serverPort
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -199,26 +199,4 @@ class ClusterTest:
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
for i in range(self.numberOfThreads):
|
for i in range(self.numberOfThreads):
|
||||||
threads[i].join()
|
threads[i].join()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
from clustertest import *
|
from clusterSetup import *
|
||||||
from util.sql import tdSql
|
from util.sql import tdSql
|
||||||
from util.log import tdLog
|
from util.log import tdLog
|
||||||
import random
|
import random
|
||||||
|
|
|
@ -54,6 +54,7 @@ export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
|
||||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
||||||
|
|
||||||
# Now we are all let, and let's see if we can find a crash. Note we pass all params
|
# Now we are all let, and let's see if we can find a crash. Note we pass all params
|
||||||
|
CRASH_GEN_EXEC=crash_gen_bootstrap.py
|
||||||
if [[ $1 == '--valgrind' ]]; then
|
if [[ $1 == '--valgrind' ]]; then
|
||||||
shift
|
shift
|
||||||
export PYTHONMALLOC=malloc
|
export PYTHONMALLOC=malloc
|
||||||
|
@ -66,14 +67,14 @@ if [[ $1 == '--valgrind' ]]; then
|
||||||
--leak-check=yes \
|
--leak-check=yes \
|
||||||
--suppressions=crash_gen/valgrind_taos.supp \
|
--suppressions=crash_gen/valgrind_taos.supp \
|
||||||
$PYTHON_EXEC \
|
$PYTHON_EXEC \
|
||||||
./crash_gen/crash_gen.py $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
|
$CRASH_GEN_EXEC $@ > $VALGRIND_OUT 2> $VALGRIND_ERR
|
||||||
elif [[ $1 == '--helgrind' ]]; then
|
elif [[ $1 == '--helgrind' ]]; then
|
||||||
shift
|
shift
|
||||||
valgrind \
|
valgrind \
|
||||||
--tool=helgrind \
|
--tool=helgrind \
|
||||||
$PYTHON_EXEC \
|
$PYTHON_EXEC \
|
||||||
./crash_gen/crash_gen.py $@
|
$CRASH_GEN_EXEC $@
|
||||||
else
|
else
|
||||||
$PYTHON_EXEC ./crash_gen/crash_gen.py $@
|
$PYTHON_EXEC $CRASH_GEN_EXEC $@
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
<center><h1>User's Guide to the Crash_Gen Tool</h1></center>
|
||||||
|
|
||||||
|
# Introduction
|
||||||
|
|
||||||
|
To effectively test and debug our TDengine product, we have developed a simple tool to
|
||||||
|
exercise various functions of the system in a randomized fashion, hoping to expose
|
||||||
|
maximum number of problems, hopefully without a pre-determined scenario.
|
||||||
|
|
||||||
|
# Preparation
|
||||||
|
|
||||||
|
To run this tool, please ensure the followed preparation work is done first.
|
||||||
|
|
||||||
|
1. Fetch a copy of the TDengine source code, and build it successfully in the `build/`
|
||||||
|
directory
|
||||||
|
1. Ensure that the system has Python3.8 or above properly installed. We use
|
||||||
|
Ubuntu 20.04LTS as our own development environment, and suggest you also use such
|
||||||
|
an environment if possible.
|
||||||
|
|
||||||
|
# Simple Execution
|
||||||
|
|
||||||
|
To run the tool with the simplest method, follow the steps below:
|
||||||
|
|
||||||
|
1. Open a terminal window, start the `taosd` service in the `build/` directory
|
||||||
|
(or however you prefer to start the `taosd` service)
|
||||||
|
1. Open another terminal window, go into the `tests/pytest/` directory, and
|
||||||
|
run `./crash_gen.sh -p -t 3 -s 10` (change the two parameters here as you wish)
|
||||||
|
1. Watch the output to the end and see if you get a `SUCCESS` or `FAILURE`
|
||||||
|
|
||||||
|
That's it!
|
||||||
|
|
||||||
|
# Running Clusters
|
||||||
|
|
||||||
|
This tool also makes it easy to test/verify the clustering capabilities of TDengine. You
|
||||||
|
can start a cluster quite easily with the following command:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ cd tests/pytest/
|
||||||
|
$ ./crash_gen.sh -e -o 3
|
||||||
|
```
|
||||||
|
|
||||||
|
The `-e` option above tells the tool to start the service, and do not run any tests, while
|
||||||
|
the `-o 3` option tells the tool to start 3 DNodes and join them together in a cluster.
|
||||||
|
Obviously you can adjust the the number here.
|
||||||
|
|
||||||
|
## Behind the Scenes
|
||||||
|
|
||||||
|
When the tool runs a cluster, it users a number of directories, each holding the information
|
||||||
|
for a single DNode, see:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ ls build/cluster*
|
||||||
|
build/cluster_dnode_0:
|
||||||
|
cfg data log
|
||||||
|
|
||||||
|
build/cluster_dnode_1:
|
||||||
|
cfg data log
|
||||||
|
|
||||||
|
build/cluster_dnode_2:
|
||||||
|
cfg data log
|
||||||
|
```
|
||||||
|
|
||||||
|
Therefore, when something goes wrong and you want to reset everything with the cluster, simple
|
||||||
|
erase all the files:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rm -rf build/cluster_dnode_*
|
||||||
|
```
|
||||||
|
|
||||||
|
## Addresses and Ports
|
||||||
|
|
||||||
|
The DNodes in the cluster all binds the the `127.0.0.1` IP address (for now anyway), and
|
||||||
|
uses port 6030 for the first DNode, and 6130 for the 2nd one, and so on.
|
||||||
|
|
||||||
|
## Testing Against a Cluster
|
||||||
|
|
||||||
|
In a separate terminal window, you can invoke the tool in client mode and test against
|
||||||
|
a cluster, such as:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ ./crash_gen.sh -p -t 10 -s 100 -i 3
|
||||||
|
```
|
||||||
|
|
||||||
|
Here the `-i` option tells the tool to always create tables with 3 replicas, and run
|
||||||
|
all tests against such tables.
|
||||||
|
|
||||||
|
# Additional Features
|
||||||
|
|
||||||
|
The exhaustive features of the tool is available through the `-h` option:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ ./crash_gen.sh -h
|
||||||
|
usage: crash_gen_bootstrap.py [-h] [-a] [-b MAX_DBS] [-c CONNECTOR_TYPE] [-d] [-e] [-g IGNORE_ERRORS] [-i MAX_REPLICAS] [-l] [-n] [-o NUM_DNODES] [-p] [-r]
|
||||||
|
[-s MAX_STEPS] [-t NUM_THREADS] [-v] [-x]
|
||||||
|
|
||||||
|
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1. You build TDengine in the top level ./build directory, as described in offical docs
|
||||||
|
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
|
||||||
|
|
||||||
|
optional arguments:
|
||||||
|
-h, --help show this help message and exit
|
||||||
|
-a, --auto-start-service
|
||||||
|
Automatically start/stop the TDengine service (default: false)
|
||||||
|
-b MAX_DBS, --max-dbs MAX_DBS
|
||||||
|
Maximum number of DBs to keep, set to disable dropping DB. (default: 0)
|
||||||
|
-c CONNECTOR_TYPE, --connector-type CONNECTOR_TYPE
|
||||||
|
Connector type to use: native, rest, or mixed (default: 10)
|
||||||
|
-d, --debug Turn on DEBUG mode for more logging (default: false)
|
||||||
|
-e, --run-tdengine Run TDengine service in foreground (default: false)
|
||||||
|
-g IGNORE_ERRORS, --ignore-errors IGNORE_ERRORS
|
||||||
|
Ignore error codes, comma separated, 0x supported (default: None)
|
||||||
|
-i MAX_REPLICAS, --max-replicas MAX_REPLICAS
|
||||||
|
Maximum number of replicas to use, when testing against clusters. (default: 1)
|
||||||
|
-l, --larger-data Write larger amount of data during write operations (default: false)
|
||||||
|
-n, --dynamic-db-table-names
|
||||||
|
Use non-fixed names for dbs/tables, useful for multi-instance executions (default: false)
|
||||||
|
-o NUM_DNODES, --num-dnodes NUM_DNODES
|
||||||
|
Number of Dnodes to initialize, used with -e option. (default: 1)
|
||||||
|
-p, --per-thread-db-connection
|
||||||
|
Use a single shared db connection (default: false)
|
||||||
|
-r, --record-ops Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)
|
||||||
|
-s MAX_STEPS, --max-steps MAX_STEPS
|
||||||
|
Maximum number of steps to run (default: 100)
|
||||||
|
-t NUM_THREADS, --num-threads NUM_THREADS
|
||||||
|
Number of threads to run (default: 10)
|
||||||
|
-v, --verify-data Verify data written in a number of places by reading back (default: false)
|
||||||
|
-x, --continue-on-exception
|
||||||
|
Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)
|
||||||
|
```
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,435 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import requests
|
||||||
|
from requests.auth import HTTPBasicAuth
|
||||||
|
|
||||||
|
import taos
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.log import *
|
||||||
|
|
||||||
|
from .misc import Logging, CrashGenError, Helper, Dice
|
||||||
|
import os
|
||||||
|
import datetime
|
||||||
|
# from .service_manager import TdeInstance
|
||||||
|
|
||||||
|
class DbConn:
|
||||||
|
TYPE_NATIVE = "native-c"
|
||||||
|
TYPE_REST = "rest-api"
|
||||||
|
TYPE_INVALID = "invalid"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, connType, dbTarget):
|
||||||
|
if connType == cls.TYPE_NATIVE:
|
||||||
|
return DbConnNative(dbTarget)
|
||||||
|
elif connType == cls.TYPE_REST:
|
||||||
|
return DbConnRest(dbTarget)
|
||||||
|
else:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Unexpected connection type: {}".format(connType))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def createNative(cls, dbTarget) -> DbConn:
|
||||||
|
return cls.create(cls.TYPE_NATIVE, dbTarget)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def createRest(cls, dbTarget) -> DbConn:
|
||||||
|
return cls.create(cls.TYPE_REST, dbTarget)
|
||||||
|
|
||||||
|
def __init__(self, dbTarget):
|
||||||
|
self.isOpen = False
|
||||||
|
self._type = self.TYPE_INVALID
|
||||||
|
self._lastSql = None
|
||||||
|
self._dbTarget = dbTarget
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "[DbConn: type={}, target={}]".format(self._type, self._dbTarget)
|
||||||
|
|
||||||
|
def getLastSql(self):
|
||||||
|
return self._lastSql
|
||||||
|
|
||||||
|
def open(self):
|
||||||
|
if (self.isOpen):
|
||||||
|
raise RuntimeError("Cannot re-open an existing DB connection")
|
||||||
|
|
||||||
|
# below implemented by child classes
|
||||||
|
self.openByType()
|
||||||
|
|
||||||
|
Logging.debug("[DB] data connection opened: {}".format(self))
|
||||||
|
self.isOpen = True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def queryScalar(self, sql) -> int:
|
||||||
|
return self._queryAny(sql)
|
||||||
|
|
||||||
|
def queryString(self, sql) -> str:
|
||||||
|
return self._queryAny(sql)
|
||||||
|
|
||||||
|
def _queryAny(self, sql): # actual query result as an int
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError("Cannot query database until connection is open")
|
||||||
|
nRows = self.query(sql)
|
||||||
|
if nRows != 1:
|
||||||
|
raise taos.error.ProgrammingError(
|
||||||
|
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
|
||||||
|
(0x991 if nRows==0 else 0x992)
|
||||||
|
)
|
||||||
|
if self.getResultRows() != 1 or self.getResultCols() != 1:
|
||||||
|
raise RuntimeError("Unexpected result set for query: {}".format(sql))
|
||||||
|
return self.getQueryResult()[0][0]
|
||||||
|
|
||||||
|
def use(self, dbName):
|
||||||
|
self.execute("use {}".format(dbName))
|
||||||
|
|
||||||
|
def existsDatabase(self, dbName: str):
|
||||||
|
''' Check if a certain database exists '''
|
||||||
|
self.query("show databases")
|
||||||
|
dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
|
||||||
|
# ret2 = dbName in dbs
|
||||||
|
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
|
||||||
|
return dbName in dbs # TODO: super weird type mangling seen, once here
|
||||||
|
|
||||||
|
def hasTables(self):
|
||||||
|
return self.query("show tables") > 0
|
||||||
|
|
||||||
|
def execute(self, sql):
|
||||||
|
''' Return the number of rows affected'''
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def safeExecute(self, sql):
|
||||||
|
'''Safely execute any SQL query, returning True/False upon success/failure'''
|
||||||
|
try:
|
||||||
|
self.execute(sql)
|
||||||
|
return True # ignore num of results, return success
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
return False # failed, for whatever TAOS reason
|
||||||
|
# Not possile to reach here, non-TAOS exception would have been thrown
|
||||||
|
|
||||||
|
def query(self, sql) -> int: # return num rows returned
|
||||||
|
''' Return the number of rows affected'''
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def openByType(self):
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def getQueryResult(self):
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def getResultRows(self):
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
def getResultCols(self):
|
||||||
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
|
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
|
||||||
|
|
||||||
|
|
||||||
|
class DbConnRest(DbConn):
|
||||||
|
REST_PORT_INCREMENT = 11
|
||||||
|
|
||||||
|
def __init__(self, dbTarget: DbTarget):
|
||||||
|
super().__init__(dbTarget)
|
||||||
|
self._type = self.TYPE_REST
|
||||||
|
restPort = dbTarget.port + 11
|
||||||
|
self._url = "http://{}:{}/rest/sql".format(
|
||||||
|
dbTarget.hostAddr, dbTarget.port + self.REST_PORT_INCREMENT)
|
||||||
|
self._result = None
|
||||||
|
|
||||||
|
def openByType(self): # Open connection
|
||||||
|
pass # do nothing, always open
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError("Cannot clean up database until connection is open")
|
||||||
|
# Do nothing for REST
|
||||||
|
Logging.debug("[DB] REST Database connection closed")
|
||||||
|
self.isOpen = False
|
||||||
|
|
||||||
|
def _doSql(self, sql):
|
||||||
|
self._lastSql = sql # remember this, last SQL attempted
|
||||||
|
try:
|
||||||
|
r = requests.post(self._url,
|
||||||
|
data = sql,
|
||||||
|
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||||
|
except:
|
||||||
|
print("REST API Failure (TODO: more info here)")
|
||||||
|
raise
|
||||||
|
rj = r.json()
|
||||||
|
# Sanity check for the "Json Result"
|
||||||
|
if ('status' not in rj):
|
||||||
|
raise RuntimeError("No status in REST response")
|
||||||
|
|
||||||
|
if rj['status'] == 'error': # clearly reported error
|
||||||
|
if ('code' not in rj): # error without code
|
||||||
|
raise RuntimeError("REST error return without code")
|
||||||
|
errno = rj['code'] # May need to massage this in the future
|
||||||
|
# print("Raising programming error with REST return: {}".format(rj))
|
||||||
|
raise taos.error.ProgrammingError(
|
||||||
|
rj['desc'], errno) # todo: check existance of 'desc'
|
||||||
|
|
||||||
|
if rj['status'] != 'succ': # better be this
|
||||||
|
raise RuntimeError(
|
||||||
|
"Unexpected REST return status: {}".format(
|
||||||
|
rj['status']))
|
||||||
|
|
||||||
|
nRows = rj['rows'] if ('rows' in rj) else 0
|
||||||
|
self._result = rj
|
||||||
|
return nRows
|
||||||
|
|
||||||
|
def execute(self, sql):
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError(
|
||||||
|
"Cannot execute database commands until connection is open")
|
||||||
|
Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
|
||||||
|
nRows = self._doSql(sql)
|
||||||
|
Logging.debug(
|
||||||
|
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
|
||||||
|
return nRows
|
||||||
|
|
||||||
|
def query(self, sql): # return rows affected
|
||||||
|
return self.execute(sql)
|
||||||
|
|
||||||
|
def getQueryResult(self):
|
||||||
|
return self._result['data']
|
||||||
|
|
||||||
|
def getResultRows(self):
|
||||||
|
print(self._result)
|
||||||
|
raise RuntimeError("TBD") # TODO: finish here to support -v under -c rest
|
||||||
|
# return self._tdSql.queryRows
|
||||||
|
|
||||||
|
def getResultCols(self):
|
||||||
|
print(self._result)
|
||||||
|
raise RuntimeError("TBD")
|
||||||
|
|
||||||
|
# Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
|
||||||
|
|
||||||
|
|
||||||
|
class MyTDSql:
|
||||||
|
# Class variables
|
||||||
|
_clsLock = threading.Lock() # class wide locking
|
||||||
|
longestQuery = None # type: str
|
||||||
|
longestQueryTime = 0.0 # seconds
|
||||||
|
lqStartTime = 0.0
|
||||||
|
# lqEndTime = 0.0 # Not needed, as we have the two above already
|
||||||
|
|
||||||
|
def __init__(self, hostAddr, cfgPath):
|
||||||
|
# Make the DB connection
|
||||||
|
self._conn = taos.connect(host=hostAddr, config=cfgPath)
|
||||||
|
self._cursor = self._conn.cursor()
|
||||||
|
|
||||||
|
self.queryRows = 0
|
||||||
|
self.queryCols = 0
|
||||||
|
self.affectedRows = 0
|
||||||
|
|
||||||
|
# def init(self, cursor, log=True):
|
||||||
|
# self.cursor = cursor
|
||||||
|
# if (log):
|
||||||
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
# self.cursor.log(caller.filename + ".sql")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._cursor.close() # can we double close?
|
||||||
|
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
|
||||||
|
self._cursor.close()
|
||||||
|
|
||||||
|
def _execInternal(self, sql):
|
||||||
|
startTime = time.time()
|
||||||
|
ret = self._cursor.execute(sql)
|
||||||
|
# print("\nSQL success: {}".format(sql))
|
||||||
|
queryTime = time.time() - startTime
|
||||||
|
# Record the query time
|
||||||
|
cls = self.__class__
|
||||||
|
if queryTime > (cls.longestQueryTime + 0.01) :
|
||||||
|
with cls._clsLock:
|
||||||
|
cls.longestQuery = sql
|
||||||
|
cls.longestQueryTime = queryTime
|
||||||
|
cls.lqStartTime = startTime
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def query(self, sql):
|
||||||
|
self.sql = sql
|
||||||
|
try:
|
||||||
|
self._execInternal(sql)
|
||||||
|
self.queryResult = self._cursor.fetchall()
|
||||||
|
self.queryRows = len(self.queryResult)
|
||||||
|
self.queryCols = len(self._cursor.description)
|
||||||
|
except Exception as e:
|
||||||
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
# args = (caller.filename, caller.lineno, sql, repr(e))
|
||||||
|
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
|
||||||
|
raise
|
||||||
|
return self.queryRows
|
||||||
|
|
||||||
|
def execute(self, sql):
|
||||||
|
self.sql = sql
|
||||||
|
try:
|
||||||
|
self.affectedRows = self._execInternal(sql)
|
||||||
|
except Exception as e:
|
||||||
|
# caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||||
|
# args = (caller.filename, caller.lineno, sql, repr(e))
|
||||||
|
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
|
||||||
|
raise
|
||||||
|
return self.affectedRows
|
||||||
|
|
||||||
|
class DbTarget:
|
||||||
|
def __init__(self, cfgPath, hostAddr, port):
|
||||||
|
self.cfgPath = cfgPath
|
||||||
|
self.hostAddr = hostAddr
|
||||||
|
self.port = port
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "[DbTarget: cfgPath={}, host={}:{}]".format(
|
||||||
|
Helper.getFriendlyPath(self.cfgPath), self.hostAddr, self.port)
|
||||||
|
|
||||||
|
def getEp(self):
|
||||||
|
return "{}:{}".format(self.hostAddr, self.port)
|
||||||
|
|
||||||
|
class DbConnNative(DbConn):
|
||||||
|
# Class variables
|
||||||
|
_lock = threading.Lock()
|
||||||
|
# _connInfoDisplayed = False # TODO: find another way to display this
|
||||||
|
totalConnections = 0 # Not private
|
||||||
|
|
||||||
|
def __init__(self, dbTarget):
|
||||||
|
super().__init__(dbTarget)
|
||||||
|
self._type = self.TYPE_NATIVE
|
||||||
|
self._conn = None
|
||||||
|
# self._cursor = None
|
||||||
|
|
||||||
|
def openByType(self): # Open connection
|
||||||
|
# global gContainer
|
||||||
|
# tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
|
||||||
|
# cfgPath = self.getBuildPath() + "/test/cfg"
|
||||||
|
# cfgPath = tInst.getCfgDir()
|
||||||
|
# hostAddr = tInst.getHostAddr()
|
||||||
|
|
||||||
|
cls = self.__class__ # Get the class, to access class variables
|
||||||
|
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
|
||||||
|
dbTarget = self._dbTarget
|
||||||
|
# if not cls._connInfoDisplayed:
|
||||||
|
# cls._connInfoDisplayed = True # updating CLASS variable
|
||||||
|
Logging.debug("Initiating TAOS native connection to {}".format(dbTarget))
|
||||||
|
# Make the connection
|
||||||
|
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
|
||||||
|
# self._cursor = self._conn.cursor()
|
||||||
|
# Record the count in the class
|
||||||
|
self._tdSql = MyTDSql(dbTarget.hostAddr, dbTarget.cfgPath) # making DB connection
|
||||||
|
cls.totalConnections += 1
|
||||||
|
|
||||||
|
self._tdSql.execute('reset query cache')
|
||||||
|
# self._cursor.execute('use db') # do this at the beginning of every
|
||||||
|
|
||||||
|
# Open connection
|
||||||
|
# self._tdSql = MyTDSql()
|
||||||
|
# self._tdSql.init(self._cursor)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError("Cannot clean up database until connection is open")
|
||||||
|
self._tdSql.close()
|
||||||
|
# Decrement the class wide counter
|
||||||
|
cls = self.__class__ # Get the class, to access class variables
|
||||||
|
with cls._lock:
|
||||||
|
cls.totalConnections -= 1
|
||||||
|
|
||||||
|
Logging.debug("[DB] Database connection closed")
|
||||||
|
self.isOpen = False
|
||||||
|
|
||||||
|
def execute(self, sql):
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError("Cannot execute database commands until connection is open")
|
||||||
|
Logging.debug("[SQL] Executing SQL: {}".format(sql))
|
||||||
|
self._lastSql = sql
|
||||||
|
nRows = self._tdSql.execute(sql)
|
||||||
|
Logging.debug(
|
||||||
|
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
|
||||||
|
nRows, sql))
|
||||||
|
return nRows
|
||||||
|
|
||||||
|
def query(self, sql): # return rows affected
|
||||||
|
if (not self.isOpen):
|
||||||
|
raise RuntimeError(
|
||||||
|
"Cannot query database until connection is open")
|
||||||
|
Logging.debug("[SQL] Executing SQL: {}".format(sql))
|
||||||
|
self._lastSql = sql
|
||||||
|
nRows = self._tdSql.query(sql)
|
||||||
|
Logging.debug(
|
||||||
|
"[SQL] Query Result, nRows = {}, SQL = {}".format(
|
||||||
|
nRows, sql))
|
||||||
|
return nRows
|
||||||
|
# results are in: return self._tdSql.queryResult
|
||||||
|
|
||||||
|
def getQueryResult(self):
|
||||||
|
return self._tdSql.queryResult
|
||||||
|
|
||||||
|
def getResultRows(self):
|
||||||
|
return self._tdSql.queryRows
|
||||||
|
|
||||||
|
def getResultCols(self):
|
||||||
|
return self._tdSql.queryCols
|
||||||
|
|
||||||
|
|
||||||
|
class DbManager():
|
||||||
|
''' This is a wrapper around DbConn(), to make it easier to use.
|
||||||
|
|
||||||
|
TODO: rename this to DbConnManager
|
||||||
|
'''
|
||||||
|
def __init__(self, cType, dbTarget):
|
||||||
|
# self.tableNumQueue = LinearQueue() # TODO: delete?
|
||||||
|
# self.openDbServerConnection()
|
||||||
|
self._dbConn = DbConn.createNative(dbTarget) if (
|
||||||
|
cType == 'native') else DbConn.createRest(dbTarget)
|
||||||
|
try:
|
||||||
|
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
|
||||||
|
if (err.msg == 'client disconnected'): # cannot open DB connection
|
||||||
|
print(
|
||||||
|
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
||||||
|
sys.exit(2)
|
||||||
|
else:
|
||||||
|
print("Failed to connect to DB, errno = {}, msg: {}"
|
||||||
|
.format(Helper.convertErrno(err.errno), err.msg))
|
||||||
|
raise
|
||||||
|
except BaseException:
|
||||||
|
print("[=] Unexpected exception")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Do this after dbConn is in proper shape
|
||||||
|
# Moved to Database()
|
||||||
|
# self._stateMachine = StateMechine(self._dbConn)
|
||||||
|
|
||||||
|
def getDbConn(self):
|
||||||
|
return self._dbConn
|
||||||
|
|
||||||
|
# TODO: not used any more, to delete
|
||||||
|
def pickAndAllocateTable(self): # pick any table, and "use" it
|
||||||
|
return self.tableNumQueue.pickAndAllocate()
|
||||||
|
|
||||||
|
# TODO: Not used any more, to delete
|
||||||
|
def addTable(self):
|
||||||
|
with self._lock:
|
||||||
|
tIndex = self.tableNumQueue.push()
|
||||||
|
return tIndex
|
||||||
|
|
||||||
|
# Not used any more, to delete
|
||||||
|
def releaseTable(self, i): # return the table back, so others can use it
|
||||||
|
self.tableNumQueue.release(i)
|
||||||
|
|
||||||
|
# TODO: not used any more, delete
|
||||||
|
def getTableNameToDelete(self):
|
||||||
|
tblNum = self.tableNumQueue.pop() # TODO: race condition!
|
||||||
|
if (not tblNum): # maybe false
|
||||||
|
return False
|
||||||
|
|
||||||
|
return "table_{}".format(tblNum)
|
||||||
|
|
||||||
|
def cleanUp(self):
|
||||||
|
self._dbConn.close()
|
||||||
|
|
|
@ -0,0 +1,175 @@
|
||||||
|
import threading
|
||||||
|
import random
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class CrashGenError(Exception):
|
||||||
|
def __init__(self, msg=None, errno=None):
|
||||||
|
self.msg = msg
|
||||||
|
self.errno = errno
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.msg
|
||||||
|
|
||||||
|
|
||||||
|
class LoggingFilter(logging.Filter):
|
||||||
|
def filter(self, record: logging.LogRecord):
|
||||||
|
if (record.levelno >= logging.INFO):
|
||||||
|
return True # info or above always log
|
||||||
|
|
||||||
|
# Commenting out below to adjust...
|
||||||
|
|
||||||
|
# if msg.startswith("[TRD]"):
|
||||||
|
# return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class MyLoggingAdapter(logging.LoggerAdapter):
|
||||||
|
def process(self, msg, kwargs):
|
||||||
|
return "[{}] {}".format(threading.get_ident() % 10000, msg), kwargs
|
||||||
|
# return '[%s] %s' % (self.extra['connid'], msg), kwargs
|
||||||
|
|
||||||
|
|
||||||
|
class Logging:
|
||||||
|
logger = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getLogger(cls):
|
||||||
|
return logger
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clsInit(cls, gConfig): # TODO: refactor away gConfig
|
||||||
|
if cls.logger:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Logging Stuff
|
||||||
|
# global misc.logger
|
||||||
|
_logger = logging.getLogger('CrashGen') # real logger
|
||||||
|
_logger.addFilter(LoggingFilter())
|
||||||
|
ch = logging.StreamHandler()
|
||||||
|
_logger.addHandler(ch)
|
||||||
|
|
||||||
|
# Logging adapter, to be used as a logger
|
||||||
|
print("setting logger variable")
|
||||||
|
# global logger
|
||||||
|
cls.logger = MyLoggingAdapter(_logger, [])
|
||||||
|
|
||||||
|
if (gConfig.debug):
|
||||||
|
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
|
||||||
|
else:
|
||||||
|
cls.logger.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def info(cls, msg):
|
||||||
|
cls.logger.info(msg)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def debug(cls, msg):
|
||||||
|
cls.logger.debug(msg)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def warning(cls, msg):
|
||||||
|
cls.logger.warning(msg)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def error(cls, msg):
|
||||||
|
cls.logger.error(msg)
|
||||||
|
|
||||||
|
class Status:
|
||||||
|
STATUS_STARTING = 1
|
||||||
|
STATUS_RUNNING = 2
|
||||||
|
STATUS_STOPPING = 3
|
||||||
|
STATUS_STOPPED = 4
|
||||||
|
|
||||||
|
def __init__(self, status):
|
||||||
|
self.set(status)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "[Status: v={}]".format(self._status)
|
||||||
|
|
||||||
|
def set(self, status):
|
||||||
|
self._status = status
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
return self._status
|
||||||
|
|
||||||
|
def isStarting(self):
|
||||||
|
return self._status == Status.STATUS_STARTING
|
||||||
|
|
||||||
|
def isRunning(self):
|
||||||
|
# return self._thread and self._thread.is_alive()
|
||||||
|
return self._status == Status.STATUS_RUNNING
|
||||||
|
|
||||||
|
def isStopping(self):
|
||||||
|
return self._status == Status.STATUS_STOPPING
|
||||||
|
|
||||||
|
def isStopped(self):
|
||||||
|
return self._status == Status.STATUS_STOPPED
|
||||||
|
|
||||||
|
def isStable(self):
|
||||||
|
return self.isRunning() or self.isStopped()
|
||||||
|
|
||||||
|
# Deterministic random number generator
|
||||||
|
class Dice():
|
||||||
|
seeded = False # static, uninitialized
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def seed(cls, s): # static
|
||||||
|
if (cls.seeded):
|
||||||
|
raise RuntimeError(
|
||||||
|
"Cannot seed the random generator more than once")
|
||||||
|
cls.verifyRNG()
|
||||||
|
random.seed(s)
|
||||||
|
cls.seeded = True # TODO: protect against multi-threading
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def verifyRNG(cls): # Verify that the RNG is determinstic
|
||||||
|
random.seed(0)
|
||||||
|
x1 = random.randrange(0, 1000)
|
||||||
|
x2 = random.randrange(0, 1000)
|
||||||
|
x3 = random.randrange(0, 1000)
|
||||||
|
if (x1 != 864 or x2 != 394 or x3 != 776):
|
||||||
|
raise RuntimeError("System RNG is not deterministic")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def throw(cls, stop): # get 0 to stop-1
|
||||||
|
return cls.throwRange(0, stop)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def throwRange(cls, start, stop): # up to stop-1
|
||||||
|
if (not cls.seeded):
|
||||||
|
raise RuntimeError("Cannot throw dice before seeding it")
|
||||||
|
return random.randrange(start, stop)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def choice(cls, cList):
|
||||||
|
return random.choice(cList)
|
||||||
|
|
||||||
|
class Helper:
|
||||||
|
@classmethod
|
||||||
|
def convertErrno(cls, errno):
|
||||||
|
return errno if (errno > 0) else 0x80000000 + errno
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def getFriendlyPath(cls, path): # returns .../xxx/yyy
|
||||||
|
ht1 = os.path.split(path)
|
||||||
|
ht2 = os.path.split(ht1[0])
|
||||||
|
return ".../" + ht2[1] + '/' + ht1[1]
|
||||||
|
|
||||||
|
|
||||||
|
class Progress:
|
||||||
|
STEP_BOUNDARY = 0
|
||||||
|
BEGIN_THREAD_STEP = 1
|
||||||
|
END_THREAD_STEP = 2
|
||||||
|
SERVICE_HEART_BEAT= 3
|
||||||
|
tokens = {
|
||||||
|
STEP_BOUNDARY: '.',
|
||||||
|
BEGIN_THREAD_STEP: '[',
|
||||||
|
END_THREAD_STEP: '] ',
|
||||||
|
SERVICE_HEART_BEAT: '.Y.'
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def emit(cls, token):
|
||||||
|
print(cls.tokens[token], end="", flush=True)
|
|
@ -0,0 +1,729 @@
|
||||||
|
import os
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import signal
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from typing import IO, List
|
||||||
|
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
except:
|
||||||
|
print("Psutil module needed, please install: sudo pip3 install psutil")
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
from queue import Queue, Empty
|
||||||
|
|
||||||
|
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
||||||
|
from .db import DbConn, DbTarget
|
||||||
|
|
||||||
|
class TdeInstance():
|
||||||
|
"""
|
||||||
|
A class to capture the *static* information of a TDengine instance,
|
||||||
|
including the location of the various files/directories, and basica
|
||||||
|
configuration.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _getBuildPath(cls):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("communit")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
buildPath = None
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
|
break
|
||||||
|
if buildPath == None:
|
||||||
|
raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
|
||||||
|
.format(selfPath, projPath))
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def __init__(self, subdir='test', tInstNum=0, port=6030, fepPort=6030):
|
||||||
|
self._buildDir = self._getBuildPath()
|
||||||
|
self._subdir = '/' + subdir # TODO: tolerate "/"
|
||||||
|
self._port = port # TODO: support different IP address too
|
||||||
|
self._fepPort = fepPort
|
||||||
|
|
||||||
|
self._tInstNum = tInstNum
|
||||||
|
self._smThread = ServiceManagerThread()
|
||||||
|
|
||||||
|
def getDbTarget(self):
|
||||||
|
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
|
||||||
|
|
||||||
|
def getPort(self):
|
||||||
|
return self._port
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "[TdeInstance: {}, subdir={}]".format(
|
||||||
|
self._buildDir, Helper.getFriendlyPath(self._subdir))
|
||||||
|
|
||||||
|
def generateCfgFile(self):
|
||||||
|
# print("Logger = {}".format(logger))
|
||||||
|
# buildPath = self.getBuildPath()
|
||||||
|
# taosdPath = self._buildPath + "/build/bin/taosd"
|
||||||
|
|
||||||
|
cfgDir = self.getCfgDir()
|
||||||
|
cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
|
||||||
|
if os.path.exists(cfgFile):
|
||||||
|
if os.path.isfile(cfgFile):
|
||||||
|
Logging.warning("Config file exists already, skip creation: {}".format(cfgFile))
|
||||||
|
return # cfg file already exists, nothing to do
|
||||||
|
else:
|
||||||
|
raise CrashGenError("Invalid config file: {}".format(cfgFile))
|
||||||
|
# Now that the cfg file doesn't exist
|
||||||
|
if os.path.exists(cfgDir):
|
||||||
|
if not os.path.isdir(cfgDir):
|
||||||
|
raise CrashGenError("Invalid config dir: {}".format(cfgDir))
|
||||||
|
# else: good path
|
||||||
|
else:
|
||||||
|
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
|
||||||
|
# Now we have a good cfg dir
|
||||||
|
cfgValues = {
|
||||||
|
'runDir': self.getRunDir(),
|
||||||
|
'ip': '127.0.0.1', # TODO: change to a network addressable ip
|
||||||
|
'port': self._port,
|
||||||
|
'fepPort': self._fepPort,
|
||||||
|
}
|
||||||
|
cfgTemplate = """
|
||||||
|
dataDir {runDir}/data
|
||||||
|
logDir {runDir}/log
|
||||||
|
|
||||||
|
charset UTF-8
|
||||||
|
|
||||||
|
firstEp {ip}:{fepPort}
|
||||||
|
fqdn {ip}
|
||||||
|
serverPort {port}
|
||||||
|
|
||||||
|
# was all 135 below
|
||||||
|
dDebugFlag 135
|
||||||
|
cDebugFlag 135
|
||||||
|
rpcDebugFlag 135
|
||||||
|
qDebugFlag 135
|
||||||
|
# httpDebugFlag 143
|
||||||
|
# asyncLog 0
|
||||||
|
# tables 10
|
||||||
|
maxtablesPerVnode 10
|
||||||
|
rpcMaxTime 101
|
||||||
|
# cache 2
|
||||||
|
keep 36500
|
||||||
|
# walLevel 2
|
||||||
|
walLevel 1
|
||||||
|
#
|
||||||
|
# maxConnections 100
|
||||||
|
"""
|
||||||
|
cfgContent = cfgTemplate.format_map(cfgValues)
|
||||||
|
f = open(cfgFile, "w")
|
||||||
|
f.write(cfgContent)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def rotateLogs(self):
|
||||||
|
logPath = self.getLogDir()
|
||||||
|
# ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
|
||||||
|
if os.path.exists(logPath):
|
||||||
|
logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
|
||||||
|
Logging.info("Saving old log files to: {}".format(logPathSaved))
|
||||||
|
os.rename(logPath, logPathSaved)
|
||||||
|
# os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
|
||||||
|
|
||||||
|
|
||||||
|
def getExecFile(self): # .../taosd
|
||||||
|
return self._buildDir + "/build/bin/taosd"
|
||||||
|
|
||||||
|
def getRunDir(self): # TODO: rename to "root dir" ?!
|
||||||
|
return self._buildDir + self._subdir
|
||||||
|
|
||||||
|
def getCfgDir(self): # path, not file
|
||||||
|
return self.getRunDir() + "/cfg"
|
||||||
|
|
||||||
|
def getLogDir(self):
|
||||||
|
return self.getRunDir() + "/log"
|
||||||
|
|
||||||
|
def getHostAddr(self):
|
||||||
|
return "127.0.0.1"
|
||||||
|
|
||||||
|
def getServiceCmdLine(self): # to start the instance
|
||||||
|
return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||||
|
|
||||||
|
def _getDnodes(self, dbc):
|
||||||
|
dbc.query("show dnodes")
|
||||||
|
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
|
||||||
|
return {c[1]:c[4] for c in cols} # {'xxx:6030':'ready', 'xxx:6130':'ready'}
|
||||||
|
|
||||||
|
def createDnode(self, dbt: DbTarget):
|
||||||
|
"""
|
||||||
|
With a connection to the "first" EP, let's create a dnode for someone else who
|
||||||
|
wants to join.
|
||||||
|
"""
|
||||||
|
dbc = DbConn.createNative(self.getDbTarget())
|
||||||
|
dbc.open()
|
||||||
|
|
||||||
|
if dbt.getEp() in self._getDnodes(dbc):
|
||||||
|
Logging.info("Skipping DNode creation for: {}".format(dbt))
|
||||||
|
dbc.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
sql = "CREATE DNODE \"{}\"".format(dbt.getEp())
|
||||||
|
dbc.execute(sql)
|
||||||
|
dbc.close()
|
||||||
|
|
||||||
|
def getStatus(self):
|
||||||
|
return self._smThread.getStatus()
|
||||||
|
|
||||||
|
def getSmThread(self):
|
||||||
|
return self._smThread
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
if not self.getStatus().isStopped():
|
||||||
|
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
|
||||||
|
|
||||||
|
Logging.info("Starting TDengine instance: {}".format(self))
|
||||||
|
self.generateCfgFile() # service side generates config file, client does not
|
||||||
|
self.rotateLogs()
|
||||||
|
|
||||||
|
self._smThread.start(self.getServiceCmdLine())
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._smThread.stop()
|
||||||
|
|
||||||
|
def isFirst(self):
|
||||||
|
return self._tInstNum == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TdeSubProcess:
|
||||||
|
"""
|
||||||
|
A class to to represent the actual sub process that is the run-time
|
||||||
|
of a TDengine instance.
|
||||||
|
|
||||||
|
It takes a TdeInstance object as its parameter, with the rationale being
|
||||||
|
"a sub process runs an instance".
|
||||||
|
"""
|
||||||
|
|
||||||
|
# RET_ALREADY_STOPPED = -1
|
||||||
|
# RET_TIME_OUT = -3
|
||||||
|
# RET_SUCCESS = -4
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.subProcess = None
|
||||||
|
# if tInst is None:
|
||||||
|
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
||||||
|
# self._tInst = tInst # Default create at ServiceManagerThread
|
||||||
|
|
||||||
|
def getStdOut(self):
|
||||||
|
return self.subProcess.stdout
|
||||||
|
|
||||||
|
def getStdErr(self):
|
||||||
|
return self.subProcess.stderr
|
||||||
|
|
||||||
|
def isRunning(self):
|
||||||
|
return self.subProcess is not None
|
||||||
|
|
||||||
|
def getPid(self):
|
||||||
|
return self.subProcess.pid
|
||||||
|
|
||||||
|
def start(self, cmdLine):
|
||||||
|
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||||
|
|
||||||
|
# Sanity check
|
||||||
|
if self.subProcess: # already there
|
||||||
|
raise RuntimeError("Corrupt process state")
|
||||||
|
|
||||||
|
self.subProcess = subprocess.Popen(
|
||||||
|
cmdLine,
|
||||||
|
shell=False,
|
||||||
|
# svcCmdSingle, shell=True, # capture core dump?
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
# bufsize=1, # not supported in binary mode
|
||||||
|
close_fds=ON_POSIX
|
||||||
|
) # had text=True, which interferred with reading EOF
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""
|
||||||
|
Stop a sub process, and try to return a meaningful return code.
|
||||||
|
|
||||||
|
Common POSIX signal values (from man -7 signal):
|
||||||
|
SIGHUP 1
|
||||||
|
SIGINT 2
|
||||||
|
SIGQUIT 3
|
||||||
|
SIGILL 4
|
||||||
|
SIGTRAP 5
|
||||||
|
SIGABRT 6
|
||||||
|
SIGIOT 6
|
||||||
|
SIGBUS 7
|
||||||
|
SIGEMT -
|
||||||
|
SIGFPE 8
|
||||||
|
SIGKILL 9
|
||||||
|
SIGUSR1 10
|
||||||
|
SIGSEGV 11
|
||||||
|
SIGUSR2 12
|
||||||
|
"""
|
||||||
|
if not self.subProcess:
|
||||||
|
print("Sub process already stopped")
|
||||||
|
return # -1
|
||||||
|
|
||||||
|
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
||||||
|
if retCode: # valid return code, process ended
|
||||||
|
retCode = -retCode # only if valid
|
||||||
|
Logging.warning("TSP.stop(): process ended itself")
|
||||||
|
self.subProcess = None
|
||||||
|
return retCode
|
||||||
|
|
||||||
|
# process still alive, let's interrupt it
|
||||||
|
print("Terminate running process, send SIG_INT and wait...")
|
||||||
|
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||||
|
self.subProcess.send_signal(signal.SIGINT)
|
||||||
|
self.subProcess.wait(20)
|
||||||
|
retCode = self.subProcess.returncode # should always be there
|
||||||
|
# May throw subprocess.TimeoutExpired exception above, therefore
|
||||||
|
# The process is guranteed to have ended by now
|
||||||
|
self.subProcess = None
|
||||||
|
if retCode != 0: # != (- signal.SIGINT):
|
||||||
|
Logging.error("TSP.stop(): Failed to stop sub proc properly w/ SIG_INT, retCode={}".format(retCode))
|
||||||
|
else:
|
||||||
|
Logging.info("TSP.stop(): sub proc successfully terminated with SIG_INT")
|
||||||
|
return - retCode
|
||||||
|
|
||||||
|
class ServiceManager:
|
||||||
|
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
||||||
|
|
||||||
|
def __init__(self, numDnodes): # >1 when we run a cluster
|
||||||
|
Logging.info("TDengine Service Manager (TSM) created")
|
||||||
|
self._numDnodes = numDnodes # >1 means we have a cluster
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
|
||||||
|
# signal.signal(signal.SIGINT, self.sigIntHandler)
|
||||||
|
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
|
||||||
|
|
||||||
|
self.inSigHandler = False
|
||||||
|
# self._status = MainExec.STATUS_RUNNING # set inside
|
||||||
|
# _startTaosService()
|
||||||
|
self._runCluster = (numDnodes > 1)
|
||||||
|
self._tInsts : List[TdeInstance] = []
|
||||||
|
for i in range(0, numDnodes):
|
||||||
|
ti = self._createTdeInstance(i) # construct tInst
|
||||||
|
self._tInsts.append(ti)
|
||||||
|
|
||||||
|
# self.svcMgrThreads : List[ServiceManagerThread] = []
|
||||||
|
# for i in range(0, numDnodes):
|
||||||
|
# thread = self._createThread(i) # construct tInst
|
||||||
|
# self.svcMgrThreads.append(thread)
|
||||||
|
|
||||||
|
def _createTdeInstance(self, dnIndex):
|
||||||
|
if not self._runCluster: # single instance
|
||||||
|
subdir = 'test'
|
||||||
|
else: # Create all threads in a cluster
|
||||||
|
subdir = 'cluster_dnode_{}'.format(dnIndex)
|
||||||
|
fepPort= 6030 # firstEP Port
|
||||||
|
port = fepPort + dnIndex * 100
|
||||||
|
return TdeInstance(subdir, dnIndex, port, fepPort)
|
||||||
|
# return ServiceManagerThread(dnIndex, ti)
|
||||||
|
|
||||||
|
def _doMenu(self):
|
||||||
|
choice = ""
|
||||||
|
while True:
|
||||||
|
print("\nInterrupting Service Program, Choose an Action: ")
|
||||||
|
print("1: Resume")
|
||||||
|
print("2: Terminate")
|
||||||
|
print("3: Restart")
|
||||||
|
# Remember to update the if range below
|
||||||
|
# print("Enter Choice: ", end="", flush=True)
|
||||||
|
while choice == "":
|
||||||
|
choice = input("Enter Choice: ")
|
||||||
|
if choice != "":
|
||||||
|
break # done with reading repeated input
|
||||||
|
if choice in ["1", "2", "3"]:
|
||||||
|
break # we are done with whole method
|
||||||
|
print("Invalid choice, please try again.")
|
||||||
|
choice = "" # reset
|
||||||
|
return choice
|
||||||
|
|
||||||
|
def sigUsrHandler(self, signalNumber, frame):
|
||||||
|
print("Interrupting main thread execution upon SIGUSR1")
|
||||||
|
if self.inSigHandler: # already
|
||||||
|
print("Ignoring repeated SIG...")
|
||||||
|
return # do nothing if it's already not running
|
||||||
|
self.inSigHandler = True
|
||||||
|
|
||||||
|
choice = self._doMenu()
|
||||||
|
if choice == "1":
|
||||||
|
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
|
||||||
|
elif choice == "2":
|
||||||
|
self.stopTaosServices()
|
||||||
|
elif choice == "3": # Restart
|
||||||
|
self.restart()
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Invalid menu choice: {}".format(choice))
|
||||||
|
|
||||||
|
self.inSigHandler = False
|
||||||
|
|
||||||
|
def sigIntHandler(self, signalNumber, frame):
|
||||||
|
print("ServiceManager: INT Signal Handler starting...")
|
||||||
|
if self.inSigHandler:
|
||||||
|
print("Ignoring repeated SIG_INT...")
|
||||||
|
return
|
||||||
|
self.inSigHandler = True
|
||||||
|
|
||||||
|
self.stopTaosServices()
|
||||||
|
print("ServiceManager: INT Signal Handler returning...")
|
||||||
|
self.inSigHandler = False
|
||||||
|
|
||||||
|
def sigHandlerResume(self):
|
||||||
|
print("Resuming TDengine service manager (main thread)...\n\n")
|
||||||
|
|
||||||
|
# def _updateThreadStatus(self):
|
||||||
|
# if self.svcMgrThread: # valid svc mgr thread
|
||||||
|
# if self.svcMgrThread.isStopped(): # done?
|
||||||
|
# self.svcMgrThread.procIpcBatch() # one last time. TODO: appropriate?
|
||||||
|
# self.svcMgrThread = None # no more
|
||||||
|
|
||||||
|
def isActive(self):
|
||||||
|
"""
|
||||||
|
Determine if the service/cluster is active at all, i.e. at least
|
||||||
|
one thread is not "stopped".
|
||||||
|
"""
|
||||||
|
for ti in self._tInsts:
|
||||||
|
if not ti.getStatus().isStopped():
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# def isRestarting(self):
|
||||||
|
# """
|
||||||
|
# Determine if the service/cluster is being "restarted", i.e., at least
|
||||||
|
# one thread is in "restarting" status
|
||||||
|
# """
|
||||||
|
# for thread in self.svcMgrThreads:
|
||||||
|
# if thread.isRestarting():
|
||||||
|
# return True
|
||||||
|
# return False
|
||||||
|
|
||||||
|
def isStable(self):
|
||||||
|
"""
|
||||||
|
Determine if the service/cluster is "stable", i.e. all of the
|
||||||
|
threads are in "stable" status.
|
||||||
|
"""
|
||||||
|
for ti in self._tInsts:
|
||||||
|
if not ti.getStatus().isStable():
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _procIpcAll(self):
|
||||||
|
while self.isActive():
|
||||||
|
Progress.emit(Progress.SERVICE_HEART_BEAT)
|
||||||
|
for ti in self._tInsts: # all thread objects should always be valid
|
||||||
|
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
|
||||||
|
status = ti.getStatus()
|
||||||
|
if status.isRunning():
|
||||||
|
th = ti.getSmThread()
|
||||||
|
th.procIpcBatch() # regular processing,
|
||||||
|
if status.isStopped():
|
||||||
|
th.procIpcBatch() # one last time?
|
||||||
|
# self._updateThreadStatus()
|
||||||
|
|
||||||
|
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
|
||||||
|
# raise CrashGenError("dummy")
|
||||||
|
print("Service Manager Thread (with subprocess) ended, main thread exiting...")
|
||||||
|
|
||||||
|
def _getFirstInstance(self):
|
||||||
|
return self._tInsts[0]
|
||||||
|
|
||||||
|
def startTaosServices(self):
|
||||||
|
with self._lock:
|
||||||
|
if self.isActive():
|
||||||
|
raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
|
||||||
|
|
||||||
|
# Find if there's already a taosd service, and then kill it
|
||||||
|
for proc in psutil.process_iter():
|
||||||
|
if proc.name() == 'taosd':
|
||||||
|
print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupt")
|
||||||
|
time.sleep(2.0)
|
||||||
|
proc.kill()
|
||||||
|
# print("Process: {}".format(proc.name()))
|
||||||
|
|
||||||
|
# self.svcMgrThread = ServiceManagerThread() # create the object
|
||||||
|
|
||||||
|
for ti in self._tInsts:
|
||||||
|
ti.start()
|
||||||
|
if not ti.isFirst():
|
||||||
|
tFirst = self._getFirstInstance()
|
||||||
|
tFirst.createDnode(ti.getDbTarget())
|
||||||
|
ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
|
||||||
|
|
||||||
|
def stopTaosServices(self):
|
||||||
|
with self._lock:
|
||||||
|
if not self.isActive():
|
||||||
|
Logging.warning("Cannot stop TAOS service(s), already not active")
|
||||||
|
return
|
||||||
|
|
||||||
|
for ti in self._tInsts:
|
||||||
|
ti.stop()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.startTaosServices()
|
||||||
|
self._procIpcAll() # pump/process all the messages, may encounter SIG + restart
|
||||||
|
if self.isActive(): # if sig handler hasn't destroyed it by now
|
||||||
|
self.stopTaosServices() # should have started already
|
||||||
|
|
||||||
|
def restart(self):
|
||||||
|
if not self.isStable():
|
||||||
|
Logging.warning("Cannot restart service/cluster, when not stable")
|
||||||
|
return
|
||||||
|
|
||||||
|
# self._isRestarting = True
|
||||||
|
if self.isActive():
|
||||||
|
self.stopTaosServices()
|
||||||
|
else:
|
||||||
|
Logging.warning("Service not active when restart requested")
|
||||||
|
|
||||||
|
self.startTaosServices()
|
||||||
|
# self._isRestarting = False
|
||||||
|
|
||||||
|
# def isRunning(self):
|
||||||
|
# return self.svcMgrThread != None
|
||||||
|
|
||||||
|
# def isRestarting(self):
|
||||||
|
# return self._isRestarting
|
||||||
|
|
||||||
|
class ServiceManagerThread:
|
||||||
|
"""
|
||||||
|
A class representing a dedicated thread which manages the "sub process"
|
||||||
|
of the TDengine service, interacting with its STDOUT/ERR.
|
||||||
|
|
||||||
|
It takes a TdeInstance parameter at creation time, or create a default
|
||||||
|
"""
|
||||||
|
MAX_QUEUE_SIZE = 10000
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# Set the sub process
|
||||||
|
self._tdeSubProcess = None # type: TdeSubProcess
|
||||||
|
|
||||||
|
# Arrange the TDengine instance
|
||||||
|
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
|
||||||
|
# self._tInst = tInst or TdeInstance() # Need an instance
|
||||||
|
|
||||||
|
self._thread = None # The actual thread, # type: threading.Thread
|
||||||
|
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "[SvcMgrThread: status={}, subProc={}]".format(
|
||||||
|
self.getStatus(), self._tdeSubProcess)
|
||||||
|
|
||||||
|
def getStatus(self):
|
||||||
|
return self._status
|
||||||
|
|
||||||
|
# Start the thread (with sub process), and wait for the sub service
|
||||||
|
# to become fully operational
|
||||||
|
def start(self, cmdLine):
|
||||||
|
if self._thread:
|
||||||
|
raise RuntimeError("Unexpected _thread")
|
||||||
|
if self._tdeSubProcess:
|
||||||
|
raise RuntimeError("TDengine sub process already created/running")
|
||||||
|
|
||||||
|
Logging.info("Attempting to start TAOS service: {}".format(self))
|
||||||
|
|
||||||
|
self._status.set(Status.STATUS_STARTING)
|
||||||
|
self._tdeSubProcess = TdeSubProcess()
|
||||||
|
self._tdeSubProcess.start(cmdLine)
|
||||||
|
|
||||||
|
self._ipcQueue = Queue()
|
||||||
|
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||||
|
target=self.svcOutputReader,
|
||||||
|
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
|
||||||
|
self._thread.daemon = True # thread dies with the program
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||||
|
target=self.svcErrorReader,
|
||||||
|
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
|
||||||
|
self._thread2.daemon = True # thread dies with the program
|
||||||
|
self._thread2.start()
|
||||||
|
|
||||||
|
# wait for service to start
|
||||||
|
for i in range(0, 100):
|
||||||
|
time.sleep(1.0)
|
||||||
|
# self.procIpcBatch() # don't pump message during start up
|
||||||
|
print("_zz_", end="", flush=True)
|
||||||
|
if self._status.isRunning():
|
||||||
|
Logging.info("[] TDengine service READY to process requests")
|
||||||
|
Logging.info("[] TAOS service started: {}".format(self))
|
||||||
|
# self._verifyDnode(self._tInst) # query and ensure dnode is ready
|
||||||
|
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
|
||||||
|
return # now we've started
|
||||||
|
# TODO: handle failure-to-start better?
|
||||||
|
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
|
||||||
|
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
|
||||||
|
|
||||||
|
def _verifyDnode(self, tInst: TdeInstance):
|
||||||
|
dbc = DbConn.createNative(tInst.getDbTarget())
|
||||||
|
dbc.open()
|
||||||
|
dbc.query("show dnodes")
|
||||||
|
# dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
|
||||||
|
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
|
||||||
|
# ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
|
||||||
|
isValid = False
|
||||||
|
for col in cols:
|
||||||
|
# print("col = {}".format(col))
|
||||||
|
ep = col[1].split(':') # 10.1.30.2:6030
|
||||||
|
print("Found ep={}".format(ep))
|
||||||
|
if tInst.getPort() == int(ep[1]): # That's us
|
||||||
|
# print("Valid Dnode matched!")
|
||||||
|
isValid = True # now we are valid
|
||||||
|
break
|
||||||
|
if not isValid:
|
||||||
|
print("Failed to start dnode, sleep for a while")
|
||||||
|
time.sleep(600)
|
||||||
|
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
|
||||||
|
format(tInst.getPort()))
|
||||||
|
dbc.close()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
# can be called from both main thread or signal handler
|
||||||
|
print("Terminating TDengine service running as the sub process...")
|
||||||
|
if self.getStatus().isStopped():
|
||||||
|
print("Service already stopped")
|
||||||
|
return
|
||||||
|
if self.getStatus().isStopping():
|
||||||
|
print("Service is already being stopped")
|
||||||
|
return
|
||||||
|
# Linux will send Control-C generated SIGINT to the TDengine process
|
||||||
|
# already, ref:
|
||||||
|
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
|
||||||
|
if not self._tdeSubProcess:
|
||||||
|
raise RuntimeError("sub process object missing")
|
||||||
|
|
||||||
|
self._status.set(Status.STATUS_STOPPING)
|
||||||
|
# retCode = self._tdeSubProcess.stop()
|
||||||
|
try:
|
||||||
|
retCode = self._tdeSubProcess.stop()
|
||||||
|
# print("Attempted to stop sub process, got return code: {}".format(retCode))
|
||||||
|
if retCode == signal.SIGSEGV : # SGV
|
||||||
|
Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
||||||
|
except subprocess.TimeoutExpired as err:
|
||||||
|
print("Time out waiting for TDengine service process to exit")
|
||||||
|
else:
|
||||||
|
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
||||||
|
print("FAILED to stop sub process, it is still running... pid = {}".format(
|
||||||
|
self._tdeSubProcess.getPid()))
|
||||||
|
else:
|
||||||
|
self._tdeSubProcess = None # not running any more
|
||||||
|
self.join() # stop the thread, change the status, etc.
|
||||||
|
|
||||||
|
# Check if it's really stopped
|
||||||
|
outputLines = 10 # for last output
|
||||||
|
if self.getStatus().isStopped():
|
||||||
|
self.procIpcBatch(outputLines) # one last time
|
||||||
|
Logging.debug("End of TDengine Service Output: {}".format(self))
|
||||||
|
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
|
||||||
|
else:
|
||||||
|
print("WARNING: SMT did not terminate as expected: {}".format(self))
|
||||||
|
|
||||||
|
def join(self):
|
||||||
|
# TODO: sanity check
|
||||||
|
if not self.getStatus().isStopping():
|
||||||
|
raise RuntimeError(
|
||||||
|
"SMT.Join(): Unexpected status: {}".format(self._status))
|
||||||
|
|
||||||
|
if self._thread:
|
||||||
|
self._thread.join()
|
||||||
|
self._thread = None
|
||||||
|
self._status.set(Status.STATUS_STOPPED)
|
||||||
|
# STD ERR thread
|
||||||
|
self._thread2.join()
|
||||||
|
self._thread2 = None
|
||||||
|
else:
|
||||||
|
print("Joining empty thread, doing nothing")
|
||||||
|
|
||||||
|
def _trimQueue(self, targetSize):
|
||||||
|
if targetSize <= 0:
|
||||||
|
return # do nothing
|
||||||
|
q = self._ipcQueue
|
||||||
|
if (q.qsize() <= targetSize): # no need to trim
|
||||||
|
return
|
||||||
|
|
||||||
|
Logging.debug("Triming IPC queue to target size: {}".format(targetSize))
|
||||||
|
itemsToTrim = q.qsize() - targetSize
|
||||||
|
for i in range(0, itemsToTrim):
|
||||||
|
try:
|
||||||
|
q.get_nowait()
|
||||||
|
except Empty:
|
||||||
|
break # break out of for loop, no more trimming
|
||||||
|
|
||||||
|
TD_READY_MSG = "TDengine is initialized successfully"
|
||||||
|
|
||||||
|
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
|
||||||
|
self._trimQueue(trimToTarget) # trim if necessary
|
||||||
|
# Process all the output generated by the underlying sub process,
|
||||||
|
# managed by IO thread
|
||||||
|
print("<", end="", flush=True)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
line = self._ipcQueue.get_nowait() # getting output at fast speed
|
||||||
|
self._printProgress("_o")
|
||||||
|
except Empty:
|
||||||
|
# time.sleep(2.3) # wait only if there's no output
|
||||||
|
# no more output
|
||||||
|
print(".>", end="", flush=True)
|
||||||
|
return # we are done with THIS BATCH
|
||||||
|
else: # got line, printing out
|
||||||
|
if forceOutput:
|
||||||
|
Logging.info(line)
|
||||||
|
else:
|
||||||
|
Logging.debug(line)
|
||||||
|
print(">", end="", flush=True)
|
||||||
|
|
||||||
|
_ProgressBars = ["--", "//", "||", "\\\\"]
|
||||||
|
|
||||||
|
def _printProgress(self, msg): # TODO: assuming 2 chars
|
||||||
|
print(msg, end="", flush=True)
|
||||||
|
pBar = self._ProgressBars[Dice.throw(4)]
|
||||||
|
print(pBar, end="", flush=True)
|
||||||
|
print('\b\b\b\b', end="", flush=True)
|
||||||
|
|
||||||
|
def svcOutputReader(self, out: IO, queue):
|
||||||
|
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||||
|
# print("This is the svcOutput Reader...")
|
||||||
|
# for line in out :
|
||||||
|
for line in iter(out.readline, b''):
|
||||||
|
# print("Finished reading a line: {}".format(line))
|
||||||
|
# print("Adding item to queue...")
|
||||||
|
try:
|
||||||
|
line = line.decode("utf-8").rstrip()
|
||||||
|
except UnicodeError:
|
||||||
|
print("\nNon-UTF8 server output: {}\n".format(line))
|
||||||
|
|
||||||
|
# This might block, and then causing "out" buffer to block
|
||||||
|
queue.put(line)
|
||||||
|
self._printProgress("_i")
|
||||||
|
|
||||||
|
if self._status.isStarting(): # we are starting, let's see if we have started
|
||||||
|
if line.find(self.TD_READY_MSG) != -1: # found
|
||||||
|
Logging.info("Waiting for the service to become FULLY READY")
|
||||||
|
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
|
||||||
|
Logging.info("Service is now FULLY READY") # TODO: more ID info here?
|
||||||
|
self._status.set(Status.STATUS_RUNNING)
|
||||||
|
|
||||||
|
# Trim the queue if necessary: TODO: try this 1 out of 10 times
|
||||||
|
self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10) # trim to 90% size
|
||||||
|
|
||||||
|
if self._status.isStopping(): # TODO: use thread status instead
|
||||||
|
# WAITING for stopping sub process to finish its outptu
|
||||||
|
print("_w", end="", flush=True)
|
||||||
|
|
||||||
|
# queue.put(line)
|
||||||
|
# meaning sub process must have died
|
||||||
|
Logging.info("\nEnd of stream detected for TDengine STDOUT: {}".format(self))
|
||||||
|
out.close()
|
||||||
|
|
||||||
|
def svcErrorReader(self, err: IO, queue):
|
||||||
|
for line in iter(err.readline, b''):
|
||||||
|
print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
|
||||||
|
Logging.info("\nEnd of stream detected for TDengine STDERR: {}".format(self))
|
||||||
|
err.close()
|
|
@ -0,0 +1,23 @@
|
||||||
|
# -----!/usr/bin/python3.7
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from crash_gen.crash_gen import MainExec
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
mExec = MainExec()
|
||||||
|
mExec.init()
|
||||||
|
exitCode = mExec.run()
|
||||||
|
|
||||||
|
print("Exiting with code: {}".format(exitCode))
|
||||||
|
sys.exit(exitCode)
|
|
@ -96,6 +96,12 @@ class TDTestCase:
|
||||||
tdSql.query("select * from st order by ts desc")
|
tdSql.query("select * from st order by ts desc")
|
||||||
self.checkColumnSorted(0, "desc")
|
self.checkColumnSorted(0, "desc")
|
||||||
|
|
||||||
|
print("======= step 2: verify order for special column =========")
|
||||||
|
|
||||||
|
tdSql.query("select tbcol1 from st order by ts desc")
|
||||||
|
|
||||||
|
tdSql.query("select tbcol6 from st order by ts desc")
|
||||||
|
|
||||||
for i in range(1, 10):
|
for i in range(1, 10):
|
||||||
tdSql.error("select * from st order by tbcol%d" % i)
|
tdSql.error("select * from st order by tbcol%d" % i)
|
||||||
tdSql.error("select * from st order by tbcol%d asc" % i)
|
tdSql.error("select * from st order by tbcol%d asc" % i)
|
||||||
|
|
Loading…
Reference in New Issue