优化动态数据的热加载功能,并将数据提取方式单独拎出来变成单独的模块

This commit is contained in:
floraachy
2023-06-28 16:54:17 +08:00
parent b98d1da094
commit a96d674518
3 changed files with 279 additions and 65 deletions

View File

@@ -6,13 +6,146 @@
# @Software: PyCharm
# @Desc: 数据处理
# 标准库导入
import random
import re
from string import Template
from jsonpath import jsonpath
from loguru import logger
from datetime import datetime, date, timedelta
# 第三方库导入
from faker import Faker
from string import Template
faker = Faker()
class FakerData:
"""
测试数据生成类
"""
def __init__(self):
self.fk_zh = Faker(locale='zh_CN')
self.faker = Faker()
@classmethod
def generate_random_int(cls, *args) -> int:
"""
:return: 随机数
"""
# 检查是否传入了参数
if not args:
# 没有传参就从5000内随机取一个整数返回
return random.randint(0, 5000)
# 排序参数并获取最小值和最大值
min_val = min(args)
max_val = max(args)
# 生成并返回随机整数
return random.randint(min_val, max_val)
def generate_phone(self, lan="en") -> int:
"""
:return: 随机生成手机号码
"""
if lan == "zh":
phone = self.fk_zh.phone_number()
else:
phone = self.faker.phone_number()
return phone
def generate_id_number(self, lan="en") -> int:
"""
:return: 随机生成身份证号码
"""
if lan == "zh":
id_number = self.fk_zh.ssn()
else:
id_number = self.faker.ssn()
return id_number
def generate_female_name(self, lan="en") -> str:
"""
:return: 女生姓名
"""
if lan == "zh":
female_name = self.fk_zh.name_female()
else:
female_name = self.faker.name_female()
return female_name
def generate_male_name(self, lan="en") -> str:
"""
:return: 男生姓名
"""
if lan == "zh":
male_name = self.fk_zh.name_male()
else:
male_name = self.faker.name_male()
return male_name
def generate_name(self, lan="en") -> str:
"""
:return: 人名
"""
if lan == "zh":
name = self.fk_zh.name()
else:
name = self.faker.name()
return name
def generate_email(self, lan="en") -> str:
"""
:return: 生成邮箱
"""
if lan == "zh":
email = self.fk_zh.email()
else:
email = self.faker.email()
return email
def generate_identifier(self, lan="en"):
"""
:return:生成随机标识满足要求长度为2~100 只能包含数字,字母,下划线(_),中划线(-),英文句号(.),必须以数字和字母开头,不能以下划线/中划线/英文句号开头和结尾
"""
if lan == "zh":
fk = self.fk_zh
else:
fk = self.faker
while True:
identifier = fk.slug() # 生成随机的slug标识
if (
re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_.-]{0,98}[a-zA-Z0-9]$', identifier) and
not (identifier.startswith('_') or identifier.startswith('-') or identifier.startswith('.')) and
not (identifier.endswith('_') or identifier.endswith('.'))
):
return identifier
@classmethod
def generate_time(cls, fmt='%Y-%m-%d %H:%M:%S') -> str:
"""
计算当前时间
:return:
"""
now_time = datetime.now().strftime(fmt)
return now_time
@classmethod
def generate_today_date(cls):
"""获取今日0点整时间"""
_today = date.today().strftime("%Y-%m-%d") + " 00:00:00"
return str(_today)
@classmethod
def generate_time_after_week(cls):
"""获取一周后12点整的时间"""
_time_after_week = (date.today() + timedelta(days=+6)).strftime("%Y-%m-%d") + " 00:00:00"
return _time_after_week
def data_handle(obj, source):
@@ -25,7 +158,26 @@ def data_handle(obj, source):
obj = Template(obj).safe_substitute(source)
# 寻找${python表达式} 将Python表达式eval得出其具体值
for func in re.findall('\\${(.*?)}', obj):
obj = obj.replace('${%s}' % func, eval_data_process(func))
"""
兼容一下如下数据处理faker.name().replace(" ", "").replace(".", "")
faker是FakerData()中的实例变量。
"""
if func.startswith("faker."):
# 英文的faker数据self.faker = Faker()
faker = FakerData().faker
obj = obj.replace('${%s}' % func, eval(func))
elif func.startswith("fk_zh."):
# 中文的faker数据 self.fk_zh = Faker(locale='zh_CN')
fk_zh = FakerData().fk_zh
obj = obj.replace('${%s}' % func, eval(func))
else:
# 这里获取到的func是这样的格式例如random_int() 但实际我们通过FakerData()获取到的方法是这样的格式例如random_int, 所以我们需要进行处理
func_name = func.split("(")[0] if "(" in func else func
if hasattr(FakerData(), func_name) and callable(getattr(FakerData(), func_name)):
# 调用FakerData类的方法获取数据
obj = obj.replace('${%s}' % func, str(getattr(FakerData(), func_name)()))
# 处理其他Python表达式例如${1+1}
obj = obj.replace('${%s}' % func, func)
obj = eval_data_process(obj)
return obj
return eval_data_process(obj)
@@ -73,55 +225,3 @@ def eval_data_process(data):
for key, value in data.items():
data[key] = eval_data_process(eval_data(value))
return data
def json_extractor(obj: dict, expr: str = '.'):
"""
:param obj :json/dict类型数据
:param expr: 表达式, . 提取字典所有内容, $.test_api_case 提取一级字典case $.test_api_case.data 提取case字典下的data
:return result: 提取的结果,未提取到返回 None
"""
try:
result = jsonpath(obj, expr)[0]
logger.debug("\n======================================================\n" \
"-------------Startjson_extractor--------------------\n"
f"提取表达式为: {expr} \n"
f"提取值为: {result}\n"
"=====================================================")
print("提取响应内容成功,提取表达式为: {} 提取值为 {}".format(expr, result))
except Exception as e:
logger.debug("\n======================================================\n" \
"-------------Endjson_extractor--------------------\n"
f"提取表达式为: {expr}\n"
f"提取数据为: {obj}\n"
f"错误信息为:{e}\n"
"=====================================================")
print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj} 错误信息为:{e}')
result = None
return result
def re_extract(obj: str, expr: str = '.'):
"""
:param obj : 字符串数据
:param expr: 正则表达式
:return result: 提取的结果,未提取到返回 None
"""
try:
result = re.findall(expr, obj)[0]
logger.debug("\n======================================================\n" \
"-------------Startre_extract--------------------\n"
f"提取表达式为: {expr}\n" \
f"提取值为: {result}\n" \
"=====================================================")
print("提取响应内容成功,提取表达式为: {} 提取值为: {}".format(expr, result))
except Exception as e:
logger.debug(f"\n======================================================\n" \
"-------------Endre_extract--------------------\n"
f"提取表达式为: {expr}\n" \
f"提取数据为: {obj}\n" \
f"错误信息为:{e}\n" \
"=====================================================")
print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj} 错误信息为:{e}')
result = None
return result

