Files
apiautotest/case_utils/requests_utils/data_handle.py
floraachy a01db401bf 调整响应数据提取,支持通过yaml用例数据传参提取指定格式的参数
支持3种类型的数据提取:1. 通过jsonpath从response.json()提取数据; 2. 通过正则表达式从response.text提取; 3. 直接从response提取cookies之类;
2023-12-05 14:02:27 +08:00

521 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# @Time : 2023/9/21 17:20
# @Author : chenyinhua
# @File : data_handle.py
# @Software: PyCharm
# @Desc:
# 标准库导入
import random
import json
import re, uuid
from datetime import datetime, date, timedelta
# 第三方库导入
from loguru import logger
from faker import Faker
from string import Template
from requests.cookies import RequestsCookieJar
from requests.utils import dict_from_cookiejar
class FakerData:
"""
测试数据生成类
官方文档https://faker.readthedocs.io/en/master/index.html
"""
def __init__(self):
self.fk_zh = Faker(locale='zh_CN')
self.faker = Faker()
@classmethod
def generate_random_int(cls, *args) -> int:
"""
:return: 随机数
"""
# 检查是否传入了参数
if not args:
# 没有传参就从5000内随机取一个整数返回
return random.randint(0, 5000)
# 排序参数并获取最小值和最大值
min_val = min(args)
max_val = max(args)
# 生成并返回随机整数
return random.randint(min_val, max_val)
def generate_phone(self, lan="en") -> int:
"""
:return: 随机生成手机号码
"""
if lan == "zh":
phone = self.fk_zh.phone_number()
else:
phone = self.faker.phone_number()
return phone
def generate_id_number(self, lan="en") -> int:
"""
:return: 随机生成身份证号码
"""
if lan == "zh":
id_number = self.fk_zh.ssn()
else:
id_number = self.faker.ssn()
return id_number
def generate_female_name(self, lan="en") -> str:
"""
:return: 女生姓名
"""
if lan == "zh":
female_name = self.fk_zh.name_female()
else:
female_name = self.faker.name_female()
return female_name
def generate_male_name(self, lan="en") -> str:
"""
:return: 男生姓名
"""
if lan == "zh":
male_name = self.fk_zh.name_male()
else:
male_name = self.faker.name_male()
return male_name
def generate_name(self, lan="en") -> str:
"""
生成人名
:return: 人名
"""
if lan == "zh":
name = self.fk_zh.name()
else:
name = self.faker.name()
return name
def generate_company_name(self, lan: str = "en", fix: str = None) -> str:
"""
生成公司名
:param lan: 语言类型可选en, zh zh表示中文en表示英文默认是en
:param fix: 前后缀可选pre suf pre表示公司前缀suf标识公司后缀
:return: 公司名
"""
if lan == "zh":
if fix == "pre":
name = self.fk_zh.company_prefix()
elif fix == "suf":
name = self.fk_zh.company_suffix()
else:
name = self.fk_zh.company()
else:
if fix == "pre":
name = self.faker.company_prefix()
elif fix == "suf":
name = self.faker.company_suffix()
else:
name = self.faker.company()
return name
def generate_paragraph(self, lan: str = "en", nb: int = 3) -> str:
"""
生成段落
:param lan: 语言类型可选en, zh zh表示中文en表示英文默认是en
:param nb: 段落个数默认是3个
"""
if lan == "zh":
text = self.fk_zh.paragraph(nb_sentences=nb, variable_nb_sentences=True, ext_word_list=None)
else:
text = self.faker.paragraph(nb_sentences=nb, variable_nb_sentences=True, ext_word_list=None)
return text
def generate_words(self, lan: str = "en", nb: int = 1) -> str:
"""
生成词语
:param lan: 语言类型可选en, zh zh表示中文en表示英文默认是en
:param nb: 词语个数默认是1个
"""
if lan == "zh":
if nb == 1 or nb < 1:
text = self.fk_zh.word(ext_word_list=None)
else:
res = self.fk_zh.words(nb=nb, ext_word_list=None)
text = "-".join(res)
else:
if nb == 1 or nb < 1:
text = self.faker.word(ext_word_list=None)
else:
res = self.faker.words(nb=nb, ext_word_list=None)
text = "-".join(res)
return text
def generate_email(self, lan="en") -> str:
"""
:return: 生成邮箱
"""
if lan == "zh":
email = self.fk_zh.email()
else:
email = self.faker.email()
return email
def generate_identifier(self, lan="en"):
"""
:return:生成随机标识满足要求长度为2~100 只能包含数字,字母,下划线(_),中划线(-),英文句号(.),必须以数字和字母开头,不能以下划线/中划线/英文句号开头和结尾
"""
if lan == "zh":
fk = self.fk_zh
else:
fk = self.faker
while True:
identifier = fk.slug() # 生成随机的slug标识
if (
re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_.-]{0,98}[a-zA-Z0-9]$', identifier) and
not (identifier.startswith('_') or identifier.startswith('-') or identifier.startswith('.')) and
not (identifier.endswith('_') or identifier.endswith('.'))
):
return identifier
@classmethod
def generate_time(cls, fmt='%Y-%m-%d %H:%M:%S') -> str:
"""
计算当前时间
:return:
"""
now_time = datetime.now().strftime(fmt)
return now_time
@classmethod
def generate_today_date(cls, fmt='%Y-%m-%d'):
"""获取今日0点整时间"""
_today = date.today().strftime(fmt) + " 00:00:00"
return str(_today)
@classmethod
def generate_time_after_week(cls, fmt='%Y-%m-%d'):
"""获取一周后12点整的时间"""
_time_after_week = (date.today() + timedelta(days=+6)).strftime(fmt) + " 00:00:00"
return _time_after_week
@classmethod
def remove_special_characters(cls, target: str):
"""
移除字符串中的特殊字符。
在Python中用replace()函数操作指定字符
常用字符unicode的编码范围
数字:\u0030-\u0039
汉字:\u4e00-\u9fa5
大写字母:\u0041-\u005a
小写字母:\u0061-\u007a
英文字母:\u0041-\u007a
"""
pattern = r'([^\u4e00-\u9fa5])'
result = re.sub(pattern, '', target)
return result
class DataHandle:
def __init__(self):
# 实例化FakerData类避免反复实例提高性能。
self.FakerDataClass = FakerData()
# 获取FakerData类所有自定义方法
self.method_list = [method for method in dir(FakerData) if
callable(getattr(FakerData, method)) and not method.startswith("__")]
# 将"[1,2,3]" 或者"{'k':'v'}" -> [1,2,3], {'k':'v'}
def eval_data(self, data):
"""
执行一个字符串表达式,并返回其表达式的值
"""
try:
if hasattr(eval(data), "__call__"):
return data
else:
return eval(data)
except Exception:
return data
def process_cookie_jar(self, data):
"""
将任意数据里的RequestsCookieJar转成dict再转换成JSON 格式的字符串(序列化)
:param data: 待处理的数据
"""
if isinstance(data, dict):
for key, value in data.items():
data[key] = self.process_cookie_jar(value)
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = self.process_cookie_jar(item)
elif isinstance(data, RequestsCookieJar):
data = json.dumps(dict_from_cookiejar(data))
return data
def replace_and_store_placeholders(self, pattern, text, resultAsDict=True):
"""
提取字符串中符合正则表达式的元素同时用一个唯一的uuid来替换原有字符串
例如:
原字符串user_id: ${user_id}, user_name: ${user_name}
替换后的原字符串user_id: e1c6fc74-2f21-49a9-8d0c-de16650c6364, user_name: 50c74155-5cb5-4809-bc5d-277addf8c3e7
暂存的需要被处理的关键字或函数:{'e1c6fc74-2f21-49a9-8d0c-de16650c6364': {0: '${user_id}', 1: 'user_id'}, '50c74155-5cb5-4809-bc5d-277addf8c3e7': {0: '${user_name}', 1: 'user_name'}}
"""
placeholders = {}
def replace(match):
placeholder = str(uuid.uuid4()) # 使用uuid生成唯一的占位符
placeholders[placeholder] = {0: f'${match.group(1)}', 1: match.group(1)} # 将提取到的字符串存储到字典中
return placeholder
# 使用正则表达式进行字符串匹配和替换,同时指定替换次数为 1
replaced_text = re.sub(pattern, replace, text, count=1)
while replaced_text != text:
text = replaced_text
replaced_text = re.sub(pattern, replace, text, count=1)
if resultAsDict:
return replaced_text, placeholders
else:
# 构造结果字符串
result = '{\n'
for key, value in placeholders.items():
result += f" '{key}': {{0: \"{value[0]}\", 1: \"{value[1]}\"}},\n"
result += '}'
return replaced_text, result
def data_handle(self, obj, source=None):
obj = self.eval_data(obj)
return self.data_handle_(obj, source)
def data_handle_(self, obj, source=None):
"""
递归处理字典、列表中的字符串,将${}占位符替换成source中的值
"""
func = {}
keys = {}
source = {} if not source or not isinstance(source, dict) else source
logger.trace(f"source={source}")
# 处理一下source检测到里面存在RequestsCookieJar转成dict再转换成JSON 格式的字符串(序列化)。
# 避免传递过来一个RequestsCookieJar替换后变成了'RequestsCookieJar'导致cookies无法使用的问题
source = self.process_cookie_jar(data=source)
# 如果进来的是字符串,先将各种类型的表达式处理完
if isinstance(obj, str):
# 先把python表达式找出来存着这里会漏掉一些诸如1+1的表达式
pattern = r"\${([^}]+\))}" # 匹配以 "${" 开头、以 ")}" 结尾的字符串,并在括号内提取内容,括号内不能包含"}"字符
obj, func = self.replace_and_store_placeholders(pattern, obj)
# 模板替换
should_eval = 0
if obj.startswith("${") and obj.endswith("}"):
if source.get(obj[2:-1]) and not isinstance(source[obj[2:-1]], str):
should_eval = 1
obj = Template(obj).safe_substitute(source)
if should_eval == 1:
obj = self.eval_data(obj)
if not isinstance(obj, str):
return self.data_handle(obj)
# 再找一遍剩余的${}跟第一步的结果合并提取漏掉的诸如1+1的表达式(在此认为关键字无法替换的都是表达式,最后表达式也无法处理的情况就报错或者原样返回)
pattern = r'\$\{([^}]+)\}' # 定义匹配以"${"开头,"}"结尾的字符串的正则表达式
obj, func_temp = self.replace_and_store_placeholders(pattern, obj)
func.update(func_temp)
# 进行函数调用替换
obj = self.invoke_funcs(obj, func)
if not isinstance(obj, str):
return self.data_handle(obj)
# 直接返回最后的结果
return obj
elif isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = self.data_handle(item, source)
return obj
elif isinstance(obj, dict):
for key, value in obj.items():
obj[key] = self.data_handle(value, source)
return obj
else:
return obj
def invoke_funcs(self, obj, funcs):
"""
调用方法并将方法返回的结果替换到obj中去
"""
for key, funcs in funcs.items(): # 遍历方法字典调用并替换
func = funcs[1]
# logger.debug("invoke func : ", func)
try:
if "." in func:
if func.startswith("faker."):
# 英文的faker数据self.faker = Faker()
faker = self.FakerDataClass.faker
obj = self.deal_func_res(obj, key, eval(func))
elif func.startswith("fk_zh."):
# 中文的faker数据 self.fk_zh = Faker(locale='zh_CN')
fk_zh = self.FakerDataClass.fk_zh
obj = self.deal_func_res(obj, key, eval(func))
else:
obj = self.deal_func_res(obj, key, eval(func))
else:
func_parts = func.split('(')
func_name = func_parts[0]
func_args_str = ''.join(func_parts[1:])[:-1]
if func_name in self.method_list: # 证明是FakerData类方法
method = getattr(self.FakerDataClass, func_name)
res = eval(f"method({func_args_str})") # 尝试直接调用
logger.debug("==========res type:", type(res), "obj:", obj, "key:", key)
obj = self.deal_func_res(obj, key, res)
else: # 不是FakerData类方法但有可能是 1+1 这样的
obj = self.deal_func_res(obj, key, eval(func))
except:
logger.warning("Warn: --------函数:%s 无法调用成功, 请检查是否存在该函数-------" % func)
obj = obj.replace(key, funcs[0])
return obj
def deal_func_res(self, obj, key, res):
obj = obj.replace(key, str(res))
try:
if not isinstance(res, str):
obj = eval(obj)
except:
msg = f"表达式:{obj},结果值:{res},结果类型:{type(res)}"
logger.warning("Warn: --------无法根据函数返回值eval可能原始的字符串并不是python表达式-------,%s" % msg)
return obj
# 声明data_handle方法这样外部就可以直接import data_handle来使用了
data_handle = DataHandle().data_handle
if __name__ == '__main__':
# 下面是测试代码
print("\n----------测试场景1: 识别${python表达式}这里random方法是需要导入random包的---------------------\n")
data = "选择.gitignore: ${random.choice(['Ada', 'Actionscript', 'Ansible', 'Android', 'Agda'])},开源许可证: ${random.choice(['0BSD', 'AAL', 'AFL-1.1', '389-exception'])}"
new = data_handle(data)
print(new, type(new),
end="\n\n---------------------------------------------------------------------------------------------\n\n")
print("-----------测试场景2识别${python表达式},可以在当前文件导入其他模块,一样可以识别替换---------------------")
# 导入其他方法,也可以直接使用
# from common_utils.time_handle import test_fun_a
# data = "${test_fun_a()}"
# new = data_handle(data)
# print(new, type(new))
print("\n-----------测试场景3识别FakerData类中的方法---------------------\n")
"""
使用FakerData类中的方法可以直接这样写${generate_random_int()} 也可以带上类名:${FakerData().generate_random_int()}
"""
data = {
"age": "${generate_random_int()}",
"message": "Hello, ${FakerData().generate_female_name()}!",
"nested_data": [
"This is ${name}'s data.",
{
"message": "Age: ${generate_random_int()}",
"nested_list": [
"More data: ${FakerData().generate_random_int()}",
]
}
]
}
new = data_handle(data)
print(new, type(new), end="\n\n")
"""
使用FakerData类中的方法, 支持方法传参使用注意参数如果是str格式建议使用单引号
"""
payload = {
"name": "${generate_name(lan='zh')}",
"repository_name": "${generate_name('zh')}",
"desc": '[[1,2,3,4],"${FakerData().generate_random_int()}"]',
"pre": '[[1,2,3,4],${FakerData().generate_name()}]',
"startTime": "${FakerData.generate_time('%Y-%m-%d')}",
}
new = data_handle(payload)
print(new, type(new), end="\n\n")
"""
还可以直接使用FakerData类中的实例属性
"""
data = {
"payload": {
"en_name": "${faker.name()}", # 这里是使用类FakerData里面的实例属性faker
"zh_name": "${fk_zh.name()}", # 这里是使用类FakerData里面的实例属性fk_zh
"url": "/api/accounts/${FakerData.generate_time('%Y-%m-%d')}/login.json",
}
}
new = data_handle(data)
print(new, type(new), end="\n\n")
"""
FakerData类中没有封装random_name这个方法会无法处理
"""
data = '[[1,2,3,4],"${FakerData().random_name()}"]'
new = data_handle(data)
print(new, type(new),
end="\n\n---------------------------------------------------------------------------------------------\n\n")
print("\n-----------测试场景4识别${}进行关键字替换---------------------\n")
user_info = {
"user_id": 104,
"user_name": "flora"
}
data_03 = "user_id: ${user_id}, user_name: ${user_name}"
new = data_handle(data_03, user_info)
print(new, type(new), end="\n\n")
"""
识别${}进行关键字替换时会保留原值的类型。 比如eval('1,2,4')会变成元组(1,2,4)。经过本方法处理,会保留原有格式
"""
data = {
"winner_id": "${winner_id}",
"user_id": "${user_id}",
"time": "${generate_time()}",
"attachment_ids": "${attachment_ids}",
"assigned_id": "${assigned_id}",
"cookies": "${cookies}"
}
source = {
"winner_id": "1,2,4",
"assigned_id": [],
'报告标题': 'UI自动化测试报告', '项目名称': 'GitLink 确实开源', 'tester': '陈银花',
'department': '开源中心', 'env': 'https://testforgeplus.trustie.net',
'host': 'https://testforgeplus.trustie.net', 'login': 'autotest',
'nickname': 'autotest', 'user_id': 106, 'super_login': 'floraachy', 'super_user_id': 103,
'project_id': '59',
'repo_id': '59', 'project_url': '/autotest/auotest',
'attachment_ids': ['85b7f7ff-59e6-4f38-88da-29440aa4fc18', 'ba23f9b1-ad92-476d-ac4d-aba1382a9636'],
'file_name': 'gitlinklogo3.jpg',
'cookies': '{"_educoder_session": "d79e0e75f71cd98a9df2665d405b49e7", "autologin_trustie": "d25b412c26388182a50e8be38e4b9731c4e783ba"}',
}
new = data_handle(obj=data, source=source)
print(new, type(new),
end="\n\n---------------------------------------------------------------------------------------------\n\n")
print("\n-----------测试场景5识别 字符串里面是python表达式的情况---------------------\n")
data = [
"[1,2,3,4]", "1+1", "[1, '1', [1, 2], {'name':'flora', 'age': '1'}]"
]
new = data_handle(data)
print(new, type(new),
end="\n\n---------------------------------------------------------------------------------------------\n\n")