diff --git a/tests/system-test/8-stream/force_window_close_interval.py b/tests/system-test/8-stream/force_window_close_interval.py index af06cfdfd3..a93e79f991 100644 --- a/tests/system-test/8-stream/force_window_close_interval.py +++ b/tests/system-test/8-stream/force_window_close_interval.py @@ -179,7 +179,7 @@ class TDTestCase: tag_t1_value = self.tdCom.tag_value_list[0] where_tag = f"where t1 = {tag_t1_value}" where_tbname = f'where tbname="{self.ctb_name}"' - print(f"tag: {tag_t1_value}") + # print(f"tag: {tag_t1_value}") self.stb_stream_des_where_tag_table = ( f"{self.stb_name}_where_tag{self.tdCom.des_table_suffix}" @@ -209,12 +209,21 @@ class TDTestCase: ) # set partition by tag and column + self.stb_stream_des_partition_tag_table = ( + f"{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}" + ) + self.stb_stream_des_partition_column1_table = ( + f"{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}" + ) + self.stb_stream_des_partition_column2_table = ( + f"{self.stb_name}_partition_column2{self.tdCom.des_table_suffix}" + ) if partition: tdLog.info("create stream with partition by tag and tbname ") partition_elm_new = f"partition by {partition}, t1" self.tdCom.create_stream( stream_name=f"{self.stb_name}_partition_tag{self.tdCom.stream_suffix}", - des_table=f"{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}", + des_table=self.stb_stream_des_partition_tag_table, source_sql=f'select _irowts as irowts, tbname as table_name, t1 as t_t1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, @@ -248,6 +257,7 @@ class TDTestCase: if fill_value: if "value" in fill_value.lower(): fill_value = "VALUE,1,2,3,4,5,6,7,8,9,10,11" + # create stream general table tdLog.info("create stream general table") self.tdCom.create_stream( @@ -261,6 +271,9 @@ class TDTestCase: ignore_update=ignore_update, ) + #sleep 10s wait stream_task status is ready + time.sleep(10) + # insert data self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0] start_time = self.tdCom.date_time @@ -271,7 +284,7 @@ class TDTestCase: str(self.tdCom.date_time + self.tdCom.dataDict["interval"]) + f"+{i*10}s" ) - print(ts_value) + # print(ts_value) if start_force_ts == "0": start_force_ts = ts_value ts_cast_delete_value = self.tdCom.time_cast(ts_value) @@ -468,7 +481,7 @@ class TDTestCase: # check the data for tbname in [self.stb_name, self.ctb_name, self.tb_name]: - print(tbname) + tdLog.info(f"tbname:{tbname}") tdSql.query( f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ' ) @@ -477,6 +490,8 @@ class TDTestCase: if "value" in fill_value.lower(): fill_value = "VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11" if partition == "tbname": + # check data for child table + tdLog.info("check data for child table ") self.tdCom.check_query_data( f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', @@ -484,12 +499,15 @@ class TDTestCase: ) elif tbname == self.stb_name: if partition == "tbname": + # check data for super table + tdLog.info("check data for super table") self.tdCom.check_query_data( f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value, ) - # check tag and tbname filter + # tag and tbname filter + tdLog.info("check data for tag and tbname filter") self.tdCom.check_query_data( f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', @@ -500,10 +518,28 @@ class TDTestCase: f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value, ) + # check partition by tag and column(c1 or c2) + tdLog.info("check data for partition by tag and column") + self.tdCom.check_query_data( + f'select irowts, table_name, t_t1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by t_t1, irowts', + f'select _irowts as irowts ,tbname as table_name, t1 as t_t1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition},t1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by t_t1, irowts', + fill_value=fill_value, + ) + self.tdCom.check_query_data( + f'select irowts, table_name, c_c1, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column1_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c1, irowts', + f'select _irowts as irowts ,tbname as table_name, c1 as c_c1, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition},c1 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c1, irowts', + fill_value=fill_value, + ) + self.tdCom.check_query_data( + f'select irowts, table_name, c_c2, isfilled, {funciton_name_alias} from {self.stb_stream_des_partition_column2_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by c_c2, irowts', + f'select _irowts as irowts ,tbname as table_name, c2 as c_c2, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname}, {where_tbname} partition by {partition},c2 range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by c_c2, irowts', + fill_value=fill_value, + ) else: if "value" in fill_value.lower(): fill_value = "VALUE,1" if partition == "tbname": + # check data for general table self.tdCom.check_query_data( f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', @@ -512,19 +548,32 @@ class TDTestCase: # Recreate a sub-table that meets the filtering "where_tag" and check if the streaming results are automatically included within it." where_tag_ctbname = f"{self.ctb_name}_where_tag" + where_tag_ctbname_other_tag = f"{self.ctb_name}_where_tag_1" + tag_t1_value_other = abs(tag_t1_value)-1 tdSql.execute( f"create table {where_tag_ctbname} using {self.stb_name} (t1) tags({tag_t1_value}) " ) + tdSql.execute( + f"create table {where_tag_ctbname_other_tag} using {self.stb_name} (t1) tags({tag_t1_value_other}) " + ) where_tag_timestamp = self.tdCom.genTs(precision=self.tdCom.precision)[0] where_tag_ts_start_value = str(where_tag_timestamp) + "+2s" self.tdCom.sinsert_rows( tbname=where_tag_ctbname, ts_value=where_tag_ts_start_value ) - time.sleep(self.tdCom.dataDict["interval"] + 5) - tdSql.query( - f"select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name=\"{where_tag_ctbname}\"" + self.tdCom.sinsert_rows( + tbname=where_tag_ctbname_other_tag, ts_value=where_tag_ts_start_value ) - tdSql.checkEqual(tdSql.queryResult[0][0], where_tag_ctbname) + time.sleep(self.tdCom.dataDict["interval"]) + for _ in range(self.tdCom.dataDict["interval"]): + tdSql.query( + f"select distinct(table_name) from {self.stb_stream_des_where_tag_table} where table_name=\"{where_tag_ctbname}\"" + ) + if tdSql.queryRows > 0: + if tdSql.checkDataNotExit(0,0, where_tag_ctbname): + break + else: + time.sleep(1) if self.delete: self.tdCom.sdelete_rows(