View File

@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# @Time : 2023/6/28 14:35
# @Author : chenyinhua
# @File : extract_data_handle.py
# @Software: PyCharm
# @Desc: 提取数据的一些方法
# 标准库导入
import re
# 第三方库导入
from jsonpath import jsonpath
from loguru import logger
def json_extractor(obj: dict, expr: str = '.'):
"""
:param obj :json/dict类型数据
:param expr: 表达式, . 提取字典所有内容, $.test_api_case 提取一级字典case $.test_api_case.data 提取case字典下的data
:return result: 提取的结果,未提取到返回 None
"""
try:
result = jsonpath(obj, expr)[0]
logger.debug("\n======================================================\n" \
"-------------Startjson_extractor--------------------\n"
f"提取表达式为: {expr} \n"
f"提取值为: {result}\n"
"=====================================================")
print("提取响应内容成功,提取表达式为: {} 提取值为 {}".format(expr, result))
except Exception as e:
logger.debug("\n======================================================\n" \
"-------------Endjson_extractor--------------------\n"
f"提取表达式为: {expr}\n"
f"提取数据为: {obj}\n"
f"错误信息为:{e}\n"
"=====================================================")
print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj} 错误信息为:{e}')
result = None
return result
def re_extract(obj: str, expr: str = '.'):
"""
:param obj : 字符串数据
:param expr: 正则表达式
:return result: 提取的结果,未提取到返回 None
"""
try:
result = re.findall(expr, obj)[0]
logger.debug("\n======================================================\n" \
"-------------Startre_extract--------------------\n"
f"提取表达式为: {expr}\n" \
f"提取值为: {result}\n" \
"=====================================================")
print("提取响应内容成功,提取表达式为: {} 提取值为: {}".format(expr, result))
except Exception as e:
logger.debug(f"\n======================================================\n" \
"-------------Endre_extract--------------------\n"
f"提取表达式为: {expr}\n" \
f"提取数据为: {obj}\n" \
f"错误信息为:{e}\n" \
"=====================================================")
print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj} 错误信息为:{e}')
result = None
return result

