diff --git a/Pipfile b/Pipfile
index 80c2d0c..8ebc376 100644
--- a/Pipfile
+++ b/Pipfile
@@ -20,6 +20,7 @@ xpinyin = "==0.7.6"
yagmail = "==0.15.293"
pytest-repeat = "==0.9.3"
jsonpath = "*"
+pytest = "==8.0.2"
[dev-packages]
diff --git a/README.md b/README.md
index 10e1a77..2e9a0d1 100644
--- a/README.md
+++ b/README.md
@@ -46,22 +46,24 @@
## 三、依赖库
```
-allure-pytest = "==2.9.45"
-click = "==8.1.7"
-faker = "==21.0.0"
-loguru = "==0.7.2"
-openpyxl = "==3.1.2"
-pydantic = "==2.5.2"
-pymysql = "==1.1.0"
-pytest-rerunfailures = "==12.0"
-pyyaml = "==6.0.1"
-requests-toolbelt = "==1.0.0"
-"ruamel.yaml" = "==0.18.5"
-sshtunnel = "==0.4.0"
-xpinyin = "==0.7.6"
-yagmail = "==0.15.293"
-pytest-repeat = "==0.9.3"
-jsonpath = "*"
+allure_python_commons==2.9.45
+click==8.1.7
+Faker==21.0.0
+jsonpath==0.82.2
+loguru==0.7.2
+openpyxl==3.1.2
+pydantic==2.7.1
+PyMySQL==1.1.0
+pytest==8.0.2 Pytest 8.2.0 和 8.1.0 的几个版本会破坏 allure 的 listener 导致执行报错,也就是版本不兼容,需把pytest 回退到8.0.2就不报错啦!
+PyYAML==6.0.1
+PyYAML==6.0.1
+Requests==2.31.0
+requests_toolbelt==1.0.0
+ruamel.base==1.0.0
+sshtunnel==0.4.0
+xpinyin==0.7.6
+yagmail==0.15.293
+
```
@@ -74,24 +76,37 @@ https://gitlink.org.cn/floraachy/apiautotest.git
2. 本地电脑搭建好 python环境,我使用的python版本是3.9。包括allure测试报告所需的java环境(安装jdk)。
-3. 安装环境依赖包
-1) 方法一:使用pipenv管理所有依赖
+3. 安装依赖包
+方法一:使用pipenv管理依赖包
```
-# 安装pipenv, 这个需要全局安装。建议在项目根目录下执行命令安装
+注意: 如果不熟悉pipenv, 可以跳过该步骤。
+```
+
+1) 安装pipenv
+```
+# 建议在项目根目录下执行命令安装
pip install pipenv
-
-# 使用pipenv管理安装环境依赖包(必须在项目根目录下执行)。
-pipenv install
-
-# 注意:使用pipenv install会自动安装Pipfile里面的依赖包,该依赖包仅安装在虚拟环境里,不安装在测试机(本机)。
-
```
-2方法二:使用requirements.txt一键安装所有依赖
+2) 使用pipenv管理安装环境依赖包:pipenv install (必须在项目根目录下执行)
+```
+ 注意:使用pipenv install会自动安装Pipfile里面的依赖包,该依赖包仅安装在虚拟环境里,不安装在测试机。
+```
+
+方法二:直接将依赖包安装在本机
```
pip install -r requirements.txt
+```
-# 注意:这种方式安装,会将依赖包全部安装在测试机(本机)
+扩展: 使用 pipreqs 生成 requirements.txt
+```
+# 安装
+pip install pipreqs
+# 在当前目录生成
+pipreqs . --encoding=utf8 --force
+注意 --encoding=utf8 为使用utf8编码,不然可能会报UnicodeDecodeError: 'gbk' codec can't decode byte 0xae in position 406: illegal multibyte sequence 的错误。
+
+--force 强制执行,当 生成目录下的requirements.txt存在时覆盖。
```
如上环境都已经搭建好了,包括框架依赖包也都安装好了。
@@ -113,8 +128,12 @@ pip install -r requirements.txt
4)指定日志收集级别,由LOG_LEVEL控制
### 2. 修改全局变量,增加测试数据 `config.settings.py`
-1) ENV_VARS["common"]是一些公共参数,如报告标题,报告名称,测试者,测试部门。后续会显示在测试报告上。如果还有其他,可自行添加
-2)ENV_VARS["test"]是保存test环境的一些测试数据。ENV_VARS["live"]是保存live环境的一些测试数据。如果还有其他环境可以继续增加,例如增加ENV_VARS["dev"] = {"host": "", ......}
+1)确认RunConfig的各项参数,可以调整失败重跑次数`rerun`, 失败重跑间隔时间`reruns_delay`,当达到最大失败数,停止执行`max_fail`
+2)确认测试完成后是否发送测试结果,由SEND_RESULT_TYPE控制,并填充对应邮件/钉钉/企业微信配置信息
+3)指定日志收集级别,由LOG_LEVEL控制
+4) 配置测试相关数据:
+ENV_VARS["common"]是一些公共参数,如报告标题,报告名称,测试者,测试部门。后续会显示在测试报告上。如果还有其他,可自行添加
+ENV_VARS["test"]是保存test环境的一些测试数据。ENV_VARS["live"]是保存live环境的一些测试数据。如果还有其他环境可以继续增加,例如增加ENV_VARS["dev"] = {"host": "", ......}
### 3. 删除框架中的示例用例数据
1)删除 `interface`目录下所有的YAML和EXCEL文件(每一个文件都保存的接口测试用例)
@@ -213,7 +232,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
cookies: response.cookies
```
-### 6. 断言方式
+### 7. 断言方式
以下是支持的几种断言方式:
| 断言方式 | 说明 |
| ------------ | ------------ |
@@ -234,7 +253,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
| startswith | 以什么开头,判断实际结果是否是以预期结果开头的 |
| endswith | 以什么结尾,判断实际结果是否是以预期结果结尾的 |
-### 7. 响应断言说明
+### 8. 响应断言说明
#### 断言状态码
如果想要断言接口响应码,直接这样写即可:
参考示例:
@@ -275,7 +294,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
type_jsonpath: $.user_id
```
-### 8. 数据库断言说明
+### 9. 数据库断言说明
数据库断言的参数说明:
```
断言标识(自定义,不为空即可,没有实际的意义):
@@ -309,7 +328,7 @@ case_info: # 具体的用例数据,是以列表的形式进行管理
expect_value: ${user_id}
assert_type: contains
```
-### 9. 用例依赖说明
+### 10. 用例依赖说明
目前用例依赖仅支持接口依赖和环境变量依赖。数据库依赖后续会补充。
依赖的配置说明:
```
@@ -339,7 +358,7 @@ case_dependence: 用例依赖,为空时表示没有依赖
env_page_content: ${generate_paragraph}
```
-### 10. Excel用例单独说明
+### 11. Excel用例单独说明
框架支持excel多表单自动生成测试用例,每一个表单作为一个测试用例模块。
例如:
excel表格名称是:test_demo.xlsx
@@ -366,10 +385,11 @@ excel表单2名称是:示例模块
测试用例方法:test_demo_slmk_auto
## 六、运行自动化测试
-### 1. 激活已存在的虚拟环境
+### 方式一:使用pipenv管理虚拟环境
+#### 1. 激活已存在的虚拟环境
- (如果不存在会创建一个):pipenv shell (必须在项目根目录下执行)
-### 2. 运行
+#### 2. 运行
```
在pycharm>terminal或者电脑命令窗口,进入项目根路径,执行如下命令(如果依赖包是安装在虚拟环境中,需要先启动虚拟环境)。
> python run.py 默认在test环境运行测试用例, 生成allure测试报告
@@ -378,6 +398,18 @@ excel表单2名称是:示例模块
> python run.py -env=test 在test环境运行测试用例
> python run.py -report=no 在test环境下允许测试用例,不生成allure测试报告
```
+或者上述步骤可以合并为:pipenv run python run.py
+
+### 方式二:依赖包安装在本机
+```
+在pycharm>terminal或者电脑命令窗口,进入项目根路径,执行如下命令(如果依赖包是安装在虚拟环境中,需要先启动虚拟环境)。
+ > python run.py 默认在test环境运行测试用例, 生成allure测试报告
+ > python run.py -m demo 在test环境仅运行打了标记demo用例,生成allure测试报告
+ > python run.py -env live 在live环境运行测试用例
+ > python run.py -env=test 在test环境运行测试用例
+ > python run.py -report=no 在test环境下允许测试用例,不生成allure测试报告
+```
+
注意:
- 如果pycharm.interpreter拥有了框架所需的所有依赖包,可以通过pycharm直接在`run.py`中右键运行
@@ -451,5 +483,5 @@ excel表单2名称是:示例模块
## 赞赏
-如果这个库有帮助到你并且你很想支持库的后续开发和维护,那么你可以扫描下方二维码随意打赏我,我将不胜感激~
-
\ No newline at end of file
+如果这个库有帮助到你并且你很想支持库的后续开发和维护,那么你可以扫描下方二维码随意打赏我,我将不胜感激~
+
\ No newline at end of file
diff --git a/test_case/conftest.py b/test_case/conftest.py
index f20d767..d070c0d 100644
--- a/test_case/conftest.py
+++ b/test_case/conftest.py
@@ -16,8 +16,7 @@ from loguru import logger
from config.global_vars import GLOBAL_VARS
from config.path_config import GITLINK_DIR
from utils.report_utils.allure_handle import allure_title
-from utils.requests_utils.api_workflow import get_api_data, api_work_flow
-from utils.requests_utils.case_dependence import case_dependence_handle
+from utils.requests_utils.request_control import RequestControl
@pytest.fixture(scope="function", autouse=True)
@@ -64,8 +63,8 @@ def gitlink_login():
:return:
"""
# 请求登录接口
- login_api = get_api_data(os.path.join(GITLINK_DIR, "test_gitlink_login.yaml"), "gitlink_login_01")
- res = api_work_flow(login_api, GLOBAL_VARS)
+ res = RequestControl().api_request_flow(api_file_path=os.path.join(GITLINK_DIR, "test_gitlink_login.yaml"),
+ key="gitlink_login_01", global_var=GLOBAL_VARS)
GLOBAL_VARS.update(res)
@@ -75,8 +74,7 @@ def get_oauth_token():
获取oauth_token, 用于在接口的headers里面传递{Authorization:Bearer {{token}}}
注意:oauth_token适用于application通过客户端的方式登录平台。application不同于正常注册平台的用户,是没有用户名和密码的。
"""
- login_oauth_token_api = get_api_data(os.path.join(GITLINK_DIR, "login_oauth_token.yaml"),
- "gitlink_login_oauth_token_01")
+ res = RequestControl().api_request_flow(api_file_path=os.path.join(GITLINK_DIR, "login_oauth_token.yaml"),
+ key="gitlink_login_oauth_token_01", global_var=GLOBAL_VARS)
- res = api_work_flow(login_oauth_token_api, GLOBAL_VARS)
- GLOBAL_VARS.update(res)
\ No newline at end of file
+ GLOBAL_VARS.update(res)
diff --git a/utils/case_generate_utils/case_fun_generate.py b/utils/case_generate_utils/case_fun_generate.py
index 197bf7c..45dc891 100644
--- a/utils/case_generate_utils/case_fun_generate.py
+++ b/utils/case_generate_utils/case_fun_generate.py
@@ -217,9 +217,11 @@ def gen_case_file(filename, case_template_path, case_common, common_dependence,
common_dependence_template = ['common_dependence = ${common_dependence}\n', '\n', '\n',
'@pytest.fixture(scope="class", autouse=True)\n',
'def background():\n',
- ' case_dependence_handle(case_dependence=common_dependence.get("setup", None), source=GLOBAL_VARS)\n',
+ ' dependence_results = case_dependence_handle(case_dependence=common_dependence.get("setup", None), source=GLOBAL_VARS)\n',
+ ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
' yield\n',
- ' case_dependence_handle(case_dependence=common_dependence.get("teardown", None), source=GLOBAL_VARS)',
+ ' dependence_results = case_dependence_handle(case_dependence=common_dependence.get("teardown", None), source=GLOBAL_VARS)\n',
+ ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
'\n', '\n']
for idx in reversed(indices):
diff --git a/utils/case_generate_utils/case_template.txt b/utils/case_generate_utils/case_template.txt
index 75a9ed9..5d674da 100644
--- a/utils/case_generate_utils/case_template.txt
+++ b/utils/case_generate_utils/case_template.txt
@@ -7,8 +7,7 @@ import pytest
import allure
# 本地应用/模块导入
from config.global_vars import GLOBAL_VARS
-from utils.requests_utils.request_control import RequestPreDataHandle, RequestHandle, after_request_extract
-from utils.assertion_utils.assert_control import AssertHandle
+from utils.requests_utils.request_control import RequestControl
from utils.requests_utils.case_dependence import case_dependence_handle
# 公共依赖
@@ -22,25 +21,21 @@ class ${class_title}Auto:
@allure.story("${allure_story}")
@pytest.mark.auto
- @pytest.mark.parametrize("case", cases, ids=["{}".format(case["title"]) for case in cases])
+ @pytest.mark.parametrize("case", cases, ids=lambda x: x["title"])
def ${func_title}_auto(self, case):
+ from loguru import logger
# 前置依赖处理
- if case.get("case_dependence"):
- case_dependence_handle(case_dependence=case["case_dependence"].get("setup", None), source=GLOBAL_VARS)
- # 处理请求前的用例数据
- case_data = RequestPreDataHandle(request_data=case, global_var=GLOBAL_VARS).request_data_handle()
- # 发送请求
- response = RequestHandle(case_data=case_data, global_var=GLOBAL_VARS).http_request()
- # 进行响应断言
- AssertHandle(assert_data=case_data["assert_response"], response=response).assert_handle()
- # 进行数据库断言
- AssertHandle(assert_data=case_data["assert_sql"], db_info=GLOBAL_VARS["db_info"]).assert_handle()
- # 断言成功后进行参数提取
- res = after_request_extract(response, case_data.get("extract", None))
- for k, v in res.items():
- GLOBAL_VARS[k] = v
+ if case.get("case_dependence") and case["case_dependence"].get("setup"):
+ dependence_results = case_dependence_handle(case_dependence=case["case_dependence"]["setup"],
+ source=GLOBAL_VARS)
+ GLOBAL_VARS.update(dependence_results if dependence_results else {})
+ # 处理请求前的用例数据 -> 发送请求 -> 响应/数据库断言 -> 断言成功后进行参数提取
+ res = RequestControl().api_request_flow(request_data=case, global_var=GLOBAL_VARS)
+ GLOBAL_VARS.update(res)
# 后置依赖处理
- if case.get("case_dependence"):
- case_dependence_handle(case_dependence=case["case_dependence"].get("teardown", None), source=GLOBAL_VARS)
+ if case.get("case_dependence") and case["case_dependence"].get("teardown"):
+ dependence_results = case_dependence_handle(case_dependence=case["case_dependence"]["teardown"],
+ source=GLOBAL_VARS)
+ GLOBAL_VARS.update(dependence_results if dependence_results else {})
diff --git a/utils/requests_utils/api_workflow.py b/utils/requests_utils/api_workflow.py
deleted file mode 100644
index 3fa9b9a..0000000
--- a/utils/requests_utils/api_workflow.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/8 16:53
-# @Author : floraachy
-# @File : api_workflow.py
-# @Software: PyCharm
-# @Desc:
-
-
-# 标准库导入
-import os
-# 第三方库导入
-from loguru import logger
-# 本地应用/模块导入
-from utils.files_utils.yaml_handle import YamlHandle
-from utils.requests_utils.request_control import RequestPreDataHandle, RequestHandle, after_request_extract
-from utils.files_utils.files_handle import get_files
-from utils.data_utils.eval_data_handle import eval_data
-from utils.assertion_utils.assert_control import AssertHandle
-from config.global_vars import GLOBAL_VARS
-
-
-def get_api_data(api_file_path: str, key: str = None):
- """
- 根据指定的yaml文件路径,以及key值,获取对应的接口
- :param:api_file_path 接口yaml文件路径
- :param:key 对应接口的id
- """
- api_data = []
- if os.path.isdir(api_file_path):
- logger.debug(f"目标路径是一个目录:{api_file_path}")
- api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
- for api_file in api_files:
- api_data.append(YamlHandle(filename=api_file).read_yaml)
- elif os.path.isfile(api_file_path):
- logger.debug(f"目标路径是一个文件:{api_file_path}")
- api_data.append(YamlHandle(filename=api_file_path).read_yaml)
-
- else:
- logger.error(f"目标路径错误,请检查!api_file_path={api_file_path}")
- return None
-
- for api in api_data:
- matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
- if matching_api:
- logger.info("\n----------匹配到的api----------\n"
- f"类型:{type(matching_api)}"
- f"值:{matching_api}\n")
- return matching_api
-
- # 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
- logger.warning(f"未找到id为{key}的接口, 返回值是None")
- raise Exception(f"未找到id为{key}的接口, 返回值是None")
-
-
-def api_work_flow(req_data: dict, source: dict) -> dict:
- """
- 请求过程:请求前用例数据处理,发送请求,断言,参数提取
- :param:req_data 接口请求数据
- :param:source 全局变量,保存接口相关变量的实际值, 例如接口中的${login},会从source中找到key=login的元素进行替换
- """
- logger.debug(f"\n----------------api_work_flow-----------------\n"
- f"接口请求数据:{req_data}\n"
- f"全局变量:{source}\n")
- if req_data:
- extract_result = {}
- api_data = RequestPreDataHandle(request_data=req_data, global_var=source).request_data_handle()
- # 发送请求
- response = RequestHandle(case_data=api_data, global_var=source).http_request()
- # 进行响应断言
- AssertHandle(assert_data=api_data["assert_response"], response=response).assert_handle()
- # 断言成功后进行参数提取
- res = after_request_extract(response, api_data["extract"])
- for k, v in res.items():
- extract_result[k] = eval_data(v)
- return extract_result
- else:
- logger.error(f"接口请求数据不能为空!\n"
- f"req_data = {req_data}")
- raise f"接口请求数据不能为空!\nreq_data = {req_data}"
diff --git a/utils/requests_utils/case_dependence.py b/utils/requests_utils/case_dependence.py
index f1b3dfb..fa1e9b9 100644
--- a/utils/requests_utils/case_dependence.py
+++ b/utils/requests_utils/case_dependence.py
@@ -9,10 +9,10 @@
import allure
from loguru import logger
# 本地应用/模块导入
-from utils.requests_utils.api_workflow import get_api_data, api_work_flow
+from utils.requests_utils.request_control import RequestControl
from utils.data_utils.data_handle import data_handle
from config.path_config import INTERFACE_DIR
-from config.global_vars import GLOBAL_VARS
+from utils.report_utils.allure_handle import allure_step
def case_dependence_handle(case_dependence, source):
@@ -20,41 +20,50 @@ def case_dependence_handle(case_dependence, source):
处理用例依赖,支持接口依赖,环境变量依赖,SQL依赖。关键字:interface, sql, env_vars
先处理环境变量依赖,再处理接口依赖,最后处理SQL依赖
"""
+ results = {}
if case_dependence is None:
logger.debug(f"跳过用例依赖处理 --> case_dependence={case_dependence}")
- return
+ allure_step(f"跳过用例依赖处理 --> case_dependence={case_dependence}")
+ return results
# 环境变量处理
if case_dependence.get("env_vars"):
if isinstance(case_dependence["env_vars"], dict):
for key, value in case_dependence["env_vars"].items():
- new_value = data_handle(value, GLOBAL_VARS)
+ new_value = data_handle(value, source)
with allure.step(f"依赖环境变量 --> {key}={new_value}"):
- GLOBAL_VARS.update({key: new_value})
+ results.update({key: new_value})
else:
+ allure_step(f"依赖环境变量格式错误,跳过依赖环境变量处理~ --> env_vars仅支持dict格式")
logger.warning(f"依赖环境变量格式错误,跳过依赖环境变量处理~ --> env_vars仅支持dict格式")
else:
+ allure_step(f"依赖环境变量为空,跳过依赖环境变量处理~")
logger.warning(f"依赖环境变量为空,跳过依赖环境变量处理~")
if case_dependence.get("interface"):
interfaces = case_dependence["interface"]
if isinstance(interfaces, str):
- api_data = get_api_data(api_file_path=INTERFACE_DIR, key=interfaces)
+ api_data = RequestControl().get_api_data(api_file_path=INTERFACE_DIR, key=interfaces)
with allure.step(f"依赖接口:{api_data['title']}({interfaces})"):
- result = api_work_flow(req_data=api_data, source=source)
- GLOBAL_VARS.update(result)
+ result = RequestControl().api_request_flow(request_data=api_data, global_var=source)
+ results.update(result)
elif isinstance(interfaces, list):
for interface in interfaces:
- api_data = get_api_data(api_file_path=INTERFACE_DIR, key=interface)
+ api_data = RequestControl().get_api_data(api_file_path=INTERFACE_DIR, key=interface)
with allure.step(f"依赖接口:{api_data['title']}({interface})"):
- result = api_work_flow(req_data=api_data, source=source)
- GLOBAL_VARS.update(result)
+ result = RequestControl().api_request_flow(request_data=api_data, global_var=source)
+ results.update(result)
else:
+ allure_step(f"依赖接口格式错误,跳过依赖接口处理~ --> interface 仅支持str和list格式")
logger.warning(f"依赖接口格式错误,跳过依赖接口处理~ --> interface 仅支持str和list格式")
else:
+ allure_step(f"依赖接口为空,跳过依赖接口处理~")
logger.warning(f"依赖接口为空,跳过依赖接口处理~")
# 依赖SQL处理
if case_dependence.get("sql"):
+ allure_step(f"依赖SQL暂不支持,跳过依赖SQL处理~")
logger.warning(f"暂时不支持依赖sql处理,后续更新")
pass
+
+ return results
diff --git a/utils/requests_utils/request_control.py b/utils/requests_utils/request_control.py
index f535291..75c2a05 100644
--- a/utils/requests_utils/request_control.py
+++ b/utils/requests_utils/request_control.py
@@ -1,97 +1,76 @@
# -*- coding: utf-8 -*-
-# @Version: Python 3.9
-# @Time : 2023/1/31 14:31
-# @Author : chenyinhua
-# @File : request_control.py
+# @Time : 2024/7/1 9:42
+# @Author : floraachy
+# @File : request_control
# @Software: PyCharm
-# @Desc: 处理request请求前后的用例数据
+# @Desc:
# 标准库导入
import json
import os
import http.cookiejar
-import copy
-import time
# 第三方库导入
from requests import Response
from loguru import logger
# 本地应用/模块导入
+from config.path_config import FILES_DIR
from utils.requests_utils.base_request import BaseRequest
from utils.data_utils.data_handle import data_handle
from utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
+from utils.files_utils.files_handle import get_files
+from utils.files_utils.yaml_handle import YamlHandle
+from utils.assertion_utils.assert_control import AssertHandle
from utils.report_utils.allure_handle import allure_step
-from config.path_config import FILES_DIR
-# ---------------------------------------- 请求前的数据处理----------------------------------------#
-
-class RequestPreDataHandle:
+class RequestControl(BaseRequest):
"""
- 请求前处理用例数据
+ 进行请求,请求后的参数提取处理
"""
- def __init__(self, request_data: dict, global_var: dict):
- logger.debug(f"\n======================================================\n" \
- "-------------Start:处理用例数据前--------------------\n"
- f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
- f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
- f"请求域名(host): {type(request_data.get('host', None))} || {request_data.get('host', None)}\n" \
- f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
- f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
- f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
- f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
- f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
- f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
- f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
- f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
- f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
- f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
- f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
- f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n" \
- "=====================================================")
- self.request_data = copy.deepcopy(request_data)
- self.global_var = global_var
-
- def request_data_handle(self):
+ # --------------------从接口池中获取接口请求数据--------------------
+ def get_api_data(self, api_file_path: str, key: str = None):
"""
- 针对用例数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
+ 根据指定的yaml文件路径,以及key值,获取对应的接口
+ :param:api_file_path 接口yaml文件路径
+ :param:key 对应接口的id
"""
- self.url_handle()
- self.method_handle()
- self.headers_handle()
- self.cookies_handle()
- self.payload_handle()
- self.files_handle()
- self.wait_seconds_handle()
- self.assert_handle()
- self.extract_handle()
- logger.debug(f"\n======================================================\n" \
- "-------------End:处理用例数据后--------------------\n"
- f"用例标题(title): {type(self.request_data.get('title', None))} || {self.request_data.get('title', None)}\n" \
- f"用例优先级(severity): {type(self.request_data.get('severity', None))} || {self.request_data.get('severity', None)}\n" \
- f"请求路径(url): {type(self.request_data.get('url', None))} || {self.request_data.get('url', None)}\n" \
- f"请求方式(method): {type(self.request_data.get('method', None))} || {self.request_data.get('method', None)}\n" \
- f"请求头(headers): {type(self.request_data.get('headers', None))} || {self.request_data.get('headers', None)}\n" \
- f"请求cookies: {type(self.request_data.get('cookies', None))} || {self.request_data.get('cookies', None)}\n" \
- f"请求类型(request_type): {type(self.request_data.get('request_type', None))} || {self.request_data.get('request_type', None)}\n" \
- f"请求参数(payload): {type(self.request_data.get('payload', None))} || {self.request_data.get('payload', None)}\n" \
- f"请求文件(files): {type(self.request_data.get('files', None))} || {self.request_data.get('files', None)}\n" \
- f"请求后等待(wait_seconds): {type(self.request_data.get('wait_seconds', None))} || {self.request_data.get('wait_seconds', None)}\n" \
- f"响应断言(assert_response): {type(self.request_data.get('assert_response', None))} || {self.request_data.get('assert_response', None)}\n" \
- f"数据库断言(assert_sql): {type(self.request_data.get('assert_sql', None))} || {self.request_data.get('assert_sql', None)}\n" \
- f"后置提取参数(extract): {type(self.request_data.get('extract', None))} || {self.request_data.get('extract', None)}\n" \
- f"用例依赖(case_dependence): {type(self.request_data.get('case_dependence', None))} || {self.request_data.get('case_dependence', None)}\n" \
- "=====================================================")
- return self.request_data
+ api_data = []
+ if os.path.isdir(api_file_path):
+ logger.debug(f"目标路径是一个目录:{api_file_path}")
+ api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
+ for api_file in api_files:
+ api_data.append(YamlHandle(filename=api_file).read_yaml)
+ elif os.path.isfile(api_file_path):
+ logger.debug(f"目标路径是一个文件:{api_file_path}")
+ api_data.append(YamlHandle(filename=api_file_path).read_yaml)
- def url_handle(self):
+ else:
+ logger.error(f"目标路径错误,请检查!api_file_path={api_file_path}")
+ return None
+
+ for api in api_data:
+ matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
+ if matching_api:
+ logger.info("\n----------匹配到的api----------\n"
+ f"类型:{type(matching_api)}"
+ f"值:{matching_api}\n")
+ return matching_api
+
+ # 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
+ logger.warning(f"未找到id为{key}的接口, 返回值是None")
+ raise Exception(f"未找到id为{key}的接口, 返回值是None")
+
+ # ---------- 请求之前进行数据处理 --------------------------#
+ @staticmethod
+ def url_handle(url: str, source: dict = None):
"""
用例数据中获取到的url(一般是不带host的,个别特殊的带有host,则不进行处理)
"""
# 检测url中是否存在需要替换的参数,如果存在则进行替换
- url = data_handle(obj=self.request_data.get("url", None), source=self.global_var)
+ url = data_handle(obj=url, source=source)
# 进行url处理,最终得到full_url
- host = self.global_var.get("host", "")
+ host = source.get("host", "")
# 从用例数据中获取url,如果键url不存在,则返回空字符串
# 如果url是以http开头的,则直接使用该url,不与host进行拼接
if url.lower().startswith("http"):
@@ -109,103 +88,132 @@ class RequestPreDataHandle:
else:
# 如果host不以/结尾 且 url不以/开头,则将host和url拼接起来的时候增加/,组成新的url
full_url = host + "/" + url
- self.request_data["url"] = full_url
+ return full_url
- def method_handle(self):
- # TODO 暂时不需要处理,后续有需要在处理
- pass
-
- def cookies_handle(self):
+ @staticmethod
+ def cookies_handle(cookies, source: dict = None):
"""
requests模块中,cookies参数要求是Dict or CookieJar object
"""
- cookies = self.request_data.get("cookies", None)
-
- # 从用例数据中获取cookies, 处理cookies
if cookies:
+ # 从用例数据中获取cookies, 处理cookies
# 通过全局变量替换cookies,得到的是一个str类型
- cookies = data_handle(obj=cookies, source=self.global_var)
+ cookies = data_handle(obj=cookies, source=source)
try:
cookies = json.loads(cookies)
except Exception as e:
cookies = cookies
if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar):
- self.request_data["cookies"] = cookies
+ return cookies
else:
logger.error(
f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}")
raise TypeError(
f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}")
- def headers_handle(self):
+ @staticmethod
+ def headers_handle(headers: dict, source: dict = None):
"""
headers里面传cookies,要求cookies类型是str
"""
- headers = self.request_data.get("headers", None)
-
- # 从用例数据中获取header, 处理header
if headers:
- self.request_data["headers"] = data_handle(obj=headers, source=self.global_var)
+ # 从用例数据中获取header, 处理header
+ headers = data_handle(obj=headers, source=source)
# 如果请求头中有cookies,需要进行单独处理
- if self.request_data["headers"].get("cookies", None):
- cookies = self.request_data["headers"]["cookies"]
+ if headers.get("cookies", None):
+ cookies = headers["cookies"]
if isinstance(cookies, dict):
# 如果是字典类型,就转成字符串
- self.request_data["headers"]["cookies"] = json.dumps(cookies)
+ headers["cookies"] = json.dumps(cookies)
else:
- self.request_data["headers"]["cookies"] = cookies
+ headers["cookies"] = cookies
+ return headers
- def payload_handle(self):
- # 处理请求参数payload
- payload = self.request_data.get("payload", None)
- if payload:
- self.request_data["payload"] = data_handle(obj=payload, source=self.global_var)
-
- def files_handle(self):
+ @staticmethod
+ def files_handle(files: str, source: dict = None):
"""
格式:接口中文件参数的名称:"文件路径地址"
例如:{"file": "demo_test_demo.py"}
"""
- # 处理请求参数files参数
- files = self.request_data.get("files", None)
if files:
+ # 处理请求参数files参数
# 支持文件传递${}关键字,将使用data_handle进行处理
- files = data_handle(obj=files, source=self.global_var)
+ files = data_handle(obj=files, source=source)
# 将文件处理成绝对路径
- self.request_data["files"] = os.path.join(FILES_DIR, files)
+ return os.path.join(FILES_DIR, files)
- def wait_seconds_handle(self):
+ @staticmethod
+ def wait_seconds_handle(wait_seconds):
"""
处理等待时间参数,如果不能转为int类型,则认为是none
"""
- wait_seconds = self.request_data.get("wait_seconds", None)
try:
- self.request_data["wait_seconds"] = int(wait_seconds)
+ return int(wait_seconds)
except:
- self.request_data["wait_seconds"] = None
- def assert_handle(self):
- # 处理响应断言参数
- assert_response = self.request_data.get("assert_response", None)
- if assert_response:
- self.request_data["assert_response"] = data_handle(obj=assert_response, source=self.global_var)
- # 由于数据库断言里面的变量需要请求响应后进行提取,因此目前不进行处理
+ return None
- def extract_handle(self):
- # 处理提取参数
- extract = self.request_data.get("extract", None)
- if extract:
- self.request_data["extract"] = data_handle(obj=extract, source=self.global_var)
+ def before_request(self, request_data: dict, source_data: dict = None):
+ """
+ 针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
+ """
+ try:
+ logger.debug(f"\n======================================================\n" \
+ "-------------用例数据处理前--------------------\n"
+ f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \
+ f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
+ f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
+ f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
+ f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
+ f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
+ f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
+ f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
+ f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
+ f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
+ f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
+ f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
+ f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
+ f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
+ f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
+ new_request_data = {
+ "id": request_data.get("id"),
+ "severity": request_data.get("severity"),
+ "title": request_data.get("title"),
+ "url": self.url_handle(url=request_data.get("url"), source=source_data),
+ "method": request_data.get("method"),
+ "headers": self.headers_handle(headers=request_data.get("headers"), source=source_data),
+ "cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data),
+ "request_type": request_data.get("request_type"),
+ "files": self.files_handle(files=request_data.get("files"), source=source_data),
+ "wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
+ "payload": data_handle(obj=request_data["payload"], source=source_data),
+ "assert_response": data_handle(obj=request_data.get("assert_response"), source=source_data),
+ "assert_sql": request_data.get("assert_sql"),
+ "extract": data_handle(obj=request_data.get("extract"), source=source_data),
+ "case_dependence": request_data.get("case_dependence"),
+ }
-# ---------------------------------------- 进行请求,请求后的参数提取处理----------------------------------------#
-class RequestHandle:
- """
- 进行请求,请求后的参数提取处理
- """
-
- def __init__(self, case_data: dict, global_var: dict):
- self.case_data = case_data
- self.global_var = global_var
+ logger.debug("\n-------------用例数据处理后--------------------\n"
+ f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \
+ f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \
+ f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \
+ f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \
+ f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \
+ f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \
+ f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \
+ f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \
+ f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
+ f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
+ f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
+ f"响应断言(assert_response): {type(new_request_data.get('assert_response', None))} || {new_request_data.get('assert_response', None)}\n" \
+ f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
+ f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
+ f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
+ "=====================================================")
+ return new_request_data
+ except Exception as e:
+ logger.error(f"接口数据处理异常:{e}")
+ raise f"接口数据处理异常:\n{e}"
@classmethod
def api_step_record(cls, **kwargs) -> None:
@@ -257,79 +265,91 @@ class RequestHandle:
allure_step(f"响应结果: {response_result}")
allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms")
- def http_request(self):
+ @staticmethod
+ def after_request(response: Response, extract):
"""
- 发送请求并进行后置参数提取操作
+ 从响应数据中提取请求后的参数,并保存到全局变量中
+ :param response: playwright APIResponse 响应对象
+ :param extract: 需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}'
+ :return:
+
"""
- response = BaseRequest.send_request(self.case_data)
- # 根据配置,增加接口请求等待时间。适应部分调用调用后,需要进行内置数据处理的问题
- logger.debug(f"开始等待")
- if self.case_data["wait_seconds"]:
- time.sleep(self.case_data["wait_seconds"])
- logger.debug(f"结束等待")
- self.case_data["status_code"] = response.status_code
- self.case_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
- self.case_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
+ logger.info(f"断言成功后,参数提取表达式extract: {extract}")
+ json_result = {}
+ re_result = {}
+ response_result = {}
+ if extract:
+ if extract.get("type_jsonpath"):
+ # 如果响应数据是json格式,则将按照json方式对后置提取参数进行处理
+ res = response.json()
+ for k, v in extract["type_jsonpath"].items():
+ json_result[k] = json_extractor(res, v)
+ logger.debug(f"--从response.json()中通过jsonpath方式提取到的结果 --> {json_result}")
+ if extract.get("type_re"):
+ # 如果响应数据是str格式,则将按照str方式对后置提取参数进行处理
+ res = response.text
+ for k, v in extract["type_re"].items():
+ re_result[k] = data_handle(obj=re_extract(res, v))
+ logger.debug(f"--从response.text中通过正则表达式提取到的结果 --> {re_result} ")
+
+ if extract.get("type_response"):
+ for k, v in extract["type_response"].items():
+ response_result[k] = response_extract(response, v)
+ logger.debug(f"--从response中提取到的结果 --> {re_result}")
+ result = {**json_result, **re_result, **response_result}
+ logger.info(f"--参数提取结果extract --> {result} --")
+ allure_step(f"参数提取结果extract:{result}")
+ return result
+
+ # -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 --------------
+ def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None,
+ key: str = None):
+ """
+ 发送请求并进行后置参数提取操作。
+ request_data参数 与 api_file_path,key, 这两个必须传递其中一个
+
+ :param request_data: 请求数据字典,包含请求所需的所有信息。
+ :param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${}
+ :param api_file_path: 接口所在的目录或者文件路径
+ :param key: 接口的ID
+ :return: 接口请求数据以及从接口响应提取的参数,字典。
+ :raises ValueError: 如果请求数据无效或缺失。
+ """
+ # 初始化一个变量,保存接口请求参数payload以及通过extract提取的参数
+ save_api_data = {}
+
+ if request_data:
+ api_info = request_data
+
+ elif api_file_path and key:
+ api_info = self.get_api_data(api_file_path=api_file_path, key=key)
+ else:
+ logger.error("请求数据异常")
+ raise ValueError("请求数据异常")
+
+ new_api_data = self.before_request(request_data=api_info, source_data=global_var)
+
+ response = self.send_request(new_api_data)
+
+ new_api_data["status_code"] = response.status_code
+ new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
+ new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
try:
- self.case_data["response_result"] = response.json()
+ new_api_data["response_result"] = response.json()
except:
- self.case_data["response_result"] = response.text
+ new_api_data["response_result"] = response.text()
- self.api_step_record(**self.case_data)
+ self.api_step_record(**new_api_data)
+ # 进行响应断言
+ AssertHandle(assert_data=new_api_data["assert_response"], response=response).assert_handle()
- # 处理数据库断言 - 从全局变量中获取最新值,替换数据库断言中的参数
- if self.case_data.get('assert_sql', None):
- self.case_data["assert_sql"] = data_handle(obj=self.case_data["assert_sql"], source=self.global_var)
- # 处理请求里面的files,使得日志以及allure中写入的是文件,而不是文件二进制内容
- if self.case_data.get('files', None):
- files = self.case_data["files"]
- if isinstance(files, dict):
- dict_values = list(files.values())[0]
- _file = os.path.join(FILES_DIR, dict_values[0])
- logger.info(
- f"\n请求文件(files): {type(_file)} || {_file}\n")
- return response
+ # 进行响应参数提取,并保存提取后的数据
+ extract_results = self.after_request(response=response, extract=new_api_data.get("extract"))
+ save_api_data.update(extract_results)
-
-# ---------------------------------------- 请求后的参数提取处理----------------------------------------#
-def after_request_extract(response: Response, extract):
- """
- 从响应数据中提取请求后的参数,并保存到全局变量中
- :param response: request 响应对象
- :param extract: 需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}'
- :return:
- """
- logger.debug(f"\n================================================================================\n" \
- "-------------Start: 提取表达式--------------------\n"
- f"后置提取参数(原): {extract}\n")
- json_result = {}
- re_result = {}
- response_result = {}
- if extract:
- if extract.get("type_jsonpath"):
- # 如果响应数据是json格式,则将按照json方式对后置提取参数进行处理
- res = response.json()
- for k, v in extract["type_jsonpath"].items():
- json_result[k] = json_extractor(res, v)
- logger.debug("\n-------------从response.json()中通过jsonpath方式提取到的结果--------------------\n"
- f"后置提取参数(新): {json_result}\n")
- if extract.get("type_re"):
- # 如果响应数据是str格式,则将按照str方式对后置提取参数进行处理
- res = response.text
- for k, v in extract["type_re"].items():
- re_result[k] = data_handle(obj=re_extract(res, v))
- logger.debug("\n-------------从response.text中通过正则表达式提取到的结果--------------------\n"
- f"后置提取参数(新): {re_result}\n")
-
- if extract.get("type_response"):
- for k, v in extract["type_response"].items():
- response_result[k] = response_extract(response, v)
- logger.debug("-------------从response中提取到的结果--------------------\n"
- f"后置提取参数(新): {re_result}\n")
- result = {**json_result, **re_result, **response_result}
- logger.info("\n-------------End:所有提取到的结果--------------------\n"
- f"后置提取参数(新): {result}\n" \
- "================================================================================")
- allure_step(f"参数提取结果:{result}")
- return result
+ # 将接口请求参数payload的值保存到save_api_data中
+ save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {})
+ logger.debug(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}")
+ allure_step(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}")
+ return save_api_data