This commit is contained in:
cpwu 2022-07-19 16:39:50 +08:00
parent 67ecaaec61
commit 83fe51f6d9
5 changed files with 601 additions and 195 deletions

View File

@ -17,6 +17,7 @@ import requests
import time
import socket
import json
import toml
from .boundary import DataBoundary
import taos
from util.log import *
@ -443,7 +444,9 @@ class TDCom:
return buildPath
def getClientCfgPath(self):
buildPath = self.getBuildPath()
# buildPath = self.getBuildPath()
buildPath = get_path()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
@ -752,4 +755,29 @@ def is_json(msg):
else:
return False
def get_path(tool="taosd"):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files or ("%s.exe"%tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
return ""
return paths[0]
def dict2toml(in_dict: dict, file:str):
if not isinstance(in_dict, dict):
return ""
with open(file, 'w') as f:
toml.dump(in_dict, f)
tdCom = TDCom()

View File

@ -96,9 +96,9 @@ class TDSimClient:
for key, value in self.cfgDict.items():
self.cfg(key, value)
try:
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
for key, value in clientCfg.items():
self.cfg(key, value)
@ -244,7 +244,6 @@ class TDDnode:
# print(updatecfgDict)
isFirstDir = 1
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
print(updatecfgDict[0][0])
for key, value in updatecfgDict[0][0].items():
if key == "clientCfg" and self.remoteIP == "" and not platform.system().lower() == 'windows':
continue
@ -324,7 +323,6 @@ class TDDnode:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
print("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
if self.valgrind == 0:
time.sleep(0.1)
@ -358,7 +356,7 @@ class TDDnode:
# break
# elif bkey2 in line:
# popen.kill()
# break
# break
# if time.time() > timeout:
# print(time.time(),timeout)
# tdLog.exit('wait too long for taosd start')
@ -407,7 +405,6 @@ class TDDnode:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
print("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
if self.valgrind == 0:
time.sleep(0.1)
@ -655,7 +652,6 @@ class TDDnodes:
def stoptaosd(self, index):
self.check(index)
self.dnodes[index - 1].stoptaosd()
def start(self, index):
self.check(index)

View File

@ -0,0 +1,254 @@
from fabric2 import Connection
from util.log import *
from util.common import *
class TAdapter:
def __init__(self):
self.running = 0
self.deployed = 0
self.remoteIP = ""
self.taosadapter_cfg_dict = {
"debug" : False,
"taosConfigDir" : "",
"port" : 6041,
"logLevel" : "info",
"cors" : {
"allowAllOrigins" : True,
},
"pool" : {
"maxConnect" : 4000,
"maxIdle" : 4000,
"idleTimeout" : "1h"
},
"ssl" : {
"enable" : False,
"certFile" : "",
"keyFile" : "",
},
"log" : {
"path" : "",
"rotationCount" : 30,
"rotationTime" : "24h",
"rotationSize" : "1GB",
"enableRecordHttpSql" : False,
"sqlRotationCount" : 2,
"sqlRotationTime" : "24h",
"sqlRotationSize" : "1GB",
},
"monitor" : {
"collectDuration" : "3s",
"incgroup" : False,
"pauseQueryMemoryThreshold" : 70,
"pauseAllMemoryThreshold" : 80,
"identity" : "",
"writeToTD" : True,
"user" : "root",
"password" : "taosdata",
"writeInterval" : "30s"
}
}
# TODO: add taosadapter env:
# 1. init cfg.toml.dict OK
# 2. dump dict to toml OK
# 3. update cfg.toml.dict OK
# 4. check adapter exists OK
# 5. deploy adapter cfg OK
# 6. adapter start OK
# 7. adapter stop
def init(self, path, remoteIP=""):
self.path = path
self.remoteIP = remoteIP
binPath = get_path() + "/../../../"
binPath = os.path.realpath(binPath)
if path == "":
self.path = os.path.abspath(binPath + "../../")
else:
self.path = os.path.realpath(path)
if self.remoteIP:
try:
self.config = eval(remoteIP)
self.remote_conn = Connection(host=self.config["host"], port=self.config["port"], user=self.config["user"], connect_kwargs={'password':self.config["password"]})
except Exception as e:
tdLog.notice(e)
def update_cfg(self, update_dict :dict):
if not isinstance(update_dict, dict):
return
if "log" in update_dict and "path" in update_dict["log"]:
del update_dict["log"]["path"]
for key, value in update_dict.items():
if key in ["cors", "pool", "ssl", "log", "monitor", "opentsdb", "influxdb", "statsd", "collectd", "opentsdb_telnet", "node_exporter", "prometheus"]:
if isinstance(value, dict):
for k, v in value.items():
self.taosadapter_cfg_dict[key][k] = v
else:
self.taosadapter_cfg_dict[key] = value
def check_adapter(self):
if getPath(tool="taosadapter"):
return False
else:
return True
def remote_exec(self, updateCfgDict, execCmd):
remoteCfgDict = copy.deepcopy(updateCfgDict)
if "log" in remoteCfgDict and "path" in remoteCfgDict["log"]:
del remoteCfgDict["log"]["path"]
remoteCfgDictStr = base64.b64encode(toml.dumps(remoteCfgDict).encode()).decode()
execCmdStr = base64.b64encode(execCmd.encode()).decode()
with self.remote_conn.cd((self.config["path"]+sys.path[0].replace(self.path, '')).replace('\\','/')):
self.remote_conn.run(f"python3 ./test.py -D {remoteCfgDictStr} -e {execCmdStr}" )
def cfg(self, option, value):
cmd = f"echo {option} = {value} >> {self.cfg_path}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
def deploy(self, *update_cfg_dict):
self.log_dir = f"{self.path}/sim/dnode1/log"
self.cfg_dir = f"{self.path}/sim/dnode1/cfg"
self.cfg_path = f"{self.cfg_dir}/taosadapter.toml"
cmd = f"touch {self.cfg_path}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.taosadapter_cfg_dict["log"]["path"] = self.log_dir
if bool(update_cfg_dict):
self.update_cfg(update_dict=update_cfg_dict)
if (self.remoteIP == ""):
dict2toml(self.taosadapter_cfg_dict, self.cfg_path)
else:
self.remote_exec(self.taosadapter_cfg_dict, "tAdapter.deploy(update_cfg_dict)")
self.deployed = 1
tdLog.debug(f"taosadapter is deployed and configured by {self.cfg_path}")
def start(self):
bin_path = get_path(tool="taosadapter")
if (bin_path == ""):
tdLog.exit("taosadapter not found!")
else:
tdLog.info(f"taosadapter found: {bin_path}")
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {bin_path} -c {self.cfg_dir}"
else:
cmd = f"nohup {bin_path} -c {self.cfg_path} > /dev/null 2>&1 & "
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, f"tAdapter.deployed=1\ntAdapter.log_dir={self.log_dir}\ntAdapter.cfg_dir={self.cfg_dir}\ntAdapter.start()")
self.running = 1
else:
os.system(f"rm -rf {self.log_dir}/taosadapter*")
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
tdLog.debug(f"taosadapter is running with {cmd} " )
time.sleep(0.1)
key = 'all plugin init finish'
bkey = bytes(key, encoding="utf8")
logFile = self.log_dir + "/taosadapter*"
file_exists = False
i = 0
while (not file_exists):
for file in os.listdir(self.log_dir):
if "taosadapter" in file:
file_exists = True
break
sleep(0.1)
i += 1
if i > 50:
tdLog.notice("log file is too long to create")
break
tailCmdStr = 'tail -f '
popen = subprocess.Popen(
tailCmdStr + logFile,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
# pid = popen.pid
# print('Popen.pid:' + str(pid))
timeout = time.time() + 60 * 2
while True:
line = popen.stdout.readline().strip()
if bkey in line:
popen.kill()
break
if time.time() > timeout:
tdLog.exit('wait too long for taosadapter start')
tdLog.debug("the taosadapter has been started.")
def start_taosadapter(self):
"""
use this method, must deploy taosadapter
"""
bin_path = get_path(tool="taosadapter")
if (bin_path == ""):
tdLog.exit("taosadapter not found!")
else:
tdLog.info(f"taosadapter found: {bin_path}")
if self.deployed == 0:
tdLog.exit("taosadapter is not deployed")
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {bin_path} -c {self.cfg_dir}"
else:
cmd = f"nohup {bin_path} -c {self.cfg_path} > /dev/null 2>&1 & "
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, f"tAdapter.deployed=1\ntAdapter.log_dir={self.log_dir}\ntAdapter.cfg_dir={self.cfg_dir}\ntAdapter.start()")
self.running = 1
else:
if os.system(cmd) != 0:
tdLog.exit(cmd)
self.running = 1
tdLog.debug(f"taosadapter is running with {cmd} " )
time.sleep(0.1)
def stop(self, force_kill=False):
signal = "-SIGKILL" if force_kill else "-SIGTERM"
if self.remoteIP:
self.remote_exec(self.taosadapter_cfg_dict, "tAdapter.running=1\ntAdapter.stop()")
tdLog.info("stop taosadapter")
return
toBeKilled = "taosadapter"
if self.running != 0:
psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'"
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = f"kill {signal} {processID} > /dev/null 2>&1"
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows':
for port in range(6030, 6041):
fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
os.system(fuserCmd)
self.running = 0
tdLog.debug(f"taosadapter is stopped by kill {signal}")
tAdapter = TAdapter()

View File

@ -1,5 +1,7 @@
import datetime
from dataclasses import dataclass, field
from typing import List, Any, Tuple
from util.log import *
from util.sql import *
from util.cases import *
@ -7,22 +9,57 @@ from util.dnodes import *
PRIMARY_COL = "ts"
INT_COL = "c1"
BINT_COL = "c2"
SINT_COL = "c3"
TINT_COL = "c4"
FLOAT_COL = "c5"
DOUBLE_COL = "c6"
BOOL_COL = "c7"
INT_COL = "c_int"
BINT_COL = "c_bint"
SINT_COL = "c_sint"
TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
SINT_UN_COL = "c_usint"
BINT_UN_COL = "c_ubint"
INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
BINARY_COL = "c8"
NCHAR_COL = "c9"
TS_COL = "c10"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
TAG_COL = [INT_TAG]
# insert data args
TIME_STEP = 10000
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = f"{DBNAME}.stb1"
CTBNAME = f"{DBNAME}.ct1"
NTBNAME = f"{DBNAME}.nt1"
@dataclass
class DataSet:
ts_data : List[int] = field(default_factory=list)
int_data : List[int] = field(default_factory=list)
bint_data : List[int] = field(default_factory=list)
sint_data : List[int] = field(default_factory=list)
tint_data : List[int] = field(default_factory=list)
int_un_data : List[int] = field(default_factory=list)
bint_un_data: List[int] = field(default_factory=list)
sint_un_data: List[int] = field(default_factory=list)
tint_un_data: List[int] = field(default_factory=list)
float_data : List[float] = field(default_factory=list)
double_data : List[float] = field(default_factory=list)
bool_data : List[int] = field(default_factory=list)
binary_data : List[str] = field(default_factory=list)
nchar_data : List[str] = field(default_factory=list)
NUM_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [ BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [ BOOL_COL, ]
TS_TYPE_COL = [ TS_COL, ]
class TDTestCase:
@ -52,12 +89,12 @@ class TDTestCase:
return query_condition
def __join_condition(self, tb_list, filter=PRIMARY_COL, INNER=False):
def __join_condition(self, tb_list, filter=PRIMARY_COL, INNER=False, alias_tb1="tb1", alias_tb2="tb2"):
table_reference = tb_list[0]
join_condition = table_reference
join = "inner join" if INNER else "join"
for i in range(len(tb_list[1:])):
join_condition += f" {join} {tb_list[i+1]} on {table_reference}.{filter}={tb_list[i+1]}.{filter}"
join_condition += f" as {alias_tb1} {join} {tb_list[i+1]} as {alias_tb2} on {alias_tb1}.{filter}={alias_tb2}.{filter}"
return join_condition
@ -103,19 +140,19 @@ class TDTestCase:
return f"select {select_clause} from {from_clause} {where_condition} {group_condition}"
@property
def __join_tblist(self):
def __join_tblist(self, dbname=DBNAME):
return [
# ["ct1", "ct2"],
["ct1", "ct4"],
["ct1", "t1"],
[f"{dbname}.ct1", f"{dbname}.ct4"],
[f"{dbname}.ct1", f"{dbname}.nt1"],
# ["ct2", "ct4"],
# ["ct2", "t1"],
# ["ct4", "t1"],
# ["ct2", "nt1"],
# ["ct4", "nt1"],
# ["ct1", "ct2", "ct4"],
# ["ct1", "ct2", "t1"],
# ["ct1", "ct4", "t1"],
# ["ct2", "ct4", "t1"],
# ["ct1", "ct2", "ct4", "t1"],
# ["ct1", "ct2", "nt1"],
# ["ct1", "ct4", "nt1"],
# ["ct2", "ct4", "nt1"],
# ["ct1", "ct2", "ct4", "nt1"],
]
@property
@ -123,28 +160,29 @@ class TDTestCase:
sqls = []
__join_tblist = self.__join_tblist
for join_tblist in __join_tblist:
for join_tb in join_tblist:
select_claus_list = self.__query_condition(join_tb)
for select_claus in select_claus_list:
group_claus = self.__group_condition( col=select_claus)
where_claus = self.__where_condition( query_conditon=select_claus )
having_claus = self.__group_condition( col=select_claus, having=f"{select_claus} is not null" )
sqls.extend(
(
# self.__gen_sql(select_claus, self.__join_condition(join_tblist), where_claus, group_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist), where_claus, having_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist), where_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist), group_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist), having_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist)),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True), where_claus, group_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True), where_claus, having_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True), where_claus, ),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True), having_claus ),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True), group_claus ),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True) ),
)
alias_tb = "tb1"
# for join_tb in join_tblist:
select_claus_list = self.__query_condition(alias_tb)
for select_claus in select_claus_list:
group_claus = self.__group_condition( col=select_claus)
where_claus = self.__where_condition( query_conditon=select_claus )
having_claus = self.__group_condition( col=select_claus, having=f"{select_claus} is not null" )
sqls.extend(
(
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb), where_claus, group_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb), where_claus, having_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb), where_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb), group_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb), having_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, alias_tb1=alias_tb)),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb), where_claus, group_claus),
self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb), where_claus, having_claus),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb), where_claus, ),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb), having_claus ),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb), group_claus ),
# self.__gen_sql(select_claus, self.__join_condition(join_tblist, INNER=True, alias_tb1=alias_tb) ),
)
)
return list(filter(None, sqls))
def __join_check(self,):
@ -172,7 +210,7 @@ class TDTestCase:
tdSql.error(sql=sql)
break
if len(tblist) == 2:
if "ct1" in tblist or "t1" in tblist:
if "ct1" in tblist or "nt1" in tblist:
self.__join_current(sql, checkrows)
elif where_condition or "not null" in group_condition:
self.__join_current(sql, checkrows + 2 )
@ -187,14 +225,14 @@ class TDTestCase:
tdSql.query(sql=sql)
# tdSql.checkRows(checkrows)
def __test_error(self):
def __test_error(self, dbname=DBNAME):
# sourcery skip: extract-duplicate-method, move-assign-in-block
tdLog.printNoPrefix("==========err sql condition check , must return error==========")
err_list_1 = ["ct1","ct2", "ct4"]
err_list_2 = ["ct1","ct2", "t1"]
err_list_3 = ["ct1","ct4", "t1"]
err_list_4 = ["ct2","ct4", "t1"]
err_list_5 = ["ct1", "ct2","ct4", "t1"]
err_list_1 = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.ct4"]
err_list_2 = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.nt1"]
err_list_3 = [f"{dbname}.ct1", f"{dbname}.ct4", f"{dbname}.nt1"]
err_list_4 = [f"{dbname}.ct2", f"{dbname}.ct4", f"{dbname}.nt1"]
err_list_5 = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.ct4", f"{dbname}.nt1"]
self.__join_check_old(err_list_1, -1)
tdLog.printNoPrefix(f"==========err sql condition check in {err_list_1} over==========")
self.__join_check_old(err_list_2, -1)
@ -208,16 +246,16 @@ class TDTestCase:
self.__join_check_old(["ct2", "ct4"], -1, join_flag=False)
tdLog.printNoPrefix("==========err sql condition check in has no join condition over==========")
tdSql.error( f"select c1, c2 from ct2, ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from ct2, ct4 where ct2.{INT_COL}=ct4.{INT_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from ct2, ct4 where ct2.{TS_COL}=ct4.{TS_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from ct2, ct4 where ct2.{PRIMARY_COL}=ct4.{TS_COL}" )
tdSql.error( f"select ct2.c1, ct1.c2 from ct2, ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL}" )
tdSql.error( f"select ct2.c1, ct4.c2 from ct2, ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL} and c1 is not null " )
tdSql.error( f"select ct2.c1, ct4.c2 from ct2, ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL} and ct1.c1 is not null " )
tdSql.error( f"select c1, c2 from {dbname}.ct2, {dbname}.ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{INT_COL}=ct4.{INT_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{TS_COL}=ct4.{TS_COL}" )
tdSql.error( f"select ct2.c1, ct2.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{PRIMARY_COL}=ct4.{TS_COL}" )
tdSql.error( f"select ct2.c1, ct1.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL}" )
tdSql.error( f"select ct2.c1, ct4.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL} and c1 is not null " )
tdSql.error( f"select ct2.c1, ct4.c2 from {dbname}.ct2 as ct2, {dbname}.ct4 as ct4 where ct2.{PRIMARY_COL}=ct4.{PRIMARY_COL} and ct1.c1 is not null " )
tbname = ["ct1", "ct2", "ct4", "t1"]
tbname = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.ct4", f"{dbname}.nt1"]
# for tb in tbname:
# for errsql in self.__join_err_check(tb):
@ -230,124 +268,147 @@ class TDTestCase:
self.__test_error()
def __create_tb(self):
tdSql.prepare()
def __create_tb(self, stb="stb1", ctb_num=20, ntbnum=1, dbname=DBNAME):
create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags ({INT_TAG} int)
'''
for i in range(ntbnum):
tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (tag1 int)
'''
create_ntb_sql = f'''create table t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
)
'''
create_ntb_sql = f'''create table {dbname}.nt{i+1}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
'''
tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
{ i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}
for i in range(ctb_num):
tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
def __insert_data(self, rows):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows):
tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f'''insert into ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
'''
)
tdSql.execute(
f'''insert into ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
{ now_time + 5184000000}, {pow(2,31)-pow(2,15)}, {pow(2,63)-pow(2,30)}, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000}
)
(
{ now_time + 2592000000 }, {pow(2,31)-pow(2,16)}, {pow(2,63)-pow(2,31)}, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000}
)
'''
)
tdSql.execute(
f'''insert into ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
{ now_time + 5184000000 }, { -1 * pow(2,31) + pow(2,15) }, { -1 * pow(2,63) + pow(2,30) }, -32766, -126,
{ -1 * 3.2 * pow(10,38) }, { -1.2 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 2592000000 }, { -1 * pow(2,31) + pow(2,16) }, { -1 * pow(2,63) + pow(2,31) }, -32767, -127,
{ - 3.3 * pow(10,38) }, { -1.3 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
'''
)
def __data_set(self, rows):
data_set = DataSet()
for i in range(rows):
insert_data = f'''insert into t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
'''
tdSql.execute(insert_data)
tdSql.execute(
f'''insert into t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7200000 }, { pow(2,31) - pow(2,15) }, { pow(2,63) - pow(2,30) }, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 },
"binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 3600000 } , { pow(2,31) - pow(2,16) }, { pow(2,63) - pow(2,31) }, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 },
"binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
data_set.ts_data.append(NOW + 1 * (rows - i))
data_set.int_data.append(rows - i)
data_set.bint_data.append(11111 * (rows - i))
data_set.sint_data.append(111 * (rows - i) % 32767)
data_set.tint_data.append(11 * (rows - i) % 127)
data_set.int_un_data.append(rows - i)
data_set.bint_un_data.append(11111 * (rows - i))
data_set.sint_un_data.append(111 * (rows - i) % 32767)
data_set.tint_un_data.append(11 * (rows - i) % 127)
data_set.float_data.append(1.11 * (rows - i))
data_set.double_data.append(1100.0011 * (rows - i))
data_set.bool_data.append((rows - i) % 2)
data_set.binary_data.append(f'binary{(rows - i)}')
data_set.nchar_data.append(f'nchar_测试_{(rows - i)}')
return data_set
def __insert_data(self, dbname=DBNAME):
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
data = self.__data_set(rows=self.rows)
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
null_data = '''null, null, null, null, null, null, null, null, null, null, null, null, null, null'''
zero_data = "0, 0, 0, 0, 0, 0, 0, 'binary_0', 'nchar_0', 0, 0, 0, 0, 0"
for i in range(self.rows):
row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.tint_un_data[i]},
{data.sint_un_data[i]}, {data.int_un_data[i]}, {data.bint_un_data[i]}
'''
)
neg_row_data = f'''
{-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.tint_un_data[i]},
{1 * data.sint_un_data[i]}, {1 * data.int_un_data[i]}, {1 * data.bint_un_data[i]}
'''
tdSql.execute( f"insert into {dbname}.ct1 values ( {NOW - i * TIME_STEP}, {row_data} )" )
tdSql.execute( f"insert into {dbname}.ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )" )
tdSql.execute( f"insert into {dbname}.ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )" )
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )" )
tdSql.execute( f"insert into {dbname}.ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.ct2 values ( {NOW - self.rows * int(TIME_STEP * 0.29) }, {null_data} )" )
tdSql.execute( f"insert into {dbname}.ct4 values ( {NOW + int(TIME_STEP * 0.8)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )" )
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
self.__create_tb()
self.__create_tb(dbname=DBNAME)
tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10
self.__insert_data(self.rows)
self.__insert_data(dbname=DBNAME)
tdLog.printNoPrefix("==========step3:all check")
tdSql.query(f"select count(*) from {DBNAME}.ct1")
tdSql.checkData(0, 0, self.rows)
self.all_test()
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.printNoPrefix("==========step4:cross db check")
dbname1 = "db1"
tdSql.execute(f"create database {dbname1} duration 432000m")
tdSql.execute(f"use {dbname1}")
self.__create_tb(dbname=dbname1)
self.__insert_data(dbname=dbname1)
tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows)
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows)
tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows + 3)
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts")
tdSql.checkRows(self.rows * 3 + 6)
tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows)
tdSql.query("select count(*) from db1.ct1")
tdSql.checkData(0, 0, self.rows)
self.all_test()
tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows)
tdSql.query("select count(*) from db1.ct1")
tdSql.checkData(0, 0, self.rows)
tdSql.execute(f"flush database {DBNAME}")
tdSql.execute(f"flush database {dbname1}")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.execute("use db")
tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows)
tdSql.query("select count(*) from db1.ct1")
tdSql.checkData(0, 0, self.rows)
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows)
def stop(self):
tdSql.close()