View File

@@ -18,8 +18,8 @@ case_new_project_demo_01:
request_type: json
payload:
"user_id": ${user_id}
"name": ${faker.name().replace(" ", "").replace(".", "")}
"repository_name": ${faker.name().replace(" ", "").replace(".", "")}
"name": ${generate_name(len='zh')}
"repository_name": ${generate_identifier()}
files:
extract:
project_id: $.id
@@ -34,7 +34,7 @@ case_new_project_demo_01:
case_new_project_demo_02:
feature: 新建项目
title: 正确输入各项必填参数新建项目成功不校验数据库单独传cookies
run: False
run: True
url: /api/projects.json
method: POST
headers:
@@ -43,8 +43,8 @@ case_new_project_demo_02:
request_type: json
payload:
"user_id": ${user_id}
"name": ${faker.name().replace(" ", "").replace(".", "")}
"repository_name": ${faker.name().replace(" ", "").replace(".", "")}
"name": ${generate_name(generate_name(len="zh"))}
"repository_name": ${generate_identifier()}
files:
extract:
project_id: $.id
@@ -67,8 +67,8 @@ case_new_project_demo_03:
request_type: json
payload:
"user_id": ${user_id}
"name": ${faker.name().replace(" ", "").replace(".", "")}
"repository_name": ${faker.name().replace(" ", "").replace(".", "")}
"name": ${generate_name()}
"repository_name": ${generate_identifier()}
files:
extract:
project_id: $.id
@@ -83,4 +83,54 @@ case_new_project_demo_03:
sql: select id,`name`, identifier from projects where user_id=${user_id} ORDER BY created_on DESC;
$.id: ${project_id}
$.name: ${project_name}
$.identifier: ${project_identifier}
$.identifier: ${project_identifier}
case_new_project_demo_04:
feature: 新建项目
title: 正确输入各项必填参数新建项目成功04
run: True
url: /api/projects.json
method: POST
headers:
Content-Type: application/json; charset=utf-8;
cookies: ${login_cookie}
request_type: json
payload:
"user_id": ${user_id}
"name": ${faker.name()}
"repository_name": ${generate_identifier()}
files:
extract:
project_id: $.id
project_name: $.name
project_identifier: $.identifier
assert_response:
eq:
http_code: 200
$.login: ${login}
assert_sql:
case_new_project_demo_05:
feature: 新建项目
title: 正确输入各项必填参数新建项目成功05
run: True
url: /api/projects.json
method: POST
headers:
Content-Type: application/json; charset=utf-8;
cookies: ${login_cookie}
request_type: json
payload:
"user_id": ${user_id}
"name": ${fk_zh.name()}
"repository_name": ${generate_identifier()}
files:
extract:
project_id: $.id
project_name: $.name
project_identifier: $.identifier
assert_response:
eq:
http_code: 200
$.login: ${login}
assert_sql: