Files
apiautotest/utils/requests_utils/request_control.py

412 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# @Time : 2024/7/1 9:42
# @Author : floraachy
# @File : request_control
# @Software: PyCharm
# @Desc:
# 标准库导入
import json
import os
import http.cookiejar
# 第三方库导入
from requests import Response
from loguru import logger
# 本地应用/模块导入
from config.path_config import FILES_DIR
from utils.requests_utils.base_request import BaseRequest
from utils.data_utils.data_handle import data_handle
from utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
from utils.files_utils.files_handle import get_files
from utils.files_utils.yaml_handle import YamlHandle
from utils.assertion_utils.assert_control import AssertHandle
from utils.report_utils.allure_handle import allure_step
from utils.database_utils.mysql_handle import MysqlServer
class RequestControl(BaseRequest):
"""
进行请求,请求后的参数提取处理
"""
# --------------------从接口池中获取接口请求数据--------------------
def get_api_data(self, api_file_path: str, key: str = None):
"""
根据指定的yaml文件路径以及key值获取对应的接口
:param:api_file_path 接口yaml文件路径
:param:key 对应接口的id
"""
api_data = []
if os.path.isdir(api_file_path):
logger.debug(f"目标路径是一个目录:{api_file_path}")
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
for api_file in api_files:
api_data.append(YamlHandle(filename=api_file).read_yaml)
elif os.path.isfile(api_file_path):
logger.debug(f"目标路径是一个文件:{api_file_path}")
api_data.append(YamlHandle(filename=api_file_path).read_yaml)
else:
logger.error(f"目标路径错误请检查api_file_path={api_file_path}")
return None
for api in api_data:
matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
if matching_api:
logger.info("\n----------匹配到的api----------\n"
f"类型:{type(matching_api)}"
f"值:{matching_api}\n")
return matching_api
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
logger.warning(f"未找到id为{key}的接口, 返回值是None")
raise Exception(f"未找到id为{key}的接口, 返回值是None")
# ---------- 请求之前进行数据处理 --------------------------#
@staticmethod
def url_handle(url: str, source: dict = None):
"""
用例数据中获取到的url(一般是不带host的个别特殊的带有host则不进行处理)
"""
# 检测url中是否存在需要替换的参数如果存在则进行替换
url = data_handle(obj=url, source=source)
# 进行url处理最终得到full_url
host = source.get("host", "")
# 从用例数据中获取url如果键url不存在则返回空字符串
# 如果url是以http开头的则直接使用该url不与host进行拼接
if url.lower().startswith("http"):
full_url = url
else:
# 如果host以/结尾 并且 url以/开头
if host.endswith("/") and url.startswith("/"):
full_url = host[0:len(host) - 1] + url
# 如果host以/结尾 并且 url不以/开头
elif host.endswith("/") and (not url.startswith("/")):
full_url = host + url
elif (not host.endswith("/")) and url.startswith("/"):
# 如果host不以/结尾 且 url以/开头则将host和url拼接起来组成新的url
full_url = host + url
else:
# 如果host不以/结尾 且 url不以/开头则将host和url拼接起来的时候增加/组成新的url
full_url = host + "/" + url
return full_url
@staticmethod
def cookies_handle(cookies, source: dict = None):
"""
requests模块中cookies参数要求是Dict or CookieJar object
"""
if cookies:
# 从用例数据中获取cookies 处理cookies
# 通过全局变量替换cookies得到的是一个str类型
cookies = data_handle(obj=cookies, source=source)
try:
cookies = json.loads(cookies)
except Exception as e:
cookies = cookies
if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar):
return cookies
else:
logger.error(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
raise TypeError(
f"cookies参数要求是Dict or CookieJar object 目前cookies类型是{type(cookies)} cookies值是{cookies}")
@staticmethod
def headers_handle(headers: dict, source: dict = None):
"""
headers里面传cookies要求cookies类型是str
"""
if headers:
# 从用例数据中获取header 处理header
headers = data_handle(obj=headers, source=source)
# 如果请求头中有cookies需要进行单独处理
if headers.get("cookies", None):
cookies = headers["cookies"]
if isinstance(cookies, dict):
# 如果是字典类型,就转成字符串
headers["cookies"] = json.dumps(cookies)
else:
headers["cookies"] = cookies
return headers
@staticmethod
def files_handle(files: str, source: dict = None):
"""
格式:接口中文件参数的名称:"文件路径地址"
例如:{"file": "demo_test_demo.py"}
"""
if files:
# 处理请求参数files参数
# 支持文件传递${}关键字将使用data_handle进行处理
files = data_handle(obj=files, source=source)
# 将文件处理成绝对路径
return os.path.join(FILES_DIR, files)
@staticmethod
def wait_seconds_handle(wait_seconds):
"""
处理等待时间参数如果不能转为int类型则认为是none
"""
try:
return int(wait_seconds)
except:
return None
def before_request(self, request_data: dict, source_data: dict = None):
"""
针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值
"""
try:
logger.debug(f"\n======================================================\n" \
"-------------用例数据处理前--------------------\n"
f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \
f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \
f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \
f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \
f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \
f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
new_request_data = {
"id": request_data.get("id"),
"severity": request_data.get("severity"),
"title": request_data.get("title"),
"url": self.url_handle(url=request_data.get("url"), source=source_data),
"method": request_data.get("method"),
"headers": self.headers_handle(headers=request_data.get("headers"), source=source_data),
"cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data),
"request_type": request_data.get("request_type"),
"files": self.files_handle(files=request_data.get("files"), source=source_data),
"wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
"payload": data_handle(obj=request_data["payload"], source=source_data),
"assert_response": data_handle(obj=request_data.get("assert_response"), source=source_data),
"assert_sql": request_data.get("assert_sql"),
"extract": data_handle(obj=request_data.get("extract"), source=source_data),
"case_dependence": request_data.get("case_dependence"),
}
logger.debug("\n-------------用例数据处理后--------------------\n"
f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \
f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \
f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \
f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \
f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \
f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \
f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \
f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \
f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
f"响应断言(assert_response): {type(new_request_data.get('assert_response', None))} || {new_request_data.get('assert_response', None)}\n" \
f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
"=====================================================")
return new_request_data
except Exception as e:
logger.error(f"接口数据处理异常:{e}")
raise f"接口数据处理异常:\n{e}"
@classmethod
def api_step_record(cls, **kwargs) -> None:
"""
在allure/logger中记录请求数据
"""
key = kwargs.get("id")
title = kwargs.get("title")
url = kwargs.get("url")
method = kwargs.get("method")
headers = kwargs.get("headers")
cookies = kwargs.get("cookies")
request_type = kwargs.get("request_type")
payload = kwargs.get("payload")
files = kwargs.get("files")
wait_seconds = kwargs.get("wait_seconds")
status_code = kwargs.get("status_code")
response_result = kwargs.get("response_result")
response_time_seconds = kwargs.get("response_time_seconds")
response_time_millisecond = kwargs.get("response_time_millisecond")
_res = "\n" + "=" * 80 \
+ "\n-------------发送请求--------------------\n" \
f"ID: {key}\n" \
f"标题: {title}\n" \
f"请求URL: {url}\n" \
f"请求方式: {method}\n" \
f"请求头: {headers}\n" \
f"请求Cookies: {cookies}\n" \
f"请求关键字: {request_type}\n" \
f"请求参数: {payload}\n" \
f"请求文件: {files}\n" \
f"响应码: {status_code}\n" \
f"响应数据: {response_result}\n" \
f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms\n" \
+ "=" * 80
logger.info(_res)
allure_step(f"ID: {key}")
allure_step(f"标题: {title}")
allure_step(f"请求URL: {url}")
allure_step(f"请求方式: {method}")
allure_step(f"请求头: {headers}")
allure_step(f"请求Cookies: {cookies}")
allure_step(f"请求关键字: {request_type}")
allure_step(f"请求参数: {payload}")
allure_step(f"请求文件: {files}")
allure_step(f"请求后等待时间: {wait_seconds}")
allure_step(f"响应码: {status_code}")
allure_step(f"响应结果: {response_result}")
allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms")
def after_request(self, response: Response, api_data, db_info=None):
"""
请求结束后提取参数,目前支持从响应数据、数据库、用例数据中提取
:param response: Response 响应对象
:param api_data: 接口数据需要提取的参数字典 '{"k1": "$.data"}''{"k1": "data:(.*?)$"}'
:return:
"""
extract = api_data.get("extract")
logger.info(f"断言成功后需要进行提取操作extract={extract}")
case_results = {}
response_results = {}
database_results = {}
default_results = {}
for k, v in extract.items():
if k.lower() == "case":
logger.info(f"数据来源:{k}")
# 将用例数据作为来源
for _k, _v in v.items():
if _k.lower() == "type_jsonpath":
for i, j in _v.items():
case_results[i] = json_extractor(api_data, j)
elif _k.lower() == "type_re":
for i, j in _v.items():
case_results[i] = re_extract(str(api_data), j)
else:
logger.error(f"提取方式: {_k} 错误仅支持type_jsonpath、type_re两种")
logger.info(f"数据来源:{k} 提取结果:{case_results} --")
elif k.lower() == "database":
logger.info(f"数据来源:{k}")
# 将数据库SQL执行结果作为来源
if v.get("sql"):
mysql = MysqlServer(**db_info)
sql_result = mysql.query_all(v["sql"])
v.pop("sql")
else:
sql_result = None
logger.error(f"数据库提取参数必须传入sql")
if sql_result:
for _k, _v in v.items():
if _k.lower() == "type_jsonpath":
for i, j in _v.items():
database_results[i] = json_extractor(sql_result, j)
elif _k.lower() == "type_re":
for i, j in _v.items():
database_results[i] = re_extract(str(sql_result), j)
else:
logger.error(f"提取方式: {_k} 错误仅支持type_jsonpath、type_re两种")
logger.info(f"数据来源:{k} 提取结果:{database_results} --")
elif k.lower() == "response":
logger.info(f"数据来源:{k}")
# 来源=response
for _k, _v in v.items():
if _k.lower() == "type_jsonpath":
for i, j in _v.items():
response_results[i] = json_extractor(response.json(), j)
elif _k.lower() == "type_re":
for i, j in _v.items():
response_results[i] = re_extract(response.text, j)
elif _k.lower() == "type_response":
for i, j in _v.items():
response_results[i] = response_extract(response, j)
else:
logger.error(f"提取方式: {_k} 错误仅支持type_jsonpath、type_re、type_response三种")
logger.info(f"数据来源:{k} 提取结果:{response_results} --")
else:
logger.info(f"数据来源Response对象")
# 直接k=type_jsonpath, type_re, type_response, 来源默认是response
if k.lower() == "type_jsonpath":
for i, j in v.items():
default_results[i] = json_extractor(response.json(), j)
elif k.lower() == "type_re":
for i, j in v.items():
default_results[i] = re_extract(response.text, j)
elif k.lower() == "type_response":
for i, j in v.items():
default_results[i] = response_extract(response, j)
else:
logger.error(
f"数据来源默认是Response对象 提取方式: {k} 错误仅支持type_jsonpath、type_re、type_response三种")
logger.info(f"数据来源Response对象 提取结果:{default_results}")
return {**case_results, **response_results, **database_results, **default_results}
# -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 --------------
def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None,
key: str = None, db_info: dict = None):
"""
发送请求并进行后置参数提取操作。lll
request_data参数 与 api_file_pathkey 这两个必须传递其中一个
:param request_data: 请求数据字典,包含请求所需的所有信息。
:param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${}
:param api_file_path: 接口所在的目录或者文件路径
:param key: 接口的ID
:param db_info: 数据库连接信息,用于数据库断言/数据库提取数据时连接数据库
:return: 接口请求数据以及从接口响应提取的参数,字典。
:raises ValueError: 如果请求数据无效或缺失。
"""
# 初始化一个变量保存接口请求参数payload以及通过extract提取的参数
save_api_data = {}
if request_data:
api_info = request_data
elif api_file_path and key:
api_info = self.get_api_data(api_file_path=api_file_path, key=key)
else:
logger.error("请求数据异常")
raise ValueError("请求数据异常")
new_api_data = self.before_request(request_data=api_info, source_data=global_var)
response = self.send_request(new_api_data)
new_api_data["status_code"] = response.status_code
new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2)
new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2)
try:
new_api_data["response_result"] = response.json()
except:
new_api_data["response_result"] = response.text()
self.api_step_record(**new_api_data)
# 进行响应断言
AssertHandle(assert_data=new_api_data["assert_response"], response=response).assert_handle()
# 进行响应参数提取,并保存提取后的数据
if new_api_data.get("extract"):
extract_results = self.after_request(response=response, api_data=new_api_data, db_info=db_info)
save_api_data.update(extract_results)
# 将接口请求参数payload的值保存到save_api_data中
save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {})
logger.debug(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
allure_step(f"接口请求完成后接口请求数据payload响应数据 & 提取数据 save_api_data={save_api_data}")
return save_api_data