test: update stream case for subtable
This commit is contained in:
parent
ae7b3179a0
commit
3a9a21b825
|
@ -201,6 +201,9 @@ class TDCom:
|
|||
self.cast_tag_stb_filter_des_select_elm = "ts, t1, t2, t3, t4, cast(t1 as TINYINT UNSIGNED), t6, t7, t8, t9, t10, cast(t2 as varchar(256)), t12, cast(t3 as bool)"
|
||||
self.tag_count = len(self.tag_filter_des_select_elm.split(","))
|
||||
self.state_window_range = list()
|
||||
|
||||
self.custom_col_val = 0
|
||||
self.part_val_list = [1, 2]
|
||||
# def init(self, conn, logSql):
|
||||
# # tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
|
@ -1259,7 +1262,7 @@ class TDCom:
|
|||
default_ctbname_index_start_num += 1
|
||||
tdSql.execute(create_stable_sql)
|
||||
|
||||
def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None):
|
||||
def sgen_column_value_list(self, column_elm_list, need_null, ts_value=None, additional_ts=None, custom_col_index=None, col_value_type=None, force_pk_val=None):
|
||||
"""_summary_
|
||||
|
||||
Args:
|
||||
|
@ -1269,6 +1272,8 @@ class TDCom:
|
|||
"""
|
||||
self.column_value_list = list()
|
||||
self.ts_value = self.genTs()[0]
|
||||
if additional_ts is not None:
|
||||
self.additional_ts = self.genTs(additional_ts=additional_ts)[2]
|
||||
if ts_value is not None:
|
||||
self.ts_value = ts_value
|
||||
|
||||
|
@ -1292,7 +1297,22 @@ class TDCom:
|
|||
for i in range(int(len(self.column_value_list)/2)):
|
||||
index_num = random.randint(0, len(self.column_value_list)-1)
|
||||
self.column_value_list[index_num] = None
|
||||
self.column_value_list = [self.ts_value] + self.column_value_list
|
||||
|
||||
if custom_col_index is not None:
|
||||
if col_value_type == "Random":
|
||||
pass
|
||||
elif col_value_type == "Incremental":
|
||||
self.column_value_list[custom_col_index] = self.custom_col_val
|
||||
self.custom_col_val += 1
|
||||
elif col_value_type == "Part_equal":
|
||||
self.column_value_list[custom_col_index] = random.choice(self.part_val_list)
|
||||
|
||||
self.column_value_list = [self.ts_value] + [self.additional_ts] + self.column_value_list if additional_ts is not None else [self.ts_value] + self.column_value_list
|
||||
if col_value_type == "Incremental" and custom_col_index==1:
|
||||
self.column_value_list[custom_col_index] = self.custom_col_val if force_pk_val is None else force_pk_val
|
||||
if col_value_type == "Part_equal" and custom_col_index==1:
|
||||
self.column_value_list[custom_col_index] = random.randint(0, self.custom_col_val) if force_pk_val is None else force_pk_val
|
||||
|
||||
|
||||
def screate_table(self, dbname=None, tbname="tb", use_name="table", column_elm_list=None,
|
||||
count=1, default_tbname_prefix="tb", default_tbname_index_start_num=1,
|
||||
|
@ -1333,7 +1353,7 @@ class TDCom:
|
|||
default_tbname_index_start_num += 1
|
||||
tdSql.execute(create_table_sql)
|
||||
|
||||
def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False):
|
||||
def sinsert_rows(self, dbname=None, tbname=None, column_ele_list=None, ts_value=None, count=1, need_null=False, custom_col_index=None, col_value_type="random"):
|
||||
"""insert rows
|
||||
|
||||
Args:
|
||||
|
@ -1353,7 +1373,7 @@ class TDCom:
|
|||
if tbname is not None:
|
||||
self.tbname = tbname
|
||||
|
||||
self.sgen_column_value_list(column_ele_list, need_null, ts_value)
|
||||
self.sgen_column_value_list(column_ele_list, need_null, ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
# column_value_str = ", ".join(str(v) for v in self.column_value_list)
|
||||
column_value_str = ""
|
||||
for column_value in self.column_value_list:
|
||||
|
@ -1370,7 +1390,7 @@ class TDCom:
|
|||
else:
|
||||
for num in range(count):
|
||||
ts_value = self.genTs()[0]
|
||||
self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s')
|
||||
self.sgen_column_value_list(column_ele_list, need_null, f'{ts_value}+{num}s', custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
column_value_str = ""
|
||||
for column_value in self.column_value_list:
|
||||
if column_value is None:
|
||||
|
@ -1777,7 +1797,7 @@ class TDCom:
|
|||
self.sdelete_rows(tbname=self.ctb_name, start_ts=self.time_cast(self.record_history_ts, "-"))
|
||||
self.sdelete_rows(tbname=self.tb_name, start_ts=self.time_cast(self.record_history_ts, "-"))
|
||||
|
||||
def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None):
|
||||
def prepare_data(self, interval=None, watermark=None, session=None, state_window=None, state_window_max=127, interation=3, range_count=None, precision="ms", fill_history_value=0, ext_stb=None, custom_col_index=None, col_value_type="random"):
|
||||
"""prepare stream data
|
||||
|
||||
Args:
|
||||
|
@ -1840,8 +1860,8 @@ class TDCom:
|
|||
if fill_history_value == 1:
|
||||
for i in range(self.range_count):
|
||||
ts_value = str(self.date_time)+f'-{self.default_interval*(i+1)}s'
|
||||
self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value)
|
||||
self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value)
|
||||
self.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
self.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if i == 1:
|
||||
self.record_history_ts = ts_value
|
||||
|
||||
|
@ -1862,6 +1882,18 @@ class TDCom:
|
|||
time.sleep(1)
|
||||
return tbname
|
||||
|
||||
def get_group_id_from_stb(self, stbname):
|
||||
tdSql.query(f'select distinct group_id from {stbname}')
|
||||
cnt = 0
|
||||
while len(tdSql.queryResult) == 0:
|
||||
tdSql.query(f'select distinct group_id from {stbname}')
|
||||
if cnt < self.default_interval:
|
||||
cnt += 1
|
||||
time.sleep(1)
|
||||
else:
|
||||
return False
|
||||
return tdSql.queryResult[0][0]
|
||||
|
||||
def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None):
|
||||
"""
|
||||
Read a JSON file, update the 'replica' value, and write the result back to a file.
|
||||
|
|
|
@ -15,9 +15,12 @@ class TDTestCase:
|
|||
|
||||
def at_once_interval(self, interval, partition="tbname", delete=False, fill_value=None, fill_history_value=None, case_when=None):
|
||||
tdLog.info(f"*** testing stream at_once+interval: interval: {interval}, partition: {partition}, fill_history: {fill_history_value}, fill: {fill_value}, delete: {delete}, case_when: {case_when} ***")
|
||||
col_value_type = "Incremental" if partition=="c1" else "random"
|
||||
custom_col_index = 1 if partition=="c1" else None
|
||||
self.tdCom.custom_col_val = 0
|
||||
self.delete = delete
|
||||
self.tdCom.case_name = sys._getframe().f_code.co_name
|
||||
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value)
|
||||
self.tdCom.prepare_data(interval=interval, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
|
@ -76,15 +79,15 @@ class TDTestCase:
|
|||
for i in range(self.tdCom.range_count):
|
||||
ts_value = str(self.tdCom.date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s'
|
||||
ts_cast_delete_value = self.tdCom.time_cast(ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.ctb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if self.delete and i%2 != 0:
|
||||
self.tdCom.sdelete_rows(tbname=self.tdCom.ctb_name, start_ts=ts_cast_delete_value)
|
||||
self.tdCom.date_time += 1
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value)
|
||||
self.tdCom.sinsert_rows(tbname=self.tdCom.tb_name, ts_value=ts_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if self.delete and i%2 != 0:
|
||||
self.tdCom.sdelete_rows(tbname=self.tdCom.tb_name, start_ts=ts_cast_delete_value)
|
||||
self.tdCom.date_time += 1
|
||||
|
@ -102,6 +105,7 @@ class TDTestCase:
|
|||
|
||||
if self.tdCom.subtable:
|
||||
for tname in [self.stb_name, self.ctb_name]:
|
||||
group_id = self.tdCom.get_group_id_from_stb(f'{tname}_output')
|
||||
tdSql.query(f'select * from {self.ctb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
|
@ -116,11 +120,11 @@ class TDTestCase:
|
|||
tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}')
|
||||
tbname = self.tdCom.get_subtable_wait(f'{tname}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{tname}_output_{group_id}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
ptn_counter += 1
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True)
|
||||
|
||||
group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output')
|
||||
tdSql.query(f'select * from {self.tb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
|
@ -135,7 +139,7 @@ class TDTestCase:
|
|||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{abs_c1_value}{self.tdCom.subtable_suffix}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}')
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
ptn_counter += 1
|
||||
|
||||
|
|
|
@ -15,9 +15,12 @@ class TDTestCase:
|
|||
|
||||
def at_once_session(self, session, ignore_expired=None, ignore_update=None, partition="tbname", delete=False, fill_history_value=None, case_when=None, subtable=True):
|
||||
tdLog.info(f"*** testing stream at_once+interval: session: {session}, ignore_expired: {ignore_expired}, ignore_update: {ignore_update}, partition: {partition}, delete: {delete}, fill_history: {fill_history_value}, case_when: {case_when}, subtable: {subtable} ***")
|
||||
col_value_type = "Incremental" if partition=="c1" else "random"
|
||||
custom_col_index = 1 if partition=="c1" else None
|
||||
self.tdCom.custom_col_val = 0
|
||||
self.delete = delete
|
||||
self.tdCom.case_name = sys._getframe().f_code.co_name
|
||||
self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value)
|
||||
self.tdCom.prepare_data(session=session, fill_history_value=fill_history_value, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
self.stb_name = self.tdCom.stb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.ctb_name = self.tdCom.ctb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
self.tb_name = self.tdCom.tb_name.replace(f"{self.tdCom.dbname}.", "")
|
||||
|
@ -79,11 +82,11 @@ class TDTestCase:
|
|||
if i == 0:
|
||||
record_window_close_ts = window_close_ts
|
||||
for ts_value in [self.tdCom.date_time, window_close_ts]:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True)
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True)
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if self.tdCom.update and i%2 == 0:
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True)
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True)
|
||||
self.tdCom.sinsert_rows(tbname=self.ctb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
self.tdCom.sinsert_rows(tbname=self.tb_name, ts_value=ts_value, need_null=True, custom_col_index=custom_col_index, col_value_type=col_value_type)
|
||||
if self.delete and i%2 != 0:
|
||||
dt = f'cast({self.tdCom.date_time-1} as timestamp)'
|
||||
self.tdCom.sdelete_rows(tbname=self.ctb_name, start_ts=dt)
|
||||
|
@ -166,6 +169,7 @@ class TDTestCase:
|
|||
self.tdCom.sdelete_rows(tbname=self.tb_name, start_ts=self.tdCom.time_cast(self.tdCom.record_history_ts, "-"))
|
||||
|
||||
if self.tdCom.subtable:
|
||||
group_id = self.tdCom.get_group_id_from_stb(f'{self.ctb_name}_output')
|
||||
tdSql.query(f'select * from {self.ctb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
|
@ -182,11 +186,11 @@ class TDTestCase:
|
|||
tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}')
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.ctb_name}_{self.tdCom.subtable_prefix}{self.ctb_name}{self.tdCom.subtable_suffix}_{self.ctb_name}_output_{group_id}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
ptn_counter += 1
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0] > 0, True) if subtable is not None else tdSql.checkEqual(tdSql.queryResult[0][0] >= 0, True)
|
||||
|
||||
group_id = self.tdCom.get_group_id_from_stb(f'{self.tb_name}_output')
|
||||
tdSql.query(f'select * from {self.tb_name}')
|
||||
ptn_counter = 0
|
||||
for c1_value in tdSql.queryResult:
|
||||
|
@ -203,7 +207,7 @@ class TDTestCase:
|
|||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{partition_elm_alias}{self.tdCom.subtable_suffix}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
elif partition == "tbname" and ptn_counter == 0:
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}')
|
||||
tbname = self.tdCom.get_subtable_wait(f'{self.tb_name}_{self.tdCom.subtable_prefix}{self.tb_name}{self.tdCom.subtable_suffix}_{self.tb_name}_output_{group_id}')
|
||||
tdSql.query(f'select count(*) from `{tbname}`')
|
||||
ptn_counter += 1
|
||||
|
||||
|
|
Loading…
Reference in New Issue