From 7450a8584d0284dd5ec346c9eb5fbd1107dd93da Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 21 Aug 2023 19:42:52 +0800 Subject: [PATCH] update --- tests/pytest/util/common.py | 45 ++++- .../8-stream/max_delay_interval.py | 161 ++++++++++++++++++ .../system-test/8-stream/max_delay_session.py | 100 +++++++++++ .../8-stream/partition_interval.py | 105 ++++++++++++ .../8-stream/window_close_session.py | 29 ---- 5 files changed, 410 insertions(+), 30 deletions(-) create mode 100644 tests/system-test/8-stream/max_delay_interval.py create mode 100644 tests/system-test/8-stream/max_delay_session.py create mode 100644 tests/system-test/8-stream/partition_interval.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index e6e2822f0e..a512ae605f 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -29,7 +29,7 @@ from util.constant import * from dataclasses import dataclass,field from typing import List from datetime import datetime - +import re @dataclass class DataSet: ts_data : List[int] = field(default_factory=list) @@ -141,6 +141,8 @@ class TDCom: self.default_interval = 5 self.stream_timeout = 12 self.record_history_ts = str() + self.precision = "ms" + self.date_time = self.genTs(precision=self.precision)[0] self.subtable = True self.partition_tbname_alias = "ptn_alias" if self.subtable else "" self.partition_col_alias = "pcol_alias" if self.subtable else "" @@ -172,6 +174,31 @@ class TDCom: self.update = False self.partition_by_downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "count(c8)", "spread(c1)", "stddev(c2)", "hyperloglog(c11)", "min(t1)", "max(t2)", "sum(t3)", "first(t4)", "last(t5)", "count(t8)", "spread(t1)", "stddev(t2)"] + + self.stb_data_filter_sql = f'ts >= {self.date_time}+1s and c1 = 1 or c2 > 1 and c3 != 4 or c4 <= 3 and c9 <> 0 or c10 is not Null or c11 is Null or \ + c12 between "na" and "nchar4" and c11 not between "bi" and "binary" and c12 match "nchar[19]" and c12 nmatch "nchar[25]" or c13 = True or \ + c5 in (1, 2, 3) or c6 not in (6, 7) and c12 like "nch%" and c11 not like "bina_" and c6 < 10 or c12 is Null or c8 >= 4 and t1 = 1 or t2 > 1 \ + and t3 != 4 or c4 <= 3 and t9 <> 0 or t10 is not Null or t11 is Null or t12 between "na" and "nchar4" and t11 not between "bi" and "binary" \ + or t12 match "nchar[19]" or t12 nmatch "nchar[25]" or t13 = True or t5 in (1, 2, 3) or t6 not in (6, 7) and t12 like "nch%" \ + and t11 not like "bina_" and t6 <= 10 or t12 is Null or t8 >= 4' + self.tb_data_filter_sql = self.stb_data_filter_sql.partition(" and t1")[0] + + self.filter_source_select_elm = "*" + self.stb_filter_des_select_elm = "ts, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13" + self.partitial_stb_filter_des_select_elm = ",".join(self.stb_filter_des_select_elm.split(",")[:3]) + self.exchange_stb_filter_des_select_elm = ",".join([self.stb_filter_des_select_elm.split(",")[0], self.stb_filter_des_select_elm.split(",")[2], self.stb_filter_des_select_elm.split(",")[1]]) + self.partitial_ext_tb_source_select_str = ','.join(self.downsampling_function_list[0:2]) + self.tb_filter_des_select_elm = self.stb_filter_des_select_elm.partition(", t1")[0] + self.tag_filter_des_select_elm = self.stb_filter_des_select_elm.partition("c13, ")[2] + self.partition_by_stb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.partition_by_downsampling_function_list))) + self.partition_by_stb_source_select_str = ','.join(self.partition_by_downsampling_function_list) + self.exchange_tag_filter_des_select_elm = ",".join([self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[0], self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[2], self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[1]]) + self.partitial_tag_filter_des_select_elm = ",".join(self.stb_filter_des_select_elm.partition("c13, ")[2].split(",")[:3]) + self.partitial_tag_stb_filter_des_select_elm = "ts, c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, t1, t3, t2, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13" + self.cast_tag_filter_des_select_elm = "t5,t11,t13" + self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)" + self.tag_count = len(self.tag_filter_des_select_elm.split(",")) + self.state_window_range = list() # def init(self, conn, logSql): # # tdSql.init(conn.cursor(), logSql) @@ -1322,6 +1349,22 @@ class TDCom: nl.append(tuple(query_data_l)) return nl + def trans_time_to_s(self, runtime): + if "d" in str(runtime).lower(): + d_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0] + s_num = float(d_num) * 24 * 60 * 60 + elif "h" in str(runtime).lower(): + h_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0] + s_num = float(h_num) * 60 * 60 + elif "m" in str(runtime).lower(): + m_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0] + s_num = float(m_num) * 60 + elif "s" in str(runtime).lower(): + s_num = re.findall("\d+\.?\d*", runtime.replace(" ", ""))[0] + else: + s_num = 60 + return int(s_num) + def check_query_data(self, sql1, sql2, sorted=False, fill_value=None, tag_value_list=None, defined_tag_count=None, partition=True, use_exist_stb=False, subtable=None, reverse_check=False): tdLog.info("checking query data ...") if tag_value_list: diff --git a/tests/system-test/8-stream/max_delay_interval.py b/tests/system-test/8-stream/max_delay_interval.py new file mode 100644 index 0000000000..9306118e30 --- /dev/null +++ b/tests/system-test/8-stream/max_delay_interval.py @@ -0,0 +1,161 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def watermark_max_delay_interval(self, interval, max_delay, watermark=None, fill_value=None, delete=False): + tdLog.info(f"*** testing stream max_delay+interval: interval: {interval}, watermark: {watermark}, fill_value: {fill_value}, delete: {delete} ***") + self.delete = delete + self.case_name = sys._getframe().f_code.co_name + if watermark is not None: + self.case_name = "watermark" + sys._getframe().f_code.co_name + self.tdCom.prepare_data(interval=interval, watermark=watermark) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.tdCom.date_time = 1658921623245 + if watermark is not None: + watermark_value = f'{self.tdCom.dataDict["watermark"]}s' + fill_watermark_value = watermark_value + else: + watermark_value = None + fill_watermark_value = "0s" + + max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s' + if fill_value: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' + # create stb/ctb/tb stream + self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) + if fill_value: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,4,5,6,7,8,9,10,11' + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_value=fill_value) + init_num = 0 + start_time = self.tdCom.date_time + for i in range(self.tdCom.range_count): + if i == 0: + if watermark is not None: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark']) + else: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval']) + else: + self.tdCom.date_time = window_close_ts + self.tdCom.offset + window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset + for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)): + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + if not fill_value: + for tbname in [self.stb_stream_des_table, self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]: + if tbname != self.tdCom.tb_stream_des_table: + tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}') + else: + tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}') + tdSql.checkEqual(tdSql.queryRows, init_num) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + + if i == 0: + init_num = 2 + i + if watermark is not None: + init_num += 1 + else: + init_num += 1 + time.sleep(int(max_delay.replace("s", ""))) + if not fill_value: + for tbname in [self.stb_name, self.ctb_name, self.tb_name]: + if tbname != self.tb_name: + self.tdCom.check_query_data(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)') + else: + self.tdCom.check_query_data(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {tbname} interval({self.tdCom.dataDict["interval"]}s)') + if fill_value: + history_ts = str(start_time)+f'-{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s' + start_ts = self.tdCom.time_cast(history_ts, "-") + future_ts = str(self.tdCom.date_time)+f'+{self.tdCom.dataDict["interval"]*(self.tdCom.range_count+2)}s' + end_ts = self.tdCom.time_cast(future_ts) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=history_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=history_ts) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=future_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=future_ts) + future_ts_bigint = self.tdCom.str_ts_trans_bigint(future_ts) + if watermark is not None: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark']) + else: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(future_ts_bigint, self.tdCom.dataDict['interval']) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + + if self.tdCom.update: + for i in range(self.tdCom.range_count): + if i == 0: + if watermark is not None: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval'], self.tdCom.dataDict['watermark']) + else: + window_close_ts = self.tdCom.cal_watermark_window_close_interval_endts(self.tdCom.date_time, self.tdCom.dataDict['interval']) + else: + self.tdCom.date_time = window_close_ts + self.tdCom.offset + window_close_ts += self.tdCom.dataDict['interval']*self.tdCom.offset + for num in range(int(window_close_ts/self.tdCom.offset-self.tdCom.date_time/self.tdCom.offset)): + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=self.tdCom.date_time+num*self.tdCom.offset) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts-1) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts-1) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + if self.delete: + self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts)) + self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(start_time), end_ts=self.tdCom.time_cast(window_close_ts)) + time.sleep(int(max_delay.replace("s", ""))) + for tbname in [self.stb_name, self.ctb_name, self.tb_name]: + if tbname != self.tb_name: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11' + self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_stb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value) + else: + if "value" in fill_value.lower(): + fill_value='VALUE,1,2,3,6,7,8,9,10,11' + self.tdCom.check_query_data(f'select wstart, {self.tdCom.fill_tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, {self.tdCom.fill_tb_source_select_str} from {tbname} where ts >= {start_ts} and ts <= {end_ts}+{self.tdCom.dataDict["interval"]}s+{fill_watermark_value} interval({self.tdCom.dataDict["interval"]}s) fill ({fill_value})', fill_value=fill_value) + + + def run(self): + for watermark in [None, random.randint(20, 25)]: + self.watermark_max_delay_interval(interval=random.choice([15]), watermark=watermark, max_delay=f"{random.randint(5, 6)}s") + for fill_value in ["NULL", "PREV", "NEXT", "LINEAR", "VALUE,1,2,3,4,5,6,7,8,9,10,11,1,2,3,4,5,6,7,8,9,10,11"]: + self.watermark_max_delay_interval(interval=random.randint(10, 15), watermark=None, max_delay=f"{random.randint(5, 6)}s", fill_value=fill_value) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/max_delay_session.py b/tests/system-test/8-stream/max_delay_session.py new file mode 100644 index 0000000000..874665dcc9 --- /dev/null +++ b/tests/system-test/8-stream/max_delay_session.py @@ -0,0 +1,100 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def watermark_max_delay_session(self, session, watermark, max_delay, fill_history_value=None): + tdLog.info(f"*** testing stream max_delay+session: session: {session}, watermark: {watermark}, max_delay: {max_delay}, fill_history_value: {fill_history_value} ***") + self.tdCom.case_name = sys._getframe().f_code.co_name + if watermark is not None: + self.tdCom.case_name = "watermark" + sys._getframe().f_code.co_name + self.tdCom.prepare_data(session=session, watermark=watermark, fill_history_value=fill_history_value) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + self.tdCom.date_time = self.tdCom.dataDict["start_ts"] + + if watermark is not None: + watermark_value = f'{self.tdCom.dataDict["watermark"]}s' + else: + watermark_value = None + max_delay_value = f'{self.tdCom.trans_time_to_s(max_delay)}s' + self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {self.ctb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) + self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {self.tb_name} session(ts, {self.tdCom.dataDict["session"]}s)', trigger_mode="max_delay", watermark=watermark_value, max_delay=max_delay_value, fill_history_value=fill_history_value) + init_num = 0 + for i in range(self.tdCom.range_count): + if i == 0: + window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session']) + else: + self.tdCom.date_time = window_close_ts + 1 + window_close_ts = self.tdCom.cal_watermark_window_close_session_endts(self.tdCom.date_time, self.tdCom.dataDict['watermark'], self.tdCom.dataDict['session']) + + if watermark_value is not None: + for ts_value in [self.tdCom.date_time, window_close_ts-1]: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + for tbname in [self.tdCom.ctb_stream_des_table, self.tdCom.tb_stream_des_table]: + if tbname != self.tdCom.tb_stream_des_table: + tdSql.query(f'select wstart, {self.tdCom.stb_output_select_str} from {tbname}') + else: + tdSql.query(f'select wstart, {self.tdCom.tb_output_select_str} from {tbname}') + if not fill_history_value: + tdSql.checkEqual(tdSql.queryRows, init_num) + + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + if self.tdCom.update and i%2 == 0: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=window_close_ts) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=window_close_ts) + if i == 0: + init_num = 2 + i + else: + init_num += 1 + if watermark_value is not None: + expected_value = init_num + else: + expected_value = i + 1 + + if not fill_history_value: + for tbname in [self.ctb_name, self.tb_name]: + if tbname != self.tb_name: + self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay) + else: + self.tdCom.check_stream(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)', expected_value, max_delay) + else: + self.tdCom.update_delete_history_data(delete=True) + for tbname in [self.ctb_name, self.tb_name]: + if tbname != self.tb_name: + self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.stb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.stb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)') + else: + self.tdCom.check_query_data(f'select wstart, wend-{self.tdCom.dataDict["session"]}s, {self.tdCom.tb_output_select_str} from {tbname}{self.tdCom.des_table_suffix}', f'select _wstart AS wstart, _wend AS wend, {self.tdCom.tb_source_select_str} from {tbname} session(ts, {self.tdCom.dataDict["session"]}s)') + + def run(self): + for fill_history_value in [None, 1]: + for watermark in [None, random.randint(20, 30)]: + self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(1, 3)}s", fill_history_value=fill_history_value) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/partition_interval.py b/tests/system-test/8-stream/partition_interval.py new file mode 100644 index 0000000000..f12cf038e0 --- /dev/null +++ b/tests/system-test/8-stream/partition_interval.py @@ -0,0 +1,105 @@ +import sys +import threading +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + self.tdCom = tdCom + + def partitionby_interval(self, interval=None, partition_by_elm="tbname", ignore_expired=None): + tdLog.info(f"*** testing stream partition+interval: interval: {interval}, partition_by: {partition_by_elm}, ignore_expired: {ignore_expired} ***") + self.tdCom.case_name = sys._getframe().f_code.co_name + self.tdCom.prepare_data(interval=interval) + self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") + self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") + self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") + self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' + self.tdCom.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' + self.tdCom.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' + ctb_name_list = list() + for i in range(1, self.tdCom.range_count): + ctb_name = self.tdCom.get_long_name() + ctb_name_list.append(ctb_name) + self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name) + if interval is not None: + source_sql = f'select _wstart AS wstart, {self.tdCom.partition_by_stb_source_select_str} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s)' + else: + source_sql = f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm}' + + # create stb/ctb/tb stream + self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=source_sql, ignore_expired=ignore_expired) + # insert data + count = 1 + step_count = 1 + for i in range(1, self.tdCom.range_count): + if i == 1: + record_window_close_ts = self.tdCom.date_time - 15 * self.tdCom.offset + ctb_name = self.tdCom.get_long_name() + self.tdCom.screate_ctable(stbname=self.stb_name, ctbname=ctb_name) + if i % 2 == 0: + step_count += i + for j in range(count, step_count): + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s') + for ctb_name in ctb_name_list: + self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{j}s') + count += i + else: + step_count += 1 + for i in range(2): + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s') + for ctb_name in ctb_name_list: + self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=f'{self.tdCom.date_time}+{count}s') + count += 1 + # check result + for colname in self.tdCom.partition_by_downsampling_function_list: + if "first" not in colname and "last" not in colname: + if interval is not None: + self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;') + else: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;') + + if self.tdCom.disorder: + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=record_window_close_ts) + for ctb_name in ctb_name_list: + self.tdCom.sinsert_rows(tbname=ctb_name, ts_value=record_window_close_ts) + if ignore_expired: + if "first" not in colname and "last" not in colname: + for colname in self.tdCom.partition_by_downsampling_function_list: + if interval is not None: + tdSql.query(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;') + res1 = tdSql.queryResult + tdSql.query(f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;') + res2 = tdSql.queryResult + tdSql.checkNotEqual(res1, res2) + else: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;') + + else: + for colname in self.tdCom.partition_by_downsampling_function_list: + if "first" not in colname and "last" not in colname: + if interval is not None: + self.tdCom.check_query_data(f'select `{colname}` from {self.stb_name}{self.tdCom.des_table_suffix} order by `{colname}`;', f'select {colname} from {self.stb_name} partition by {partition_by_elm} interval({self.tdCom.dataDict["interval"]}s) order by `{colname}`;') + else: + self.tdCom.check_query_data(f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name}{self.tdCom.des_table_suffix} order by c1,c2,c3;', f'select {self.tdCom.stb_filter_des_select_elm} from {self.stb_name} partition by {partition_by_elm} order by c1,c2,c3;') + + def run(self): + for interval in [None, 10]: + for ignore_expired in [0, 1]: + self.partitionby_interval(interval=interval, partition_by_elm="tbname", ignore_expired=ignore_expired) + self.partitionby_interval(interval=10, partition_by_elm="t1") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/8-stream/window_close_session.py b/tests/system-test/8-stream/window_close_session.py index 8ee097ca10..ddd366b45a 100644 --- a/tests/system-test/8-stream/window_close_session.py +++ b/tests/system-test/8-stream/window_close_session.py @@ -12,34 +12,6 @@ class TDTestCase: tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql) self.tdCom = tdCom - self.tdCom.subtable = True - self.tdCom.update = True - self.tdCom.disorder = True - if self.tdCom.disorder: - self.tdCom.update = False - self.tdCom.partition_tbname_alias = "ptn_alias" if self.tdCom.subtable else "" - self.tdCom.partition_col_alias = "pcol_alias" if self.tdCom.subtable else "" - self.tdCom.partition_tag_alias = "ptag_alias" if self.tdCom.subtable else "" - self.tdCom.partition_expression_alias = "pexp_alias" if self.tdCom.subtable else "" - self.stb_name = str() - self.ctb_name = str() - self.tb_name = str() - self.tdCom.des_table_suffix = "_output" - self.tdCom.stream_suffix = "_stream" - self.tdCom.stream_case_when_tbname = "tbname" - self.tdCom.subtable_prefix = "prefix_" if self.tdCom.subtable else "" - self.tdCom.subtable_suffix = "_suffix" if self.tdCom.subtable else "" - self.stb_stream_des_table = str() - self.ctb_stream_des_table = str() - self.tb_stream_des_table = str() - self.downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "apercentile(c6, 50)", "avg(c7)", "count(c8)", "spread(c1)", - "stddev(c2)", "hyperloglog(c11)", "timediff(1, 0, 1h)", "timezone()", "to_iso8601(1)", 'to_unixtimestamp("1970-01-01T08:00:00+08:00")', "min(t1)", "max(t2)", "sum(t3)", - "first(t4)", "last(t5)", "apercentile(t6, 50)", "avg(t7)", "count(t8)", "spread(t1)", "stddev(t2)", "hyperloglog(t11)"] - self.tdCom.stb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list))) - self.tdCom.stb_source_select_str = ','.join(self.downsampling_function_list) - self.tdCom.tb_source_select_str = ','.join(self.downsampling_function_list[0:15]) - self.tdCom.partition_by_downsampling_function_list = ["min(c1)", "max(c2)", "sum(c3)", "first(c4)", "last(c5)", "count(c8)", "spread(c1)", - "stddev(c2)", "hyperloglog(c11)", "min(t1)", "max(t2)", "sum(t3)", "first(t4)", "last(t5)", "count(t8)", "spread(t1)", "stddev(t2)"] def watermark_window_close_session(self, session, watermark, fill_history_value=None, delete=True): tdLog.info(f"*** testing stream window_close+session: session: {session}, watermark: {watermark}, fill_history: {fill_history_value}, delete: {delete} ***") @@ -53,7 +25,6 @@ class TDTestCase: self.stb_stream_des_table = f'{self.stb_name}{self.tdCom.des_table_suffix}' self.ctb_stream_des_table = f'{self.ctb_name}{self.tdCom.des_table_suffix}' self.tb_stream_des_table = f'{self.tb_name}{self.tdCom.des_table_suffix}' - self.tdCom.tb_output_select_str = ','.join(list(map(lambda x:f'`{x}`', self.downsampling_function_list[0:15]))) self.tdCom.date_time = self.tdCom.dataDict["start_ts"] if watermark is not None: watermark_value = f'{self.tdCom.dataDict["watermark"]}s'