diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index df50e8031c..7bb2f42495 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -201,6 +201,9 @@ class TDCom: self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)" self.tag_count = len(self.tag_filter_des_select_elm.split(",")) self.state_window_range = list() + + self.custom_col_val = 0 + self.part_val_list = [1, 2] # def init(self, conn, logSql): # # tdSql.init(conn.cursor(), logSql) @@ -1259,7 +1262,7 @@ class TDCom: default_ctbname_index_start_num += 1 tdSql.execute(create_stable_sql) - def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None): + def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None, additional_ts=None, custom_col_index=None, col_value_type=None, force_pk_val=None): """_summary_ Args: @@ -1269,6 +1272,8 @@ class TDCom: """ self.column_value_list = list() self.ts_value = self.genTs()[0] + if additional_ts is not None: + self.additional_ts = self.genTs(additional_ts=additional_ts)[2] if ts_value is not None: self.ts_value = ts_value @@ -1292,7 +1297,22 @@ class TDCom: for i in range(int(len(self.column_value_list)/2)): index_num = random.randint(0, len(self.column_value_list)-1) self.column_value_list[index_num] = None - self.column_value_list = [self.ts_value] + self.column_value_list + + if custom_col_index is not None: + if col_value_type == "Random": + pass + elif col_value_type == "Incremental": + self.column_value_list[custom_col_index] = self.custom_col_val + self.custom_col_val += 1 + elif col_value_type == "Part_equal": + self.column_value_list[custom_col_index] = random.choice(self.part_val_list) + + self.column_value_list = [self.ts_value] + [self.additional_ts] + self.column_value_list if additional_ts is not None else [self.ts_value] + self.column_value_list + if col_value_type == "Incremental" and custom_col_index==1: + self.column_value_list[custom_col_index] = self.custom_col_val if force_pk_val is None else force_pk_val + if col_value_type == "Part_equal" and custom_col_index==1: + self.column_value_list[custom_col_index] = random.randint(0, self.custom_col_val) if force_pk_val is None else force_pk_val + def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None, count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1, @@ -1333,7 +1353,7 @@ class TDCom: default_tbname_index_start_num += 1 tdSql.execute(create_table_sql) - def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False): + def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False, custom_col_index=None, col_value_type="random"): """insert rows Args: @@ -1353,7 +1373,7 @@ class TDCom: if tbname is not None: self.tbname = tbname - self.sgen_column_value_list(column_ele_list, need_null, ts_value) + self.sgen_column_value_list(column_ele_list, need_null, ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) # column_value_str = ", ".join(str(v) for v in self.column_value_list) column_value_str = "" for column_value in self.column_value_list: @@ -1370,7 +1390,7 @@ class TDCom: else: for num in range(count): ts_value = self.genTs()[0] - self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s') + self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s', custom_col_index=custom_col_index, col_value_type=col_value_type) column_value_str = "" for column_value in self.column_value_list: if column_value is None: @@ -1777,7 +1797,7 @@ class TDCom: self.sdelete_rows(tbname=self.ctb_name, start_ts=self.time_cast(self.record_history_ts, "-")) self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-")) - def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None): + def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None, custom_col_index=None, col_value_type="random"): """prepare stream data Args: @@ -1840,8 +1860,8 @@ class TDCom: if fill_history_value == 1: for i in range(self.range_count): ts_value = str(self.date_time)+f'-{self.default_interval*(i+1)}s' - self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value) - self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value) + self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) + self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) if i == 1: self.record_history_ts = ts_value @@ -1862,6 +1882,18 @@ class TDCom: time.sleep(1) return tbname + def get_group_id_from_stb(self, stbname): + tdSql.query(f'select distinct group_id from {stbname}') + cnt = 0 + while len(tdSql.queryResult) == 0: + tdSql.query(f'select distinct group_id from {stbname}') + if cnt < self.default_interval: + cnt += 1 + time.sleep(1) + else: + return False + return tdSql.queryResult[0][0] + def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None): """ Read a JSON file, update the 'replica' value, and write the result back to a file. diff --git a/tests/system-test/8-stream/at_once_interval.py b/tests/system-test/8-stream/at_once_interval.py index 763b88bc69..eb581e84c4 100644 --- a/tests/system-test/8-stream/at_once_interval.py +++ b/tests/system-test/8-stream/at_once_interval.py @@ -15,9 +15,12 @@ class TDTestCase: def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None): tdLog.info(f"*** testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***") + col_value_type = "Incremental" if partition=="c1" else "random" + custom_col_index = 1 if partition=="c1" else None + self.tdCom.custom_col_val = 0 self.delete = delete self.tdCom.case_name = sys._getframe().f_code.co_name - self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value) + self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type) self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") @@ -76,15 +79,15 @@ class TDTestCase: for i in range(self.tdCom.range_count): ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' ts_cast_delete_value = self.tdCom.time_cast(ts_value) - self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) if i%2 == 0: - self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) if self.delete and i%2 != 0: self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value) self.tdCom.date_time += 1 - self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) if i%2 == 0: - self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value) + self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type) if self.delete and i%2 != 0: self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value) self.tdCom.date_time += 1 @@ -102,6 +105,7 @@ class TDTestCase: if self.tdCom.subtable: for tname in [self.stb_name, self.ctb_name]: + group_id = self.tdCom.get_group_id_from_stb(f'{tname}_output') tdSql.query(f'select * from {self.ctb_name}') ptn_counter = 0 for c1_value in tdSql.queryResult: @@ -116,11 +120,11 @@ class TDTestCase: tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{tname}_output_{group_id}') tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) - + group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output') tdSql.query(f'select * from {self.tb_name}') ptn_counter = 0 for c1_value in tdSql.queryResult: @@ -135,7 +139,7 @@ class TDTestCase: tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}') tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}') tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 diff --git a/tests/system-test/8-stream/at_once_session.py b/tests/system-test/8-stream/at_once_session.py index 6f25e5ad97..cdded9388c 100644 --- a/tests/system-test/8-stream/at_once_session.py +++ b/tests/system-test/8-stream/at_once_session.py @@ -15,9 +15,12 @@ class TDTestCase: def at_once_session(self, session, ignore_expired=None, ignore_update=None, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True): tdLog.info(f"*** testing stream at_once+interval: session: {session}, ignore_expired: {ignore_expired}, ignore_update: {ignore_update}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, case_when: {case_when}, subtable: {subtable} ***") + col_value_type = "Incremental" if partition=="c1" else "random" + custom_col_index = 1 if partition=="c1" else None + self.tdCom.custom_col_val = 0 self.delete = delete self.tdCom.case_name = sys._getframe().f_code.co_name - self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value) + self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type) self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "") self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "") self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "") @@ -79,11 +82,11 @@ class TDTestCase: if i == 0: record_window_close_ts = window_close_ts for ts_value in [self.tdCom.date_time, window_close_ts]: - self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True) - self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type) if self.tdCom.update and i%2 == 0: - self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True) - self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True) + self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type) + self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type) if self.delete and i%2 != 0: dt = f'cast({self.tdCom.date_time-1} as timestamp)' self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt) @@ -166,6 +169,7 @@ class TDTestCase: self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-")) if self.tdCom.subtable: + group_id = self.tdCom.get_group_id_from_stb(f'{self.ctb_name}_output') tdSql.query(f'select * from {self.ctb_name}') ptn_counter = 0 for c1_value in tdSql.queryResult: @@ -182,11 +186,11 @@ class TDTestCase: tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}') + tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{self.ctb_name}_output_{group_id}') tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1 tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True) - + group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output') tdSql.query(f'select * from {self.tb_name}') ptn_counter = 0 for c1_value in tdSql.queryResult: @@ -203,7 +207,7 @@ class TDTestCase: tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}') tdSql.query(f'select count(*) from `{tbname}`') elif partition == "tbname" and ptn_counter == 0: - tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}') + tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}') tdSql.query(f'select count(*) from `{tbname}`') ptn_counter += 1