Files
floratest56/request_control.py
2026-05-11 08:24:56 +08:00

439 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# @Time : 2024/7/1 9:42
# @Author : floraachy
# @File : request_control.py
# @Software: PyCharm
# @Desc:
# 标准库导入
import json
import os
import http.cookiejar
import time
# 第三方库导入
import requests
from requests import Response, utils
from loguru import logger
# 本地应用/模块导入
from settings import FILES_DIR
from custom_utils.requests_utils.base_request import BaseRequest
from custom_utils.data_utils.data_handle import data_handle
from custom_utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
from common_utils.files_utils.files_handle import get_files, load_yaml_file
from custom_utils.assertion_utils.assert_control import AssertHandle
from custom_utils.report_utils.allure_handle import allure_step
from common_utils.database_utils.mysql_handle import MysqlServer
class RequestControl(BaseRequest):
"""
进行请求,请求后的参数提取处理
"""
# --------------------从接口池中获取接口请求数据--------------------
@staticmethod
def get_api_data(api_file_path: str, key: str = None):
"""
根据指定的yaml文件路径以及key值获取对应的接口
:param:api_file_path 接口yaml文件路径
:param:key 对应接口的id
"""
api_data = []
if os.path.isdir(api_file_path):
logger.trace(f"目标路径是一个目录:{api_file_path}")
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
for api_file in api_files:
api_data.append(load_yaml_file(api_file))
elif os.path.isfile(api_file_path):
logger.trace(f"目标路径是一个文件:{api_file_path}")
api_data.append(load_yaml_file(api_file_path))
else:
logger.error(f"目标路径错误请检查api_file_path={api_file_path}")
return None
for api in api_data:
if api.get("teststeps"):
matching_api = next((item for item in api["teststeps"] if item["id"] == key), None)
if matching_api:
logger.debug("\n----------匹配到的api----------\n"
f"类型:{type(matching_api)}"
f"值:{matching_api}\n")
return matching_api
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
logger.warning(f"未找到id为{key}的接口, 返回值是None")
raise Exception(f"未找到id为{key}的接口, 返回值是None")
# ---------- 请求之前进行数据处理 --------------------------#
@staticmethod
def url_handle(url: str, source: dict = None):
"""
用例数据中获取到的url(一般是不带host的个别特殊的带有host则不进行处理)
"""
# 检测url中是否存在需要替换的参数如果存在则进行替换
url = data_handle(obj=url, source=source)
# 进行url处理最终得到full_url
host = source.get("host", "")
# 从用例数据中获取url如果键url不存在则返回空字符串
# 如果url是以http开头的则直接使用该url不与host进行拼接
if url.lower().startswith("http"):
full_url = url
else:
# 如果host以/结尾 并且 url以/开头
if host.endswith("/") and url.startswith("/"):
full_url = host[0:len(host) - 1] + url
# 如果host以/结尾 并且 url不以/开头
elif host.endswith("/") and (not url.startswith("/")):
full_url = host + url
elif (not host.endswith("/")) and url.startswith("/"):
# 如果host不以/结尾 且 url以/开头则将host和url拼接起来组成新的url
full_url = host + url
else:
# 如果host不以/结尾 且 url不以/开头则将host和url拼接起来的时候增加/组成新的url
full_url = host + "/" + url
return full_url
@staticmethod
def cookies_handle(cookies, source: dict = None):
"""
requests模块中cookies参数要求是Dict or CookieJar object
"""
if not cookies:
return None # 或者返回空字典 {}
# 通过全局变量替换cookies得到的是一个str类型
cookies = data_handle(obj=cookies, source=source)
try:
cookies = json.loads(cookies)
except Exception as e:
cookies = cookies
if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar):
return cookies
else:
logger.error(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
raise TypeError(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
@staticmethod
def headers_handle(headers: dict = None, source: dict = None) -> dict:
"""
headers里面传Cookie要求Cookie类型是str
Args:
headers: 请求头字典,默认为空字典
source: 数据源
Returns:
dict: 处理后的请求头字典
"""
if headers is None:
headers = {}
# 从用例数据中获取header处理header
headers = data_handle(obj=headers, source=source)
# 处理Cookie字段
if headers.get("Cookie"):
cookies = headers["Cookie"]
if isinstance(cookies, dict):
headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies.items()])
elif isinstance(cookies, http.cookiejar.CookieJar):
cookies_dict = utils.dict_from_cookiejar(cookies)
headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies_dict.items()])
# str类型不需要处理保持原样
return headers
@staticmethod
def files_handle(files: str, source: dict = None):
"""
格式:接口中文件参数的名称:"文件路径地址"
例如:{"file": "demo_test_demo.py"}
"""
if not files:
return None # 或者返回空字典 {}
# 处理请求参数files参数
# 支持文件传递${}关键字将使用data_handle进行处理
files = data_handle(obj=files, source=source)
# 将文件处理成绝对路径
return os.path.join(FILES_DIR, files)
@staticmethod
def wait_seconds_handle(wait_seconds):
"""
处理等待时间参数如果不能转为int类型则认为是none
"""
if wait_seconds is None:
return None
try:
return int(wait_seconds)
except TypeError as e:
# 处理类型错误如传入对象不支持int转换
logger.debug(f"等待时间参数类型错误: {type(wait_seconds)} -> {e}")
return None
except ValueError as e:
# 处理值错误(如传入"abc"字符串)
logger.debug(f"等待时间参数值错误: {wait_seconds} -> {e}")
return None
def before_request(self, request_data: dict, source_data: dict = None):
"""
针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
"""
try:
logger.trace(f"\n======================================================\n" \
"-------------用例数据处理前--------------------\n"
f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
f"响应断言(validate): {type(request_data.get('validate', None))} || {request_data.get('validate', None)}\n" \
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
new_request_data = {
"id": request_data.get("id"),
"severity": request_data.get("severity"),
"title": request_data.get("title"),
"url": self.url_handle(url=request_data.get("url"), source=source_data),
"method": request_data.get("method"),
"headers": self.headers_handle(headers=request_data.get("headers"), source=source_data),
"cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data),
"request_type": request_data.get("request_type"),
"files": self.files_handle(files=request_data.get("files"), source=source_data),
"wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
"payload": data_handle(obj=request_data["payload"], source=source_data),
"validate": data_handle(obj=request_data.get("validate"), source=source_data),
"assert_sql": request_data.get("assert_sql"),
"extract": data_handle(obj=request_data.get("extract"), source=source_data),
"case_dependence": request_data.get("case_dependence")
}
logger.trace("\n-------------用例数据处理后--------------------\n"
f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \
f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \
f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \
f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \
f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \
f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
f"响应断言(validate): {type(new_request_data.get('validate', None))} || {new_request_data.get('validate', None)}\n" \
f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
"=====================================================")
logger.trace(new_request_data)
return new_request_data
except Exception as e:
logger.error(f"接口数据处理异常:{e}")
raise f"接口数据处理异常:\n{e}"
@classmethod
def api_step_record(cls, **kwargs) -> None:
"""
在allure/logger中记录请求数据
"""
key = kwargs.get("id")
title = kwargs.get("title")
url = kwargs.get("url")
method = kwargs.get("method")
headers = kwargs.get("headers")
cookies = kwargs.get("cookies")
request_type = kwargs.get("request_type")
payload = kwargs.get("payload")
files = kwargs.get("files")
wait_seconds = kwargs.get("wait_seconds")
status_code = kwargs.get("status_code")
response_result = kwargs.get("response_result")
response_time_seconds = kwargs.get("response_time_seconds")
response_time_millisecond = kwargs.get("response_time_millisecond")
_res = "\n" + "=" * 80 \
+ "\n-------------发送请求--------------------\n" \
f"ID: {key}\n" \
f"标题: {title}\n" \
f"请求URL: {url}\n" \
f"请求方式: {method}\n" \
f"请求头: {headers}\n" \
f"请求Cookies: {cookies}\n" \
f"请求关键字: {request_type}\n" \
f"请求参数: {payload}\n" \
f"请求文件: {files}\n" \
f"响应码: {status_code}\n" \
f"响应数据: {response_result}\n" \
f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms\n" \
+ "=" * 80
logger.debug(_res)
allure_step(f"ID: {key}")
allure_step(f"标题: {title}")
allure_step(f"请求URL: {url}")
allure_step(f"请求方式: {method}")
allure_step(f"请求头: {headers}")
allure_step(f"请求Cookies: {cookies}")
allure_step(f"请求关键字: {request_type}")
allure_step(f"请求参数: {payload}")
allure_step(f"请求文件: {files}")
allure_step(f"请求后等待时间: {wait_seconds}")
allure_step(f"响应码: {status_code}")
allure_step(f"响应结果: {response_result}")
allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms")
def after_request(self, response: Response, api_data, db_info=None):
"""
请求结束后提取参数,目前支持从响应数据、数据库、用例数据中提取
:param response: Response 响应对象
:param api_data: 接口数据需要提取的参数字典 '{"k1": "$.data"}''{"k1": "data:(.*?)$"}'
:return:
"""
extract = api_data.get("extract")
if not extract:
logger.debug(f"断言成功后不需要进行提取操作extract={extract}")
return None
logger.debug(f"断言成功后需要进行提取操作extract={extract}")
case_results = {}
response_results = {}
database_results = {}
# 封装三种提取方式
def extract_data(source_data, patterns):
"""
采用不同的提取方式对数据进行提取
:param source_data: 数据来源Response 响应对象, 用例数据Dict 数据库查询结果Dict
:param patterns: 提取模式, {提取方式: {提取key: 提取表达式}}, ,例如:{'type_jsonpath: {'login': '$.login'}}
"""
results = {}
items = patterns.items()
for pattern_type, pattern_values in items:
if pattern_type == "type_jsonpath":
for key, expr in pattern_values.items():
# 如果数据来源是response对象需要处理成response.json()
if isinstance(source_data, requests.Response):
source_data = response.json()
results[key] = json_extractor(source_data, expr)
elif pattern_type == "type_re":
# 如果数据来源是response对象需要处理成response.text
if isinstance(source_data, requests.Response):
source_data = response.text
for key, expr in pattern_values.items():
results[key] = re_extract(str(source_data), expr)
elif pattern_type == "type_response":
for key, attr in pattern_values.items():
results[key] = response_extract(source_data, attr)
else:
logger.error(f"不支持的提取方式: {pattern_type}")
return results
for k, v in extract.items():
"""根据不同的数据来源,采取不同方式进行提取"""
if k in ["case"]:
case_results = extract_data(api_data, v)
elif k in ["response"]:
response_results = extract_data(response, v)
elif k in ["database"]:
if "sql" in v.keys():
mysql = MysqlServer(**db_info)
sql_result = mysql.query_all(v["sql"])
v.pop("sql")
database_results = extract_data(sql_result, v)
else:
logger.error(f"数据库提取参数必须传入sql")
else: # 兼容以前的写法
# 通过update方法合并dict
response_results.update(extract_data(response, {k: v}))
result = {**case_results, **response_results, **database_results}
logger.debug(f"--用例提取结果 --> {case_results} --")
logger.debug(f"--响应提取结果 --> {response_results} --")
logger.debug(f"--数据库提取结果 --> {database_results} --")
return result
# -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 --------------
def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None,
key: str = None, db_info: dict = None):
"""
发送请求并进行后置参数提取操作。lll
request_data参数 与 api_file_pathkey 这两个必须传递其中一个
:param request_data: 请求数据字典,包含请求所需的所有信息。
:param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${}
:param api_file_path: 接口所在的目录或者文件路径
:param key: 接口的ID
:param db_info: 数据库连接信息,用于数据库断言/数据库提取数据时连接数据库
:return: 接口请求数据以及从接口响应提取的参数,字典。
:raises ValueError: 如果请求数据无效或缺失。
"""
# 初始化一个变量保存接口请求参数payload以及通过extract提取的参数
save_api_data = {}
if request_data:
api_info = request_data
elif api_file_path and key:
api_info = self.get_api_data(api_file_path=api_file_path, key=key)
else:
logger.error("请求数据异常")
raise ValueError("请求数据异常")
new_api_data = self.before_request(request_data=api_info, source_data=global_var)
response = self.send_request(new_api_data)
# 根据配置,增加接口请求等待时间。适应部分请求调用后,需要进行内置数据处理的问题
logger.trace(f"开始等待")
if new_api_data.get("wait_seconds"):
time.sleep(new_api_data["wait_seconds"])
logger.trace(f"结束等待")
new_api_data["status_code"] = response.status_code
new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
try:
# 先检查响应内容是否可能为JSON
content_type = response.headers.get('content-type', '').lower()
if 'application/json' in content_type or response.text.strip().startswith(('{', '[')):
new_api_data["response_result"] = response.json()
else:
new_api_data["response_result"] = response.text
except json.JSONDecodeError as e:
logger.debug(f"JSON解析失败使用文本格式: {e}")
new_api_data["response_result"] = response.text
except Exception as e:
logger.error(f"处理响应数据时发生意外错误: {e}")
new_api_data["response_result"] = f"Error: {str(e)}"
self.api_step_record(**new_api_data)
# 进行响应断言
AssertHandle(assert_data=new_api_data["validate"], response=response).assert_handle()
# todo 数据库断言 和 响应断言二选一
# 进行响应参数提取,并保存提取后的数据
if new_api_data.get("extract"):
extract_results = self.after_request(response=response, api_data=new_api_data, db_info=db_info)
save_api_data.update(extract_results)
# 将接口请求参数payload的值保存到save_api_data中
save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {})
logger.trace(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
allure_step(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
return save_api_data