test: add stream cases

This commit is contained in:
jiajingbin 2023-08-17 19:49:07 +08:00 committed by Haojun Liao
parent b11975f00c
commit 91269075c7
3 changed files with 1129 additions and 31 deletions

View File

@ -132,6 +132,13 @@ class TDCom:
self.full_type_list = ["tinyint", "smallint", "int", "bigint", "tinyint unsigned", "smallint unsigned", "int unsigned", "bigint unsigned", "float", "double", "binary", "nchar", "bool"] self.full_type_list = ["tinyint", "smallint", "int", "bigint", "tinyint unsigned", "smallint unsigned", "int unsigned", "bigint unsigned", "float", "double", "binary", "nchar", "bool"]
self.white_list = ["statsd", "node_exporter", "collectd", "icinga2", "tcollector", "information_schema", "performance_schema"] self.white_list = ["statsd", "node_exporter", "collectd", "icinga2", "tcollector", "information_schema", "performance_schema"]
self.Boundary = DataBoundary() self.Boundary = DataBoundary()
self.white_list = ["statsd", "node_exporter", "collectd", "icinga2", "tcollector", "information_schema", "performance_schema"]
self.case_name = str()
self.des_table_suffix = "_output"
self.stream_suffix = "_stream"
self.range_count = 5
self.default_interval = 5
self.stream_timeout = 12
# def init(self, conn, logSql): # def init(self, conn, logSql):
# # tdSql.init(conn.cursor(), logSql) # # tdSql.init(conn.cursor(), logSql)
@ -745,40 +752,711 @@ class TDCom:
""" """
return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count))) return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count)))
def is_json(msg): def is_json(msg):
if isinstance(msg, str): if isinstance(msg, str):
try: try:
json.loads(msg) json.loads(msg)
return True return True
except: except:
return False return False
else:
return False
def get_path(tool="taosd"):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else: else:
projPath = selfPath[:selfPath.find("tests")] return False
paths = [] def get_path(tool="taosd"):
for root, dirs, files in os.walk(projPath): selfPath = os.path.dirname(os.path.realpath(__file__))
if ((tool) in files or ("%s.exe"%tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
return ""
return paths[0]
def dict2toml(in_dict: dict, file:str): if ("community" in selfPath):
if not isinstance(in_dict, dict): projPath = selfPath[:selfPath.find("community")]
return "" else:
with open(file, 'w') as f: projPath = selfPath[:selfPath.find("tests")]
toml.dump(in_dict, f)
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files or ("%s.exe"%tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
return ""
return paths[0]
def dict2toml(in_dict: dict, file:str):
if not isinstance(in_dict, dict):
return ""
with open(file, 'w') as f:
toml.dump(in_dict, f)
# stream
def create_stream(self, stream_name, des_table, source_sql, trigger_mode=None, watermark=None, max_delay=None, ignore_expired=None, ignore_update=None, subtable_value=None, fill_value=None, fill_history_value=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False):
if subtable_value is None:
subtable = ""
else:
subtable = f'subtable({subtable_value})'
if fill_value is None:
fill = ""
else:
fill = f'fill({fill_value})'
if fill_history_value is None:
fill_history = ""
else:
fill_history = f'fill_history {fill_history_value}'
if use_exist_stb:
if stb_field_name_value is None:
stb_field_name = ""
else:
stb_field_name = f'({stb_field_name_value})'
if tag_value is None:
tags = ""
else:
tags = f'tags({tag_value})'
else:
stb_field_name = ""
tags = ""
if trigger_mode is None:
stream_options = ""
if watermark is not None:
stream_options = f'watermark {watermark}'
if ignore_expired:
stream_options += f" ignore expired {ignore_expired}"
else:
stream_options += f" ignore expired 0"
if ignore_update:
stream_options += f" ignore update {ignore_update}"
else:
stream_options += f" ignore update 0"
if not use_except:
tdSql.execute(f'create stream if not exists {stream_name} trigger at_once {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};')
return None
else:
return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};'
else:
if watermark is None:
if trigger_mode == "max_delay":
stream_options = f'trigger {trigger_mode} {max_delay}'
else:
stream_options = f'trigger {trigger_mode}'
else:
if trigger_mode == "max_delay":
stream_options = f'trigger {trigger_mode} {max_delay} watermark {watermark}'
else:
stream_options = f'trigger {trigger_mode} watermark {watermark}'
if ignore_expired:
stream_options += f" ignore expired {ignore_expired}"
else:
stream_options += f" ignore expired 0"
if ignore_update:
stream_options += f" ignore update {ignore_update}"
else:
stream_options += f" ignore update 0"
if not use_except:
tdSql.execute(f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};')
return None
else:
return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};'
def drop_all_streams(self):
tdSql.query("show streams")
stream_name_list = list(map(lambda x: x[0], tdSql.queryResult))
for stream_name in stream_name_list:
tdSql.execute(f'drop stream if exists {stream_name};')
def drop_db(self, dbname="test"):
if dbname[0].isdigit():
tdSql.execute(f'drop database if exists `{dbname}`')
else:
tdSql.execute(f'drop database if exists {dbname}')
def drop_all_db(self):
tdSql.query("show databases;")
db_list = list(map(lambda x: x[0], tdSql.queryResult))
for dbname in db_list:
if dbname not in self.white_list and "telegraf" not in dbname:
tdSql.execute(f'drop database if exists `{dbname}`')
def time_cast(self, time_value, split_symbol="+"):
ts_value = str(time_value).split(split_symbol)[0]
if split_symbol in str(time_value):
ts_value_offset = str(time_value).split(split_symbol)[1]
else:
ts_value_offset = "0s"
return f'cast({ts_value} as timestamp){split_symbol}{ts_value_offset}'
def clean_env(self):
self.drop_all_streams()
self.drop_all_db()
def set_precision_offset(self, precision):
if precision == "ms":
self.offset = 1000
elif precision == "us":
self.offset = 1000000
elif precision == "ns":
self.offset = 1000000000
else:
pass
def genTs(self, precision="ms", ts="", protype="taosc", ns_tag=None):
"""
protype = "taosc" or "restful"
gen ts and datetime
"""
if precision == "ns":
if ts == "" or ts is None:
ts = time.time_ns()
else:
ts = ts
if ns_tag is None:
dt = ts
else:
dt = datetime.fromtimestamp(ts // 1000000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000000)).zfill(9)
if protype == "restful":
dt = datetime.fromtimestamp(ts // 1000000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000000)).zfill(9)
else:
if ts == "" or ts is None:
ts = time.time()
else:
ts = ts
if precision == "ms" or precision is None:
ts = int(round(ts * 1000))
dt = datetime.fromtimestamp(ts // 1000)
if protype == "taosc":
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000)).zfill(3) + '000'
elif protype == "restful":
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000)).zfill(3)
else:
pass
elif precision == "us":
ts = int(round(ts * 1000000))
dt = datetime.fromtimestamp(ts // 1000000)
dt = dt.strftime('%Y-%m-%d %H:%M:%S') + '.' + str(int(ts % 1000000)).zfill(6)
return ts, dt
def sgen_column_type_str(self, column_elm_list):
self.column_type_str = ""
if column_elm_list is None:
self.column_type_str = self.gen_default_column_str()
else:
for column_elm in column_elm_list:
if "count" in column_elm:
total_count = int(column_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
self.column_type_str += f'{self.default_colname_prefix}{self.default_column_index_start_num} {column_elm["type"]}, '
if column_elm["type"] in ["varchar", "binary", "nchar"]:
self.column_type_str = self.column_type_str.rstrip()[:-1] + f'({column_elm["len"]}), '
self.default_column_index_start_num += 1
else:
continue
self.column_type_str = self.default_colts_name + " timestamp, " + self.column_type_str.rstrip()[:-1]
def sgen_tag_type_str(self, tag_elm_list):
self.tag_type_str = ""
if tag_elm_list is None:
self.tag_type_str = self.gen_default_tag_str()
else:
for tag_elm in tag_elm_list:
if "count" in tag_elm:
total_count = int(tag_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
self.tag_type_str += f'{self.default_tagname_prefix}{self.default_tag_index_start_num} {tag_elm["type"]}, '
if tag_elm["type"] in ["varchar", "binary", "nchar"]:
self.tag_type_str = self.tag_type_str.rstrip()[:-1] + f'({tag_elm["len"]}), '
self.default_tag_index_start_num += 1
else:
continue
self.tag_type_str = self.tag_type_str.rstrip()[:-1]
if self.need_tagts:
self.tag_type_str = self.default_tagts_name + " timestamp, " + self.tag_type_str
def sgen_tag_value_list(self, tag_elm_list, ts_value=None):
if self.need_tagts:
self.ts_value = self.genTs()[0]
if ts_value is not None:
self.ts_value = ts_value
if tag_elm_list is None:
self.tag_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list))
else:
for tag_elm in tag_elm_list:
if "count" in tag_elm:
total_count = int(tag_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
if tag_elm["type"] in ["varchar", "binary", "nchar"]:
self.tag_value_list.append(self.gen_random_type_value(tag_elm["type"], tag_elm["len"], self.default_varchar_datatype, tag_elm["len"], self.default_nchar_datatype))
else:
self.tag_value_list.append(self.gen_random_type_value(tag_elm["type"], "", "", "", ""))
else:
continue
# if self.need_tagts and self.ts_value is not None and len(str(self.ts_value)) > 0:
if self.need_tagts:
self.tag_value_list = [self.ts_value] + self.tag_value_list
def screateDb(self, dbname="test", drop_db=True, **kwargs):
tdLog.info("creating db ...")
db_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
if param == "precision":
db_params += f'{param} "{value}" '
else:
db_params += f'{param} {value} '
if drop_db:
self.drop_db(dbname)
tdSql.execute(f'create database if not exists {dbname} {db_params}')
tdSql.execute(f'use {dbname}')
def screate_stable(self, dbname=None, stbname="stb", use_name="table", column_elm_list=None, tag_elm_list=None,
need_tagts=False, count=1, default_stbname_prefix="stb", default_stbname_index_start_num=1,
default_column_index_start_num=1, default_tag_index_start_num=1, **kwargs):
tdLog.info("creating stable ...")
if dbname is not None:
self.dbname = dbname
self.need_tagts = need_tagts
self.default_stbname_prefix = default_stbname_prefix
self.default_stbname_index_start_num = default_stbname_index_start_num
self.default_column_index_start_num = default_column_index_start_num
self.default_tag_index_start_num = default_tag_index_start_num
stb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
stb_params += f'{param} "{value}" '
self.sgen_column_type_str(column_elm_list)
self.sgen_tag_type_str(tag_elm_list)
if self.dbname is not None:
self.stb_name = f'{self.dbname}.{stbname}'
else:
self.stb_name = stbname
if int(count) <= 1:
create_stable_sql = f'create {use_name} {self.stb_name} ({self.column_type_str}) tags ({self.tag_type_str}) {stb_params};'
tdSql.execute(create_stable_sql)
else:
for _ in range(count):
create_stable_sql = f'create {use_name} {self.dbname}.{default_stbname_prefix}{default_stbname_index_start_num} ({self.column_type_str}) tags ({self.tag_type_str}) {stb_params};'
default_stbname_index_start_num += 1
tdSql.execute(create_stable_sql)
def screate_ctable(self, dbname=None, stbname=None, ctbname="ctb", use_name="table", tag_elm_list=None, ts_value=None, count=1, default_varchar_datatype="letters", default_nchar_datatype="letters", default_ctbname_prefix="ctb", default_ctbname_index_start_num=1, **kwargs):
tdLog.info("creating childtable ...")
self.default_varchar_datatype = default_varchar_datatype
self.default_nchar_datatype = default_nchar_datatype
self.default_ctbname_prefix = default_ctbname_prefix
self.default_ctbname_index_start_num = default_ctbname_index_start_num
ctb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
ctb_params += f'{param} "{value}" '
self.sgen_tag_value_list(tag_elm_list, ts_value)
tag_value_str = ""
# tag_value_str = ", ".join(str(v) for v in self.tag_value_list)
for tag_value in self.tag_value_list:
if isinstance(tag_value, str):
tag_value_str += f'"{tag_value}", '
else:
tag_value_str += f'{tag_value}, '
tag_value_str = tag_value_str.rstrip()[:-1]
if dbname is not None:
self.dbname = dbname
self.ctb_name = f'{self.dbname}.{ctbname}'
else:
self.ctb_name = ctbname
if stbname is not None:
self.stb_name = stbname
if int(count) <= 1:
create_ctable_sql = f'create {use_name} {self.ctb_name} using {self.stb_name} tags ({tag_value_str}) {ctb_params};'
tdSql.execute(create_ctable_sql)
else:
for _ in range(count):
create_stable_sql = f'create {use_name} {self.dbname}.{default_ctbname_prefix}{default_ctbname_index_start_num} using {self.stb_name} tags ({tag_value_str}) {ctb_params};'
default_ctbname_index_start_num += 1
tdSql.execute(create_stable_sql)
def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None):
self.column_value_list = list()
self.ts_value = self.genTs()[0]
if ts_value is not None:
self.ts_value = ts_value
if column_elm_list is None:
self.column_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list))
else:
for column_elm in column_elm_list:
if "count" in column_elm:
total_count = int(column_elm["count"])
else:
total_count = 1
if total_count > 0:
for _ in range(total_count):
if column_elm["type"] in ["varchar", "binary", "nchar"]:
self.column_value_list.append(self.gen_random_type_value(column_elm["type"], column_elm["len"], self.default_varchar_datatype, column_elm["len"], self.default_nchar_datatype))
else:
self.column_value_list.append(self.gen_random_type_value(column_elm["type"], "", "", "", ""))
else:
continue
if need_null:
for i in range(int(len(self.column_value_list)/2)):
index_num = random.randint(0, len(self.column_value_list)-1)
self.column_value_list[index_num] = None
self.column_value_list = [self.ts_value] + self.column_value_list
def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None,
count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1,
default_column_index_start_num=1, **kwargs):
tdLog.info("creating table ...")
if dbname is not None:
self.dbname = dbname
self.default_tbname_prefix = default_tbname_prefix
self.default_tbname_index_start_num = default_tbname_index_start_num
self.default_column_index_start_num = default_column_index_start_num
tb_params = ""
if len(kwargs) > 0:
for param, value in kwargs.items():
tb_params += f'{param} "{value}" '
self.sgen_column_type_str(column_elm_list)
if self.dbname is not None:
self.tb_name = f'{self.dbname}.{tbname}'
else:
self.tb_name = tbname
if int(count) <= 1:
create_table_sql = f'create {use_name} {self.tb_name} ({self.column_type_str}) {tb_params};'
tdSql.execute(create_table_sql)
else:
for _ in range(count):
create_table_sql = f'create {use_name} {self.dbname}.{default_tbname_prefix}{default_tbname_index_start_num} ({self.column_type_str}) {tb_params};'
default_tbname_index_start_num += 1
tdSql.execute(create_table_sql)
def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False):
tdLog.info("stream inserting ...")
if dbname is not None:
self.dbname = dbname
if tbname is not None:
self.tb_name = f'{self.dbname}.{tbname}'
else:
if tbname is not None:
self.tb_name = tbname
self.sgen_column_value_list(column_ele_list, need_null, ts_value)
# column_value_str = ", ".join(str(v) for v in self.column_value_list)
column_value_str = ""
for column_value in self.column_value_list:
if column_value is None:
column_value_str += 'Null, '
elif isinstance(column_value, str) and "+" not in column_value and "-" not in column_value:
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
column_value_str = column_value_str.rstrip()[:-1]
if int(count) <= 1:
insert_sql = f'insert into {self.tb_name} values ({column_value_str});'
tdSql.execute(insert_sql)
else:
for num in range(count):
ts_value = self.genTs()[0]
self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s')
column_value_str = ""
for column_value in self.column_value_list:
if column_value is None:
column_value_str += 'Null, '
elif isinstance(column_value, str) and "+" not in column_value:
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {self.tb_name} values ({column_value_str});'
print(insert_sql)
tdSql.execute(insert_sql)
def sdelete_rows(self, dbname=None, tbname=None, start_ts=None, end_ts=None, ts_key=None):
if dbname is not None:
self.dbname = dbname
if tbname is not None:
self.tb_name = f'{self.dbname}.{tbname}'
else:
if tbname is not None:
self.tb_name = tbname
if ts_key is None:
ts_col_name = self.default_colts_name
else:
ts_col_name = ts_key
base_del_sql = f'delete from {self.tb_name} '
if end_ts is not None:
if ":" in start_ts and "-" in start_ts:
start_ts = f"{start_ts}"
if ":" in end_ts and "-" in end_ts:
end_ts = f"{end_ts}"
base_del_sql += f'where {ts_col_name} between {start_ts} and {end_ts};'
else:
if start_ts is not None:
if ":" in start_ts and "-" in start_ts:
start_ts = f"{start_ts}"
base_del_sql += f'where {ts_col_name} = {start_ts};'
tdSql.execute(base_del_sql)
def check_stream_field_type(self, sql, input_function):
tdSql.query(sql)
res = tdSql.queryResult
if input_function in ["acos", "asin", "atan", "cos", "log", "pow", "sin", "sqrt", "tan"]:
tdSql.checkEqual(res[1][1], "DOUBLE")
tdSql.checkEqual(res[2][1], "DOUBLE")
elif input_function in ["lower", "ltrim", "rtrim", "upper"]:
tdSql.checkEqual(res[1][1], "VARCHAR")
tdSql.checkEqual(res[2][1], "VARCHAR")
tdSql.checkEqual(res[3][1], "NCHAR")
elif input_function in ["char_length", "length"]:
tdSql.checkEqual(res[1][1], "BIGINT")
tdSql.checkEqual(res[2][1], "BIGINT")
tdSql.checkEqual(res[3][1], "BIGINT")
elif input_function in ["concat", "concat_ws"]:
tdSql.checkEqual(res[1][1], "VARCHAR")
tdSql.checkEqual(res[2][1], "NCHAR")
tdSql.checkEqual(res[3][1], "NCHAR")
tdSql.checkEqual(res[4][1], "NCHAR")
elif input_function in ["substr"]:
tdSql.checkEqual(res[1][1], "VARCHAR")
tdSql.checkEqual(res[2][1], "VARCHAR")
tdSql.checkEqual(res[3][1], "VARCHAR")
tdSql.checkEqual(res[4][1], "NCHAR")
else:
tdSql.checkEqual(res[1][1], "INT")
tdSql.checkEqual(res[2][1], "DOUBLE")
def round_handle(self, input_list):
tdLog.info("round rows ...")
final_list = list()
for i in input_list:
tmpl = list()
for j in i:
if type(j) != datetime and type(j) != str:
tmpl.append(round(j, 1))
else:
tmpl.append(j)
final_list.append(tmpl)
return final_list
def float_handle(self, input_list):
tdLog.info("float rows ...")
final_list = list()
for i in input_list:
tmpl = list()
for j_i,j_v in enumerate(i):
if type(j_v) != datetime and j_v is not None and str(j_v).isdigit() and j_i <= 12:
tmpl.append(float(j_v))
else:
tmpl.append(j_v)
final_list.append(tuple(tmpl))
return final_list
def cast_query_data(self, query_data):
tdLog.info("cast query data ...")
col_type_list = self.column_type_str.split(',')
tag_type_list = self.tag_type_str.split(',')
col_tag_type_list = col_type_list + tag_type_list
nl = list()
for query_data_t in query_data:
query_data_l = list(query_data_t)
for i,v in enumerate(query_data_l):
if v is not None:
if " ".join(col_tag_type_list[i].strip().split(" ")[1:]) == "nchar(256)":
tdSql.query(f'select cast("{v}" as binary(256))')
else:
tdSql.query(f'select cast("{v}" as {" ".join(col_tag_type_list[i].strip().split(" ")[1:])})')
query_data_l[i] = tdSql.queryResult[0][0]
else:
query_data_l[i] = v
nl.append(tuple(query_data_l))
return nl
def check_query_data(self, sql1, sql2, sorted=False, fill_value=None, tag_value_list=None, defined_tag_count=None, partition=True, use_exist_stb=False, subtable=None, reverse_check=False):
tdLog.info("checking query data ...")
if tag_value_list:
dvalue = len(self.tag_type_str.split(',')) - defined_tag_count
tdSql.query(sql1)
res1 = tdSql.queryResult
tdSql.query(sql2)
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
new_list = list()
if tag_value_list:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
if i < len(tag_value_list):
if partition:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + list(tag_value_list[i]) + [None]*dvalue))
else:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
else:
if use_exist_stb:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
new_list.append(tuple(list(v)[:-(13)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
latency = 0
if sorted:
res1.sort()
res2.sort()
if fill_value == "LINEAR":
res1 = self.round_handle(res1)
res2 = self.round_handle(res2)
if not reverse_check:
while res1 != res2:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
if tag_value_list:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
if i < len(tag_value_list):
if partition:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + list(tag_value_list[i]) + [None]*dvalue))
else:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
else:
if use_exist_stb:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
new_list.append(tuple(list(v)[:-(13)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
if sorted or tag_value_list:
res1.sort()
res2.sort()
if fill_value == "LINEAR":
res1 = self.round_handle(res1)
res2 = self.round_handle(res2)
if latency < self.stream_timeout:
latency += 0.2
time.sleep(0.2)
else:
if latency == 0:
return False
tdSql.checkEqual(res1, res2)
# tdSql.checkEqual(res1, res2) if not reverse_check else tdSql.checkNotEqual(res1, res2)
else:
while res1 == res2:
tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult
tdSql.query(sql2)
# res2 = tdSql.queryResult
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
if tag_value_list:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
if i < len(tag_value_list):
if partition:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + list(tag_value_list[i]) + [None]*dvalue))
else:
new_list.append(tuple(list(v)[:-(dvalue+defined_tag_count)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
else:
if use_exist_stb:
res1 = self.float_handle(res1)
res2 = self.float_handle(res2)
for i,v in enumerate(res2):
new_list.append(tuple(list(v)[:-(13)] + [None]*len(self.tag_type_str.split(','))))
res2 = new_list
if sorted or tag_value_list:
res1.sort()
res2.sort()
if fill_value == "LINEAR":
res1 = self.round_handle(res1)
res2 = self.round_handle(res2)
if latency < self.stream_timeout:
latency += 0.2
time.sleep(0.2)
else:
if latency == 0:
return False
tdSql.checkNotEqual(res1, res2)
# tdSql.checkEqual(res1, res2) if not reverse_check else tdSql.checkNotEqual(res1, res2)
def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None):
self.clean_env()
self.dataDict = {
"stb_name" : f"{self.case_name}_stb",
"ctb_name" : f"{self.case_name}_ct1",
"tb_name" : f"{self.case_name}_tb1",
"ext_stb_name" : f"ext_{self.case_name}_stb",
"ext_ctb_name" : f"ext_{self.case_name}_ct1",
"ext_tb_name" : f"ext_{self.case_name}_tb1",
"interval" : interval,
"watermark": watermark,
"session": session,
"state_window": state_window,
"state_window_max": state_window_max,
"iteration": interation,
"range_count": range_count,
"start_ts": 1655903478508,
}
if range_count is not None:
self.range_count = range_count
if precision is not None:
self.precision = precision
self.set_precision_offset(self.precision)
self.stb_name = self.dataDict["stb_name"]
self.ctb_name = self.dataDict["ctb_name"]
self.tb_name = self.dataDict["tb_name"]
self.ext_stb_name = self.dataDict["ext_stb_name"]
self.ext_ctb_name = self.dataDict["ext_ctb_name"]
self.ext_tb_name = self.dataDict["ext_tb_name"]
self.stb_stream_des_table = f'{self.stb_name}{self.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.des_table_suffix}'
self.ext_stb_stream_des_table = f'{self.ext_stb_name}{self.des_table_suffix}'
self.ext_ctb_stream_des_table = f'{self.ext_ctb_name}{self.des_table_suffix}'
self.ext_tb_stream_des_table = f'{self.ext_tb_name}{self.des_table_suffix}'
self.date_time = self.genTs(precision=self.precision)[0]
self.screateDb(dbname=self.dbname, precision=self.precision)
self.screate_stable(dbname=self.dbname, stbname=self.stb_name)
self.screate_ctable(dbname=self.dbname, stbname=self.stb_name, ctbname=self.ctb_name)
self.screate_table(dbname=self.dbname, tbname=self.tb_name)
if fill_history_value == 1:
for i in range(self.range_count):
ts_value = str(self.date_time)+f'-{self.default_interval*(i+1)}s'
self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
if i == 1:
self.record_history_ts = ts_value
if ext_stb:
self.screate_stable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table)
self.screate_ctable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table, ctbname=self.ext_ctb_stream_des_table)
self.screate_table(dbname=self.dbname, tbname=self.ext_tb_stream_des_table)
tdCom = TDCom() tdCom = TDCom()

View File

@ -0,0 +1,240 @@
import sys
import time
import threading
from taos.tmq import Consumer
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
self.subtable = True
self.partition_tbname_alias = "ptn_alias" if self.subtable else ""
self.partition_col_alias = "pcol_alias" if self.subtable else ""
self.partition_tag_alias = "ptag_alias" if self.subtable else ""
self.partition_expression_alias = "pexp_alias" if self.subtable else ""
self.stb_name = str()
self.ctb_name = str()
self.tb_name = str()
self.des_table_suffix = "_output"
self.stream_suffix = "_stream"
self.subtable_prefix = "prefix_" if self.subtable else ""
self.subtable_suffix = "_suffix" if self.subtable else ""
self.stb_stream_des_table = str()
self.ctb_stream_des_table = str()
self.tb_stream_des_table = str()
self.downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "apercentile(c6, 50)", "avg(c7)", "count(c8)", "spread(c1)",
"stddev(c2)", "hyperloglog(c11)", "timediff(1, 0, 1h)", "timezone()", "to_iso8601(1)", 'to_unixtimestamp("1970-01-01T08:00:00+08:00")', "min(t1)", "max(t2)", "sum(t3)",
"first(t4)", "last(t5)", "apercentile(t6, 50)", "avg(t7)", "count(t8)", "spread(t1)", "stddev(t2)", "hyperloglog(t11)"]
self.stb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list)))
self.stb_source_select_str = ','.join(self.downsampling_function_list)
self.tb_source_select_str = ','.join(self.downsampling_function_list[0:15])
def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None):
tdLog.info(f"testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}")
self.delete = delete
self.tdCom.case_name = sys._getframe().f_code.co_name
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value)
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.des_table_suffix}'
self.tb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list[0:15])))
if partition == "tbname":
if case_when:
stream_case_when_partition = case_when
else:
stream_case_when_partition = self.partition_tbname_alias
partition_elm_alias = self.partition_tbname_alias
elif partition == "c1":
if case_when:
stream_case_when_partition = case_when
else:
stream_case_when_partition = self.partition_col_alias
partition_elm_alias = self.partition_col_alias
elif partition == "abs(c1)":
partition_elm_alias = self.partition_expression_alias
elif partition is None:
partition_elm_alias = '"no_partition"'
else:
partition_elm_alias = self.partition_tag_alias
if partition == "tbname" or partition is None:
if case_when:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", {stream_case_when_partition}), "{self.subtable_suffix}")' if self.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.subtable_prefix}", {stream_case_when_partition}), "{self.subtable_suffix}")' if self.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.subtable_prefix}", {stream_case_when_partition}), "{self.subtable_suffix}")' if self.subtable else None
else:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", {partition_elm_alias}), "{self.subtable_suffix}")' if self.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.subtable_prefix}", {partition_elm_alias}), "{self.subtable_suffix}")' if self.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.subtable_prefix}", {partition_elm_alias}), "{self.subtable_suffix}")' if self.subtable else None
else:
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
if partition:
partition_elm = f'partition by {partition} {partition_elm_alias}'
else:
partition_elm = ""
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_value=fill_value, fill_history_value=fill_history_value)
start_time = self.tdCom.date_time
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
if i%2 == 0:
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
if self.delete and i%2 != 0:
self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value)
self.tdCom.date_time += 1
if partition:
partition_elm = f'partition by {partition}'
else:
partition_elm = ""
if not fill_value:
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
self.tdCom.check_query_data(f'select wstart, {self.stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.stb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True)
else:
self.tdCom.check_query_data(f'select wstart, {self.tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True)
if self.subtable:
for tname in [self.stb_name, self.ctb_name]:
tdSql.query(f'select * from {self.ctb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{abs(c1_value[1])}{self.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}no_partition{self.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{abs_c1_value}{self.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{tname}_{self.subtable_prefix}{self.ctb_name}{self.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
tdSql.query(f'select * from {self.tb_name}')
ptn_counter = 0
for c1_value in tdSql.queryResult:
if partition == "c1":
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{abs(c1_value[1])}{self.subtable_suffix}`;')
elif partition is None:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}no_partition{self.subtable_suffix}`;')
elif partition == "abs(c1)":
abs_c1_value = abs(c1_value[1])
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{abs_c1_value}{self.subtable_suffix}`;')
elif partition == "tbname" and ptn_counter == 0:
tdSql.query(f'select count(*) from `{self.tb_name}_{self.subtable_prefix}{self.tb_name}{self.subtable_suffix}`;')
ptn_counter += 1
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
if fill_value:
end_date_time = self.tdCom.date_time
final_range_count = self.tdCom.range_count
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(end_date_time)+f'+{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
self.tdCom.date_time = start_time
# update
history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
start_ts = self.tdCom.time_cast(history_ts, "-")
future_ts = str(end_date_time)+f'+{self.tdCom.dataDict["interval"]*(final_range_count+2)}s'
end_ts = self.tdCom.time_cast(future_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
self.tdCom.date_time = start_time
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
self.tdCom.date_time += 1
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
self.tdCom.date_time += 1
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts.replace("-", "+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_stb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts.replace("-", "+")} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.fill_tb_output_select_str} from {tbname}{self.des_table_suffix} order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
def run(self):
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition="c1", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition="abs(c1)", delete=True)
self.at_once_interval(interval=random.randint(10, 15), partition=None, delete=True)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,180 @@
import sys
import time
import threading
from taos.tmq import Consumer
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.tdCom = tdCom
def scalar_function(self, partition="tbname", fill_history_value=None):
tdLog.info(f"testing stream scalar funtion partition: {partition}, fill_history_value: {fill_history_value}")
self.tdCom.case_name = sys._getframe().f_code.co_name
tdLog.info("preparing data ...")
self.tdCom.prepare_data(fill_history_value=fill_history_value)
# return
tdSql.execute('create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int);')
tdSql.execute('create table scalar_ct1 using scalar_stb tags(10);')
tdSql.execute('create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20));')
if fill_history_value is None:
fill_history = ""
else:
tdLog.info("inserting fill_history data ...")
fill_history = f'fill_history {fill_history_value}'
for i in range(self.tdCom.range_count):
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}-{i}s, 100, -100.1, "hebei", Null, "Bigdata");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}-{i}s, 100, -100.1, "heBei", Null, "Bigdata");')
# self.tdCom.write_latency(self.case_name)
math_function_list = ["abs", "acos", "asin", "atan", "ceil", "cos", "floor", "log", "pow", "round", "sin", "sqrt", "tan"]
string_function_list = ["char_length", "concat", "concat_ws", "length", "lower", "ltrim", "rtrim", "substr", "upper"]
for math_function in math_function_list:
tdLog.info(f"testing function {math_function} ...")
tdLog.info(f"creating stream for function {math_function} ...")
if math_function in ["log", "pow"]:
tdSql.execute(f'create stream stb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_stb as select ts, {math_function}(c1, 2), {math_function}(c2, 2), c3 from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_ctb as select ts, {math_function}(c1, 2), {math_function}(c2, 2), c3 from scalar_ct1;')
tdSql.execute(f'create stream tb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_tb as select ts, {math_function}(c1, 2), {math_function}(c2, 2), c3 from scalar_tb;')
else:
tdSql.execute(f'create stream stb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_stb as select ts, {math_function}(c1), {math_function}(c2), c3 from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_ctb as select ts, {math_function}(c1), {math_function}(c2), c3 from scalar_ct1;')
tdSql.execute(f'create stream tb_{math_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{math_function}_tb as select ts, {math_function}(c1), {math_function}(c2), c3 from scalar_tb;')
self.tdCom.check_stream_field_type(f"describe output_{math_function}_stb", math_function)
self.tdCom.check_stream_field_type(f"describe output_{math_function}_ctb", math_function)
self.tdCom.check_stream_field_type(f"describe output_{math_function}_tb", math_function)
for tbname in ["scalar_ct1", "scalar_tb"]:
tdLog.info(f"function {math_function}: inserting data for tb --- {tbname} ...")
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}, 100, 100.1, "beijing", "taos", "Taos");')
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");')
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}+2s, 0, Null, "hebei", "TDengine", Null);')
for i in range(self.tdCom.range_count):
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}+{i}s, 100, -100.1, "hebei", Null, "Bigdata");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}+{i}s, 100, -100.1, "heBei", Null, "Bigdata");')
if i%2 == 0:
tdLog.info(f"function {math_function}: update testing ...")
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}+{i}s, 50, -50.1, Null, "heBei", "Bigdata1");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}+{i}s, 50, -50.1, Null, "heBei", "Bigdata1");')
else:
tdLog.info(f"function {math_function}: delete testing ...")
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
tdSql.execute(f'delete from scalar_ct1 where ts = {dt};')
tdSql.execute(f'delete from scalar_tb where ts = {dt};')
if fill_history_value:
tdLog.info(f"function {math_function}: disorder testing ...")
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}-{self.tdCom.range_count-1}s, 50, -50.1, Null, "heBei", "Bigdata1");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}-{self.tdCom.range_count-1}s, 50, -50.1, Null, "heBei", "Bigdata1");')
dt = f'cast({self.tdCom.date_time-(self.tdCom.range_count-1)} as timestamp)'
tdSql.execute(f'delete from scalar_ct1 where ts = {dt};')
tdSql.execute(f'delete from scalar_tb where ts = {dt};')
if math_function == "log" or math_function == "pow":
tdLog.info(f"function {math_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{math_function}(c1, 2)`, `{math_function}(c2, 2)` from output_{math_function}_stb order by ts;', f'select {math_function}(c1, 2), {math_function}(c2, 2) from scalar_stb partition by {partition} order by ts;')
self.tdCom.check_query_data(f'select `{math_function}(c1, 2)`, `{math_function}(c2, 2)` from output_{math_function}_ctb;', f'select {math_function}(c1, 2), {math_function}(c2, 2) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{math_function}(c1, 2)`, `{math_function}(c2, 2)` from output_{math_function}_tb;', f'select {math_function}(c1, 2), {math_function}(c2, 2) from scalar_tb;')
else:
tdLog.info(f"function {math_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{math_function}(c1)`, `{math_function}(c2)` from output_{math_function}_stb order by ts;', f'select {math_function}(c1), {math_function}(c2) from scalar_stb partition by {partition} order by ts;')
self.tdCom.check_query_data(f'select `{math_function}(c1)`, `{math_function}(c2)` from output_{math_function}_ctb;', f'select {math_function}(c1), {math_function}(c2) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{math_function}(c1)`, `{math_function}(c2)` from output_{math_function}_tb;', f'select {math_function}(c1), {math_function}(c2) from scalar_tb;')
tdSql.execute(f'drop stream if exists stb_{math_function}_stream')
tdSql.execute(f'drop stream if exists ctb_{math_function}_stream')
tdSql.execute(f'drop stream if exists tb_{math_function}_stream')
for string_function in string_function_list:
tdLog.info(f"testing function {string_function} ...")
tdLog.info(f"creating stream for function {string_function} ...")
if string_function == "concat":
tdSql.execute(f'create stream stb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_stb as select ts, {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_ctb as select ts, {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_ct1;')
tdSql.execute(f'create stream tb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_tb as select ts, {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_tb;')
elif string_function == "concat_ws":
tdSql.execute(f'create stream stb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_stb as select ts, {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_ctb as select ts, {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_ct1;')
tdSql.execute(f'create stream tb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_tb as select ts, {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_tb;')
elif string_function == "substr":
tdSql.execute(f'create stream stb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_stb as select ts, {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_ctb as select ts, {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_ct1;')
tdSql.execute(f'create stream tb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_tb as select ts, {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_tb;')
else:
tdSql.execute(f'create stream stb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_stb as select ts, {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_stb partition by {partition};')
tdSql.execute(f'create stream ctb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_ctb as select ts, {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_ct1;')
tdSql.execute(f'create stream tb_{string_function}_stream trigger at_once ignore expired 0 ignore update 0 {fill_history} into output_{string_function}_tb as select ts, {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_tb;')
self.tdCom.check_stream_field_type(f"describe output_{string_function}_stb", string_function)
self.tdCom.check_stream_field_type(f"describe output_{string_function}_ctb", string_function)
self.tdCom.check_stream_field_type(f"describe output_{string_function}_tb", string_function)
for tbname in ["scalar_ct1", "scalar_tb"]:
tdLog.info(f"function {string_function}: inserting data for tb --- {tbname} ...")
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}, 100, 100.1, "beijing", "taos", "Taos");')
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");')
tdSql.execute(f'insert into {tbname} values ({self.tdCom.date_time}+2s, 0, Null, "hebei", "TDengine", Null);')
for i in range(self.tdCom.range_count):
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}+{i}s, 100, -100.1, "hebei", Null, "Bigdata");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}+{i}s, 100, -100.1, "heBei", Null, "Bigdata");')
if i%2 == 0:
tdLog.info(f"function {string_function}: update testing...")
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}+{i}s, 50, -50.1, Null, "heBei", "Bigdata1");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}+{i}s, 50, -50.1, Null, "heBei", "Bigdata1");')
else:
tdLog.info(f"function {string_function}: delete testing ...")
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
tdSql.execute(f'delete from scalar_ct1 where ts = {dt};')
tdSql.execute(f'delete from scalar_tb where ts = {dt};')
if fill_history_value:
tdLog.info(f"function {string_function}: disorder testing ...")
tdSql.execute(f'insert into scalar_ct1 values ({self.tdCom.date_time}-{self.tdCom.range_count-1}s, 50, -50.1, Null, "heBei", "Bigdata1");')
tdSql.execute(f'insert into scalar_tb values ({self.tdCom.date_time}-{self.tdCom.range_count-1}s, 50, -50.1, Null, "heBei", "Bigdata1");')
dt = f'cast({self.tdCom.date_time-(self.tdCom.range_count-1)} as timestamp)'
tdSql.execute(f'delete from scalar_ct1 where ts = {dt};')
tdSql.execute(f'delete from scalar_tb where ts = {dt};')
if string_function == "concat":
tdLog.info(f"function {string_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{string_function}(c3, c4)`, `{string_function}(c3, c5)`, `{string_function}(c4, c5)`, `{string_function}(c3, c4, c5)` from output_{string_function}_stb order by ts;', f'select {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_stb order by ts;')
self.tdCom.check_query_data(f'select `{string_function}(c3, c4)`, `{string_function}(c3, c5)`, `{string_function}(c4, c5)`, `{string_function}(c3, c4, c5)` from output_{string_function}_ctb;', f'select {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{string_function}(c3, c4)`, `{string_function}(c3, c5)`, `{string_function}(c4, c5)`, `{string_function}(c3, c4, c5)` from output_{string_function}_tb;', f'select {string_function}(c3, c4), {string_function}(c3, c5), {string_function}(c4, c5), {string_function}(c3, c4, c5) from scalar_tb;')
elif string_function == "concat_ws":
tdLog.info(f"function {string_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{string_function}("aND", c3, c4)`, `{string_function}("and", c3, c5)`, `{string_function}("And", c4, c5)`, `{string_function}("AND", c3, c4, c5)` from output_{string_function}_stb order by ts;', f'select {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_stb order by ts;')
self.tdCom.check_query_data(f'select `{string_function}("aND", c3, c4)`, `{string_function}("and", c3, c5)`, `{string_function}("And", c4, c5)`, `{string_function}("AND", c3, c4, c5)` from output_{string_function}_ctb;', f'select {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{string_function}("aND", c3, c4)`, `{string_function}("and", c3, c5)`, `{string_function}("And", c4, c5)`, `{string_function}("AND", c3, c4, c5)` from output_{string_function}_tb;', f'select {string_function}("aND", c3, c4), {string_function}("and", c3, c5), {string_function}("And", c4, c5), {string_function}("AND", c3, c4, c5) from scalar_tb;')
elif string_function == "substr":
tdLog.info(f"function {string_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{string_function}(c3, 2)`, `{string_function}(c3, 2, 2)`, `{string_function}(c4, 5, 1)`, `{string_function}(c5, 3, 4)` from output_{string_function}_stb order by ts;', f'select {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_stb order by ts;')
self.tdCom.check_query_data(f'select `{string_function}(c3, 2)`, `{string_function}(c3, 2, 2)`, `{string_function}(c4, 5, 1)`, `{string_function}(c5, 3, 4)` from output_{string_function}_ctb;', f'select {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{string_function}(c3, 2)`, `{string_function}(c3, 2, 2)`, `{string_function}(c4, 5, 1)`, `{string_function}(c5, 3, 4)` from output_{string_function}_tb;', f'select {string_function}(c3, 2), {string_function}(c3, 2, 2), {string_function}(c4, 5, 1), {string_function}(c5, 3, 4) from scalar_tb;')
else:
tdLog.info(f"function {string_function}: confirming query result ...")
self.tdCom.check_query_data(f'select `{string_function}(c3)`, `{string_function}(c4)`, `{string_function}(c5)` from output_{string_function}_stb order by ts;', f'select {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_stb order by ts;')
self.tdCom.check_query_data(f'select `{string_function}(c3)`, `{string_function}(c4)`, `{string_function}(c5)` from output_{string_function}_ctb;', f'select {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_ct1;')
self.tdCom.check_query_data(f'select `{string_function}(c3)`, `{string_function}(c4)`, `{string_function}(c5)` from output_{string_function}_tb;', f'select {string_function}(c3), {string_function}(c4), {string_function}(c5) from scalar_tb;')
tdSql.execute(f'drop stream if exists stb_{string_function}_stream')
tdSql.execute(f'drop stream if exists ctb_{string_function}_stream')
tdSql.execute(f'drop stream if exists tb_{string_function}_stream')
def run(self):
self.scalar_function(partition="tbname", fill_history_value=1)
self.scalar_function(partition="tbname,c1,t1", fill_history_value=1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())