test:add testcase for interp in stream

This commit is contained in:
chenhaoran 2024-10-28 09:25:49 +08:00
parent ff46074b19
commit 13086bc66e
2 changed files with 78 additions and 23 deletions

View File

@ -170,6 +170,8 @@ class TDCom:
self.fill_tb_source_select_str = ','.join(self.fill_function_list[0:13])
self.ext_tb_source_select_str = ','.join(self.downsampling_function_list[0:13])
self.stream_case_when_tbname = "tbname"
self.tag_value_str = ""
self.tag_value_list = []
self.update = True
self.disorder = True
@ -202,7 +204,7 @@ class TDCom:
self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)"
self.tag_count = len(self.tag_filter_des_select_elm.split(","))
self.state_window_range = list()
self.custom_col_val = 0
self.part_val_list = [1, 2]
# def init(self, conn, logSql):
@ -754,10 +756,10 @@ class TDCom:
if len(kwargs) > 0:
for param, value in kwargs.items():
ctb_params += f'{param} "{value}" '
tag_value_list = self.gen_tag_value_list(tag_elm_list)
self.tag_value_list = self.gen_tag_value_list(tag_elm_list)
tag_value_str = ""
# tag_value_str = ", ".join(str(v) for v in self.tag_value_list)
for tag_value in tag_value_list:
for tag_value in self.tag_value_list:
if isinstance(tag_value, str):
tag_value_str += f'"{tag_value}", '
else:
@ -1764,6 +1766,7 @@ class TDCom:
bool: False if failed
"""
tdLog.info("checking query data ...")
tdLog.info(f"sq1:{sql1}; sql2:{sql2};")
if tag_value_list:
dvalue = len(self.tag_type_str.split(',')) - defined_tag_count
tdSql.query(sql1)
@ -1799,7 +1802,7 @@ class TDCom:
res2 = self.round_handle(res2)
if not reverse_check:
while res1 != res2:
tdLog.info("query retrying ...")
# tdLog.info("query retrying ...")
new_list = list()
tdSql.query(sql1)
res1 = tdSql.queryResult

View File

@ -14,6 +14,7 @@ class TDTestCase:
self.tdCom = tdCom
def force_window_close(self, interval, partition="tbname", funciton_name="", funciton_name_alias= "",delete=False, fill_value=None, fill_history_value=None, case_when=None, ignore_expired=1, ignore_update=1):
# partition must be tbname, and not NONE.
tdLog.info(f"*** testing stream force_window_close+interp+every: every: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***")
self.tdCom.subtable=False
col_value_type = "Incremental" if partition=="c1" else "random"
@ -26,6 +27,7 @@ class TDTestCase:
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}'
self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}'
self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}'
if partition == "tbname":
@ -68,32 +70,68 @@ class TDTestCase:
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
# create error stream
tdLog.info("create error stream")
sleep(10)
tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;")
tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;")
tdSql.error(f"create stream itp_force_error_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;")
tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;")
# function name : interp
trigger_mode = "force_window_close"
# # subtable is true
# create stream add :subtable_value=stb_subtable_value or subtable_value=ctb_subtable_value
# no subtable
# create stream super table and child table
tdLog.info("create stream super table and child table")
self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.ctb_name} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
# creat stream set filter of tag and tbname
tdLog.info("create stream with tag and tbname filter")
tag_t1_value = self.tdCom.tag_value_list[0]
where_tag = f'where t1 = {tag_t1_value}'
where_tbname = f'where tbname="{self.ctb_name}"'
print(f"tag: {tag_t1_value}")
self.stb_stream_des_where_tag_table = f'{self.stb_name}_where_tag{self.tdCom.des_table_suffix}'
self.tdCom.create_stream(stream_name=f'{self.stb_name}_where_tag{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_where_tag_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {where_tag} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
self.stb_stream_des_where_tbname_table = f'{self.stb_name}_where_tbname{self.tdCom.des_table_suffix}'
self.tdCom.create_stream(stream_name=f'{self.stb_name}_where_tbname{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_where_tbname_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {where_tbname} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
# set partition by tag and column
if partition:
tdLog.info("create stream with partition by tag and tbname ")
partition_elm_new = f'partition by {partition}, t1'
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
partition_elm_new = f'partition by {partition}, c1'
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
partition_elm_new = f'partition by {partition}, c2'
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column2{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column2{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c2 as c_c2, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
if fill_value:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11'
# create stream general table
tdLog.info("create stream general table")
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0]
start_time = self.tdCom.date_time
time.sleep(1)
start_force_ts = str(0)
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
print(ts_value)
if start_force_ts == "0":
start_force_ts = ts_value
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
@ -183,7 +221,13 @@ class TDTestCase:
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts)
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts)
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts)
self.tdCom.date_time = start_time
# get query time range using interval count windows
tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
end_new_ts = tdSql.getData(tdSql.queryRows-1, 1)
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ")
tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ")
for i in range(self.tdCom.range_count):
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
@ -194,21 +238,29 @@ class TDTestCase:
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
# wait for the stream to process the data
time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2))
# check the data
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
print(tbname)
tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
start_new_ts = tdSql.getData(0, 1)
if tbname != self.tb_name:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
if partition == "tbname":
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= {end_ts} order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range({start_force_ts},{end_ts}) every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
# print(self.tdCom.dataDict["interval"]*(final_range_count+2))
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
# check tag and tbname filter
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tbname_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
else:
if "value" in fill_value.lower():
fill_value='VALUE,1,2,3,6,7,8,9,10,11'
fill_value='VALUE,1'
if partition == "tbname":
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value)
else:
self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value)
self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
if self.delete:
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
@ -232,16 +284,16 @@ class TDTestCase:
def run(self):
self.force_window_close(interval=random.randint(10, 15), partition="tbname", funciton_name="interp(c1)", funciton_name_alias="intp_c1" ,delete=False, ignore_update=1, fill_value="PREV")
self.force_window_close(interval=random.randint(10, 15), partition="c1", ignore_update=1)
self.force_window_close(interval=random.randint(10, 15), partition="abs(c1)", ignore_update=1)
self.force_window_close(interval=random.randint(10, 15), partition=None, delete=True)
self.force_window_close(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end')
self.force_window_close(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL")
for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
# for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value)
self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True)
self.force_window_close(interval=10, partition="tbname", funciton_name="interp(c1)", funciton_name_alias="intp_c1" ,delete=False, ignore_update=1, fill_value="PREV")
# self.force_window_close(interval=random.randint(10, 15), partition="c1", ignore_update=1)
# self.force_window_close(interval=random.randint(10, 15), partition="abs(c1)", ignore_update=1)
# self.force_window_close(interval=random.randint(10, 15), partition=None, delete=True)
# self.force_window_close(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end')
# self.force_window_close(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL")
# for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
# # for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]:
# self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value)
# self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True)
def stop(self):
tdSql.close()