View File

@ -22,11 +22,14 @@ import json
import platform
import socket
import threading
import toml
sys.path.append("../pytest")
from util.log import *
from util.dnodes import *
from util.cases import *
from util.cluster import *
from util.taosadapter import *
import taos
import taosrest
@ -64,12 +67,13 @@ if __name__ == "__main__":
dnodeNums = 1
mnodeNums = 0
updateCfgDict = {}
adapter_cfg_dict = {}
execCmd = ""
queryPolicy = 1
createDnodeNums = 1
restful = False
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:R', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful'])
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RD:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums','queryPolicy','createDnodeNums','restful','adaptercfgupdate'])
for key, value in opts:
if key in ['-h', '--help']:
tdLog.printNoPrefix(
@ -90,6 +94,7 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-Q set queryPolicy in one dnode')
tdLog.printNoPrefix('-C create Dnode Numbers in one cluster')
tdLog.printNoPrefix('-R restful realization form')
tdLog.printNoPrefix('-D taosadapter update cfg dict ')
sys.exit(0)
@ -138,7 +143,7 @@ if __name__ == "__main__":
try:
execCmd = base64.b64decode(value.encode()).decode()
except:
print('updateCfgDict convert fail.')
print('execCmd run fail.')
sys.exit(0)
if key in ['-N', '--dnodeNums']:
@ -156,8 +161,18 @@ if __name__ == "__main__":
if key in ['-R', '--restful']:
restful = True
if key in ['-D', '--adaptercfgupdate']:
try:
adaptercfgupdate = eval(base64.b64decode(value.encode()).decode())
except:
print('adapter cfg update convert fail.')
sys.exit(0)
if not execCmd == "":
tdDnodes.init(deployPath)
if restful:
tAdapter.init(deployPath)
else:
tdDnodes.init(deployPath)
print(execCmd)
exec(execCmd)
quit()
@ -190,6 +205,31 @@ if __name__ == "__main__":
if valgrind:
time.sleep(2)
if restful:
toBeKilled = "taosadapter"
killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" % toBeKilled
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
processID = subprocess.check_output(psCmd, shell=True)
while(processID):
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
for port in range(6030, 6041):
usePortPID = "lsof -i tcp:%d | grep LISTEn | awk '{print $2}'" % port
processID = subprocess.check_output(usePortPID, shell=True)
if processID:
killCmd = "kill -TERM %s" % processID
os.system(killCmd)
fuserCmd = "fuser -k -n tcp %d" % port
os.system(fuserCmd)
tdLog.info('stop taosadapter')
tdLog.info('stop All dnodes')
if masterIp == "":
@ -219,6 +259,7 @@ if __name__ == "__main__":
except Exception as r:
print(r)
updateCfgDictStr = ''
# adapter_cfg_dict_str = ''
if is_test_framework:
moduleName = fileName.replace(".py", "").replace(os.sep, ".")
uModule = importlib.import_module(moduleName)
@ -227,30 +268,44 @@ if __name__ == "__main__":
if ((json.dumps(updateCfgDict) == '{}') and hasattr(ucase, 'updatecfgDict')):
updateCfgDict = ucase.updatecfgDict
updateCfgDictStr = "-d %s"%base64.b64encode(json.dumps(updateCfgDict).encode()).decode()
if ((json.dumps(adapter_cfg_dict) == '{}') and hasattr(ucase, 'taosadapter_cfg_dict')):
adapter_cfg_dict = ucase.taosadapter_cfg_dict
# adapter_cfg_dict_str = f"-D {base64.b64encode(toml.dumps(adapter_cfg_dict).encode()).decode()}"
except Exception as r:
print(r)
else:
pass
if restful:
tAdapter.init(deployPath, masterIp)
tAdapter.stop(force_kill=True)
if dnodeNums == 1 :
tdDnodes.deploy(1,updateCfgDict)
tdDnodes.start(1)
tdCases.logSql(logSql)
if restful:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if queryPolicy != 1:
queryPolicy=int(queryPolicy)
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
tdSql.init(conn.cursor())
tdSql.execute("create qnode on dnode 1")
tdSql.execute('alter local "queryPolicy" "%d"'%queryPolicy)
tdSql.query("show local variables;")
for i in range(tdSql.queryRows):
if tdSql.queryResult[i][0] == "queryPolicy" :
if int(tdSql.queryResult[i][1]) == int(queryPolicy):
tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
else :
tdLog.debug(tdSql.queryResult)
tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
if restful:
conn = taosrest.connect(url=f"http://{host}:6041")
else:
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
cursor = conn.cursor()
cursor.execute("create qnode on dnode 1")
cursor.execute(f'alter local "queryPolicy" "{queryPolicy}"')
cursor.execute("show local variables")
res = cursor.fetchall()
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" :
if int(res[i][1]) == int(queryPolicy):
tdLog.success(f'alter queryPolicy to {queryPolicy} successfully')
else:
tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
else :
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums,mnodeNums=mnodeNums)
@ -264,13 +319,16 @@ if __name__ == "__main__":
for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)
if restful:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if not restful:
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
conn = taosrest.connect(url=f"http://{host}:6041")
print(tdDnodes.getSimCfgPath(),host)
tdLog.info(tdDnodes.getSimCfgPath(),host)
if createDnodeNums == 1:
createDnodeNums=dnodeNums
else:
@ -285,9 +343,7 @@ if __name__ == "__main__":
conn = None
else:
if not restful:
conn = taos.connect(
host="%s"%(host),
config=tdDnodes.sim.getCfgDir())
conn = taos.connect(host="%s"%(host), config=tdDnodes.sim.getCfgDir())
else:
conn = taosrest.connect(url=f"http://{host}:6041")
if is_test_framework:
@ -314,18 +370,28 @@ if __name__ == "__main__":
ucase = uModule.TDTestCase()
if (json.dumps(updateCfgDict) == '{}'):
updateCfgDict = ucase.updatecfgDict
if (json.dumps(adapter_cfg_dict) == '{}'):
adapter_cfg_dict = ucase.taosadapter_cfg_dict
except:
pass
if restful:
tAdapter.init(deployPath, masterIp)
tAdapter.stop(force_kill=True)
if dnodeNums == 1 :
tdDnodes.deploy(1,updateCfgDict)
tdDnodes.start(1)
tdCases.logSql(logSql)
if restful:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if queryPolicy != 1:
queryPolicy=int(queryPolicy)
if not restful:
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
conn = taosrest.connect(url=f"http://{host}:6041")
# tdSql.init(conn.cursor())
@ -366,10 +432,13 @@ if __name__ == "__main__":
for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql)
if restful:
tAdapter.deploy(adapter_cfg_dict)
tAdapter.start()
if not restful:
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
conn = taosrest.connect(url=f"http://{host}:6041")
print(tdDnodes.getSimCfgPath(),host)
@ -394,9 +463,7 @@ if __name__ == "__main__":
else:
tdLog.info("Procedures for testing self-deployment")
if not restful:
conn = taos.connect(
host,
config=tdDnodes.getSimCfgPath())
conn = taos.connect(host,config=tdDnodes.getSimCfgPath())
else:
conn = taosrest.connect(url=f"http://{host}:6041")