将接口参数处理,接口请求,接口断言,接口参数提取这些流程整理成一个方法Request_Control, 同时支持将接口请求参数payload以及接口提取的参数保存到GLOBAL_VARS中,方便下一个接口使用上一个接口的数据

This commit is contained in:
floraachy
2024-07-22 16:34:32 +08:00
parent 87673c5e6f
commit 017eb15b6a
8 changed files with 324 additions and 346 deletions

View File

@@ -20,6 +20,7 @@ xpinyin = "==0.7.6"
yagmail = "==0.15.293"
pytest-repeat = "==0.9.3"
jsonpath = "*"
pytest = "==8.0.2"
[dev-packages]

108
README.md
View File

@@ -46,22 +46,24 @@
## 三、依赖库
```
allure-pytest = "==2.9.45"
click = "==8.1.7"
faker = "==21.0.0"
loguru = "==0.7.2"
openpyxl = "==3.1.2"
pydantic = "==2.5.2"
pymysql = "==1.1.0"
pytest-rerunfailures = "==12.0"
pyyaml = "==6.0.1"
requests-toolbelt = "==1.0.0"
"ruamel.yaml" = "==0.18.5"
sshtunnel = "==0.4.0"
xpinyin = "==0.7.6"
yagmail = "==0.15.293"
pytest-repeat = "==0.9.3"
jsonpath = "*"
allure_python_commons==2.9.45
click==8.1.7
Faker==21.0.0
jsonpath==0.82.2
loguru==0.7.2
openpyxl==3.1.2
pydantic==2.7.1
PyMySQL==1.1.0
pytest==8.0.2 Pytest 8.2.0 和 8.1.0 的几个版本会破坏 allure 的 listener 导致执行报错也就是版本不兼容需把pytest 回退到8.0.2就不报错啦!
PyYAML==6.0.1
PyYAML==6.0.1
Requests==2.31.0
requests_toolbelt==1.0.0
ruamel.base==1.0.0
sshtunnel==0.4.0
xpinyin==0.7.6
yagmail==0.15.293
```
@@ -74,24 +76,37 @@ https://gitlink.org.cn/floraachy/apiautotest.git
2. 本地电脑搭建好 python环境我使用的python版本是3.9。包括allure测试报告所需的java环境安装jdk
3. 安装环境依赖包
1) 方法一使用pipenv管理所有依赖
3. 安装依赖包
方法一使用pipenv管理依赖
```
# 安装pipenv, 这个需要全局安装。建议在项目根目录下执行命令安装
注意: 如果不熟悉pipenv 可以跳过该步骤。
```
1) 安装pipenv
```
# 建议在项目根目录下执行命令安装
pip install pipenv
# 使用pipenv管理安装环境依赖包必须在项目根目录下执行
pipenv install
# 注意使用pipenv install会自动安装Pipfile里面的依赖包该依赖包仅安装在虚拟环境里不安装在测试机本机
```
2方法二使用requirements.txt一键安装所有依赖
2) 使用pipenv管理安装环境依赖包pipenv install (必须在项目根目录下执行)
```
注意使用pipenv install会自动安装Pipfile里面的依赖包该依赖包仅安装在虚拟环境里不安装在测试机。
```
方法二:直接将依赖包安装在本机
```
pip install -r requirements.txt
```
# 注意:这种方式安装,会将依赖包全部安装在测试机(本机)
扩展: 使用 pipreqs 生成 requirements.txt
```
# 安装
pip install pipreqs
# 在当前目录生成
pipreqs . --encoding=utf8 --force
注意 --encoding=utf8 为使用utf8编码不然可能会报UnicodeDecodeError: 'gbk' codec can't decode byte 0xae in position 406: illegal multibyte sequence 的错误。
--force 强制执行,当 生成目录下的requirements.txt存在时覆盖。
```
如上环境都已经搭建好了,包括框架依赖包也都安装好了。
@@ -113,8 +128,12 @@ pip install -r requirements.txt
4指定日志收集级别由LOG_LEVEL控制
### 2. 修改全局变量,增加测试数据 `config.settings.py`
1) ENV_VARS["common"]是一些公共参数,如报告标题,报告名称,测试者,测试部门。后续会显示在测试报告上。如果还有其他,可自行添加
2ENV_VARS["test"]是保存test环境的一些测试数据。ENV_VARS["live"]是保存live环境的一些测试数据。如果还有其他环境可以继续增加例如增加ENV_VARS["dev"] = {"host": "", ......}
1确认RunConfig的各项参数可以调整失败重跑次数`rerun` 失败重跑间隔时间`reruns_delay`,当达到最大失败数,停止执行`max_fail`
2确认测试完成后是否发送测试结果由SEND_RESULT_TYPE控制并填充对应邮件/钉钉/企业微信配置信息
3指定日志收集级别由LOG_LEVEL控制
4 配置测试相关数据:
ENV_VARS["common"]是一些公共参数,如报告标题,报告名称,测试者,测试部门。后续会显示在测试报告上。如果还有其他,可自行添加
ENV_VARS["test"]是保存test环境的一些测试数据。ENV_VARS["live"]是保存live环境的一些测试数据。如果还有其他环境可以继续增加例如增加ENV_VARS["dev"] = {"host": "", ......}
### 3. 删除框架中的示例用例数据
1删除 `interface`目录下所有的YAML和EXCEL文件(每一个文件都保存的接口测试用例)
@@ -213,7 +232,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
cookies: response.cookies
```
### 6. 断言方式
### 7. 断言方式
以下是支持的几种断言方式:
| 断言方式 | 说明 |
| ------------ | ------------ |
@@ -234,7 +253,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
| startswith | 以什么开头,判断实际结果是否是以预期结果开头的 |
| endswith | 以什么结尾,判断实际结果是否是以预期结果结尾的 |
### 7. 响应断言说明
### 8. 响应断言说明
#### 断言状态码
如果想要断言接口响应码,直接这样写即可:
参考示例:
@@ -275,7 +294,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
type_jsonpath: $.user_id
```
### 8. 数据库断言说明
### 9. 数据库断言说明
数据库断言的参数说明:
```
断言标识(自定义,不为空即可,没有实际的意义):
@@ -309,7 +328,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
expect_value: ${user_id}
assert_type: contains
```
### 9. 用例依赖说明
### 10. 用例依赖说明
目前用例依赖仅支持接口依赖和环境变量依赖。数据库依赖后续会补充。
依赖的配置说明:
```
@@ -339,7 +358,7 @@ case_dependence: 用例依赖,为空时表示没有依赖
env_page_content: ${generate_paragraph}
```
### 10. Excel用例单独说明
### 11. Excel用例单独说明
框架支持excel多表单自动生成测试用例每一个表单作为一个测试用例模块。
例如:
excel表格名称是test_demo.xlsx
@@ -366,10 +385,11 @@ excel表单2名称是示例模块
测试用例方法test_demo_slmk_auto
## 六、运行自动化测试
### 1. 激活已存在的虚拟环境
### 方式一使用pipenv管理虚拟环境
#### 1. 激活已存在的虚拟环境
- 如果不存在会创建一个pipenv shell (必须在项目根目录下执行)
### 2. 运行
#### 2. 运行
```
在pycharm>terminal或者电脑命令窗口进入项目根路径执行如下命令如果依赖包是安装在虚拟环境中需要先启动虚拟环境
> python run.py 默认在test环境运行测试用例, 生成allure测试报告
@@ -378,6 +398,18 @@ excel表单2名称是示例模块
> python run.py -env=test 在test环境运行测试用例
> python run.py -report=no 在test环境下允许测试用例不生成allure测试报告
```
或者上述步骤可以合并为pipenv run python run.py
### 方式二:依赖包安装在本机
```
在pycharm>terminal或者电脑命令窗口进入项目根路径执行如下命令如果依赖包是安装在虚拟环境中需要先启动虚拟环境
> python run.py 默认在test环境运行测试用例, 生成allure测试报告
> python run.py -m demo 在test环境仅运行打了标记demo用例生成allure测试报告
> python run.py -env live 在live环境运行测试用例
> python run.py -env=test 在test环境运行测试用例
> python run.py -report=no 在test环境下允许测试用例不生成allure测试报告
```
注意:
- 如果pycharm.interpreter拥有了框架所需的所有依赖包可以通过pycharm直接在`run.py`中右键运行
@@ -451,5 +483,5 @@ excel表单2名称是示例模块
## 赞赏
如果这个库有帮助到你并且你很想支持库的后续开发和维护,那么你可以扫描下方二维码随意打赏我,我将不胜感激~<br/>
![打赏](https://www.gitlink.org.cn/api/attachments/437395)
如果这个库有帮助到你并且你很想支持库的后续开发和维护,那么你可以扫描下方二维码随意打赏我,我将不胜感激~
<img src="https://www.gitlink.org.cn/api/attachments/437395" style="width: 150px; height: auto;" />

View File

@@ -16,8 +16,7 @@ from loguru import logger
from config.global_vars import GLOBAL_VARS
from config.path_config import GITLINK_DIR
from utils.report_utils.allure_handle import allure_title
from utils.requests_utils.api_workflow import get_api_data, api_work_flow
from utils.requests_utils.case_dependence import case_dependence_handle
from utils.requests_utils.request_control import RequestControl
@pytest.fixture(scope="function", autouse=True)
@@ -64,8 +63,8 @@ def gitlink_login():
:return:
"""
# 请求登录接口
login_api = get_api_data(os.path.join(GITLINK_DIR, "test_gitlink_login.yaml"), "gitlink_login_01")
res = api_work_flow(login_api, GLOBAL_VARS)
res = RequestControl().api_request_flow(api_file_path=os.path.join(GITLINK_DIR, "test_gitlink_login.yaml"),
key="gitlink_login_01", global_var=GLOBAL_VARS)
GLOBAL_VARS.update(res)
@@ -75,8 +74,7 @@ def get_oauth_token():
获取oauth_token 用于在接口的headers里面传递{AuthorizationBearer {{token}}}
注意oauth_token适用于application通过客户端的方式登录平台。application不同于正常注册平台的用户是没有用户名和密码的。
"""
login_oauth_token_api = get_api_data(os.path.join(GITLINK_DIR, "login_oauth_token.yaml"),
"gitlink_login_oauth_token_01")
res = RequestControl().api_request_flow(api_file_path=os.path.join(GITLINK_DIR, "login_oauth_token.yaml"),
key="gitlink_login_oauth_token_01", global_var=GLOBAL_VARS)
res = api_work_flow(login_oauth_token_api, GLOBAL_VARS)
GLOBAL_VARS.update(res)
GLOBAL_VARS.update(res)

View File

@@ -217,9 +217,11 @@ def gen_case_file(filename, case_template_path, case_common, common_dependence,
common_dependence_template = ['common_dependence = ${common_dependence}\n', '\n', '\n',
'@pytest.fixture(scope="class", autouse=True)\n',
'def background():\n',
' case_dependence_handle(case_dependence=common_dependence.get("setup", None), source=GLOBAL_VARS)\n',
' dependence_results = case_dependence_handle(case_dependence=common_dependence.get("setup", None), source=GLOBAL_VARS)\n',
' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
' yield\n',
' case_dependence_handle(case_dependence=common_dependence.get("teardown", None), source=GLOBAL_VARS)',
' dependence_results = case_dependence_handle(case_dependence=common_dependence.get("teardown", None), source=GLOBAL_VARS)\n',
' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
'\n', '\n']
for idx in reversed(indices):

View File

@@ -7,8 +7,7 @@ import pytest
import allure
# 本地应用/模块导入
from config.global_vars import GLOBAL_VARS
from utils.requests_utils.request_control import RequestPreDataHandle, RequestHandle, after_request_extract
from utils.assertion_utils.assert_control import AssertHandle
from utils.requests_utils.request_control import RequestControl
from utils.requests_utils.case_dependence import case_dependence_handle
# 公共依赖
@@ -22,25 +21,21 @@ class ${class_title}Auto:
@allure.story("${allure_story}")
@pytest.mark.auto
@pytest.mark.parametrize("case", cases, ids=["{}".format(case["title"]) for case in cases])
@pytest.mark.parametrize("case", cases, ids=lambda x: x["title"])
def ${func_title}_auto(self, case):
from loguru import logger
# 前置依赖处理
if case.get("case_dependence"):
case_dependence_handle(case_dependence=case["case_dependence"].get("setup", None), source=GLOBAL_VARS)
# 处理请求前的用例数据
case_data = RequestPreDataHandle(request_data=case, global_var=GLOBAL_VARS).request_data_handle()
# 发送请求
response = RequestHandle(case_data=case_data, global_var=GLOBAL_VARS).http_request()
# 进行响应断言
AssertHandle(assert_data=case_data["assert_response"], response=response).assert_handle()
# 进行数据库断言
AssertHandle(assert_data=case_data["assert_sql"], db_info=GLOBAL_VARS["db_info"]).assert_handle()
# 断言成功后进行参数提取
res = after_request_extract(response, case_data.get("extract", None))
for k, v in res.items():
GLOBAL_VARS[k] = v
if case.get("case_dependence") and case["case_dependence"].get("setup"):
dependence_results = case_dependence_handle(case_dependence=case["case_dependence"]["setup"],
source=GLOBAL_VARS)
GLOBAL_VARS.update(dependence_results if dependence_results else {})
# 处理请求前的用例数据 -> 发送请求 -> 响应/数据库断言 -> 断言成功后进行参数提取
res = RequestControl().api_request_flow(request_data=case, global_var=GLOBAL_VARS)
GLOBAL_VARS.update(res)
# 后置依赖处理
if case.get("case_dependence"):
case_dependence_handle(case_dependence=case["case_dependence"].get("teardown", None), source=GLOBAL_VARS)
if case.get("case_dependence") and case["case_dependence"].get("teardown"):
dependence_results = case_dependence_handle(case_dependence=case["case_dependence"]["teardown"],
source=GLOBAL_VARS)
GLOBAL_VARS.update(dependence_results if dependence_results else {})

View File

@@ -1,79 +0,0 @@
# -*- coding: utf-8 -*-
# @Time : 2023/11/8 16:53
# @Author : floraachy
# @File : api_workflow.py
# @Software: PyCharm
# @Desc:
# 标准库导入
import os
# 第三方库导入
from loguru import logger
# 本地应用/模块导入
from utils.files_utils.yaml_handle import YamlHandle
from utils.requests_utils.request_control import RequestPreDataHandle, RequestHandle, after_request_extract
from utils.files_utils.files_handle import get_files
from utils.data_utils.eval_data_handle import eval_data
from utils.assertion_utils.assert_control import AssertHandle
from config.global_vars import GLOBAL_VARS
def get_api_data(api_file_path: str, key: str = None):
"""
根据指定的yaml文件路径以及key值获取对应的接口
:param:api_file_path 接口yaml文件路径
:param:key 对应接口的id
"""
api_data = []
if os.path.isdir(api_file_path):
logger.debug(f"目标路径是一个目录:{api_file_path}")
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
for api_file in api_files:
api_data.append(YamlHandle(filename=api_file).read_yaml)
elif os.path.isfile(api_file_path):
logger.debug(f"目标路径是一个文件:{api_file_path}")
api_data.append(YamlHandle(filename=api_file_path).read_yaml)
else:
logger.error(f"目标路径错误请检查api_file_path={api_file_path}")
return None
for api in api_data:
matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
if matching_api:
logger.info("\n----------匹配到的api----------\n"
f"类型:{type(matching_api)}"
f"值:{matching_api}\n")
return matching_api
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
logger.warning(f"未找到id为{key}的接口, 返回值是None")
raise Exception(f"未找到id为{key}的接口, 返回值是None")
def api_work_flow(req_data: dict, source: dict) -> dict:
"""
请求过程:请求前用例数据处理,发送请求,断言,参数提取
:param:req_data 接口请求数据
:param:source 全局变量,保存接口相关变量的实际值, 例如接口中的${login}会从source中找到key=login的元素进行替换
"""
logger.debug(f"\n----------------api_work_flow-----------------\n"
f"接口请求数据:{req_data}\n"
f"全局变量:{source}\n")
if req_data:
extract_result = {}
api_data = RequestPreDataHandle(request_data=req_data, global_var=source).request_data_handle()
# 发送请求
response = RequestHandle(case_data=api_data, global_var=source).http_request()
# 进行响应断言
AssertHandle(assert_data=api_data["assert_response"], response=response).assert_handle()
# 断言成功后进行参数提取
res = after_request_extract(response, api_data["extract"])
for k, v in res.items():
extract_result[k] = eval_data(v)
return extract_result
else:
logger.error(f"接口请求数据不能为空!\n"
f"req_data = {req_data}")
raise f"接口请求数据不能为空!\nreq_data = {req_data}"

View File

@@ -9,10 +9,10 @@
import allure
from loguru import logger
# 本地应用/模块导入
from utils.requests_utils.api_workflow import get_api_data, api_work_flow
from utils.requests_utils.request_control import RequestControl
from utils.data_utils.data_handle import data_handle
from config.path_config import INTERFACE_DIR
from config.global_vars import GLOBAL_VARS
from utils.report_utils.allure_handle import allure_step
def case_dependence_handle(case_dependence, source):
@@ -20,41 +20,50 @@ def case_dependence_handle(case_dependence, source):
处理用例依赖支持接口依赖环境变量依赖SQL依赖。关键字interface, sql, env_vars
先处理环境变量依赖再处理接口依赖最后处理SQL依赖
"""
results = {}
if case_dependence is None:
logger.debug(f"跳过用例依赖处理 --> case_dependence={case_dependence}")
return
allure_step(f"跳过用例依赖处理 --> case_dependence={case_dependence}")
return results
# 环境变量处理
if case_dependence.get("env_vars"):
if isinstance(case_dependence["env_vars"], dict):
for key, value in case_dependence["env_vars"].items():
new_value = data_handle(value, GLOBAL_VARS)
new_value = data_handle(value, source)
with allure.step(f"依赖环境变量 --> {key}={new_value}"):
GLOBAL_VARS.update({key: new_value})
results.update({key: new_value})
else:
allure_step(f"依赖环境变量格式错误,跳过依赖环境变量处理~ --> env_vars仅支持dict格式")
logger.warning(f"依赖环境变量格式错误,跳过依赖环境变量处理~ --> env_vars仅支持dict格式")
else:
allure_step(f"依赖环境变量为空,跳过依赖环境变量处理~")
logger.warning(f"依赖环境变量为空,跳过依赖环境变量处理~")
if case_dependence.get("interface"):
interfaces = case_dependence["interface"]
if isinstance(interfaces, str):
api_data = get_api_data(api_file_path=INTERFACE_DIR, key=interfaces)
api_data = RequestControl().get_api_data(api_file_path=INTERFACE_DIR, key=interfaces)
with allure.step(f"依赖接口:{api_data['title']}({interfaces})"):
result = api_work_flow(req_data=api_data, source=source)
GLOBAL_VARS.update(result)
result = RequestControl().api_request_flow(request_data=api_data, global_var=source)
results.update(result)
elif isinstance(interfaces, list):
for interface in interfaces:
api_data = get_api_data(api_file_path=INTERFACE_DIR, key=interface)
api_data = RequestControl().get_api_data(api_file_path=INTERFACE_DIR, key=interface)
with allure.step(f"依赖接口:{api_data['title']}({interface})"):
result = api_work_flow(req_data=api_data, source=source)
GLOBAL_VARS.update(result)
result = RequestControl().api_request_flow(request_data=api_data, global_var=source)
results.update(result)
else:
allure_step(f"依赖接口格式错误,跳过依赖接口处理~ --> interface 仅支持str和list格式")
logger.warning(f"依赖接口格式错误,跳过依赖接口处理~ --> interface 仅支持str和list格式")
else:
allure_step(f"依赖接口为空,跳过依赖接口处理~")
logger.warning(f"依赖接口为空,跳过依赖接口处理~")
# 依赖SQL处理
if case_dependence.get("sql"):
allure_step(f"依赖SQL暂不支持跳过依赖SQL处理~")
logger.warning(f"暂时不支持依赖sql处理后续更新")
pass
return results

View File

@@ -1,97 +1,76 @@
# -*- coding: utf-8 -*-
# @Version: Python 3.9
# @Time : 2023/1/31 14:31
# @Author : chenyinhua
# @File : request_control.py
# @Time : 2024/7/1 9:42
# @Author : floraachy
# @File : request_control
# @Software: PyCharm
# @Desc: 处理request请求前后的用例数据
# @Desc:
# 标准库导入
import json
import os
import http.cookiejar
import copy
import time
# 第三方库导入
from requests import Response
from loguru import logger
# 本地应用/模块导入
from config.path_config import FILES_DIR
from utils.requests_utils.base_request import BaseRequest
from utils.data_utils.data_handle import data_handle
from utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
from utils.files_utils.files_handle import get_files
from utils.files_utils.yaml_handle import YamlHandle
from utils.assertion_utils.assert_control import AssertHandle
from utils.report_utils.allure_handle import allure_step
from config.path_config import FILES_DIR
# ---------------------------------------- 请求前的数据处理----------------------------------------#
class RequestPreDataHandle:
class RequestControl(BaseRequest):
"""
请求前处理用例数据
进行请求,请求后的参数提取处理
"""
def __init__(self, request_data: dict, global_var: dict):
logger.debug(f"\n======================================================\n" \
"-------------Start处理用例数据前--------------------\n"
f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
f"请求域名(host): {type(request_data.get('host', None))} || {request_data.get('host', None)}\n" \
f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n" \
"=====================================================")
self.request_data = copy.deepcopy(request_data)
self.global_var = global_var
def request_data_handle(self):
# --------------------从接口池中获取接口请求数据--------------------
def get_api_data(self, api_file_path: str, key: str = None):
"""
针对用例数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
根据指定的yaml文件路径以及key值获取对应的接口
:param:api_file_path 接口yaml文件路径
:param:key 对应接口的id
"""
self.url_handle()
self.method_handle()
self.headers_handle()
self.cookies_handle()
self.payload_handle()
self.files_handle()
self.wait_seconds_handle()
self.assert_handle()
self.extract_handle()
logger.debug(f"\n======================================================\n" \
"-------------End处理用例数据后--------------------\n"
f"用例标题(title): {type(self.request_data.get('title', None))} || {self.request_data.get('title', None)}\n" \
f"用例优先级(severity): {type(self.request_data.get('severity', None))} || {self.request_data.get('severity', None)}\n" \
f"请求路径(url): {type(self.request_data.get('url', None))} || {self.request_data.get('url', None)}\n" \
f"请求方式(method): {type(self.request_data.get('method', None))} || {self.request_data.get('method', None)}\n" \
f"请求头(headers): {type(self.request_data.get('headers', None))} || {self.request_data.get('headers', None)}\n" \
f"请求cookies: {type(self.request_data.get('cookies', None))} || {self.request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(self.request_data.get('request_type', None))} || {self.request_data.get('request_type', None)}\n" \
f"请求参数(payload): {type(self.request_data.get('payload', None))} || {self.request_data.get('payload', None)}\n" \
f"请求文件(files): {type(self.request_data.get('files', None))} || {self.request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(self.request_data.get('wait_seconds', None))} || {self.request_data.get('wait_seconds', None)}\n" \
f"响应断言(assert_response): {type(self.request_data.get('assert_response', None))} || {self.request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(self.request_data.get('assert_sql', None))} || {self.request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(self.request_data.get('extract', None))} || {self.request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(self.request_data.get('case_dependence', None))} || {self.request_data.get('case_dependence', None)}\n" \
"=====================================================")
return self.request_data
api_data = []
if os.path.isdir(api_file_path):
logger.debug(f"目标路径是一个目录:{api_file_path}")
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
for api_file in api_files:
api_data.append(YamlHandle(filename=api_file).read_yaml)
elif os.path.isfile(api_file_path):
logger.debug(f"目标路径是一个文件:{api_file_path}")
api_data.append(YamlHandle(filename=api_file_path).read_yaml)
def url_handle(self):
else:
logger.error(f"目标路径错误请检查api_file_path={api_file_path}")
return None
for api in api_data:
matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
if matching_api:
logger.info("\n----------匹配到的api----------\n"
f"类型:{type(matching_api)}"
f"值:{matching_api}\n")
return matching_api
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
logger.warning(f"未找到id为{key}的接口, 返回值是None")
raise Exception(f"未找到id为{key}的接口, 返回值是None")
# ---------- 请求之前进行数据处理 --------------------------#
@staticmethod
def url_handle(url: str, source: dict = None):
"""
用例数据中获取到的url(一般是不带host的个别特殊的带有host则不进行处理)
"""
# 检测url中是否存在需要替换的参数如果存在则进行替换
url = data_handle(obj=self.request_data.get("url", None), source=self.global_var)
url = data_handle(obj=url, source=source)
# 进行url处理最终得到full_url
host = self.global_var.get("host", "")
host = source.get("host", "")
# 从用例数据中获取url如果键url不存在则返回空字符串
# 如果url是以http开头的则直接使用该url不与host进行拼接
if url.lower().startswith("http"):
@@ -109,103 +88,132 @@ class RequestPreDataHandle:
else:
# 如果host不以/结尾 且 url不以/开头则将host和url拼接起来的时候增加/组成新的url
full_url = host + "/" + url
self.request_data["url"] = full_url
return full_url
def method_handle(self):
# TODO 暂时不需要处理,后续有需要在处理
pass
def cookies_handle(self):
@staticmethod
def cookies_handle(cookies, source: dict = None):
"""
requests模块中cookies参数要求是Dict or CookieJar object
"""
cookies = self.request_data.get("cookies", None)
# 从用例数据中获取cookies 处理cookies
if cookies:
# 从用例数据中获取cookies 处理cookies
# 通过全局变量替换cookies得到的是一个str类型
cookies = data_handle(obj=cookies, source=self.global_var)
cookies = data_handle(obj=cookies, source=source)
try:
cookies = json.loads(cookies)
except Exception as e:
cookies = cookies
if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar):
self.request_data["cookies"] = cookies
return cookies
else:
logger.error(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
raise TypeError(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
def headers_handle(self):
@staticmethod
def headers_handle(headers: dict, source: dict = None):
"""
headers里面传cookies要求cookies类型是str
"""
headers = self.request_data.get("headers", None)
# 从用例数据中获取header 处理header
if headers:
self.request_data["headers"] = data_handle(obj=headers, source=self.global_var)
# 从用例数据中获取header 处理header
headers = data_handle(obj=headers, source=source)
# 如果请求头中有cookies需要进行单独处理
if self.request_data["headers"].get("cookies", None):
cookies = self.request_data["headers"]["cookies"]
if headers.get("cookies", None):
cookies = headers["cookies"]
if isinstance(cookies, dict):
# 如果是字典类型,就转成字符串
self.request_data["headers"]["cookies"] = json.dumps(cookies)
headers["cookies"] = json.dumps(cookies)
else:
self.request_data["headers"]["cookies"] = cookies
headers["cookies"] = cookies
return headers
def payload_handle(self):
# 处理请求参数payload
payload = self.request_data.get("payload", None)
if payload:
self.request_data["payload"] = data_handle(obj=payload, source=self.global_var)
def files_handle(self):
@staticmethod
def files_handle(files: str, source: dict = None):
"""
格式:接口中文件参数的名称:"文件路径地址"
例如:{"file": "demo_test_demo.py"}
"""
# 处理请求参数files参数
files = self.request_data.get("files", None)
if files:
# 处理请求参数files参数
# 支持文件传递${}关键字将使用data_handle进行处理
files = data_handle(obj=files, source=self.global_var)
files = data_handle(obj=files, source=source)
# 将文件处理成绝对路径
self.request_data["files"] = os.path.join(FILES_DIR, files)
return os.path.join(FILES_DIR, files)
def wait_seconds_handle(self):
@staticmethod
def wait_seconds_handle(wait_seconds):
"""
处理等待时间参数如果不能转为int类型则认为是none
"""
wait_seconds = self.request_data.get("wait_seconds", None)
try:
self.request_data["wait_seconds"] = int(wait_seconds)
return int(wait_seconds)
except:
self.request_data["wait_seconds"] = None
def assert_handle(self):
# 处理响应断言参数
assert_response = self.request_data.get("assert_response", None)
if assert_response:
self.request_data["assert_response"] = data_handle(obj=assert_response, source=self.global_var)
# 由于数据库断言里面的变量需要请求响应后进行提取,因此目前不进行处理
return None
def extract_handle(self):
# 处理提取参数
extract = self.request_data.get("extract", None)
if extract:
self.request_data["extract"] = data_handle(obj=extract, source=self.global_var)
def before_request(self, request_data: dict, source_data: dict = None):
"""
针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
"""
try:
logger.debug(f"\n======================================================\n" \
"-------------用例数据处理前--------------------\n"
f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
new_request_data = {
"id": request_data.get("id"),
"severity": request_data.get("severity"),
"title": request_data.get("title"),
"url": self.url_handle(url=request_data.get("url"), source=source_data),
"method": request_data.get("method"),
"headers": self.headers_handle(headers=request_data.get("headers"), source=source_data),
"cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data),
"request_type": request_data.get("request_type"),
"files": self.files_handle(files=request_data.get("files"), source=source_data),
"wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
"payload": data_handle(obj=request_data["payload"], source=source_data),
"assert_response": data_handle(obj=request_data.get("assert_response"), source=source_data),
"assert_sql": request_data.get("assert_sql"),
"extract": data_handle(obj=request_data.get("extract"), source=source_data),
"case_dependence": request_data.get("case_dependence"),
}
# ---------------------------------------- 进行请求,请求后的参数提取处理----------------------------------------#
class RequestHandle:
"""
进行请求,请求后的参数提取处理
"""
def __init__(self, case_data: dict, global_var: dict):
self.case_data = case_data
self.global_var = global_var
logger.debug("\n-------------用例数据处理--------------------\n"
f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \
f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \
f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \
f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \
f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \
f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
f"响应断言(assert_response): {type(new_request_data.get('assert_response', None))} || {new_request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
"=====================================================")
return new_request_data
except Exception as e:
logger.error(f"接口数据处理异常:{e}")
raise f"接口数据处理异常:\n{e}"
@classmethod
def api_step_record(cls, **kwargs) -> None:
@@ -257,79 +265,91 @@ class RequestHandle:
allure_step(f"响应结果: {response_result}")
allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms")
def http_request(self):
@staticmethod
def after_request(response: Response, extract):
"""
发送请求并进行后置参数提取操作
从响应数据中提取请求后的参数,并保存到全局变量中
:param response: playwright APIResponse 响应对象
:param extract: 需要提取的参数字典 '{"k1": "$.data"}''{"k1": "data:(.*?)$"}'
:return:
"""
response = BaseRequest.send_request(self.case_data)
# 根据配置,增加接口请求等待时间。适应部分调用调用后,需要进行内置数据处理的问题
logger.debug(f"开始等待")
if self.case_data["wait_seconds"]:
time.sleep(self.case_data["wait_seconds"])
logger.debug(f"结束等待")
self.case_data["status_code"] = response.status_code
self.case_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
self.case_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
logger.info(f"断言成功后参数提取表达式extract: {extract}")
json_result = {}
re_result = {}
response_result = {}
if extract:
if extract.get("type_jsonpath"):
# 如果响应数据是json格式则将按照json方式对后置提取参数进行处理
res = response.json()
for k, v in extract["type_jsonpath"].items():
json_result[k] = json_extractor(res, v)
logger.debug(f"--从response.json()中通过jsonpath方式提取到的结果 --> {json_result}")
if extract.get("type_re"):
# 如果响应数据是str格式则将按照str方式对后置提取参数进行处理
res = response.text
for k, v in extract["type_re"].items():
re_result[k] = data_handle(obj=re_extract(res, v))
logger.debug(f"--从response.text中通过正则表达式提取到的结果 --> {re_result} ")
if extract.get("type_response"):
for k, v in extract["type_response"].items():
response_result[k] = response_extract(response, v)
logger.debug(f"--从response中提取到的结果 --> {re_result}")
result = {**json_result, **re_result, **response_result}
logger.info(f"--参数提取结果extract --> {result} --")
allure_step(f"参数提取结果extract{result}")
return result
# -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 --------------
def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None,
key: str = None):
"""
发送请求并进行后置参数提取操作。
request_data参数 与 api_file_pathkey 这两个必须传递其中一个
:param request_data: 请求数据字典,包含请求所需的所有信息。
:param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${}
:param api_file_path: 接口所在的目录或者文件路径
:param key: 接口的ID
:return: 接口请求数据以及从接口响应提取的参数,字典。
:raises ValueError: 如果请求数据无效或缺失。
"""
# 初始化一个变量保存接口请求参数payload以及通过extract提取的参数
save_api_data = {}
if request_data:
api_info = request_data
elif api_file_path and key:
api_info = self.get_api_data(api_file_path=api_file_path, key=key)
else:
logger.error("请求数据异常")
raise ValueError("请求数据异常")
new_api_data = self.before_request(request_data=api_info, source_data=global_var)
response = self.send_request(new_api_data)
new_api_data["status_code"] = response.status_code
new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
try:
self.case_data["response_result"] = response.json()
new_api_data["response_result"] = response.json()
except:
self.case_data["response_result"] = response.text
new_api_data["response_result"] = response.text()
self.api_step_record(**self.case_data)
self.api_step_record(**new_api_data)
# 进行响应断言
AssertHandle(assert_data=new_api_data["assert_response"], response=response).assert_handle()
# 处理数据库断言 - 从全局变量中获取最新值,替换数据库断言中的参数
if self.case_data.get('assert_sql', None):
self.case_data["assert_sql"] = data_handle(obj=self.case_data["assert_sql"], source=self.global_var)
# 处理请求里面的files使得日志以及allure中写入的是文件而不是文件二进制内容
if self.case_data.get('files', None):
files = self.case_data["files"]
if isinstance(files, dict):
dict_values = list(files.values())[0]
_file = os.path.join(FILES_DIR, dict_values[0])
logger.info(
f"\n请求文件(files): {type(_file)} || {_file}\n")
return response
# 进行响应参数提取,并保存提取后的数据
extract_results = self.after_request(response=response, extract=new_api_data.get("extract"))
save_api_data.update(extract_results)
# ---------------------------------------- 请求后的参数提取处理----------------------------------------#
def after_request_extract(response: Response, extract):
"""
从响应数据中提取请求后的参数,并保存到全局变量中
:param response: request 响应对象
:param extract: 需要提取的参数字典 '{"k1": "$.data"}''{"k1": "data:(.*?)$"}'
:return:
"""
logger.debug(f"\n================================================================================\n" \
"-------------Start: 提取表达式--------------------\n"
f"后置提取参数(原): {extract}\n")
json_result = {}
re_result = {}
response_result = {}
if extract:
if extract.get("type_jsonpath"):
# 如果响应数据是json格式则将按照json方式对后置提取参数进行处理
res = response.json()
for k, v in extract["type_jsonpath"].items():
json_result[k] = json_extractor(res, v)
logger.debug("\n-------------从response.json()中通过jsonpath方式提取到的结果--------------------\n"
f"后置提取参数(新): {json_result}\n")
if extract.get("type_re"):
# 如果响应数据是str格式则将按照str方式对后置提取参数进行处理
res = response.text
for k, v in extract["type_re"].items():
re_result[k] = data_handle(obj=re_extract(res, v))
logger.debug("\n-------------从response.text中通过正则表达式提取到的结果--------------------\n"
f"后置提取参数(新): {re_result}\n")
if extract.get("type_response"):
for k, v in extract["type_response"].items():
response_result[k] = response_extract(response, v)
logger.debug("-------------从response中提取到的结果--------------------\n"
f"后置提取参数(新): {re_result}\n")
result = {**json_result, **re_result, **response_result}
logger.info("\n-------------End所有提取到的结果--------------------\n"
f"后置提取参数(新): {result}\n" \
"================================================================================")
allure_step(f"参数提取结果:{result}")
return result
# 将接口请求参数payload的值保存到save_api_data中
save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {})
logger.debug(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
allure_step(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
return save_api_data