From a55d5c5e14f6c8ca24304506304aca867613a6fc Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Sun, 14 Apr 2024 12:13:09 +0800 Subject: [PATCH] test: modify test case of tsma for clusters --- tests/pytest/util/common.py | 52 +++++++++++++++++++++++++++++ tests/pytest/util/sql.py | 55 +++++++++++++++++++++++++++++++ tests/system-test/2-query/tsma.py | 37 +++++++++++++++++++-- 3 files changed, 142 insertions(+), 2 deletions(-) diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 61cb770a10..df50e8031c 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -1862,6 +1862,55 @@ class TDCom: time.sleep(1) return tbname + def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None): + """ + Read a JSON file, update the 'replica' value, and write the result back to a file. + + Parameters: + json_file_path (str): The path to the original JSON file. + new_replica_value (int): The new 'replica' value to be set. + output_file_path (str, optional): The path to the output file where the updated JSON will be saved. + If not provided, the original file will be overwritten. + + Returns: + None + """ + try: + # Read the JSON file and load its content into a Python dictionary + with open(json_file_path, 'r', encoding='utf-8') as file: + data = json.load(file) + + # Iterate over each item in the 'databases' list to find 'dbinfo' and update 'replica' + for db in data['databases']: + if 'dbinfo' in db: + db['dbinfo']['replica'] = new_replica_value + + # Convert the updated dictionary back into a JSON string with indentation for readability + updated_json_str = json.dumps(data, indent=4, ensure_ascii=False) + + # Write the updated JSON string to a file + if output_file_path: + # If an output file path is provided, write to the new file + with open(output_file_path, 'w', encoding='utf-8') as output_file: + output_file.write(updated_json_str) + else: + # Otherwise, overwrite the original file with the updated content + with open(json_file_path, 'w', encoding='utf-8') as file: + file.write(updated_json_str) + + except json.JSONDecodeError as e: + # Handle JSON decoding error (e.g., if the file is not valid JSON) + print(f"JSON decode error: {e}") + except FileNotFoundError: + # Handle the case where the JSON file is not found at the given path + print(f"File not found: {json_file_path}") + except KeyError as e: + # Handle missing key error (e.g., if 'databases' or 'dbinfo' is not present) + print(f"Key error: {e}") + except Exception as e: + # Handle any other exceptions that may occur + print(f"An error occurred: {e}") + def is_json(msg): if isinstance(msg, str): try: @@ -1896,4 +1945,7 @@ def dict2toml(in_dict: dict, file:str): with open(file, 'w') as f: toml.dump(in_dict, f) + + + tdCom = TDCom() diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 6b5570c8ee..b46326bb3c 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -23,6 +23,7 @@ import pandas as pd from util.log import * from util.constant import * import ctypes +import random # from datetime import timezone import time @@ -744,5 +745,59 @@ class TDSql: os.makedirs( dir, 755 ) tdLog.info("dir: %s is created" %dir) pass + + + + def get_db_vgroups(self, db_name:str = "test") -> list: + db_vgroups_list = [] + tdSql.query(f"show {db_name}.vgroups") + for result in tdSql.queryResult: + db_vgroups_list.append(result[0]) + vgroup_nums = len(db_vgroups_list) + tdLog.debug(f"{db_name} has {vgroup_nums} vgroups :{db_vgroups_list}") + tdSql.query("select * from information_schema.ins_vnodes") + return db_vgroups_list + + def get_cluseter_dnodes(self) -> list: + cluset_dnodes_list = [] + tdSql.query("show dnodes") + for result in tdSql.queryResult: + cluset_dnodes_list.append(result[0]) + self.clust_dnode_nums = len(cluset_dnodes_list) + tdLog.debug(f"cluster has {len(cluset_dnodes_list)} dnodes :{cluset_dnodes_list}") + return cluset_dnodes_list + + def redistribute_one_vgroup(self, db_name:str = "test", replica:int = 1, vgroup_id:int = 1, useful_trans_dnodes_list:list = [] ): + # redisutribute vgroup {vgroup_id} dnode {dnode_id} + if replica == 1: + dnode_id = random.choice(useful_trans_dnodes_list) + redistribute_sql = f"redistribute vgroup {vgroup_id} dnode {dnode_id}" + elif replica ==3: + selected_dnodes = random.sample(useful_trans_dnodes_list, replica) + redistribute_sql_parts = [f"dnode {dnode}" for dnode in selected_dnodes] + redistribute_sql = f"redistribute vgroup {vgroup_id} " + " ".join(redistribute_sql_parts) + else: + raise ValueError(f"Replica count must be 1 or 3,but got {replica}") + tdLog.debug(f"redistributeSql:{redistribute_sql}") + tdSql.query(redistribute_sql) + tdLog.debug("redistributeSql ok") + + def redistribute_db_all_vgroups(self, db_name:str = "test", replica:int = 1): + db_vgroups_list = self.get_db_vgroups(db_name) + cluset_dnodes_list = self.get_cluseter_dnodes() + useful_trans_dnodes_list = cluset_dnodes_list.copy() + tdSql.query("select * from information_schema.ins_vnodes") + #result: dnode_id|vgroup_id|db_name|status|role_time|start_time|restored| + + for vnode_group_id in db_vgroups_list: + print(tdSql.queryResult) + for result in tdSql.queryResult: + if result[2] == db_name and result[1] == vnode_group_id: + tdLog.debug(f"dbname: {db_name}, vgroup :{vnode_group_id}, dnode is {result[0]}") + print(useful_trans_dnodes_list) + useful_trans_dnodes_list.remove(result[0]) + tdLog.debug(f"vgroup :{vnode_group_id},redis_dnode list:{useful_trans_dnodes_list}") + self.redistribute_one_vgroup(db_name, replica, vnode_group_id, useful_trans_dnodes_list) + useful_trans_dnodes_list = cluset_dnodes_list.copy() tdSql = TDSql() diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 1f914febf6..e46fb4c489 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1205,11 +1205,20 @@ class TDTestCase: self.test_drop_tsma() self.test_tb_ddl_with_created_tsma() + def run(self): self.init_data() self.test_ddl() self.test_query_with_tsma() - + # bug to fix + # self.test_flush_query() + + #cluster test + cluster_dnode_list = tdSql.get_cluseter_dnodes() + clust_dnode_nums = len(cluster_dnode_list) + if clust_dnode_nums > 1: + self.test_redistribute_vgroups() + def test_create_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') @@ -1446,7 +1455,9 @@ class TDTestCase: def test_create_tsma_maxlist_function(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------') - os.system("taosBenchmark -f 2-query/compa4096_tsma.json -y ") + json_file = "2-query/compa4096_tsma.json" + tdCom.update_json_file_replica(json_file, self.replicaVar) + os.system(f"taosBenchmark -f {json_file} -y ") # max number of list is 4093: 4096 - 3 - 2(原始表tag个数) - 1(tbname) tdSql.execute('use db4096') @@ -1542,6 +1553,28 @@ class TDTestCase: # Invalid function para type tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406) + def test_flush_query(self): + tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1) + tdSql.execute('flush database test', queryTimes=1) + tdSql.query('select count(*) from test.meters', queryTimes=1) + tdSql.checkData(0,0,100000) + tdSql.query('select count(*) from test.norm_tb', queryTimes=1) + tdSql.checkData(0,0,10008) + tdSql.execute('flush database test', queryTimes=1) + tdSql.query('select count(*) from test.meters', queryTimes=1) + tdSql.checkData(0,0,100000) + tdSql.query('select count(*) from test.norm_tb', queryTimes=1) + tdSql.checkData(0,0,10008) + + def test_redistribute_vgroups(self): + tdSql.redistribute_db_all_vgroups('test', self.replicaVar) + tdSql.redistribute_db_all_vgroups('db4096', self.replicaVar) + + # def test_replica_dnode(self): + + # def test_split_dnode(self): + + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed")