Files
apiautotest/case_utils/requests_utils/base_request.py

127 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# @Time : 2023/10/12 13:41
# @Author : chenyinhua
# @File : utils.py
# @Software: PyCharm
# @Desc:
# 标准库导入
import os
import time
# 第三方库导入
import allure
from loguru import logger
import requests # pip install requests
from requests_toolbelt import MultipartEncoder # pip install requests_toolbelt
class BaseRequest:
"""
Request操作封装
"""
TIMEOUT = 5
session = None
@classmethod
def get_session(cls):
"""
单例模式保证测试过程中使用的都是一个session对象
requests.session可以自动处理cookies做状态保持。
"""
if cls.session is None:
cls.session = requests.Session()
return cls.session
@classmethod
@allure.step("请求各项参数:{req_data}")
def send_request(cls, req_data):
"""
处理请求数据,转换成可用数据发送请求
:param req_data: 请求数据
:return: 响应对象
"""
try:
before = "\n" + "=" * 80 \
+ "\n-------------Start请求前--------------------\n" \
f"标题: {req_data.get('title', None)}\n" \
f"请求路径: {req_data.get('url', None)}\n" \
f"请求方式: {req_data.get('method', None)}\n" \
f"请求头: {req_data.get('headers', None)}\n" \
f"请求Cookies: {req_data.get('cookies', None)}\n" \
f"请求关键字: {req_data.get('request_type', None)}\n" \
f"请求内容: {req_data.get('payload', None)}\n" \
f"请求文件: {req_data.get('file', None)}\n" \
+ "=" * 80
logger.info(before)
response_result = cls.send_api_request(
url=req_data.get("url"),
method=req_data.get("method").lower(),
request_type=req_data.get("request_type", None),
header=req_data.get("headers", None),
payload=req_data.get("payload", None),
files=req_data.get("files", None),
cookies=req_data.get("cookies", None)
)
after = "\n" + "=" * 80 \
+ "\n-------------Start请求后--------------------\n" \
f"响应数据: {response_result.text}\n" \
f"响应码: {response_result.status_code}\n" \
f"响应耗时: {round(response_result.elapsed.total_seconds(), 2)} s || {round(response_result.elapsed.total_seconds() * 1000, 2)} ms\n" \
+ "=" * 80
logger.info(after)
return response_result
except requests.exceptions.RequestException as e:
logger.error(f"请求出错,{str(e)}")
raise ValueError(f"请求出错,{str(e)}")
@classmethod
def send_api_request(cls, url: str, method: str, request_type: str, header=None, payload=None,
files=None, cookies=None):
"""
发送请求
:param method: 请求方法
:param url: 请求url
:param request_type: 请求参数类型可选值为paramsjsondata
:param payload: 请求数据对于不同请求类型可以为dictMultipartEncoder等
:param files: 请求上传的文件
:param header: 请求头
:param cookies: 请求cookies
:return: 返回res对象
"""
headers = header or {}
session = cls.get_session()
if request_type:
if request_type.lower() == 'params':
res = session.request(method=method, url=url, params=payload, headers=headers, cookies=cookies,
timeout=cls.TIMEOUT)
return res
elif request_type.lower() == 'data':
res = session.request(method=method, url=url, data=payload, headers=headers, cookies=cookies,
timeout=cls.TIMEOUT)
return res
elif request_type.lower() == 'json':
res = session.request(method=method, url=url, json=payload, headers=headers, cookies=cookies,
timeout=cls.TIMEOUT)
return res
elif request_type.lower() == 'file':
if files:
file_name = os.path.basename(files)
encoder = MultipartEncoder(fields={"file": (file_name, open(files, "rb"))},
boundary='------------------------' + str(time.time()))
headers['Content-Type'] = encoder.content_type
response = session.request(method=method, url=url, data=encoder.to_string(), headers=headers,
cookies=cookies, timeout=cls.TIMEOUT)
return response
else:
logger.error("上传的文件不能为空")
else:
logger.error("request_type可选关键字为params, json, data, file")
raise ValueError('request_type可选关键字为params, json, data, file')
else:
logger.error("request_type参数不能为空")
raise ValueError('request_type参数不能为空')