test: add stream cases
This commit is contained in:
parent
da93dc45fc
commit
10cd04349d
|
@ -850,6 +850,28 @@ class TDCom:
|
|||
|
||||
# stream
|
||||
def create_stream(self, stream_name, des_table, source_sql, trigger_mode=None, watermark=None, max_delay=None, ignore_expired=None, ignore_update=None, subtable_value=None, fill_value=None, fill_history_value=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False):
|
||||
"""create_stream
|
||||
|
||||
Args:
|
||||
stream_name (str): stream_name
|
||||
des_table (str): target stable
|
||||
source_sql (str): stream sql
|
||||
trigger_mode (str, optional): at_once/window_close/max_delay. Defaults to None.
|
||||
watermark (str, optional): watermark time. Defaults to None.
|
||||
max_delay (str, optional): max_delay time. Defaults to None.
|
||||
ignore_expired (int, optional): ignore expired data. Defaults to None.
|
||||
ignore_update (int, optional): ignore update data. Defaults to None.
|
||||
subtable_value (str, optional): subtable. Defaults to None.
|
||||
fill_value (str, optional): fill. Defaults to None.
|
||||
fill_history_value (int, optional): 0/1. Defaults to None.
|
||||
stb_field_name_value (str, optional): existed stb. Defaults to None.
|
||||
tag_value (str, optional): custom tag. Defaults to None.
|
||||
use_exist_stb (bool, optional): use existed stb tag. Defaults to False.
|
||||
use_except (bool, optional): Exception tag. Defaults to False.
|
||||
|
||||
Returns:
|
||||
str: stream
|
||||
"""
|
||||
if subtable_value is None:
|
||||
subtable = ""
|
||||
else:
|
||||
|
@ -923,20 +945,54 @@ class TDCom:
|
|||
else:
|
||||
return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};'
|
||||
|
||||
def pause_stream(self, stream_name, if_exist=True, if_not_exist=False):
|
||||
"""pause_stream
|
||||
|
||||
Args:
|
||||
stream_name (str): stream_name
|
||||
if_exist (bool, optional): Defaults to True.
|
||||
if_not_exist (bool, optional): Defaults to False.
|
||||
"""
|
||||
if_exist_value = "if exists" if if_exist else ""
|
||||
if_not_exist_value = "if not exists" if if_not_exist else ""
|
||||
tdSql.execute(f'pause stream {if_exist_value} {if_not_exist_value} {stream_name}')
|
||||
|
||||
def resume_stream(self, stream_name, if_exist=True, if_not_exist=False, ignore_untreated=False):
|
||||
"""resume_stream
|
||||
|
||||
Args:
|
||||
stream_name (str): stream_name
|
||||
if_exist (bool, optional): Defaults to True.
|
||||
if_not_exist (bool, optional): Defaults to False.
|
||||
ignore_untreated (bool, optional): Defaults to False.
|
||||
"""
|
||||
if_exist_value = "if exists" if if_exist else ""
|
||||
if_not_exist_value = "if not exists" if if_not_exist else ""
|
||||
ignore_untreated_value = "ignore untreated" if ignore_untreated else ""
|
||||
tdSql.execute(f'resume stream {if_exist_value} {if_not_exist_value} {ignore_untreated_value} {stream_name}')
|
||||
|
||||
def drop_all_streams(self):
|
||||
"""drop all streams
|
||||
"""
|
||||
tdSql.query("show streams")
|
||||
stream_name_list = list(map(lambda x: x[0], tdSql.queryResult))
|
||||
for stream_name in stream_name_list:
|
||||
tdSql.execute(f'drop stream if exists {stream_name};')
|
||||
|
||||
def drop_db(self, dbname="test"):
|
||||
"""drop a db
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to "test".
|
||||
"""
|
||||
if dbname[0].isdigit():
|
||||
tdSql.execute(f'drop database if exists `{dbname}`')
|
||||
else:
|
||||
tdSql.execute(f'drop database if exists {dbname}')
|
||||
|
||||
def drop_all_db(self):
|
||||
"""drop all databases
|
||||
"""
|
||||
tdSql.query("show databases;")
|
||||
db_list = list(map(lambda x: x[0], tdSql.queryResult))
|
||||
for dbname in db_list:
|
||||
|
@ -944,6 +1000,15 @@ class TDCom:
|
|||
tdSql.execute(f'drop database if exists `{dbname}`')
|
||||
|
||||
def time_cast(self, time_value, split_symbol="+"):
|
||||
"""cast bigint to timestamp
|
||||
|
||||
Args:
|
||||
time_value (bigint): ts
|
||||
split_symbol (str, optional): split sympol. Defaults to "+".
|
||||
|
||||
Returns:
|
||||
_type_: timestamp
|
||||
"""
|
||||
ts_value = str(time_value).split(split_symbol)[0]
|
||||
if split_symbol in str(time_value):
|
||||
ts_value_offset = str(time_value).split(split_symbol)[1]
|
||||
|
@ -952,6 +1017,8 @@ class TDCom:
|
|||
return f'cast({ts_value} as timestamp){split_symbol}{ts_value_offset}'
|
||||
|
||||
def clean_env(self):
|
||||
"""drop all streams and databases
|
||||
"""
|
||||
self.drop_all_streams()
|
||||
self.drop_all_db()
|
||||
|
||||
|
@ -966,9 +1033,16 @@ class TDCom:
|
|||
pass
|
||||
|
||||
def genTs(self, precision="ms", ts="", protype="taosc", ns_tag=None):
|
||||
"""
|
||||
protype = "taosc" or "restful"
|
||||
gen ts and datetime
|
||||
"""generate ts
|
||||
|
||||
Args:
|
||||
precision (str, optional): db precision. Defaults to "ms".
|
||||
ts (str, optional): input ts. Defaults to "".
|
||||
protype (str, optional): "taosc" or "restful". Defaults to "taosc".
|
||||
ns_tag (_type_, optional): use ns. Defaults to None.
|
||||
|
||||
Returns:
|
||||
timestamp, datetime: timestamp and datetime
|
||||
"""
|
||||
if precision == "ns":
|
||||
if ts == "" or ts is None:
|
||||
|
@ -1004,6 +1078,11 @@ class TDCom:
|
|||
return ts, dt
|
||||
|
||||
def sgen_column_type_str(self, column_elm_list):
|
||||
"""generage column type str
|
||||
|
||||
Args:
|
||||
column_elm_list (list): column_elm_list
|
||||
"""
|
||||
self.column_type_str = ""
|
||||
if column_elm_list is None:
|
||||
self.column_type_str = self.gen_default_column_str()
|
||||
|
@ -1024,6 +1103,11 @@ class TDCom:
|
|||
self.column_type_str = self.default_colts_name + " timestamp, " + self.column_type_str.rstrip()[:-1]
|
||||
|
||||
def sgen_tag_type_str(self, tag_elm_list):
|
||||
"""generage tag type str
|
||||
|
||||
Args:
|
||||
tag_elm_list (list): tag_elm_list
|
||||
"""
|
||||
self.tag_type_str = ""
|
||||
if tag_elm_list is None:
|
||||
self.tag_type_str = self.gen_default_tag_str()
|
||||
|
@ -1044,7 +1128,14 @@ class TDCom:
|
|||
self.tag_type_str = self.tag_type_str.rstrip()[:-1]
|
||||
if self.need_tagts:
|
||||
self.tag_type_str = self.default_tagts_name + " timestamp, " + self.tag_type_str
|
||||
|
||||
def sgen_tag_value_list(self, tag_elm_list, ts_value=None):
|
||||
"""generage tag value str
|
||||
|
||||
Args:
|
||||
tag_elm_list (list): _description_
|
||||
ts_value (timestamp, optional): Defaults to None.
|
||||
"""
|
||||
if self.need_tagts:
|
||||
self.ts_value = self.genTs()[0]
|
||||
if ts_value is not None:
|
||||
|
@ -1071,6 +1162,12 @@ class TDCom:
|
|||
self.tag_value_list = [self.ts_value] + self.tag_value_list
|
||||
|
||||
def screateDb(self, dbname="test", drop_db=True, **kwargs):
|
||||
"""create database
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to "test".
|
||||
drop_db (bool, optional): Defaults to True.
|
||||
"""
|
||||
tdLog.info("creating db ...")
|
||||
db_params = ""
|
||||
if len(kwargs) > 0:
|
||||
|
@ -1087,6 +1184,21 @@ class TDCom:
|
|||
def screate_stable(self, dbname=None, stbname="stb", use_name="table", column_elm_list=None, tag_elm_list=None,
|
||||
need_tagts=False, count=1, default_stbname_prefix="stb", default_stbname_index_start_num=1,
|
||||
default_column_index_start_num=1, default_tag_index_start_num=1, **kwargs):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to None.
|
||||
stbname (str, optional): Defaults to "stb".
|
||||
use_name (str, optional): stable/table, Defaults to "table".
|
||||
column_elm_list (list, optional): use for sgen_column_type_str(), Defaults to None.
|
||||
tag_elm_list (list, optional): use for sgen_tag_type_str(), Defaults to None.
|
||||
need_tagts (bool, optional): tag use timestamp, Defaults to False.
|
||||
count (int, optional): stable count, Defaults to 1.
|
||||
default_stbname_prefix (str, optional): Defaults to "stb".
|
||||
default_stbname_index_start_num (int, optional): Defaults to 1.
|
||||
default_column_index_start_num (int, optional): Defaults to 1.
|
||||
default_tag_index_start_num (int, optional): Defaults to 1.
|
||||
"""
|
||||
tdLog.info("creating stable ...")
|
||||
if dbname is not None:
|
||||
self.dbname = dbname
|
||||
|
@ -1115,6 +1227,21 @@ class TDCom:
|
|||
tdSql.execute(create_stable_sql)
|
||||
|
||||
def screate_ctable(self, dbname=None, stbname=None, ctbname="ctb", use_name="table", tag_elm_list=None, ts_value=None, count=1, default_varchar_datatype="letters", default_nchar_datatype="letters", default_ctbname_prefix="ctb", default_ctbname_index_start_num=1, **kwargs):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to None.
|
||||
stbname (str, optional): Defaults to None.
|
||||
ctbname (str, optional): Defaults to "ctb".
|
||||
use_name (str, optional): Defaults to "table".
|
||||
tag_elm_list (list, optional): use for sgen_tag_type_str(), Defaults to None.
|
||||
ts_value (timestamp, optional): Defaults to None.
|
||||
count (int, optional): ctb count, Defaults to 1.
|
||||
default_varchar_datatype (str, optional): Defaults to "letters".
|
||||
default_nchar_datatype (str, optional): Defaults to "letters".
|
||||
default_ctbname_prefix (str, optional): Defaults to "ctb".
|
||||
default_ctbname_index_start_num (int, optional): Defaults to 1.
|
||||
"""
|
||||
tdLog.info("creating childtable ...")
|
||||
self.default_varchar_datatype = default_varchar_datatype
|
||||
self.default_nchar_datatype = default_nchar_datatype
|
||||
|
@ -1150,6 +1277,13 @@ class TDCom:
|
|||
tdSql.execute(create_stable_sql)
|
||||
|
||||
def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
column_elm_list (list): gen_random_type_value()
|
||||
need_null (bool): if insert null
|
||||
ts_value (timestamp, optional): Defaults to None.
|
||||
"""
|
||||
self.column_value_list = list()
|
||||
self.ts_value = self.genTs()[0]
|
||||
if ts_value is not None:
|
||||
|
@ -1180,6 +1314,18 @@ class TDCom:
|
|||
def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None,
|
||||
count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1,
|
||||
default_column_index_start_num=1, **kwargs):
|
||||
"""create ctable
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to None.
|
||||
tbname (str, optional): Defaults to "tb".
|
||||
use_name (str, optional): Defaults to "table".
|
||||
column_elm_list (list, optional): Defaults to None.
|
||||
count (int, optional): Defaults to 1.
|
||||
default_tbname_prefix (str, optional): Defaults to "tb".
|
||||
default_tbname_index_start_num (int, optional): Defaults to 1.
|
||||
default_column_index_start_num (int, optional): Defaults to 1.
|
||||
"""
|
||||
tdLog.info("creating table ...")
|
||||
if dbname is not None:
|
||||
self.dbname = dbname
|
||||
|
@ -1205,6 +1351,16 @@ class TDCom:
|
|||
tdSql.execute(create_table_sql)
|
||||
|
||||
def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False):
|
||||
"""insert rows
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to None.
|
||||
tbname (str, optional): Defaults to None.
|
||||
column_ele_list (list, optional): Defaults to None.
|
||||
ts_value (timestamp, optional): Defaults to None.
|
||||
count (int, optional): Defaults to 1.
|
||||
need_null (bool, optional): Defaults to False.
|
||||
"""
|
||||
tdLog.info("stream inserting ...")
|
||||
if dbname is not None:
|
||||
self.dbname = dbname
|
||||
|
@ -1245,6 +1401,15 @@ class TDCom:
|
|||
tdSql.execute(insert_sql)
|
||||
|
||||
def sdelete_rows(self, dbname=None, tbname=None, start_ts=None, end_ts=None, ts_key=None):
|
||||
"""delete rows
|
||||
|
||||
Args:
|
||||
dbname (str, optional): Defaults to None.
|
||||
tbname (str, optional): Defaults to None.
|
||||
start_ts (timestamp, optional): range start. Defaults to None.
|
||||
end_ts (timestamp, optional): range end. Defaults to None.
|
||||
ts_key (str, optional): timestamp column name. Defaults to None.
|
||||
"""
|
||||
if dbname is not None:
|
||||
self.dbname = dbname
|
||||
if tbname is not None:
|
||||
|
@ -1271,8 +1436,13 @@ class TDCom:
|
|||
base_del_sql += f'where {ts_col_name} = {start_ts};'
|
||||
tdSql.execute(base_del_sql)
|
||||
|
||||
|
||||
def check_stream_field_type(self, sql, input_function):
|
||||
"""confirm stream field
|
||||
|
||||
Args:
|
||||
sql (str): input sql
|
||||
input_function (str): scalar
|
||||
"""
|
||||
tdSql.query(sql)
|
||||
res = tdSql.queryResult
|
||||
if input_function in ["acos", "asin", "atan", "cos", "log", "pow", "sin", "sqrt", "tan"]:
|
||||
|
@ -1301,6 +1471,14 @@ class TDCom:
|
|||
tdSql.checkEqual(res[2][1], "DOUBLE")
|
||||
|
||||
def round_handle(self, input_list):
|
||||
"""round list elem
|
||||
|
||||
Args:
|
||||
input_list (list): input value list
|
||||
|
||||
Returns:
|
||||
_type_: round list
|
||||
"""
|
||||
tdLog.info("round rows ...")
|
||||
final_list = list()
|
||||
for i in input_list:
|
||||
|
@ -1314,6 +1492,14 @@ class TDCom:
|
|||
return final_list
|
||||
|
||||
def float_handle(self, input_list):
|
||||
"""float list elem
|
||||
|
||||
Args:
|
||||
input_list (list): input value list
|
||||
|
||||
Returns:
|
||||
_type_: float list
|
||||
"""
|
||||
tdLog.info("float rows ...")
|
||||
final_list = list()
|
||||
for i in input_list:
|
||||
|
@ -1327,10 +1513,26 @@ class TDCom:
|
|||
return final_list
|
||||
|
||||
def str_ts_trans_bigint(self, str_ts):
|
||||
"""trans str ts to bigint
|
||||
|
||||
Args:
|
||||
str_ts (str): human-date
|
||||
|
||||
Returns:
|
||||
bigint: bigint-ts
|
||||
"""
|
||||
tdSql.query(f'select cast({str_ts} as bigint)')
|
||||
return tdSql.queryResult[0][0]
|
||||
|
||||
def cast_query_data(self, query_data):
|
||||
"""cast query-result for existed-stb
|
||||
|
||||
Args:
|
||||
query_data (list): query data list
|
||||
|
||||
Returns:
|
||||
list: new list after cast
|
||||
"""
|
||||
tdLog.info("cast query data ...")
|
||||
col_type_list = self.column_type_str.split(',')
|
||||
tag_type_list = self.tag_type_str.split(',')
|
||||
|
@ -1351,6 +1553,14 @@ class TDCom:
|
|||
return nl
|
||||
|
||||
def trans_time_to_s(self, runtime):
|
||||
"""trans time to s
|
||||
|
||||
Args:
|
||||
runtime (str): 1d/1h/1m...
|
||||
|
||||
Returns:
|
||||
int: second
|
||||
"""
|
||||
if "d" in str(runtime).lower():
|
||||
d_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0]
|
||||
s_num = float(d_num) * 24 * 60 * 60
|
||||
|
@ -1367,6 +1577,23 @@ class TDCom:
|
|||
return int(s_num)
|
||||
|
||||
def check_query_data(self, sql1, sql2, sorted=False, fill_value=None, tag_value_list=None, defined_tag_count=None, partition=True, use_exist_stb=False, subtable=None, reverse_check=False):
|
||||
"""confirm query result
|
||||
|
||||
Args:
|
||||
sql1 (str): select ....
|
||||
sql2 (str): select ....
|
||||
sorted (bool, optional): if sort result list. Defaults to False.
|
||||
fill_value (str, optional): fill. Defaults to None.
|
||||
tag_value_list (list, optional): Defaults to None.
|
||||
defined_tag_count (int, optional): Defaults to None.
|
||||
partition (bool, optional): Defaults to True.
|
||||
use_exist_stb (bool, optional): Defaults to False.
|
||||
subtable (str, optional): Defaults to None.
|
||||
reverse_check (bool, optional): not equal. Defaults to False.
|
||||
|
||||
Returns:
|
||||
bool: False if failed
|
||||
"""
|
||||
tdLog.info("checking query data ...")
|
||||
if tag_value_list:
|
||||
dvalue = len(self.tag_type_str.split(',')) - defined_tag_count
|
||||
|
@ -1485,6 +1712,16 @@ class TDCom:
|
|||
# tdSql.checkEqual(res1, res2) if not reverse_check else tdSql.checkNotEqual(res1, res2)
|
||||
|
||||
def check_stream_res(self, sql, expected_res, max_delay):
|
||||
"""confirm stream result
|
||||
|
||||
Args:
|
||||
sql (str): select ...
|
||||
expected_res (str): expected result
|
||||
max_delay (int): max_delay value
|
||||
|
||||
Returns:
|
||||
bool: False if failed
|
||||
"""
|
||||
tdSql.query(sql)
|
||||
latency = 0
|
||||
|
||||
|
@ -1500,18 +1737,27 @@ class TDCom:
|
|||
tdSql.checkEqual(tdSql.queryRows, expected_res)
|
||||
|
||||
def check_stream(self, sql1, sql2, expected_count, max_delay=None):
|
||||
"""confirm stream
|
||||
|
||||
Args:
|
||||
sql1 (str): select ...
|
||||
sql2 (str): select ...
|
||||
expected_count (int): expected_count
|
||||
max_delay (int, optional): max_delay value. Defaults to None.
|
||||
"""
|
||||
self.check_stream_res(sql1, expected_count, max_delay)
|
||||
self.check_query_data(sql1, sql2)
|
||||
|
||||
def cal_watermark_window_close_session_endts(self, start_ts, watermark=None, session=None):
|
||||
"""cal endts for close window
|
||||
|
||||
:param start_ts: [start timestamp: self.date_time]
|
||||
:type start_ts: [epoch time]
|
||||
:param watermark: [second level and > session]
|
||||
:type watermark: [s]
|
||||
:param precision: [default "ms" and only support "ms" now]
|
||||
:type precision: str, optional
|
||||
Args:
|
||||
start_ts (epoch time): self.date_time
|
||||
watermark (int, optional): > session. Defaults to None.
|
||||
session (int, optional): Defaults to None.
|
||||
|
||||
Returns:
|
||||
int: as followed
|
||||
"""
|
||||
if watermark is not None:
|
||||
return start_ts + watermark*self.offset + 1
|
||||
|
@ -1521,14 +1767,13 @@ class TDCom:
|
|||
def cal_watermark_window_close_interval_endts(self, start_ts, interval, watermark=None):
|
||||
"""cal endts for close window
|
||||
|
||||
:param start_ts: [start timestamp: self.date_time]
|
||||
:type start_ts: [epoch time]
|
||||
:param interval: [second level]
|
||||
:type interval: [s]
|
||||
:param watermark: [second level and > interval]
|
||||
:type watermark: [s]
|
||||
:param precision: [default "ms" and only support "ms" now]
|
||||
:type precision: str, optional
|
||||
Args:
|
||||
start_ts (epoch time): self.date_time
|
||||
interval (int): [s]
|
||||
watermark (int, optional): [s]. Defaults to None.
|
||||
|
||||
Returns:
|
||||
_type_: _description_
|
||||
"""
|
||||
if watermark is not None:
|
||||
return int(start_ts/self.offset)*self.offset + (interval - (int(start_ts/self.offset))%interval)*self.offset + watermark*self.offset
|
||||
|
@ -1536,6 +1781,11 @@ class TDCom:
|
|||
return int(start_ts/self.offset)*self.offset + (interval - (int(start_ts/self.offset))%interval)*self.offset
|
||||
|
||||
def update_delete_history_data(self, delete):
|
||||
"""update and delete history data
|
||||
|
||||
Args:
|
||||
delete (bool): True/False
|
||||
"""
|
||||
self.sinsert_rows(tbname=self.ctb_name, ts_value=self.record_history_ts)
|
||||
self.sinsert_rows(tbname=self.tb_name, ts_value=self.record_history_ts)
|
||||
if delete:
|
||||
|
@ -1543,6 +1793,20 @@ class TDCom:
|
|||
self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-"))
|
||||
|
||||
def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None):
|
||||
"""prepare stream data
|
||||
|
||||
Args:
|
||||
interval (int, optional): Defaults to None.
|
||||
watermark (int, optional): Defaults to None.
|
||||
session (int, optional): Defaults to None.
|
||||
state_window (str, optional): Defaults to None.
|
||||
state_window_max (int, optional): Defaults to 127.
|
||||
interation (int, optional): Defaults to 3.
|
||||
range_count (int, optional): Defaults to None.
|
||||
precision (str, optional): Defaults to "ms".
|
||||
fill_history_value (int, optional): Defaults to 0.
|
||||
ext_stb (bool, optional): Defaults to None.
|
||||
"""
|
||||
self.clean_env()
|
||||
self.dataDict = {
|
||||
"stb_name" : f"{self.case_name}_stb",
|
||||
|
@ -1595,6 +1859,5 @@ class TDCom:
|
|||
self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
|
||||
if i == 1:
|
||||
self.record_history_ts = ts_value
|
||||
|
||||
|
||||
tdCom = TDCom()
|
||||
|
|
|
@ -111,7 +111,7 @@ class TDSql:
|
|||
return self.error_info
|
||||
|
||||
|
||||
def query(self, sql, row_tag=None,queryTimes=10):
|
||||
def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None):
|
||||
self.sql = sql
|
||||
i=1
|
||||
while i <= queryTimes:
|
||||
|
@ -120,6 +120,17 @@ class TDSql:
|
|||
self.queryResult = self.cursor.fetchall()
|
||||
self.queryRows = len(self.queryResult)
|
||||
self.queryCols = len(self.cursor.description)
|
||||
|
||||
if count_expected_res is not None:
|
||||
counter = 0
|
||||
while count_expected_res != self.queryResult[0][0]:
|
||||
self.cursor.execute(sql)
|
||||
self.queryResult = self.cursor.fetchall()
|
||||
if counter < queryTimes:
|
||||
counter += 0.5
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
return False
|
||||
if row_tag:
|
||||
return self.queryResult
|
||||
return self.queryRows
|
||||
|
@ -501,7 +512,8 @@ class TDSql:
|
|||
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
|
||||
tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||
# tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||
raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args)
|
||||
|
||||
def checkNotEqual(self, elm, expect_elm):
|
||||
if elm != expect_elm:
|
||||
|
@ -509,7 +521,8 @@ class TDSql:
|
|||
else:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
|
||||
tdLog.exit("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args)
|
||||
tdLog.info("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args)
|
||||
raise Exception
|
||||
|
||||
def get_times(self, time_str, precision="ms"):
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
|
|
|
@ -13,7 +13,8 @@ class TDTestCase:
|
|||
tdSql.init(conn.cursor(), logSql)
|
||||
self.tdCom = tdCom
|
||||
|
||||
def at_once_interval_ext(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, interval_value=None, subtable=None, case_when=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False):
|
||||
def at_once_interval_ext(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, subtable=None, case_when=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False):
|
||||
tdLog.info(f"*** testing stream at_once+interval+exist_stb+custom_tag: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, delete: {delete}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***")
|
||||
if use_except:
|
||||
if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm or len(stb_field_name_value.split(",")) == len(self.tdCom.partitial_stb_filter_des_select_elm.split(",")):
|
||||
partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str
|
||||
|
@ -33,8 +34,6 @@ class TDTestCase:
|
|||
self.delete = delete
|
||||
self.tdCom.case_name = sys._getframe().f_code.co_name
|
||||
defined_tag_count = len(tag_value.split()) if tag_value is not None else 0
|
||||
# if interval_value is None:
|
||||
# interval_value = f'{self.tdCom.dataDict["interval"]}s'
|
||||
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, ext_stb=use_exist_stb)
|
||||
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
|
@ -60,8 +59,6 @@ class TDTestCase:
|
|||
partition_elm_alias = self.tdCom.partition_expression_alias
|
||||
elif partition == "tbname,t1,c1":
|
||||
partition_elm_alias = f'{self.tdCom.partition_tbname_alias},t1,c1'
|
||||
partiton_tb = "tbname,c1"
|
||||
partition_elm_alias_tb = f'{self.tdCom.partition_tbname_alias},c1'
|
||||
else:
|
||||
partition_elm_alias = self.tdCom.partition_tag_alias
|
||||
if subtable:
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
import sys
|
||||
import threading
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.common import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
self.tdCom = tdCom
|
||||
|
||||
def watermark_max_delay_interval_ext(self, interval, max_delay, watermark=None, fill_value=None, partition="tbname", delete=False, fill_history_value=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False):
|
||||
tdLog.info(f"*** testing stream max_delay+interval+exist_stb+custom_tag: interval: {interval}, partition: {partition}, max_delay: {max_delay}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***")
|
||||
if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm:
|
||||
partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str
|
||||
else:
|
||||
partitial_tb_source_str = self.tdCom.ext_tb_source_select_str
|
||||
if not stb_field_name_value:
|
||||
stb_field_name_value = self.tdCom.tb_filter_des_select_elm
|
||||
self.delete = delete
|
||||
self.tdCom.case_name = sys._getframe().f_code.co_name
|
||||
defined_tag_count = len(tag_value.split())
|
||||
if watermark is not None:
|
||||
self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name
|
||||
self.tdCom.prepare_data(interval=interval, watermark=watermark, ext_stb=use_exist_stb)
|
||||
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
|
||||
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
|
||||
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
|
||||
if subtable:
|
||||
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None
|
||||
else:
|
||||
stb_subtable_value = None
|
||||
self.tdCom.date_time = 1658921623245
|
||||
if watermark is not None:
|
||||
watermark_value = f'{self.tdCom.dataDict["watermark"]}s'
|
||||
else:
|
||||
watermark_value = None
|
||||
|
||||
max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s'
|
||||
if fill_value:
|
||||
if "value" in fill_value.lower():
|
||||
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
|
||||
# create stb/ctb/tb stream
|
||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb)
|
||||
|
||||
init_num = 0
|
||||
for i in range(self.tdCom.range_count):
|
||||
if i == 0:
|
||||
if watermark is not None:
|
||||
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark'])
|
||||
else:
|
||||
window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'])
|
||||
else:
|
||||
self.tdCom.date_time = window_close_ts + self.tdCom.offset
|
||||
window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset
|
||||
for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)):
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset)
|
||||
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1)
|
||||
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
|
||||
|
||||
if i == 0:
|
||||
init_num = 2 + i
|
||||
if watermark is not None:
|
||||
init_num += 1
|
||||
else:
|
||||
init_num += 1
|
||||
time.sleep(int(max_delay.replace("s", "")))
|
||||
if tag_value:
|
||||
tdSql.query(f'select {tag_value} from {self.stb_name}')
|
||||
tag_value_list = tdSql.queryResult
|
||||
if not fill_value:
|
||||
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts;', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition)
|
||||
|
||||
def run(self):
|
||||
for delete in [True, False]:
|
||||
for fill_history_value in [0, 1]:
|
||||
self.watermark_max_delay_interval_ext(interval=random.choice([15]), watermark=random.randint(20, 25), max_delay=f"{random.randint(5, 6)}s", delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,154 @@
|
|||
import threading
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.common import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
self.tdCom = tdCom
|
||||
|
||||
def pause_resume_test(self, interval, partition="tbname", delete=False, fill_history_value=None, pause=True, resume=True, ignore_untreated=False):
|
||||
tdLog.info(f"*** testing stream pause+resume: interval: {interval}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, ignore_untreated: {ignore_untreated} ***")
|
||||
if_exist_value_list = [None, True]
|
||||
if_exist = random.choice(if_exist_value_list)
|
||||
reverse_check = True if ignore_untreated else False
|
||||
range_count = (self.tdCom.range_count + 3) * 3
|
||||
self.delete = delete
|
||||
self.tdCom.case_name = sys._getframe().f_code.co_name
|
||||
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value)
|
||||
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
|
||||
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
|
||||
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
|
||||
|
||||
if partition == "tbname":
|
||||
partition_elm_alias = self.tdCom.partition_tbname_alias
|
||||
elif partition == "c1":
|
||||
partition_elm_alias = self.tdCom.partition_col_alias
|
||||
elif partition == "abs(c1)":
|
||||
partition_elm_alias = self.tdCom.partition_expression_alias
|
||||
elif partition is None:
|
||||
partition_elm_alias = '"no_partition"'
|
||||
else:
|
||||
partition_elm_alias = self.tdCom.partition_tag_alias
|
||||
if partition == "tbname" or partition is None:
|
||||
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
else:
|
||||
stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None
|
||||
if partition:
|
||||
partition_elm = f'partition by {partition} {partition_elm_alias}'
|
||||
else:
|
||||
partition_elm = ""
|
||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value)
|
||||
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value)
|
||||
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value)
|
||||
for i in range(range_count):
|
||||
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
|
||||
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
|
||||
if self.delete and i%2 != 0:
|
||||
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=ts_cast_delete_value)
|
||||
self.tdCom.date_time += 1
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
|
||||
if self.delete and i%2 != 0:
|
||||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=ts_cast_delete_value)
|
||||
self.tdCom.date_time += 1
|
||||
if partition:
|
||||
partition_elm = f'partition by {partition}'
|
||||
else:
|
||||
partition_elm = ""
|
||||
# if i == int(range_count/2):
|
||||
if i > 2 and i % 3 == 0:
|
||||
for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']:
|
||||
if if_exist is not None:
|
||||
tdSql.execute(f'pause stream if exists {stream_name}_no_exist')
|
||||
tdSql.error(f'pause stream if not exists {stream_name}')
|
||||
tdSql.error(f'pause stream {stream_name}_no_exist')
|
||||
self.tdCom.pause_stream(stream_name, if_exist)
|
||||
if pause and not resume and range_count-i <= 3:
|
||||
time.sleep(self.tdCom.default_interval)
|
||||
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart')
|
||||
res_after_pause = tdSql.queryResult
|
||||
if resume:
|
||||
if i > 2 and i % 3 != 0:
|
||||
for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']:
|
||||
if if_exist is not None:
|
||||
tdSql.execute(f'resume stream if exists {stream_name}_no_exist')
|
||||
tdSql.error(f'resume stream if not exists {stream_name}')
|
||||
self.tdCom.resume_stream(stream_name, if_exist, None, ignore_untreated)
|
||||
if pause and not resume:
|
||||
tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart')
|
||||
res_without_resume = tdSql.queryResult
|
||||
tdSql.checkEqual(res_after_pause, res_without_resume)
|
||||
else:
|
||||
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
|
||||
if tbname != self.tb_name:
|
||||
self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check)
|
||||
else:
|
||||
self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check)
|
||||
|
||||
if self.tdCom.subtable:
|
||||
for tname in [self.stb_name, self.ctb_name]:
|
||||
tdSql.query(f'select * from {self.ctb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
if partition == "c1":
|
||||
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
|
||||
elif partition is None:
|
||||
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
|
||||
elif partition == "abs(c1)":
|
||||
abs_c1_value = abs(c1_value[1])
|
||||
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;')
|
||||
ptn_counter += 1
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
|
||||
|
||||
tdSql.query(f'select * from {self.tb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
if partition == "c1":
|
||||
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;')
|
||||
elif partition is None:
|
||||
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;')
|
||||
elif partition == "abs(c1)":
|
||||
abs_c1_value = abs(c1_value[1])
|
||||
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;')
|
||||
ptn_counter += 1
|
||||
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
|
||||
|
||||
|
||||
def run(self):
|
||||
for delete in [True, False]:
|
||||
for fill_history_value in [0, 1]:
|
||||
# pause/resume
|
||||
self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=False, fill_history_value=fill_history_value, delete=delete)
|
||||
self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=True, fill_history_value=fill_history_value, delete=delete)
|
||||
# self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", resume=False, fill_history_value=fill_history_value, delete=delete)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -13,7 +13,8 @@ class TDTestCase:
|
|||
tdSql.init(conn.cursor(), logSql)
|
||||
self.tdCom = tdCom
|
||||
|
||||
def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False):
|
||||
def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, delete=False):
|
||||
tdLog.info(f"*** testing stream window_close+session+exist_stb+custom_tag: session: {session}, partition: {partition}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***")
|
||||
if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm:
|
||||
partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str
|
||||
else:
|
||||
|
@ -61,15 +62,16 @@ class TDTestCase:
|
|||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts)
|
||||
|
||||
if fill_history_value:
|
||||
self.tdCom.update_delete_history_data(delete=True)
|
||||
self.tdCom.update_delete_history_data(delete=delete)
|
||||
if tag_value:
|
||||
tdSql.query(f'select {tag_value} from {self.stb_name}')
|
||||
tag_value_list = tdSql.queryResult
|
||||
self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart limit {expected_value};', sorted=True, defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition)
|
||||
|
||||
def run(self):
|
||||
for fill_history_value in [0, 1]:
|
||||
self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), fill_history_value=fill_history_value, subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True)
|
||||
#! TD-25893
|
||||
# self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=False, fill_history_value=1)
|
||||
self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=True)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
Loading…
Reference in New Issue