forked from floraachy/apiautotest
439 lines
23 KiB
Python
439 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
||
# @Time : 2024/7/1 9:42
|
||
# @Author : floraachy
|
||
# @File : request_control.py
|
||
# @Software: PyCharm
|
||
# @Desc:
|
||
|
||
# 标准库导入
|
||
import json
|
||
import os
|
||
import http.cookiejar
|
||
import time
|
||
# 第三方库导入
|
||
import requests
|
||
from requests import Response, utils
|
||
from loguru import logger
|
||
# 本地应用/模块导入
|
||
from settings import FILES_DIR
|
||
from custom_utils.requests_utils.base_request import BaseRequest
|
||
from custom_utils.data_utils.data_handle import data_handle
|
||
from custom_utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
|
||
from common_utils.files_utils.files_handle import get_files, load_yaml_file
|
||
from custom_utils.assertion_utils.assert_control import AssertHandle
|
||
from custom_utils.report_utils.allure_handle import allure_step
|
||
from common_utils.database_utils.mysql_handle import MysqlServer
|
||
|
||
|
||
class RequestControl(BaseRequest):
|
||
"""
|
||
进行请求,请求后的参数提取处理
|
||
"""
|
||
|
||
# --------------------从接口池中获取接口请求数据--------------------
|
||
@staticmethod
|
||
def get_api_data(api_file_path: str, key: str = None):
|
||
"""
|
||
根据指定的yaml文件路径,以及key值,获取对应的接口
|
||
:param:api_file_path 接口yaml文件路径
|
||
:param:key 对应接口的id
|
||
"""
|
||
api_data = []
|
||
if os.path.isdir(api_file_path):
|
||
logger.trace(f"目标路径是一个目录:{api_file_path}")
|
||
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
|
||
for api_file in api_files:
|
||
api_data.append(load_yaml_file(api_file))
|
||
elif os.path.isfile(api_file_path):
|
||
logger.trace(f"目标路径是一个文件:{api_file_path}")
|
||
api_data.append(load_yaml_file(api_file_path))
|
||
|
||
else:
|
||
logger.error(f"目标路径错误,请检查!api_file_path={api_file_path}")
|
||
return None
|
||
|
||
for api in api_data:
|
||
if api.get("teststeps"):
|
||
matching_api = next((item for item in api["teststeps"] if item["id"] == key), None)
|
||
if matching_api:
|
||
logger.debug("\n----------匹配到的api----------\n"
|
||
f"类型:{type(matching_api)}"
|
||
f"值:{matching_api}\n")
|
||
return matching_api
|
||
|
||
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
|
||
logger.warning(f"未找到id为{key}的接口, 返回值是None")
|
||
raise Exception(f"未找到id为{key}的接口, 返回值是None")
|
||
|
||
# ---------- 请求之前进行数据处理 --------------------------#
|
||
@staticmethod
|
||
def url_handle(url: str, source: dict = None):
|
||
"""
|
||
用例数据中获取到的url(一般是不带host的,个别特殊的带有host,则不进行处理)
|
||
"""
|
||
# 检测url中是否存在需要替换的参数,如果存在则进行替换
|
||
url = data_handle(obj=url, source=source)
|
||
# 进行url处理,最终得到full_url
|
||
host = source.get("host", "")
|
||
# 从用例数据中获取url,如果键url不存在,则返回空字符串
|
||
# 如果url是以http开头的,则直接使用该url,不与host进行拼接
|
||
if url.lower().startswith("http"):
|
||
full_url = url
|
||
else:
|
||
# 如果host以/结尾 并且 url以/开头
|
||
if host.endswith("/") and url.startswith("/"):
|
||
full_url = host[0:len(host) - 1] + url
|
||
# 如果host以/结尾 并且 url不以/开头
|
||
elif host.endswith("/") and (not url.startswith("/")):
|
||
full_url = host + url
|
||
elif (not host.endswith("/")) and url.startswith("/"):
|
||
# 如果host不以/结尾 且 url以/开头,则将host和url拼接起来,组成新的url
|
||
full_url = host + url
|
||
else:
|
||
# 如果host不以/结尾 且 url不以/开头,则将host和url拼接起来的时候增加/,组成新的url
|
||
full_url = host + "/" + url
|
||
return full_url
|
||
|
||
@staticmethod
|
||
def cookies_handle(cookies, source: dict = None):
|
||
"""
|
||
requests模块中,cookies参数要求是Dict or CookieJar object
|
||
"""
|
||
if not cookies:
|
||
return None # 或者返回空字典 {}
|
||
|
||
# 通过全局变量替换cookies,得到的是一个str类型
|
||
cookies = data_handle(obj=cookies, source=source)
|
||
try:
|
||
cookies = json.loads(cookies)
|
||
except Exception as e:
|
||
cookies = cookies
|
||
if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar):
|
||
return cookies
|
||
else:
|
||
logger.error(
|
||
f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}")
|
||
raise TypeError(
|
||
f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}")
|
||
|
||
@staticmethod
|
||
def headers_handle(headers: dict = None, source: dict = None) -> dict:
|
||
"""
|
||
headers里面传Cookie,要求Cookie类型是str
|
||
|
||
Args:
|
||
headers: 请求头字典,默认为空字典
|
||
source: 数据源
|
||
|
||
Returns:
|
||
dict: 处理后的请求头字典
|
||
"""
|
||
if headers is None:
|
||
headers = {}
|
||
|
||
# 从用例数据中获取header,处理header
|
||
headers = data_handle(obj=headers, source=source)
|
||
|
||
# 处理Cookie字段
|
||
if headers.get("Cookie"):
|
||
cookies = headers["Cookie"]
|
||
if isinstance(cookies, dict):
|
||
headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies.items()])
|
||
elif isinstance(cookies, http.cookiejar.CookieJar):
|
||
cookies_dict = utils.dict_from_cookiejar(cookies)
|
||
headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies_dict.items()])
|
||
# str类型不需要处理,保持原样
|
||
|
||
return headers
|
||
|
||
@staticmethod
|
||
def files_handle(files: str, source: dict = None):
|
||
"""
|
||
格式:接口中文件参数的名称:"文件路径地址"
|
||
例如:{"file": "demo_test_demo.py"}
|
||
"""
|
||
if not files:
|
||
return None # 或者返回空字典 {}
|
||
# 处理请求参数files参数
|
||
# 支持文件传递${}关键字,将使用data_handle进行处理
|
||
files = data_handle(obj=files, source=source)
|
||
# 将文件处理成绝对路径
|
||
return os.path.join(FILES_DIR, files)
|
||
|
||
@staticmethod
|
||
def wait_seconds_handle(wait_seconds):
|
||
"""
|
||
处理等待时间参数,如果不能转为int类型,则认为是none
|
||
"""
|
||
if wait_seconds is None:
|
||
return None
|
||
|
||
try:
|
||
return int(wait_seconds)
|
||
except TypeError as e:
|
||
# 处理类型错误(如传入对象不支持int转换)
|
||
logger.debug(f"等待时间参数类型错误: {type(wait_seconds)} -> {e}")
|
||
return None
|
||
except ValueError as e:
|
||
# 处理值错误(如传入"abc"字符串)
|
||
logger.debug(f"等待时间参数值错误: {wait_seconds} -> {e}")
|
||
return None
|
||
|
||
def before_request(self, request_data: dict, source_data: dict = None):
|
||
"""
|
||
针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
|
||
"""
|
||
try:
|
||
logger.trace(f"\n======================================================\n" \
|
||
"-------------用例数据处理前--------------------\n"
|
||
f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \
|
||
f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
|
||
f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
|
||
f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
|
||
f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
|
||
f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
|
||
f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
|
||
f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
|
||
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
|
||
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
|
||
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
|
||
f"响应断言(validate): {type(request_data.get('validate', None))} || {request_data.get('validate', None)}\n" \
|
||
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
|
||
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
|
||
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
|
||
|
||
new_request_data = {
|
||
"id": request_data.get("id"),
|
||
"severity": request_data.get("severity"),
|
||
"title": request_data.get("title"),
|
||
"url": self.url_handle(url=request_data.get("url"), source=source_data),
|
||
"method": request_data.get("method"),
|
||
"headers": self.headers_handle(headers=request_data.get("headers"), source=source_data),
|
||
"cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data),
|
||
"request_type": request_data.get("request_type"),
|
||
"files": self.files_handle(files=request_data.get("files"), source=source_data),
|
||
"wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
|
||
"payload": data_handle(obj=request_data["payload"], source=source_data),
|
||
"validate": data_handle(obj=request_data.get("validate"), source=source_data),
|
||
"assert_sql": request_data.get("assert_sql"),
|
||
"extract": data_handle(obj=request_data.get("extract"), source=source_data),
|
||
"case_dependence": request_data.get("case_dependence")
|
||
}
|
||
|
||
logger.trace("\n-------------用例数据处理后--------------------\n"
|
||
f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \
|
||
f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \
|
||
f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \
|
||
f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \
|
||
f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \
|
||
f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \
|
||
f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \
|
||
f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \
|
||
f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
|
||
f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
|
||
f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
|
||
f"响应断言(validate): {type(new_request_data.get('validate', None))} || {new_request_data.get('validate', None)}\n" \
|
||
f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
|
||
f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
|
||
f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
|
||
"=====================================================")
|
||
logger.trace(new_request_data)
|
||
return new_request_data
|
||
except Exception as e:
|
||
logger.error(f"接口数据处理异常:{e}")
|
||
raise f"接口数据处理异常:\n{e}"
|
||
|
||
@classmethod
|
||
def api_step_record(cls, **kwargs) -> None:
|
||
"""
|
||
在allure/logger中记录请求数据
|
||
"""
|
||
key = kwargs.get("id")
|
||
title = kwargs.get("title")
|
||
url = kwargs.get("url")
|
||
method = kwargs.get("method")
|
||
headers = kwargs.get("headers")
|
||
cookies = kwargs.get("cookies")
|
||
request_type = kwargs.get("request_type")
|
||
payload = kwargs.get("payload")
|
||
files = kwargs.get("files")
|
||
wait_seconds = kwargs.get("wait_seconds")
|
||
status_code = kwargs.get("status_code")
|
||
response_result = kwargs.get("response_result")
|
||
response_time_seconds = kwargs.get("response_time_seconds")
|
||
response_time_millisecond = kwargs.get("response_time_millisecond")
|
||
|
||
_res = "\n" + "=" * 80 \
|
||
+ "\n-------------发送请求--------------------\n" \
|
||
f"ID: {key}\n" \
|
||
f"标题: {title}\n" \
|
||
f"请求URL: {url}\n" \
|
||
f"请求方式: {method}\n" \
|
||
f"请求头: {headers}\n" \
|
||
f"请求Cookies: {cookies}\n" \
|
||
f"请求关键字: {request_type}\n" \
|
||
f"请求参数: {payload}\n" \
|
||
f"请求文件: {files}\n" \
|
||
f"响应码: {status_code}\n" \
|
||
f"响应数据: {response_result}\n" \
|
||
f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms\n" \
|
||
+ "=" * 80
|
||
logger.debug(_res)
|
||
allure_step(f"ID: {key}")
|
||
allure_step(f"标题: {title}")
|
||
allure_step(f"请求URL: {url}")
|
||
allure_step(f"请求方式: {method}")
|
||
allure_step(f"请求头: {headers}")
|
||
allure_step(f"请求Cookies: {cookies}")
|
||
allure_step(f"请求关键字: {request_type}")
|
||
allure_step(f"请求参数: {payload}")
|
||
allure_step(f"请求文件: {files}")
|
||
allure_step(f"请求后等待时间: {wait_seconds}")
|
||
allure_step(f"响应码: {status_code}")
|
||
allure_step(f"响应结果: {response_result}")
|
||
allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms")
|
||
|
||
def after_request(self, response: Response, api_data, db_info=None):
|
||
"""
|
||
请求结束后提取参数,目前支持从响应数据、数据库、用例数据中提取
|
||
:param response: Response 响应对象
|
||
:param api_data: 接口数据需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}'
|
||
:return:
|
||
|
||
"""
|
||
extract = api_data.get("extract")
|
||
if not extract:
|
||
logger.debug(f"断言成功后不需要进行提取操作,extract={extract}")
|
||
return None
|
||
|
||
logger.debug(f"断言成功后需要进行提取操作,extract={extract}")
|
||
|
||
case_results = {}
|
||
response_results = {}
|
||
database_results = {}
|
||
|
||
# 封装三种提取方式
|
||
def extract_data(source_data, patterns):
|
||
"""
|
||
采用不同的提取方式对数据进行提取
|
||
:param source_data: 数据来源:Response 响应对象, 用例数据Dict, 数据库查询结果Dict
|
||
:param patterns: 提取模式, {提取方式: {提取key: 提取表达式}}, ,例如:{'type_jsonpath: {'login': '$.login'}}
|
||
"""
|
||
results = {}
|
||
items = patterns.items()
|
||
for pattern_type, pattern_values in items:
|
||
if pattern_type == "type_jsonpath":
|
||
for key, expr in pattern_values.items():
|
||
# 如果数据来源是response对象,需要处理成response.json()
|
||
if isinstance(source_data, requests.Response):
|
||
source_data = response.json()
|
||
results[key] = json_extractor(source_data, expr)
|
||
elif pattern_type == "type_re":
|
||
# 如果数据来源是response对象,需要处理成response.text
|
||
if isinstance(source_data, requests.Response):
|
||
source_data = response.text
|
||
for key, expr in pattern_values.items():
|
||
results[key] = re_extract(str(source_data), expr)
|
||
elif pattern_type == "type_response":
|
||
for key, attr in pattern_values.items():
|
||
results[key] = response_extract(source_data, attr)
|
||
else:
|
||
logger.error(f"不支持的提取方式: {pattern_type}")
|
||
return results
|
||
|
||
for k, v in extract.items():
|
||
"""根据不同的数据来源,采取不同方式进行提取"""
|
||
if k in ["case"]:
|
||
case_results = extract_data(api_data, v)
|
||
elif k in ["response"]:
|
||
response_results = extract_data(response, v)
|
||
elif k in ["database"]:
|
||
if "sql" in v.keys():
|
||
mysql = MysqlServer(**db_info)
|
||
sql_result = mysql.query_all(v["sql"])
|
||
v.pop("sql")
|
||
database_results = extract_data(sql_result, v)
|
||
else:
|
||
logger.error(f"数据库提取参数必须传入sql")
|
||
else: # 兼容以前的写法
|
||
# 通过update方法合并dict
|
||
response_results.update(extract_data(response, {k: v}))
|
||
|
||
result = {**case_results, **response_results, **database_results}
|
||
logger.debug(f"--用例提取结果 --> {case_results} --")
|
||
logger.debug(f"--响应提取结果 --> {response_results} --")
|
||
logger.debug(f"--数据库提取结果 --> {database_results} --")
|
||
|
||
return result
|
||
|
||
# -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 --------------
|
||
def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None,
|
||
key: str = None, db_info: dict = None):
|
||
"""
|
||
发送请求并进行后置参数提取操作。lll
|
||
request_data参数 与 api_file_path,key, 这两个必须传递其中一个
|
||
|
||
:param request_data: 请求数据字典,包含请求所需的所有信息。
|
||
:param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${}
|
||
:param api_file_path: 接口所在的目录或者文件路径
|
||
:param key: 接口的ID
|
||
:param db_info: 数据库连接信息,用于数据库断言/数据库提取数据时连接数据库
|
||
:return: 接口请求数据以及从接口响应提取的参数,字典。
|
||
:raises ValueError: 如果请求数据无效或缺失。
|
||
"""
|
||
# 初始化一个变量,保存接口请求参数payload以及通过extract提取的参数
|
||
save_api_data = {}
|
||
|
||
if request_data:
|
||
api_info = request_data
|
||
|
||
elif api_file_path and key:
|
||
api_info = self.get_api_data(api_file_path=api_file_path, key=key)
|
||
else:
|
||
logger.error("请求数据异常")
|
||
raise ValueError("请求数据异常")
|
||
|
||
new_api_data = self.before_request(request_data=api_info, source_data=global_var)
|
||
|
||
response = self.send_request(new_api_data)
|
||
|
||
# 根据配置,增加接口请求等待时间。适应部分请求调用后,需要进行内置数据处理的问题
|
||
logger.trace(f"开始等待")
|
||
if new_api_data.get("wait_seconds"):
|
||
time.sleep(new_api_data["wait_seconds"])
|
||
logger.trace(f"结束等待")
|
||
|
||
new_api_data["status_code"] = response.status_code
|
||
new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
|
||
new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
|
||
|
||
try:
|
||
# 先检查响应内容是否可能为JSON
|
||
content_type = response.headers.get('content-type', '').lower()
|
||
if 'application/json' in content_type or response.text.strip().startswith(('{', '[')):
|
||
new_api_data["response_result"] = response.json()
|
||
else:
|
||
new_api_data["response_result"] = response.text
|
||
except json.JSONDecodeError as e:
|
||
logger.debug(f"JSON解析失败,使用文本格式: {e}")
|
||
new_api_data["response_result"] = response.text
|
||
except Exception as e:
|
||
logger.error(f"处理响应数据时发生意外错误: {e}")
|
||
new_api_data["response_result"] = f"Error: {str(e)}"
|
||
|
||
self.api_step_record(**new_api_data)
|
||
# 进行响应断言
|
||
AssertHandle(assert_data=new_api_data["validate"], response=response).assert_handle()
|
||
# todo 数据库断言 和 响应断言二选一
|
||
|
||
# 进行响应参数提取,并保存提取后的数据
|
||
if new_api_data.get("extract"):
|
||
extract_results = self.after_request(response=response, api_data=new_api_data, db_info=db_info)
|
||
save_api_data.update(extract_results)
|
||
|
||
# 将接口请求参数payload的值保存到save_api_data中
|
||
save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {})
|
||
logger.trace(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}")
|
||
allure_step(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}")
|
||
return save_api_data
|