test:verify the output of the stream with tag filtering is correct when a new child table with matching tag conditions is created
This commit is contained in:
parent
13086bc66e
commit
570b69c30d
|
@ -111,7 +111,7 @@ class TDTestCase:
|
||||||
if partition:
|
if partition:
|
||||||
tdLog.info("create stream with partition by tag and tbname ")
|
tdLog.info("create stream with partition by tag and tbname ")
|
||||||
partition_elm_new = f'partition by {partition}, t1'
|
partition_elm_new = f'partition by {partition}, t1'
|
||||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, t1 as t_t1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||||
partition_elm_new = f'partition by {partition}, c1'
|
partition_elm_new = f'partition by {partition}, c1'
|
||||||
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||||
partition_elm_new = f'partition by {partition}, c2'
|
partition_elm_new = f'partition by {partition}, c2'
|
||||||
|
@ -125,6 +125,7 @@ class TDTestCase:
|
||||||
tdLog.info("create stream general table")
|
tdLog.info("create stream general table")
|
||||||
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update)
|
||||||
|
|
||||||
|
# insert data
|
||||||
self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0]
|
self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0]
|
||||||
start_time = self.tdCom.date_time
|
start_time = self.tdCom.date_time
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
@ -224,9 +225,13 @@ class TDTestCase:
|
||||||
|
|
||||||
# get query time range using interval count windows
|
# get query time range using interval count windows
|
||||||
tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
||||||
|
# getData don't support negative index
|
||||||
end_new_ts = tdSql.getData(tdSql.queryRows-1, 1)
|
end_new_ts = tdSql.getData(tdSql.queryRows-1, 1)
|
||||||
|
end_last_but_one_ts = tdSql.getData(tdSql.queryRows-2, 1)
|
||||||
|
#source data include that fill valuse is null and "_isfilled" column of the stream output is false
|
||||||
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ")
|
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ")
|
||||||
tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ")
|
tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ")
|
||||||
|
tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_last_but_one_ts}\",NULL) ")
|
||||||
|
|
||||||
for i in range(self.tdCom.range_count):
|
for i in range(self.tdCom.range_count):
|
||||||
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
|
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
|
||||||
|
@ -240,6 +245,7 @@ class TDTestCase:
|
||||||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
|
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value)
|
||||||
|
|
||||||
# wait for the stream to process the data
|
# wait for the stream to process the data
|
||||||
|
# print(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
||||||
time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
||||||
|
|
||||||
# check the data
|
# check the data
|
||||||
|
@ -247,11 +253,13 @@ class TDTestCase:
|
||||||
print(tbname)
|
print(tbname)
|
||||||
tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ')
|
||||||
start_new_ts = tdSql.getData(0, 1)
|
start_new_ts = tdSql.getData(0, 1)
|
||||||
if tbname != self.tb_name:
|
if tbname == self.ctb_name:
|
||||||
if "value" in fill_value.lower():
|
if "value" in fill_value.lower():
|
||||||
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
|
fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11'
|
||||||
if partition == "tbname":
|
if partition == "tbname":
|
||||||
# print(self.tdCom.dataDict["interval"]*(final_range_count+2))
|
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||||
|
elif tbname == self.stb_name:
|
||||||
|
if partition == "tbname":
|
||||||
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||||
# check tag and tbname filter
|
# check tag and tbname filter
|
||||||
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||||
|
@ -262,6 +270,18 @@ class TDTestCase:
|
||||||
if partition == "tbname":
|
if partition == "tbname":
|
||||||
self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value)
|
||||||
|
|
||||||
|
# Recreate a sub-table that meets the filtering "where_tag" and check if the streaming results are automatically included within it."
|
||||||
|
where_tag_ctbname = f"{self.ctb_name}_where_tag"
|
||||||
|
tdSql.execute(f"create table {where_tag_ctbname} using {self.stb_name} (t1) tags({tag_t1_value}) ")
|
||||||
|
where_tag_timestamp = self.tdCom.genTs(precision=self.tdCom.precision)[0]
|
||||||
|
where_tag_ts_start_value = str(where_tag_timestamp)+ '+2s'
|
||||||
|
self.tdCom.sinsert_rows(tbname=where_tag_ctbname, ts_value=where_tag_ts_start_value)
|
||||||
|
time.sleep(self.tdCom.dataDict["interval"]+5)
|
||||||
|
tdSql.query(f'select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name="{where_tag_ctbname}"')
|
||||||
|
tdSql.checkEqual(tdSql.queryResult[0][0], where_tag_ctbname)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if self.delete:
|
if self.delete:
|
||||||
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
||||||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=start_ts, end_ts=ts_cast_delete_value)
|
||||||
|
|
Loading…
Reference in New Issue