From 10cd04349d382988ba52d364143fe2e88a161069 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Tue, 22 Aug 2023 18:57:29 +0800 Subject: [PATCH] test: add stream cases --- tests/pytest/util/common.py | 301 ++++++++++++++++-- tests/pytest/util/sql.py | 19 +- .../8-stream/at_once_interval_ext.py | 7 +- .../8-stream/max_delay_interval_ext.py | 101 ++++++ .../system-test/8-stream/pause_resume_test.py | 154 +++++++++ .../8-stream/window_close_session_ext.py | 10 +- 6 files changed, 561 insertions(+), 31 deletions(-) create mode 100644 tests/system-test/8-stream/max_delay_interval_ext.py create mode 100644 tests/system-test/8-stream/pause_resume_test.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 80053e66d1..f06e5d7e79 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -850,6 +850,28 @@ class TDCom: # stream def create_stream(self, stream_name, des_table, source_sql, trigger_mode=None, watermark=None, max_delay=None, ignore_expired=None, ignore_update=None, subtable_value=None, fill_value=None, fill_history_value=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False): + """create_stream + + Args: + stream_name (str): stream_name + des_table (str): target stable + source_sql (str): stream sql + trigger_mode (str, optional): at_once/window_close/max_delay. Defaults to None. + watermark (str, optional): watermark time. Defaults to None. + max_delay (str, optional): max_delay time. Defaults to None. + ignore_expired (int, optional): ignore expired data. Defaults to None. + ignore_update (int, optional): ignore update data. Defaults to None. + subtable_value (str, optional): subtable. Defaults to None. + fill_value (str, optional): fill. Defaults to None. + fill_history_value (int, optional): 0/1. Defaults to None. + stb_field_name_value (str, optional): existed stb. Defaults to None. + tag_value (str, optional): custom tag. Defaults to None. + use_exist_stb (bool, optional): use existed stb tag. Defaults to False. + use_except (bool, optional): Exception tag. Defaults to False. + + Returns: + str: stream + """ if subtable_value is None: subtable = "" else: @@ -923,20 +945,54 @@ class TDCom: else: return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};' + def pause_stream(self, stream_name, if_exist=True, if_not_exist=False): + """pause_stream + + Args: + stream_name (str): stream_name + if_exist (bool, optional): Defaults to True. + if_not_exist (bool, optional): Defaults to False. + """ + if_exist_value = "if exists" if if_exist else "" + if_not_exist_value = "if not exists" if if_not_exist else "" + tdSql.execute(f'pause stream {if_exist_value} {if_not_exist_value} {stream_name}') + + def resume_stream(self, stream_name, if_exist=True, if_not_exist=False, ignore_untreated=False): + """resume_stream + + Args: + stream_name (str): stream_name + if_exist (bool, optional): Defaults to True. + if_not_exist (bool, optional): Defaults to False. + ignore_untreated (bool, optional): Defaults to False. + """ + if_exist_value = "if exists" if if_exist else "" + if_not_exist_value = "if not exists" if if_not_exist else "" + ignore_untreated_value = "ignore untreated" if ignore_untreated else "" + tdSql.execute(f'resume stream {if_exist_value} {if_not_exist_value} {ignore_untreated_value} {stream_name}') def drop_all_streams(self): + """drop all streams + """ tdSql.query("show streams") stream_name_list = list(map(lambda x: x[0], tdSql.queryResult)) for stream_name in stream_name_list: tdSql.execute(f'drop stream if exists {stream_name};') def drop_db(self, dbname="test"): + """drop a db + + Args: + dbname (str, optional): Defaults to "test". + """ if dbname[0].isdigit(): tdSql.execute(f'drop database if exists `{dbname}`') else: tdSql.execute(f'drop database if exists {dbname}') def drop_all_db(self): + """drop all databases + """ tdSql.query("show databases;") db_list = list(map(lambda x: x[0], tdSql.queryResult)) for dbname in db_list: @@ -944,6 +1000,15 @@ class TDCom: tdSql.execute(f'drop database if exists `{dbname}`') def time_cast(self, time_value, split_symbol="+"): + """cast bigint to timestamp + + Args: + time_value (bigint): ts + split_symbol (str, optional): split sympol. Defaults to "+". + + Returns: + _type_: timestamp + """ ts_value = str(time_value).split(split_symbol)[0] if split_symbol in str(time_value): ts_value_offset = str(time_value).split(split_symbol)[1] @@ -952,6 +1017,8 @@ class TDCom: return f'cast({ts_value} as timestamp){split_symbol}{ts_value_offset}' def clean_env(self): + """drop all streams and databases + """ self.drop_all_streams() self.drop_all_db() @@ -966,9 +1033,16 @@ class TDCom: pass def genTs(self, precision="ms", ts="", protype="taosc", ns_tag=None): - """ - protype = "taosc" or "restful" - gen ts and datetime + """generate ts + + Args: + precision (str, optional): db precision. Defaults to "ms". + ts (str, optional): input ts. Defaults to "". + protype (str, optional): "taosc" or "restful". Defaults to "taosc". + ns_tag (_type_, optional): use ns. Defaults to None. + + Returns: + timestamp, datetime: timestamp and datetime """ if precision == "ns": if ts == "" or ts is None: @@ -1004,6 +1078,11 @@ class TDCom: return ts, dt def sgen_column_type_str(self, column_elm_list): + """generage column type str + + Args: + column_elm_list (list): column_elm_list + """ self.column_type_str = "" if column_elm_list is None: self.column_type_str = self.gen_default_column_str() @@ -1024,6 +1103,11 @@ class TDCom: self.column_type_str = self.default_colts_name + " timestamp, " + self.column_type_str.rstrip()[:-1] def sgen_tag_type_str(self, tag_elm_list): + """generage tag type str + + Args: + tag_elm_list (list): tag_elm_list + """ self.tag_type_str = "" if tag_elm_list is None: self.tag_type_str = self.gen_default_tag_str() @@ -1044,7 +1128,14 @@ class TDCom: self.tag_type_str = self.tag_type_str.rstrip()[:-1] if self.need_tagts: self.tag_type_str = self.default_tagts_name + " timestamp, " + self.tag_type_str + def sgen_tag_value_list(self, tag_elm_list, ts_value=None): + """generage tag value str + + Args: + tag_elm_list (list): _description_ + ts_value (timestamp, optional): Defaults to None. + """ if self.need_tagts: self.ts_value = self.genTs()[0] if ts_value is not None: @@ -1071,6 +1162,12 @@ class TDCom: self.tag_value_list = [self.ts_value] + self.tag_value_list def screateDb(self, dbname="test", drop_db=True, **kwargs): + """create database + + Args: + dbname (str, optional): Defaults to "test". + drop_db (bool, optional): Defaults to True. + """ tdLog.info("creating db ...") db_params = "" if len(kwargs) > 0: @@ -1087,6 +1184,21 @@ class TDCom: def screate_stable(self, dbname=None, stbname="stb", use_name="table", column_elm_list=None, tag_elm_list=None, need_tagts=False, count=1, default_stbname_prefix="stb", default_stbname_index_start_num=1, default_column_index_start_num=1, default_tag_index_start_num=1, **kwargs): + """_summary_ + + Args: + dbname (str, optional): Defaults to None. + stbname (str, optional): Defaults to "stb". + use_name (str, optional): stable/table, Defaults to "table". + column_elm_list (list, optional): use for sgen_column_type_str(), Defaults to None. + tag_elm_list (list, optional): use for sgen_tag_type_str(), Defaults to None. + need_tagts (bool, optional): tag use timestamp, Defaults to False. + count (int, optional): stable count, Defaults to 1. + default_stbname_prefix (str, optional): Defaults to "stb". + default_stbname_index_start_num (int, optional): Defaults to 1. + default_column_index_start_num (int, optional): Defaults to 1. + default_tag_index_start_num (int, optional): Defaults to 1. + """ tdLog.info("creating stable ...") if dbname is not None: self.dbname = dbname @@ -1115,6 +1227,21 @@ class TDCom: tdSql.execute(create_stable_sql) def screate_ctable(self, dbname=None, stbname=None, ctbname="ctb", use_name="table", tag_elm_list=None, ts_value=None, count=1, default_varchar_datatype="letters", default_nchar_datatype="letters", default_ctbname_prefix="ctb", default_ctbname_index_start_num=1, **kwargs): + """_summary_ + + Args: + dbname (str, optional): Defaults to None. + stbname (str, optional): Defaults to None. + ctbname (str, optional): Defaults to "ctb". + use_name (str, optional): Defaults to "table". + tag_elm_list (list, optional): use for sgen_tag_type_str(), Defaults to None. + ts_value (timestamp, optional): Defaults to None. + count (int, optional): ctb count, Defaults to 1. + default_varchar_datatype (str, optional): Defaults to "letters". + default_nchar_datatype (str, optional): Defaults to "letters". + default_ctbname_prefix (str, optional): Defaults to "ctb". + default_ctbname_index_start_num (int, optional): Defaults to 1. + """ tdLog.info("creating childtable ...") self.default_varchar_datatype = default_varchar_datatype self.default_nchar_datatype = default_nchar_datatype @@ -1150,6 +1277,13 @@ class TDCom: tdSql.execute(create_stable_sql) def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None): + """_summary_ + + Args: + column_elm_list (list): gen_random_type_value() + need_null (bool): if insert null + ts_value (timestamp, optional): Defaults to None. + """ self.column_value_list = list() self.ts_value = self.genTs()[0] if ts_value is not None: @@ -1180,6 +1314,18 @@ class TDCom: def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None, count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1, default_column_index_start_num=1, **kwargs): + """create ctable + + Args: + dbname (str, optional): Defaults to None. + tbname (str, optional): Defaults to "tb". + use_name (str, optional): Defaults to "table". + column_elm_list (list, optional): Defaults to None. + count (int, optional): Defaults to 1. + default_tbname_prefix (str, optional): Defaults to "tb". + default_tbname_index_start_num (int, optional): Defaults to 1. + default_column_index_start_num (int, optional): Defaults to 1. + """ tdLog.info("creating table ...") if dbname is not None: self.dbname = dbname @@ -1205,6 +1351,16 @@ class TDCom: tdSql.execute(create_table_sql) def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False): + """insert rows + + Args: + dbname (str, optional): Defaults to None. + tbname (str, optional): Defaults to None. + column_ele_list (list, optional): Defaults to None. + ts_value (timestamp, optional): Defaults to None. + count (int, optional): Defaults to 1. + need_null (bool, optional): Defaults to False. + """ tdLog.info("stream inserting ...") if dbname is not None: self.dbname = dbname @@ -1245,6 +1401,15 @@ class TDCom: tdSql.execute(insert_sql) def sdelete_rows(self, dbname=None, tbname=None, start_ts=None, end_ts=None, ts_key=None): + """delete rows + + Args: + dbname (str, optional): Defaults to None. + tbname (str, optional): Defaults to None. + start_ts (timestamp, optional): range start. Defaults to None. + end_ts (timestamp, optional): range end. Defaults to None. + ts_key (str, optional): timestamp column name. Defaults to None. + """ if dbname is not None: self.dbname = dbname if tbname is not None: @@ -1271,8 +1436,13 @@ class TDCom: base_del_sql += f'where {ts_col_name} = {start_ts};' tdSql.execute(base_del_sql) - def check_stream_field_type(self, sql, input_function): + """confirm stream field + + Args: + sql (str): input sql + input_function (str): scalar + """ tdSql.query(sql) res = tdSql.queryResult if input_function in ["acos", "asin", "atan", "cos", "log", "pow", "sin", "sqrt", "tan"]: @@ -1301,6 +1471,14 @@ class TDCom: tdSql.checkEqual(res[2][1], "DOUBLE") def round_handle(self, input_list): + """round list elem + + Args: + input_list (list): input value list + + Returns: + _type_: round list + """ tdLog.info("round rows ...") final_list = list() for i in input_list: @@ -1314,6 +1492,14 @@ class TDCom: return final_list def float_handle(self, input_list): + """float list elem + + Args: + input_list (list): input value list + + Returns: + _type_: float list + """ tdLog.info("float rows ...") final_list = list() for i in input_list: @@ -1327,10 +1513,26 @@ class TDCom: return final_list def str_ts_trans_bigint(self, str_ts): + """trans str ts to bigint + + Args: + str_ts (str): human-date + + Returns: + bigint: bigint-ts + """ tdSql.query(f'select cast({str_ts} as bigint)') return tdSql.queryResult[0][0] def cast_query_data(self, query_data): + """cast query-result for existed-stb + + Args: + query_data (list): query data list + + Returns: + list: new list after cast + """ tdLog.info("cast query data ...") col_type_list = self.column_type_str.split(',') tag_type_list = self.tag_type_str.split(',') @@ -1351,6 +1553,14 @@ class TDCom: return nl def trans_time_to_s(self, runtime): + """trans time to s + + Args: + runtime (str): 1d/1h/1m... + + Returns: + int: second + """ if "d" in str(runtime).lower(): d_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0] s_num = float(d_num) * 24 * 60 * 60 @@ -1367,6 +1577,23 @@ class TDCom: return int(s_num) def check_query_data(self, sql1, sql2, sorted=False, fill_value=None, tag_value_list=None, defined_tag_count=None, partition=True, use_exist_stb=False, subtable=None, reverse_check=False): + """confirm query result + + Args: + sql1 (str): select .... + sql2 (str): select .... + sorted (bool, optional): if sort result list. Defaults to False. + fill_value (str, optional): fill. Defaults to None. + tag_value_list (list, optional): Defaults to None. + defined_tag_count (int, optional): Defaults to None. + partition (bool, optional): Defaults to True. + use_exist_stb (bool, optional): Defaults to False. + subtable (str, optional): Defaults to None. + reverse_check (bool, optional): not equal. Defaults to False. + + Returns: + bool: False if failed + """ tdLog.info("checking query data ...") if tag_value_list: dvalue = len(self.tag_type_str.split(',')) - defined_tag_count @@ -1485,6 +1712,16 @@ class TDCom: # tdSql.checkEqual(res1, res2) if not reverse_check else tdSql.checkNotEqual(res1, res2) def check_stream_res(self, sql, expected_res, max_delay): + """confirm stream result + + Args: + sql (str): select ... + expected_res (str): expected result + max_delay (int): max_delay value + + Returns: + bool: False if failed + """ tdSql.query(sql) latency = 0 @@ -1500,18 +1737,27 @@ class TDCom: tdSql.checkEqual(tdSql.queryRows, expected_res) def check_stream(self, sql1, sql2, expected_count, max_delay=None): + """confirm stream + + Args: + sql1 (str): select ... + sql2 (str): select ... + expected_count (int): expected_count + max_delay (int, optional): max_delay value. Defaults to None. + """ self.check_stream_res(sql1, expected_count, max_delay) self.check_query_data(sql1, sql2) def cal_watermark_window_close_session_endts(self, start_ts, watermark=None, session=None): """cal endts for close window - :param start_ts: [start timestamp: self.date_time] - :type start_ts: [epoch time] - :param watermark: [second level and > session] - :type watermark: [s] - :param precision: [default "ms" and only support "ms" now] - :type precision: str, optional + Args: + start_ts (epoch time): self.date_time + watermark (int, optional): > session. Defaults to None. + session (int, optional): Defaults to None. + + Returns: + int: as followed """ if watermark is not None: return start_ts + watermark*self.offset + 1 @@ -1521,14 +1767,13 @@ class TDCom: def cal_watermark_window_close_interval_endts(self, start_ts, interval, watermark=None): """cal endts for close window - :param start_ts: [start timestamp: self.date_time] - :type start_ts: [epoch time] - :param interval: [second level] - :type interval: [s] - :param watermark: [second level and > interval] - :type watermark: [s] - :param precision: [default "ms" and only support "ms" now] - :type precision: str, optional + Args: + start_ts (epoch time): self.date_time + interval (int): [s] + watermark (int, optional): [s]. Defaults to None. + + Returns: + _type_: _description_ """ if watermark is not None: return int(start_ts/self.offset)*self.offset + (interval - (int(start_ts/self.offset))%interval)*self.offset + watermark*self.offset @@ -1536,6 +1781,11 @@ class TDCom: return int(start_ts/self.offset)*self.offset + (interval - (int(start_ts/self.offset))%interval)*self.offset def update_delete_history_data(self, delete): + """update and delete history data + + Args: + delete (bool): True/False + """ self.sinsert_rows(tbname=self.ctb_name, ts_value=self.record_history_ts) self.sinsert_rows(tbname=self.tb_name, ts_value=self.record_history_ts) if delete: @@ -1543,6 +1793,20 @@ class TDCom: self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-")) def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None): + """prepare stream data + + Args: + interval (int, optional): Defaults to None. + watermark (int, optional): Defaults to None. + session (int, optional): Defaults to None. + state_window (str, optional): Defaults to None. + state_window_max (int, optional): Defaults to 127. + interation (int, optional): Defaults to 3. + range_count (int, optional): Defaults to None. + precision (str, optional): Defaults to "ms". + fill_history_value (int, optional): Defaults to 0. + ext_stb (bool, optional): Defaults to None. + """ self.clean_env() self.dataDict = { "stb_name" : f"{self.case_name}_stb", @@ -1595,6 +1859,5 @@ class TDCom: self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) if i == 1: self.record_history_ts = ts_value - tdCom = TDCom() diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 2fa21b1983..91aac1929f 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -111,7 +111,7 @@ class TDSql: return self.error_info - def query(self, sql, row_tag=None,queryTimes=10): + def query(self, sql, row_tag=None, queryTimes=10, count_expected_res=None): self.sql = sql i=1 while i <= queryTimes: @@ -120,6 +120,17 @@ class TDSql: self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) self.queryCols = len(self.cursor.description) + + if count_expected_res is not None: + counter = 0 + while count_expected_res != self.queryResult[0][0]: + self.cursor.execute(sql) + self.queryResult = self.cursor.fetchall() + if counter < queryTimes: + counter += 0.5 + time.sleep(0.5) + else: + return False if row_tag: return self.queryResult return self.queryRows @@ -501,7 +512,8 @@ class TDSql: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) - tdLog.exit("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + # tdLog.info("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) + raise Exception("%s(%d) failed: sql:%s, elm:%s != expect_elm:%s" % args) def checkNotEqual(self, elm, expect_elm): if elm != expect_elm: @@ -509,7 +521,8 @@ class TDSql: else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, self.sql, elm, expect_elm) - tdLog.exit("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args) + tdLog.info("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args) + raise Exception def get_times(self, time_str, precision="ms"): caller = inspect.getframeinfo(inspect.stack()[1][0]) diff --git a/tests/system-test/8-stream/at_once_interval_ext.py b/tests/system-test/8-stream/at_once_interval_ext.py index aa9e5029f9..838f1e7c53 100644 --- a/tests/system-test/8-stream/at_once_interval_ext.py +++ b/tests/system-test/8-stream/at_once_interval_ext.py @@ -13,7 +13,8 @@ class TDTestCase: tdSql.init(conn.cursor(), logSql) self.tdCom = tdCom - def at_once_interval_ext(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, interval_value=None, subtable=None, case_when=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False): + def at_once_interval_ext(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, subtable=None, case_when=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, use_except=False): + tdLog.info(f"*** testing stream at_once+interval+exist_stb+custom_tag: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, delete: {delete}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***") if use_except: if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm or len(stb_field_name_value.split(",")) == len(self.tdCom.partitial_stb_filter_des_select_elm.split(",")): partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str @@ -33,8 +34,6 @@ class TDTestCase: self.delete = delete self.tdCom.case_name = sys._getframe().f_code.co_name defined_tag_count = len(tag_value.split()) if tag_value is not None else 0 - # if interval_value is None: - # interval_value = f'{self.tdCom.dataDict["interval"]}s' self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, ext_stb=use_exist_stb) self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") @@ -60,8 +59,6 @@ class TDTestCase: partition_elm_alias = self.tdCom.partition_expression_alias elif partition == "tbname,t1,c1": partition_elm_alias = f'{self.tdCom.partition_tbname_alias},t1,c1' - partiton_tb = "tbname,c1" - partition_elm_alias_tb = f'{self.tdCom.partition_tbname_alias},c1' else: partition_elm_alias = self.tdCom.partition_tag_alias if subtable: diff --git a/tests/system-test/8-stream/max_delay_interval_ext.py b/tests/system-test/8-stream/max_delay_interval_ext.py new file mode 100644 index 0000000000..653fcd997c --- /dev/null +++ b/tests/system-test/8-stream/max_delay_interval_ext.py @@ -0,0 +1,101 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def watermark_max_delay_interval_ext(self, interval, max_delay, watermark=None, fill_value=None, partition="tbname", delete=False, fill_history_value=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False): + tdLog.info(f"*** testing stream max_delay+interval+exist_stb+custom_tag: interval: {interval}, partition: {partition}, max_delay: {max_delay}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***") + if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm: + partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str + else: + partitial_tb_source_str = self.tdCom.ext_tb_source_select_str + if not stb_field_name_value: + stb_field_name_value = self.tdCom.tb_filter_des_select_elm + self.delete = delete + self.tdCom.case_name = sys._getframe().f_code.co_name + defined_tag_count = len(tag_value.split()) + if watermark is not None: + self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name + self.tdCom.prepare_data(interval=interval, watermark=watermark, ext_stb=use_exist_stb) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + if subtable: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.subtable_prefix}", cast(cast(abs(cast({subtable} as int)) as bigint) as varchar(100))), "{self.subtable_suffix}")' if self.subtable else None + else: + stb_subtable_value = None + self.tdCom.date_time = 1658921623245 + if watermark is not None: + watermark_value = f'{self.tdCom.dataDict["watermark"]}s' + else: + watermark_value = None + + max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s' + if fill_value: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' + # create stb/ctb/tb stream + self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ext_stb_stream_des_table, subtable_value=stb_subtable_value, source_sql=f'select _wstart AS wstart, {partitial_tb_source_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value, fill_history_value=fill_history_value, stb_field_name_value=stb_field_name_value, tag_value=tag_value, use_exist_stb=use_exist_stb) + + init_num = 0 + for i in range(self.tdCom.range_count): + if i == 0: + if watermark is not None: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark']) + else: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval']) + else: + self.tdCom.date_time = window_close_ts + self.tdCom.offset + window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset + for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)): + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + + if i == 0: + init_num = 2 + i + if watermark is not None: + init_num += 1 + else: + init_num += 1 + time.sleep(int(max_delay.replace("s", ""))) + if tag_value: + tdSql.query(f'select {tag_value} from {self.stb_name}') + tag_value_list = tdSql.queryResult + if not fill_value: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts;', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition) + + def run(self): + for delete in [True, False]: + for fill_history_value in [0, 1]: + self.watermark_max_delay_interval_ext(interval=random.choice([15]), watermark=random.randint(20, 25), max_delay=f"{random.randint(5, 6)}s", delete=delete, fill_history_value=fill_history_value, partition=None, subtable=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/pause_resume_test.py b/tests/system-test/8-stream/pause_resume_test.py new file mode 100644 index 0000000000..f5f1cf07fa --- /dev/null +++ b/tests/system-test/8-stream/pause_resume_test.py @@ -0,0 +1,154 @@ +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def pause_resume_test(self, interval, partition="tbname", delete=False, fill_history_value=None, pause=True, resume=True, ignore_untreated=False): + tdLog.info(f"*** testing stream pause+resume: interval: {interval}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, ignore_untreated: {ignore_untreated} ***") + if_exist_value_list = [None, True] + if_exist = random.choice(if_exist_value_list) + reverse_check = True if ignore_untreated else False + range_count = (self.tdCom.range_count + 3) * 3 + self.delete = delete + self.tdCom.case_name = sys._getframe().f_code.co_name + self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + + if partition == "tbname": + partition_elm_alias = self.tdCom.partition_tbname_alias + elif partition == "c1": + partition_elm_alias = self.tdCom.partition_col_alias + elif partition == "abs(c1)": + partition_elm_alias = self.tdCom.partition_expression_alias + elif partition is None: + partition_elm_alias = '"no_partition"' + else: + partition_elm_alias = self.tdCom.partition_tag_alias + if partition == "tbname" or partition is None: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", {partition_elm_alias}), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + else: + stb_subtable_value = f'concat(concat("{self.stb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + ctb_subtable_value = f'concat(concat("{self.ctb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + tb_subtable_value = f'concat(concat("{self.tb_name}_{self.tdCom.subtable_prefix}", cast(cast(abs(cast({partition_elm_alias} as int)) as bigint) as varchar(100))), "{self.tdCom.subtable_suffix}")' if self.tdCom.subtable else None + if partition: + partition_elm = f'partition by {partition} {partition_elm_alias}' + else: + partition_elm = "" + self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value) + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value) + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) + for i in range(range_count): + ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' + ts_cast_delete_value = self.tdCom.time_cast(ts_value) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + if self.delete and i%2 != 0: + self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=ts_cast_delete_value) + self.tdCom.date_time += 1 + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + if self.delete and i%2 != 0: + self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=ts_cast_delete_value) + self.tdCom.date_time += 1 + if partition: + partition_elm = f'partition by {partition}' + else: + partition_elm = "" + # if i == int(range_count/2): + if i > 2 and i % 3 == 0: + for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']: + if if_exist is not None: + tdSql.execute(f'pause stream if exists {stream_name}_no_exist') + tdSql.error(f'pause stream if not exists {stream_name}') + tdSql.error(f'pause stream {stream_name}_no_exist') + self.tdCom.pause_stream(stream_name, if_exist) + if pause and not resume and range_count-i <= 3: + time.sleep(self.tdCom.default_interval) + tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart') + res_after_pause = tdSql.queryResult + if resume: + if i > 2 and i % 3 != 0: + for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']: + if if_exist is not None: + tdSql.execute(f'resume stream if exists {stream_name}_no_exist') + tdSql.error(f'resume stream if not exists {stream_name}') + self.tdCom.resume_stream(stream_name, if_exist, None, ignore_untreated) + if pause and not resume: + tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {self.stb_name}{self.tdCom.des_table_suffix} order by wstart') + res_without_resume = tdSql.queryResult + tdSql.checkEqual(res_after_pause, res_without_resume) + else: + for tbname in [self.stb_name, self.ctb_name, self.tb_name]: + if tbname != self.tb_name: + self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check) + else: + self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix} order by wstart', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} {partition_elm} interval({self.tdCom.dataDict["interval"]}s) order by wstart', sorted=True, reverse_check=reverse_check) + + if self.tdCom.subtable: + for tname in [self.stb_name, self.ctb_name]: + tdSql.query(f'select * from {self.ctb_name}') + ptn_counter = 0 + for c1_value in tdSql.queryResult: + if partition == "c1": + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + elif partition is None: + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + elif partition == "abs(c1)": + abs_c1_value = abs(c1_value[1]) + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + elif partition == "tbname" and ptn_counter == 0: + tdSql.query(f'select count(*) from `{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}`;') + ptn_counter += 1 + tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) + + tdSql.query(f'select * from {self.tb_name}') + ptn_counter = 0 + for c1_value in tdSql.queryResult: + if partition == "c1": + tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs(c1_value[1])}{self.tdCom.subtable_suffix}`;') + elif partition is None: + tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}no_partition{self.tdCom.subtable_suffix}`;') + elif partition == "abs(c1)": + abs_c1_value = abs(c1_value[1]) + tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}`;') + elif partition == "tbname" and ptn_counter == 0: + tdSql.query(f'select count(*) from `{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}`;') + ptn_counter += 1 + + tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) + + + def run(self): + for delete in [True, False]: + for fill_history_value in [0, 1]: + # pause/resume + self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=False, fill_history_value=fill_history_value, delete=delete) + self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", ignore_untreated=True, fill_history_value=fill_history_value, delete=delete) + # self.pause_resume_test(interval=random.randint(10, 15), partition="tbname", resume=False, fill_history_value=fill_history_value, delete=delete) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/window_close_session_ext.py b/tests/system-test/8-stream/window_close_session_ext.py index 33990bd821..0fc041e965 100644 --- a/tests/system-test/8-stream/window_close_session_ext.py +++ b/tests/system-test/8-stream/window_close_session_ext.py @@ -13,7 +13,8 @@ class TDTestCase: tdSql.init(conn.cursor(), logSql) self.tdCom = tdCom - def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False): + def watermark_window_close_session_ext(self, session, watermark, fill_history_value=None, partition=None, subtable=None, stb_field_name_value=None, tag_value=None, use_exist_stb=False, delete=False): + tdLog.info(f"*** testing stream window_close+session+exist_stb+custom_tag: session: {session}, partition: {partition}, fill_history: {fill_history_value}, subtable: {subtable}, stb_field_name_value: {stb_field_name_value}, tag_value: {tag_value} ***") if stb_field_name_value == self.tdCom.partitial_stb_filter_des_select_elm or stb_field_name_value == self.tdCom.exchange_stb_filter_des_select_elm: partitial_tb_source_str = self.tdCom.partitial_ext_tb_source_select_str else: @@ -61,15 +62,16 @@ class TDTestCase: self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) if fill_history_value: - self.tdCom.update_delete_history_data(delete=True) + self.tdCom.update_delete_history_data(delete=delete) if tag_value: tdSql.query(f'select {tag_value} from {self.stb_name}') tag_value_list = tdSql.queryResult self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from ext_{self.stb_name}{self.tdCom.des_table_suffix} order by ts', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} session(ts, {self.tdCom.dataDict["session"]}s) order by wstart limit {expected_value};', sorted=True, defined_tag_count=defined_tag_count, tag_value_list=tag_value_list, partition=partition) def run(self): - for fill_history_value in [0, 1]: - self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), fill_history_value=fill_history_value, subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True) + #! TD-25893 + # self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=False, fill_history_value=1) + self.watermark_window_close_session_ext(session=random.randint(10, 12), watermark=random.randint(20, 25), subtable=None, partition=None, stb_field_name_value=self.tdCom.tb_filter_des_select_elm, tag_value=self.tdCom.tag_filter_des_select_elm.split(",")[0], use_exist_stb=True, delete=True) def stop(self): tdSql.close()