diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 2542eca69c..1ec6d2cbb9 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -170,6 +170,8 @@ class TDCom: self.fill_tb_source_select_str = ','.join(self.fill_function_list[0:13]) self.ext_tb_source_select_str = ','.join(self.downsampling_function_list[0:13]) self.stream_case_when_tbname = "tbname" + self.tag_value_str = "" + self.tag_value_list = [] self.update = True self.disorder = True @@ -202,7 +204,7 @@ class TDCom: self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)" self.tag_count = len(self.tag_filter_des_select_elm.split(",")) self.state_window_range = list() - + self.custom_col_val = 0 self.part_val_list = [1, 2] # def init(self, conn, logSql): @@ -754,10 +756,10 @@ class TDCom: if len(kwargs) > 0: for param, value in kwargs.items(): ctb_params += f'{param} "{value}" ' - tag_value_list = self.gen_tag_value_list(tag_elm_list) + self.tag_value_list = self.gen_tag_value_list(tag_elm_list) tag_value_str = "" # tag_value_str = ", ".join(str(v) for v in self.tag_value_list) - for tag_value in tag_value_list: + for tag_value in self.tag_value_list: if isinstance(tag_value, str): tag_value_str += f'"{tag_value}", ' else: @@ -1764,6 +1766,7 @@ class TDCom: bool: False if failed """ tdLog.info("checking query data ...") + tdLog.info(f"sq1:{sql1}; sql2:{sql2};") if tag_value_list: dvalue = len(self.tag_type_str.split(',')) - defined_tag_count tdSql.query(sql1) @@ -1799,7 +1802,7 @@ class TDCom: res2 = self.round_handle(res2) if not reverse_check: while res1 != res2: - tdLog.info("query retrying ...") + # tdLog.info("query retrying ...") new_list = list() tdSql.query(sql1) res1 = tdSql.queryResult diff --git a/tests/system-test/8-stream/force_window_close_interval.py b/tests/system-test/8-stream/force_window_close_interval.py index 32c228176c..0cad878895 100644 --- a/tests/system-test/8-stream/force_window_close_interval.py +++ b/tests/system-test/8-stream/force_window_close_interval.py @@ -14,6 +14,7 @@ class TDTestCase: self.tdCom = tdCom def force_window_close(self, interval, partition="tbname", funciton_name="", funciton_name_alias= "",delete=False, fill_value=None, fill_history_value=None, case_when=None, ignore_expired=1, ignore_update=1): + # partition must be tbname, and not NONE. tdLog.info(f"*** testing stream force_window_close+interp+every: every: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***") self.tdCom.subtable=False col_value_type = "Incremental" if partition=="c1" else "random" @@ -26,6 +27,7 @@ class TDTestCase: self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' if partition == "tbname": @@ -68,32 +70,68 @@ class TDTestCase: if fill_value: if "value" in fill_value.lower(): fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' + # create error stream tdLog.info("create error stream") sleep(10) tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;") tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;") tdSql.error(f"create stream itp_force_error_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 1 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c1,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;") + tdSql.error(f"create stream itp_force_error_1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 0 into itp_force_error_1 as select _irowts,tbname,_isfilled,interp(c11,1) from {self.stb_name} partition by tbname every(5s) fill(prev) ;") # function name : interp trigger_mode = "force_window_close" + + + # # subtable is true # create stream add :subtable_value=stb_subtable_value or subtable_value=ctb_subtable_value + # no subtable + # create stream super table and child table + tdLog.info("create stream super table and child table") self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.ctb_name} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + + # creat stream set filter of tag and tbname + tdLog.info("create stream with tag and tbname filter") + tag_t1_value = self.tdCom.tag_value_list[0] + where_tag = f'where t1 = {tag_t1_value}' + where_tbname = f'where tbname="{self.ctb_name}"' + print(f"tag: {tag_t1_value}") + + self.stb_stream_des_where_tag_table = f'{self.stb_name}_where_tag{self.tdCom.des_table_suffix}' + self.tdCom.create_stream(stream_name=f'{self.stb_name}_where_tag{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_where_tag_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {where_tag} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + self.stb_stream_des_where_tbname_table = f'{self.stb_name}_where_tbname{self.tdCom.des_table_suffix}' + self.tdCom.create_stream(stream_name=f'{self.stb_name}_where_tbname{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_where_tbname_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {where_tbname} {partition_elm} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + + # set partition by tag and column + if partition: + tdLog.info("create stream with partition by tag and tbname ") + partition_elm_new = f'partition by {partition}, t1' + self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_tag{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_tag{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + partition_elm_new = f'partition by {partition}, c1' + self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column1{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column1{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c1 as c_c1, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + partition_elm_new = f'partition by {partition}, c2' + self.tdCom.create_stream(stream_name=f'{self.stb_name}_partition_column2{self.tdCom.stream_suffix}', des_table=f'{self.stb_name}_partition_column2{self.tdCom.des_table_suffix}', source_sql=f'select _irowts as irowts, tbname as table_name, c2 as c_c2, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.stb_name} {partition_elm_new} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + + if fill_value: if "value" in fill_value.lower(): fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11' + # create stream general table + tdLog.info("create stream general table") self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _irowts as irowts,tbname as table_name, _isfilled as isfilled, {funciton_name} as {funciton_name_alias} from {self.tb_name} every({self.tdCom.dataDict["interval"]}s)', trigger_mode=trigger_mode, fill_value=fill_value, fill_history_value=fill_history_value,ignore_expired=ignore_expired,ignore_update=ignore_update) + + self.tdCom.date_time = self.tdCom.genTs(precision=self.tdCom.precision)[0] start_time = self.tdCom.date_time - time.sleep(1) start_force_ts = str(0) for i in range(self.tdCom.range_count): ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' + print(ts_value) if start_force_ts == "0": start_force_ts = ts_value ts_cast_delete_value = self.tdCom.time_cast(ts_value) @@ -183,7 +221,13 @@ class TDTestCase: self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts) self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts) - self.tdCom.date_time = start_time + + # get query time range using interval count windows + tdSql.query(f'select _wstart, _wend ,last(ts) from {self.stb_name} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ') + end_new_ts = tdSql.getData(tdSql.queryRows-1, 1) + tdSql.execute(f"insert into {self.ctb_name} (ts,c1) values(\"{end_new_ts}\",-102) ") + tdSql.execute(f"insert into {self.tb_name} (ts,c1) values(\"{end_new_ts}\",-51) ") + for i in range(self.tdCom.range_count): ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' ts_cast_delete_value = self.tdCom.time_cast(ts_value) @@ -194,21 +238,29 @@ class TDTestCase: if self.delete: self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value) self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=ts_cast_delete_value) + + # wait for the stream to process the data + time.sleep(self.tdCom.dataDict["interval"]*(final_range_count+2)) + + # check the data for tbname in [self.stb_name, self.ctb_name, self.tb_name]: + print(tbname) + tdSql.query(f'select _wstart, _wend ,last(ts) from {tbname} where ts >= {start_force_ts} and ts <= {end_ts} partition by tbname interval({self.tdCom.dataDict["interval"]}s)fill ({fill_value}) ') + start_new_ts = tdSql.getData(0, 1) if tbname != self.tb_name: if "value" in fill_value.lower(): fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' if partition == "tbname": - self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= {end_ts} order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range({start_force_ts},{end_ts}) every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) - else: - self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value) + # print(self.tdCom.dataDict["interval"]*(final_range_count+2)) + self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) + # check tag and tbname filter + self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tag_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tag} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) + self.tdCom.check_query_data(f'select irowts, table_name, isfilled, {funciton_name_alias} from {self.stb_stream_des_where_tbname_table} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts ,tbname as table_name, _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} {where_tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) else: if "value" in fill_value.lower(): - fill_value='VALUE,1,2,3,6,7,8,9,10,11' + fill_value='VALUE,1' if partition == "tbname": - self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart', fill_value=fill_value) - else: - self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} where `min(c1)` is not Null order by wstart,`min(c1)`', f'select * from (select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by wstart) where `min(c1)` is not Null order by wstart,`min(c1)`', fill_value=fill_value) + self.tdCom.check_query_data(f'select irowts, isfilled, {funciton_name_alias} from {tbname}{self.tdCom.des_table_suffix} where irowts >= {start_force_ts} and irowts <= "{end_new_ts}" order by irowts', f'select _irowts as irowts , _isfilled as isfilled , {funciton_name} as {funciton_name_alias} from {tbname} partition by {partition} range("{start_new_ts}","{end_new_ts}") every({self.tdCom.dataDict["interval"]}s) fill ({fill_value}) order by irowts', fill_value=fill_value) if self.delete: self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=start_ts, end_ts=ts_cast_delete_value) @@ -232,16 +284,16 @@ class TDTestCase: def run(self): - self.force_window_close(interval=random.randint(10, 15), partition="tbname", funciton_name="interp(c1)", funciton_name_alias="intp_c1" ,delete=False, ignore_update=1, fill_value="PREV") - self.force_window_close(interval=random.randint(10, 15), partition="c1", ignore_update=1) - self.force_window_close(interval=random.randint(10, 15), partition="abs(c1)", ignore_update=1) - self.force_window_close(interval=random.randint(10, 15), partition=None, delete=True) - self.force_window_close(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end') - self.force_window_close(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL") - for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: - # for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: - self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value) - self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True) + self.force_window_close(interval=10, partition="tbname", funciton_name="interp(c1)", funciton_name_alias="intp_c1" ,delete=False, ignore_update=1, fill_value="PREV") + # self.force_window_close(interval=random.randint(10, 15), partition="c1", ignore_update=1) + # self.force_window_close(interval=random.randint(10, 15), partition="abs(c1)", ignore_update=1) + # self.force_window_close(interval=random.randint(10, 15), partition=None, delete=True) + # self.force_window_close(interval=random.randint(10, 15), partition=self.tdCom.stream_case_when_tbname, case_when=f'case when {self.tdCom.stream_case_when_tbname} = tbname then {self.tdCom.partition_tbname_alias} else tbname end') + # self.force_window_close(interval=random.randint(10, 15), partition="tbname", fill_history_value=1, fill_value="NULL") + # for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: + # # for fill_value in ["PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: + # self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value) + # self.at_once_interval(interval=random.randint(10, 15), partition="tbname", fill_value=fill_value, delete=True) def stop(self): tdSql.close()