test:verify the output of the stream with partition by tag and column

This commit is contained in:
chenhaoran 2024-10-30 19:30:09 +08:00 committed by 54liuyao
parent cd0a23c983
commit 9376d78b67
1 changed files with 58 additions and 9 deletions

View File

@ -179,7 +179,7 @@ class TDTestCase:
tag_t1_value = self.tdCom.tag_value_list[0]
where_tag = f"where t1 = {tag_t1_value}"
where_tbname = f'where tbname="{self.ctb_name}"'
print(f"tag: {tag_t1_value}")
# print(f"tag: {tag_t1_value}")
self.stb_stream_des_where_tag_table = (
f"{self.stb_name}_where_tag{self.tdCom.des_table_suffix}"
@ -209,12 +209,21 @@ class TDTestCase:
)
# set partition by tag and column
self.stb_stream_des_partition_tag_table = (
f"{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}"
)
self.stb_stream_des_partition_column1_table = (
f"{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}"
)
self.stb_stream_des_partition_column2_table = (
f"{self.stb_name}_partition_column2{self.tdCom.des_table_suffix}"
)
if partition:
tdLog.info("create stream with partition by tag and tbname ")
partition_elm_new = f"partition by {partition}, t1"
self.tdCom.create_stream(
stream_name=f"{self.stb_name}_partition_tag{self.tdCom.stream_suffix}",
des_table=f"{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}",
des_table=self.stb_stream_des_partition_tag_table,
source_sql=f'select _irowts as irowts, tbname as table_name, t1 as t_t1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)',
trigger_mode=trigger_mode,
fill_value=fill_value,
@ -248,6 +257,7 @@ class TDTestCase:
if fill_value:
if "value" in fill_value.lower():
fill_value = "VALUE,1,2,3,4,5,6,7,8,9,10,11"
# create stream general table
tdLog.info("create stream general table")
self.tdCom.create_stream(
@ -261,6 +271,9 @@ class TDTestCase:
ignore_update=ignore_update,
)
#sleep 10s wait stream_task status is ready
time.sleep(10)
# insert data
self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0]
start_time = self.tdCom.date_time
@ -271,7 +284,7 @@ class TDTestCase:
str(self.tdCom.date_time + self.tdCom.dataDict["interval"])
+ f"+{i*10}s"
)
print(ts_value)
# print(ts_value)
if start_force_ts == "0":
start_force_ts = ts_value
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
@ -468,7 +481,7 @@ class TDTestCase:
# check the data
for tbname in [self.stb_name, self.ctb_name, self.tb_name]:
print(tbname)
tdLog.info(f"tbname:{tbname}")
tdSql.query(
f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) '
)
@ -477,6 +490,8 @@ class TDTestCase:
if "value" in fill_value.lower():
fill_value = "VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"
if partition == "tbname":
# check data for child table
tdLog.info("check data for child table ")
self.tdCom.check_query_data(
f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts',
f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts',
@ -484,12 +499,15 @@ class TDTestCase:
)
elif tbname == self.stb_name:
if partition == "tbname":
# check data for super table
tdLog.info("check data for super table")
self.tdCom.check_query_data(
f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts',
f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts',
fill_value=fill_value,
)
# check tag and tbname filter
# tag and tbname filter
tdLog.info("check data for tag and tbname filter")
self.tdCom.check_query_data(
f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts',
f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts',
@ -500,10 +518,28 @@ class TDTestCase:
f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts',
fill_value=fill_value,
)
# check partition by tag and column(c1 or c2)
tdLog.info("check data for partition by tag and column")
self.tdCom.check_query_data(
f'select irowts, table_name, t_t1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by t_t1, irowts',
f'select _irowts as irowts ,tbname as table_name, t1 as t_t1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition},t1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by t_t1, irowts',
fill_value=fill_value,
)
self.tdCom.check_query_data(
f'select irowts, table_name, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts',
f'select _irowts as irowts ,tbname as table_name, c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts',
fill_value=fill_value,
)
self.tdCom.check_query_data(
f'select irowts, table_name, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts',
f'select _irowts as irowts ,tbname as table_name, c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname}, {where_tbname} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts',
fill_value=fill_value,
)
else:
if "value" in fill_value.lower():
fill_value = "VALUE,1"
if partition == "tbname":
# check data for general table
self.tdCom.check_query_data(
f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts',
f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts',
@ -512,19 +548,32 @@ class TDTestCase:
# Recreate a sub-table that meets the filtering "where_tag" and check if the streaming results are automatically included within it."
where_tag_ctbname = f"{self.ctb_name}_where_tag"
where_tag_ctbname_other_tag = f"{self.ctb_name}_where_tag_1"
tag_t1_value_other = abs(tag_t1_value)-1
tdSql.execute(
f"create table {where_tag_ctbname} using {self.stb_name} (t1) tags({tag_t1_value}) "
)
tdSql.execute(
f"create table {where_tag_ctbname_other_tag} using {self.stb_name} (t1) tags({tag_t1_value_other}) "
)
where_tag_timestamp = self.tdCom.genTs(precision=self.tdCom.precision)[0]
where_tag_ts_start_value = str(where_tag_timestamp) + "+2s"
self.tdCom.sinsert_rows(
tbname=where_tag_ctbname, ts_value=where_tag_ts_start_value
)
time.sleep(self.tdCom.dataDict["interval"] + 5)
tdSql.query(
f"select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name=\"{where_tag_ctbname}\""
self.tdCom.sinsert_rows(
tbname=where_tag_ctbname_other_tag, ts_value=where_tag_ts_start_value
)
tdSql.checkEqual(tdSql.queryResult[0][0], where_tag_ctbname)
time.sleep(self.tdCom.dataDict["interval"])
for _ in range(self.tdCom.dataDict["interval"]):
tdSql.query(
f"select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name=\"{where_tag_ctbname}\""
)
if tdSql.queryRows > 0:
if tdSql.checkDataNotExit(0,0, where_tag_ctbname):
break
else:
time.sleep(1)
if self.delete:
self.tdCom.sdelete_rows(