From da93dc45fc6689d434fe3f62d7e9d43a9ccb8f8c Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Tue, 22 Aug 2023 15:49:45 +0800 Subject: [PATCH] test: update --- tests/pytest/util/common.py | 30 +-- .../8-stream/at_once_interval_ext.py | 212 ++++++++++++++++++ .../8-stream/max_delay_interval.py | 14 +- .../system-test/8-stream/max_delay_session.py | 12 +- .../8-stream/partition_interval.py | 4 +- .../8-stream/window_close_session_ext.py | 81 +++++++ .../8-stream/window_close_state_window.py | 8 +- 7 files changed, 328 insertions(+), 33 deletions(-) create mode 100644 tests/system-test/8-stream/at_once_interval_ext.py create mode 100644 tests/system-test/8-stream/window_close_session_ext.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index a512ae605f..80053e66d1 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -166,6 +166,7 @@ class TDCom: self.fill_stb_source_select_str = ','.join(self.fill_function_list) self.fill_tb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.fill_function_list[0:13]))) self.fill_tb_source_select_str = ','.join(self.fill_function_list[0:13]) + self.ext_tb_source_select_str = ','.join(self.downsampling_function_list[0:13]) self.stream_case_when_tbname = "tbname" self.update = True @@ -1101,11 +1102,11 @@ class TDCom: self.sgen_column_type_str(column_elm_list) self.sgen_tag_type_str(tag_elm_list) if self.dbname is not None: - self.stb_name = f'{self.dbname}.{stbname}' + stb_name = f'{self.dbname}.{stbname}' else: - self.stb_name = stbname + stb_name = stbname if int(count) <= 1: - create_stable_sql = f'create {use_name} {self.stb_name} ({self.column_type_str}) tags ({self.tag_type_str}) {stb_params};' + create_stable_sql = f'create {use_name} {stb_name} ({self.column_type_str}) tags ({self.tag_type_str}) {stb_params};' tdSql.execute(create_stable_sql) else: for _ in range(count): @@ -1134,13 +1135,13 @@ class TDCom: tag_value_str = tag_value_str.rstrip()[:-1] if dbname is not None: self.dbname = dbname - self.ctb_name = f'{self.dbname}.{ctbname}' + ctb_name = f'{self.dbname}.{ctbname}' else: - self.ctb_name = ctbname + ctb_name = ctbname if stbname is not None: - self.stb_name = stbname + stb_name = stbname if int(count) <= 1: - create_ctable_sql = f'create {use_name} {self.ctb_name} using {self.stb_name} tags ({tag_value_str}) {ctb_params};' + create_ctable_sql = f'create {use_name} {ctb_name} using {stb_name} tags ({tag_value_str}) {ctb_params};' tdSql.execute(create_ctable_sql) else: for _ in range(count): @@ -1191,11 +1192,11 @@ class TDCom: tb_params += f'{param} "{value}" ' self.sgen_column_type_str(column_elm_list) if self.dbname is not None: - self.tb_name = f'{self.dbname}.{tbname}' + tb_name = f'{self.dbname}.{tbname}' else: - self.tb_name = tbname + tb_name = tbname if int(count) <= 1: - create_table_sql = f'create {use_name} {self.tb_name} ({self.column_type_str}) {tb_params};' + create_table_sql = f'create {use_name} {tb_name} ({self.column_type_str}) {tb_params};' tdSql.execute(create_table_sql) else: for _ in range(count): @@ -1580,6 +1581,10 @@ class TDCom: self.date_time = self.genTs(precision=self.precision)[0] self.screateDb(dbname=self.dbname, precision=self.precision) + if ext_stb: + self.screate_stable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table) + self.screate_ctable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table, ctbname=self.ext_ctb_stream_des_table) + self.screate_table(dbname=self.dbname, tbname=self.ext_tb_stream_des_table) self.screate_stable(dbname=self.dbname, stbname=self.stb_name) self.screate_ctable(dbname=self.dbname, stbname=self.stb_name, ctbname=self.ctb_name) self.screate_table(dbname=self.dbname, tbname=self.tb_name) @@ -1590,9 +1595,6 @@ class TDCom: self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) if i == 1: self.record_history_ts = ts_value - if ext_stb: - self.screate_stable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table) - self.screate_ctable(dbname=self.dbname, stbname=self.ext_stb_stream_des_table, ctbname=self.ext_ctb_stream_des_table) - self.screate_table(dbname=self.dbname, tbname=self.ext_tb_stream_des_table) + tdCom = TDCom() diff --git a/tests/system-test/8-stream/at_once_interval_ext.py b/tests/system-test/8-stream/at_once_interval_ext.py new file mode 100644 index 0000000000..aa9e5029f9 --- /dev/null +++ b/tests/system-test/8-stream/at_once_interval_ext.py @@ -0,0 +1,212 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def at_once_interval_ext(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, interval_value=None, subtable=None, case_when=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False): + if use_except: + if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm or len(stb_field_name_value.split(",")) == len(self.tdCom.partitial_stb_filter_des_select_elm.split(",")): + partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str + else: + partitial_tb_source_str = self.tdCom.ext_tb_source_select_str + else: + if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm: + partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str + else: + partitial_tb_source_str = self.tdCom.ext_tb_source_select_str + + if stb_field_name_value is not None: + if len(stb_field_name_value) == 0: + stb_field_name_value = ",".join(self.tdCom.tb_filter_des_select_elm.split(",")[:5]) + # else: + # stb_field_name_value = self.tdCom.tb_filter_des_select_elm + self.delete = delete + self.tdCom.case_name = sys._getframe().f_code.co_name + defined_tag_count = len(tag_value.split()) if tag_value is not None else 0 + # if interval_value is None: + # interval_value = f'{self.tdCom.dataDict["interval"]}s' + self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, ext_stb=use_exist_stb) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + + if partition == "tbname": + if case_when: + stream_case_when_partition = case_when + else: + stream_case_when_partition = self.tdCom.partition_tbname_alias + + partition_elm_alias = self.tdCom.partition_tbname_alias + elif partition == "c1": + if case_when: + stream_case_when_partition = case_when + else: + stream_case_when_partition = self.tdCom.partition_col_alias + partition_elm_alias = self.tdCom.partition_col_alias + elif partition == "abs(c1)": + partition_elm_alias = self.tdCom.partition_expression_alias + elif partition == "tbname,t1,c1": + partition_elm_alias = f'{self.tdCom.partition_tbname_alias},t1,c1' + partiton_tb = "tbname,c1" + partition_elm_alias_tb = f'{self.tdCom.partition_tbname_alias},c1' + else: + partition_elm_alias = self.tdCom.partition_tag_alias + if subtable: + if partition == "tbname": + if case_when: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {stream_case_when_partition}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + else: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + else: + if subtable == "constant": + # stb_subtable_value = f'"{self.tdCom.ext_ctb_stream_des_table}"' + stb_subtable_value = f'"constant_{self.tdCom.ext_ctb_stream_des_table}"' + else: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(cast({subtable} as int unsigned) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + else: + stb_subtable_value = None + if fill_value: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' + # self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.ext_tb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb) + if partition: + stream_sql = self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb, use_except=use_except) + else: + stream_sql = self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb, use_except=use_except) + if stream_sql: + tdSql.error(stream_sql) + return + start_time = self.tdCom.date_time + if subtable == "constant": + range_count = 1 + else: + range_count = self.tdCom.range_count + + for i in range(range_count): + latency = 0 + tag_value_list = list() + ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' + ts_cast_delete_value = self.tdCom.time_cast(ts_value) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + if self.delete and i%2 != 0: + self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=ts_cast_delete_value) + self.tdCom.date_time += 1 + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + if self.delete and i%2 != 0: + self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=ts_cast_delete_value) + self.tdCom.date_time += 1 + if tag_value: + if subtable == "constant": + tdSql.query(f'select {tag_value} from constant_{self.tdCom.ext_ctb_stream_des_table}') + else: + tdSql.query(f'select {tag_value} from {self.stb_name}') + tag_value_list = tdSql.queryResult + if not fill_value: + if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm: + self.tdCom.check_query_data(f'select {self.tdCom.partitial_stb_filter_des_select_elm } from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True) + elif stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm: + self.tdCom.check_query_data(f'select {self.tdCom.partitial_stb_filter_des_select_elm } from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, cast(max(c2) as tinyint), cast(min(c1) as smallint) from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True) + else: + if partition: + if tag_value == self.tdCom.exchange_tag_filter_des_select_elm: + self.tdCom.check_query_data(f'select {self.tdCom.partitial_tag_stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list) + elif tag_value == self.tdCom.cast_tag_filter_des_select_elm: + tdSql.query(f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart') + limit_row = tdSql.queryRows + self.tdCom.check_query_data(f'select {self.tdCom.cast_tag_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select cast(t1 as TINYINT UNSIGNED),cast(t2 as varchar(256)),cast(t3 as bool) from {self.stb_name} order by ts limit {limit_row}') + tdSql.query(f'select t1,t2,t3,t4,t6,t7,t8,t9,t10,t12 from ext_{self.stb_name}{self.tdCom.des_table_suffix};') + while list(set(tdSql.queryResult)) != [(None, None, None, None, None, None, None, None, None, None)]: + tdSql.query(f'select t1,t2,t3,t4,t6,t7,t8,t9,t10,t12 from ext_{self.stb_name}{self.tdCom.des_table_suffix};') + if latency < self.tdCom.default_interval: + latency += 1 + time.sleep(1) + else: + return False + tdSql.checkEqual(list(set(tdSql.queryResult)), [(None, None, None, None, None, None, None, None, None, None)]) + else: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} partition by {partition} interval({self.tdCom.dataDict["interval"]}s) order by wstart', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list) + else: + if use_exist_stb and not tag_value: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s) order by wstart', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition, use_exist_stb=use_exist_stb) + else: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s) order by wstart', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition, subtable=subtable) + + if subtable: + for tname in [self.stb_name]: + tdSql.query(f'select * from {self.ctb_name}') + ptn_counter = 0 + for c1_value in tdSql.queryResult: + if partition == "c1": + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + elif partition == "abs(c1)": + abs_c1_value = abs(c1_value[1]) + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + elif partition == "tbname" and ptn_counter == 0: + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + ptn_counter += 1 + else: + tdSql.query(f'select cast(cast(cast({c1_value[1]} as int unsigned) as bigint) as varchar(100))') + subtable_value = tdSql.queryResult[0][0] + if subtable == "constant": + return + else: + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{subtable_value}{self.tdCom.subtable_suffix}`;') + tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) + + def run(self): + self.at_once_interval_ext(interval=random.randint(10, 15), delete=False, fill_history_value=1, partition=None, subtable="constant", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True) + for delete in [True, False]: + for fill_history_value in [0, 1]: + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', subtable="c1", stb_field_name_value=None, tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', stb_field_name_value=None, tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value=self.tdCom.partitial_stb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value=self.tdCom.exchange_stb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + # self-define tag + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'{self.tdCom.tag_filter_des_select_elm}', subtable=None, stb_field_name_value=None, tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'{self.tdCom.partitial_tag_filter_des_select_elm}', subtable=None, stb_field_name_value=None, tag_value=self.tdCom.partitial_tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=f'{self.tdCom.partitial_tag_filter_des_select_elm}', subtable=None, stb_field_name_value=None, tag_value=self.tdCom.exchange_tag_filter_des_select_elm, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition="t1 as t5,t2 as t11,t3 as t13", subtable=None, stb_field_name_value=None, tag_value="t5,t11,t13", use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=None, use_exist_stb=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="t1", use_exist_stb=True) + # error cases + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', stb_field_name_value="", tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', stb_field_name_value=self.tdCom.tb_filter_des_select_elm.replace("c1","c19"), tag_value=self.tdCom.tag_filter_des_select_elm, use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname', subtable="c1", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', subtable="ttt", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', subtable="c1", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=None, use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', subtable="c1", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="t15", use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm},c1', subtable="c1", stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="c5", use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value="ts,c1,c2,c3", tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value="ts,c1", tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), partition=f'tbname,{self.tdCom.tag_filter_des_select_elm.split(",")[0]},c1', subtable="c1", stb_field_name_value="c1,c2,c3", tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=False, fill_history_value=1, partition="t1 as t5,t2 as t11", subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="t5,t11,t13", use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=False, fill_history_value=1, partition="t1 as t5,t2 as t11,t3 as t14", subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="t5,t11,t13", use_exist_stb=True, use_except=True) + self.at_once_interval_ext(interval=random.randint(10, 15), delete=False, fill_history_value=1, partition="t1 as t5,t2 as t11,t3 as c13", subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value="t5,t11,c13", use_exist_stb=True, use_except=True) + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/max_delay_interval.py b/tests/system-test/8-stream/max_delay_interval.py index 9306118e30..5efc4262a1 100644 --- a/tests/system-test/8-stream/max_delay_interval.py +++ b/tests/system-test/8-stream/max_delay_interval.py @@ -16,7 +16,7 @@ class TDTestCase: def watermark_max_delay_interval(self, interval, max_delay, watermark=None, fill_value=None, delete=False): tdLog.info(f"*** testing stream max_delay+interval: interval: {interval}, watermark: {watermark}, fill_value: {fill_value}, delete: {delete} ***") self.delete = delete - self.case_name = sys._getframe().f_code.co_name + self.tdCom.case_name = sys._getframe().f_code.co_name if watermark is not None: self.case_name = "watermark" + sys._getframe().f_code.co_name self.tdCom.prepare_data(interval=interval, watermark=watermark) @@ -24,8 +24,8 @@ class TDTestCase: self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' - self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' - self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' self.tdCom.date_time = 1658921623245 if watermark is not None: watermark_value = f'{self.tdCom.dataDict["watermark"]}s' @@ -40,11 +40,11 @@ class TDTestCase: fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' # create stb/ctb/tb stream self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) - self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) if fill_value: if "value" in fill_value.lower(): fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11' - self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) init_num = 0 start_time = self.tdCom.date_time for i in range(self.tdCom.range_count): @@ -63,8 +63,8 @@ class TDTestCase: self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) if not fill_value: - for tbname in [self.stb_stream_des_table, self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]: - if tbname != self.tdCom.tb_stream_des_table: + for tbname in [self.stb_stream_des_table, self.ctb_stream_des_table, self.tb_stream_des_table]: + if tbname != self.tb_stream_des_table: tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}') else: tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}') diff --git a/tests/system-test/8-stream/max_delay_session.py b/tests/system-test/8-stream/max_delay_session.py index 874665dcc9..1a734e0e61 100644 --- a/tests/system-test/8-stream/max_delay_session.py +++ b/tests/system-test/8-stream/max_delay_session.py @@ -23,8 +23,8 @@ class TDTestCase: self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' - self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' - self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' self.tdCom.date_time = self.tdCom.dataDict["start_ts"] if watermark is not None: @@ -32,8 +32,8 @@ class TDTestCase: else: watermark_value = None max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s' - self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) - self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) init_num = 0 for i in range(self.tdCom.range_count): if i == 0: @@ -49,8 +49,8 @@ class TDTestCase: if self.tdCom.update and i%2 == 0: self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) - for tbname in [self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]: - if tbname != self.tdCom.tb_stream_des_table: + for tbname in [self.ctb_stream_des_table, self.tb_stream_des_table]: + if tbname != self.tb_stream_des_table: tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}') else: tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}') diff --git a/tests/system-test/8-stream/partition_interval.py b/tests/system-test/8-stream/partition_interval.py index f12cf038e0..0424932bf8 100644 --- a/tests/system-test/8-stream/partition_interval.py +++ b/tests/system-test/8-stream/partition_interval.py @@ -21,8 +21,8 @@ class TDTestCase: self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' - self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' - self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' ctb_name_list = list() for i in range(1, self.tdCom.range_count): ctb_name = self.tdCom.get_long_name() diff --git a/tests/system-test/8-stream/window_close_session_ext.py b/tests/system-test/8-stream/window_close_session_ext.py new file mode 100644 index 0000000000..33990bd821 --- /dev/null +++ b/tests/system-test/8-stream/window_close_session_ext.py @@ -0,0 +1,81 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False): + if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm: + partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str + else: + partitial_tb_source_str = self.tdCom.ext_tb_source_select_str + if not stb_field_name_value: + stb_field_name_value = self.tdCom.tb_filter_des_select_elm + self.tdCom.case_name = sys._getframe().f_code.co_name + defined_tag_count = len(tag_value.split()) + if watermark is not None: + self.case_name = "watermark" + sys._getframe().f_code.co_name + self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value, ext_stb=use_exist_stb) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.tdCom.date_time = self.tdCom.dataDict["start_ts"] + if subtable: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None + else: + stb_subtable_value = None + if watermark is not None: + watermark_value = f'{self.tdCom.dataDict["watermark"]}s' + else: + watermark_value = None + # create stb/ctb/tb stream + self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="window_close", watermark=watermark_value, subtable_value=stb_subtable_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb) + for i in range(self.tdCom.range_count): + if i == 0: + window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session']) + else: + self.tdCom.date_time = window_close_ts + 1 + window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session']) + if watermark_value is not None: + expected_value = i + 1 + for ts_value in [self.tdCom.date_time, window_close_ts-1]: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + else: + expected_value = i + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + + if fill_history_value: + self.tdCom.update_delete_history_data(delete=True) + if tag_value: + tdSql.query(f'select {tag_value} from {self.stb_name}') + tag_value_list = tdSql.queryResult + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart limit {expected_value};', sorted=True, defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition) + + def run(self): + for fill_history_value in [0, 1]: + self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), fill_history_value=fill_history_value, subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/window_close_state_window.py b/tests/system-test/8-stream/window_close_state_window.py index d6e6a2c093..4c978cb860 100644 --- a/tests/system-test/8-stream/window_close_state_window.py +++ b/tests/system-test/8-stream/window_close_state_window.py @@ -22,11 +22,11 @@ class TDTestCase: self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' - self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' - self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' state_window_col_name = self.tdCom.dataDict["state_window"] - self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} state_window({state_window_col_name})', trigger_mode="window_close") - self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} state_window({state_window_col_name})', trigger_mode="window_close") + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} state_window({state_window_col_name})', trigger_mode="window_close") + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} state_window({state_window_col_name})', trigger_mode="window_close") state_window_max = self.tdCom.dataDict['state_window_max'] state_window_value_inmem = 0 sleep_step = 0