test: modify test case of tsma for clusters
This commit is contained in:
parent
05aade099d
commit
a55d5c5e14
|
@ -1862,6 +1862,55 @@ class TDCom:
|
|||
time.sleep(1)
|
||||
return tbname
|
||||
|
||||
def update_json_file_replica(self, json_file_path, new_replica_value, output_file_path=None):
|
||||
"""
|
||||
Read a JSON file, update the 'replica' value, and write the result back to a file.
|
||||
|
||||
Parameters:
|
||||
json_file_path (str): The path to the original JSON file.
|
||||
new_replica_value (int): The new 'replica' value to be set.
|
||||
output_file_path (str, optional): The path to the output file where the updated JSON will be saved.
|
||||
If not provided, the original file will be overwritten.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
try:
|
||||
# Read the JSON file and load its content into a Python dictionary
|
||||
with open(json_file_path, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
|
||||
# Iterate over each item in the 'databases' list to find 'dbinfo' and update 'replica'
|
||||
for db in data['databases']:
|
||||
if 'dbinfo' in db:
|
||||
db['dbinfo']['replica'] = new_replica_value
|
||||
|
||||
# Convert the updated dictionary back into a JSON string with indentation for readability
|
||||
updated_json_str = json.dumps(data, indent=4, ensure_ascii=False)
|
||||
|
||||
# Write the updated JSON string to a file
|
||||
if output_file_path:
|
||||
# If an output file path is provided, write to the new file
|
||||
with open(output_file_path, 'w', encoding='utf-8') as output_file:
|
||||
output_file.write(updated_json_str)
|
||||
else:
|
||||
# Otherwise, overwrite the original file with the updated content
|
||||
with open(json_file_path, 'w', encoding='utf-8') as file:
|
||||
file.write(updated_json_str)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
# Handle JSON decoding error (e.g., if the file is not valid JSON)
|
||||
print(f"JSON decode error: {e}")
|
||||
except FileNotFoundError:
|
||||
# Handle the case where the JSON file is not found at the given path
|
||||
print(f"File not found: {json_file_path}")
|
||||
except KeyError as e:
|
||||
# Handle missing key error (e.g., if 'databases' or 'dbinfo' is not present)
|
||||
print(f"Key error: {e}")
|
||||
except Exception as e:
|
||||
# Handle any other exceptions that may occur
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
def is_json(msg):
|
||||
if isinstance(msg, str):
|
||||
try:
|
||||
|
@ -1896,4 +1945,7 @@ def dict2toml(in_dict: dict, file:str):
|
|||
with open(file, 'w') as f:
|
||||
toml.dump(in_dict, f)
|
||||
|
||||
|
||||
|
||||
|
||||
tdCom = TDCom()
|
||||
|
|
|
@ -23,6 +23,7 @@ import pandas as pd
|
|||
from util.log import *
|
||||
from util.constant import *
|
||||
import ctypes
|
||||
import random
|
||||
# from datetime import timezone
|
||||
import time
|
||||
|
||||
|
@ -744,5 +745,59 @@ class TDSql:
|
|||
os.makedirs( dir, 755 )
|
||||
tdLog.info("dir: %s is created" %dir)
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def get_db_vgroups(self, db_name:str = "test") -> list:
|
||||
db_vgroups_list = []
|
||||
tdSql.query(f"show {db_name}.vgroups")
|
||||
for result in tdSql.queryResult:
|
||||
db_vgroups_list.append(result[0])
|
||||
vgroup_nums = len(db_vgroups_list)
|
||||
tdLog.debug(f"{db_name} has {vgroup_nums} vgroups :{db_vgroups_list}")
|
||||
tdSql.query("select * from information_schema.ins_vnodes")
|
||||
return db_vgroups_list
|
||||
|
||||
def get_cluseter_dnodes(self) -> list:
|
||||
cluset_dnodes_list = []
|
||||
tdSql.query("show dnodes")
|
||||
for result in tdSql.queryResult:
|
||||
cluset_dnodes_list.append(result[0])
|
||||
self.clust_dnode_nums = len(cluset_dnodes_list)
|
||||
tdLog.debug(f"cluster has {len(cluset_dnodes_list)} dnodes :{cluset_dnodes_list}")
|
||||
return cluset_dnodes_list
|
||||
|
||||
def redistribute_one_vgroup(self, db_name:str = "test", replica:int = 1, vgroup_id:int = 1, useful_trans_dnodes_list:list = [] ):
|
||||
# redisutribute vgroup {vgroup_id} dnode {dnode_id}
|
||||
if replica == 1:
|
||||
dnode_id = random.choice(useful_trans_dnodes_list)
|
||||
redistribute_sql = f"redistribute vgroup {vgroup_id} dnode {dnode_id}"
|
||||
elif replica ==3:
|
||||
selected_dnodes = random.sample(useful_trans_dnodes_list, replica)
|
||||
redistribute_sql_parts = [f"dnode {dnode}" for dnode in selected_dnodes]
|
||||
redistribute_sql = f"redistribute vgroup {vgroup_id} " + " ".join(redistribute_sql_parts)
|
||||
else:
|
||||
raise ValueError(f"Replica count must be 1 or 3,but got {replica}")
|
||||
tdLog.debug(f"redistributeSql:{redistribute_sql}")
|
||||
tdSql.query(redistribute_sql)
|
||||
tdLog.debug("redistributeSql ok")
|
||||
|
||||
def redistribute_db_all_vgroups(self, db_name:str = "test", replica:int = 1):
|
||||
db_vgroups_list = self.get_db_vgroups(db_name)
|
||||
cluset_dnodes_list = self.get_cluseter_dnodes()
|
||||
useful_trans_dnodes_list = cluset_dnodes_list.copy()
|
||||
tdSql.query("select * from information_schema.ins_vnodes")
|
||||
#result: dnode_id|vgroup_id|db_name|status|role_time|start_time|restored|
|
||||
|
||||
for vnode_group_id in db_vgroups_list:
|
||||
print(tdSql.queryResult)
|
||||
for result in tdSql.queryResult:
|
||||
if result[2] == db_name and result[1] == vnode_group_id:
|
||||
tdLog.debug(f"dbname: {db_name}, vgroup :{vnode_group_id}, dnode is {result[0]}")
|
||||
print(useful_trans_dnodes_list)
|
||||
useful_trans_dnodes_list.remove(result[0])
|
||||
tdLog.debug(f"vgroup :{vnode_group_id},redis_dnode list:{useful_trans_dnodes_list}")
|
||||
self.redistribute_one_vgroup(db_name, replica, vnode_group_id, useful_trans_dnodes_list)
|
||||
useful_trans_dnodes_list = cluset_dnodes_list.copy()
|
||||
|
||||
tdSql = TDSql()
|
||||
|
|
|
@ -1205,11 +1205,20 @@ class TDTestCase:
|
|||
self.test_drop_tsma()
|
||||
self.test_tb_ddl_with_created_tsma()
|
||||
|
||||
|
||||
def run(self):
|
||||
self.init_data()
|
||||
self.test_ddl()
|
||||
self.test_query_with_tsma()
|
||||
|
||||
# bug to fix
|
||||
# self.test_flush_query()
|
||||
|
||||
#cluster test
|
||||
cluster_dnode_list = tdSql.get_cluseter_dnodes()
|
||||
clust_dnode_nums = len(cluster_dnode_list)
|
||||
if clust_dnode_nums > 1:
|
||||
self.test_redistribute_vgroups()
|
||||
|
||||
def test_create_tsma(self):
|
||||
function_name = sys._getframe().f_code.co_name
|
||||
tdLog.debug(f'-----{function_name}------')
|
||||
|
@ -1446,7 +1455,9 @@ class TDTestCase:
|
|||
def test_create_tsma_maxlist_function(self):
|
||||
function_name = sys._getframe().f_code.co_name
|
||||
tdLog.debug(f'-----{function_name}------')
|
||||
os.system("taosBenchmark -f 2-query/compa4096_tsma.json -y ")
|
||||
json_file = "2-query/compa4096_tsma.json"
|
||||
tdCom.update_json_file_replica(json_file, self.replicaVar)
|
||||
os.system(f"taosBenchmark -f {json_file} -y ")
|
||||
# max number of list is 4093: 4096 - 3 - 2(原始表tag个数) - 1(tbname)
|
||||
tdSql.execute('use db4096')
|
||||
|
||||
|
@ -1542,6 +1553,28 @@ class TDTestCase:
|
|||
# Invalid function para type
|
||||
tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406)
|
||||
|
||||
def test_flush_query(self):
|
||||
tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
|
||||
tdSql.execute('flush database test', queryTimes=1)
|
||||
tdSql.query('select count(*) from test.meters', queryTimes=1)
|
||||
tdSql.checkData(0,0,100000)
|
||||
tdSql.query('select count(*) from test.norm_tb', queryTimes=1)
|
||||
tdSql.checkData(0,0,10008)
|
||||
tdSql.execute('flush database test', queryTimes=1)
|
||||
tdSql.query('select count(*) from test.meters', queryTimes=1)
|
||||
tdSql.checkData(0,0,100000)
|
||||
tdSql.query('select count(*) from test.norm_tb', queryTimes=1)
|
||||
tdSql.checkData(0,0,10008)
|
||||
|
||||
def test_redistribute_vgroups(self):
|
||||
tdSql.redistribute_db_all_vgroups('test', self.replicaVar)
|
||||
tdSql.redistribute_db_all_vgroups('db4096', self.replicaVar)
|
||||
|
||||
# def test_replica_dnode(self):
|
||||
|
||||
# def test_split_dnode(self):
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
Loading…
Reference in New Issue