diff --git a/request_control.py b/request_control.py new file mode 100644 index 0000000..e445772 --- /dev/null +++ b/request_control.py @@ -0,0 +1,438 @@ +# -*- coding: utf-8 -*- +# @Time : 2024/7/1 9:42 +# @Author : floraachy +# @File : request_control.py +# @Software: PyCharm +# @Desc: + +# 标准库导入 +import json +import os +import http.cookiejar +import time +# 第三方库导入 +import requests +from requests import Response, utils +from loguru import logger +# 本地应用/模块导入 +from settings import FILES_DIR +from custom_utils.requests_utils.base_request import BaseRequest +from custom_utils.data_utils.data_handle import data_handle +from custom_utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract +from common_utils.files_utils.files_handle import get_files, load_yaml_file +from custom_utils.assertion_utils.assert_control import AssertHandle +from custom_utils.report_utils.allure_handle import allure_step +from common_utils.database_utils.mysql_handle import MysqlServer + + +class RequestControl(BaseRequest): + """ + 进行请求,请求后的参数提取处理 + """ + + # --------------------从接口池中获取接口请求数据-------------------- + @staticmethod + def get_api_data(api_file_path: str, key: str = None): + """ + 根据指定的yaml文件路径,以及key值,获取对应的接口 + :param:api_file_path 接口yaml文件路径 + :param:key 对应接口的id + """ + api_data = [] + if os.path.isdir(api_file_path): + logger.trace(f"目标路径是一个目录:{api_file_path}") + api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml") + for api_file in api_files: + api_data.append(load_yaml_file(api_file)) + elif os.path.isfile(api_file_path): + logger.trace(f"目标路径是一个文件:{api_file_path}") + api_data.append(load_yaml_file(api_file_path)) + + else: + logger.error(f"目标路径错误,请检查!api_file_path={api_file_path}") + return None + + for api in api_data: + if api.get("teststeps"): + matching_api = next((item for item in api["teststeps"] if item["id"] == key), None) + if matching_api: + logger.debug("\n----------匹配到的api----------\n" + f"类型:{type(matching_api)}" + f"值:{matching_api}\n") + return matching_api + + # 在找不到匹配的情况下,返回一个默认值且记录一条错误日志 + logger.warning(f"未找到id为{key}的接口, 返回值是None") + raise Exception(f"未找到id为{key}的接口, 返回值是None") + + # ---------- 请求之前进行数据处理 --------------------------# + @staticmethod + def url_handle(url: str, source: dict = None): + """ + 用例数据中获取到的url(一般是不带host的,个别特殊的带有host,则不进行处理) + """ + # 检测url中是否存在需要替换的参数,如果存在则进行替换 + url = data_handle(obj=url, source=source) + # 进行url处理,最终得到full_url + host = source.get("host", "") + # 从用例数据中获取url,如果键url不存在,则返回空字符串 + # 如果url是以http开头的,则直接使用该url,不与host进行拼接 + if url.lower().startswith("http"): + full_url = url + else: + # 如果host以/结尾 并且 url以/开头 + if host.endswith("/") and url.startswith("/"): + full_url = host[0:len(host) - 1] + url + # 如果host以/结尾 并且 url不以/开头 + elif host.endswith("/") and (not url.startswith("/")): + full_url = host + url + elif (not host.endswith("/")) and url.startswith("/"): + # 如果host不以/结尾 且 url以/开头,则将host和url拼接起来,组成新的url + full_url = host + url + else: + # 如果host不以/结尾 且 url不以/开头,则将host和url拼接起来的时候增加/,组成新的url + full_url = host + "/" + url + return full_url + + @staticmethod + def cookies_handle(cookies, source: dict = None): + """ + requests模块中,cookies参数要求是Dict or CookieJar object + """ + if not cookies: + return None # 或者返回空字典 {} + + # 通过全局变量替换cookies,得到的是一个str类型 + cookies = data_handle(obj=cookies, source=source) + try: + cookies = json.loads(cookies) + except Exception as e: + cookies = cookies + if isinstance(cookies, dict) or isinstance(cookies, http.cookiejar.CookieJar): + return cookies + else: + logger.error( + f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}") + raise TypeError( + f"cookies参数要求是Dict or CookieJar object, 目前cookies类型是:{type(cookies)}, cookies值是:{cookies}") + + @staticmethod + def headers_handle(headers: dict = None, source: dict = None) -> dict: + """ + headers里面传Cookie,要求Cookie类型是str + + Args: + headers: 请求头字典,默认为空字典 + source: 数据源 + + Returns: + dict: 处理后的请求头字典 + """ + if headers is None: + headers = {} + + # 从用例数据中获取header,处理header + headers = data_handle(obj=headers, source=source) + + # 处理Cookie字段 + if headers.get("Cookie"): + cookies = headers["Cookie"] + if isinstance(cookies, dict): + headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies.items()]) + elif isinstance(cookies, http.cookiejar.CookieJar): + cookies_dict = utils.dict_from_cookiejar(cookies) + headers["Cookie"] = '; '.join([f"{key}={value}" for key, value in cookies_dict.items()]) + # str类型不需要处理,保持原样 + + return headers + + @staticmethod + def files_handle(files: str, source: dict = None): + """ + 格式:接口中文件参数的名称:"文件路径地址" + 例如:{"file": "demo_test_demo.py"} + """ + if not files: + return None # 或者返回空字典 {} + # 处理请求参数files参数 + # 支持文件传递${}关键字,将使用data_handle进行处理 + files = data_handle(obj=files, source=source) + # 将文件处理成绝对路径 + return os.path.join(FILES_DIR, files) + + @staticmethod + def wait_seconds_handle(wait_seconds): + """ + 处理等待时间参数,如果不能转为int类型,则认为是none + """ + if wait_seconds is None: + return None + + try: + return int(wait_seconds) + except TypeError as e: + # 处理类型错误(如传入对象不支持int转换) + logger.debug(f"等待时间参数类型错误: {type(wait_seconds)} -> {e}") + return None + except ValueError as e: + # 处理值错误(如传入"abc"字符串) + logger.debug(f"等待时间参数值错误: {wait_seconds} -> {e}") + return None + + def before_request(self, request_data: dict, source_data: dict = None): + """ + 针请求前,对接口数据进行处理,识别用例数据中的关键字${xxxx},使用全局变量进行替换或者执行关键字中的方法替换为具体值 + """ + try: + logger.trace(f"\n======================================================\n" \ + "-------------用例数据处理前--------------------\n" + f"用例ID: {type(request_data.get('id', None))} || {request_data.get('id', None)}\n" \ + f"用例优先级(severity): {type(request_data.get('severity', None))} || {request_data.get('severity', None)}\n" \ + f"用例标题(title): {type(request_data.get('title', None))} || {request_data.get('title', None)}\n" \ + f"请求路径(url): {type(request_data.get('url', None))} || {request_data.get('url', None)}\n" \ + f"请求方式(method): {type(request_data.get('method', None))} || {request_data.get('method', None)}\n" \ + f"请求头(headers): {type(request_data.get('headers', None))} || {request_data.get('headers', None)}\n" \ + f"请求cookies: {type(request_data.get('cookies', None))} || {request_data.get('cookies', None)}\n" \ + f"请求类型(request_type): {type(request_data.get('request_type', None))} || {request_data.get('request_type', None)}\n" \ + f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \ + f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \ + f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \ + f"响应断言(validate): {type(request_data.get('validate', None))} || {request_data.get('validate', None)}\n" \ + f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \ + f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \ + f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n") + + new_request_data = { + "id": request_data.get("id"), + "severity": request_data.get("severity"), + "title": request_data.get("title"), + "url": self.url_handle(url=request_data.get("url"), source=source_data), + "method": request_data.get("method"), + "headers": self.headers_handle(headers=request_data.get("headers"), source=source_data), + "cookies": self.cookies_handle(cookies=request_data.get("cookies"), source=source_data), + "request_type": request_data.get("request_type"), + "files": self.files_handle(files=request_data.get("files"), source=source_data), + "wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")), + "payload": data_handle(obj=request_data["payload"], source=source_data), + "validate": data_handle(obj=request_data.get("validate"), source=source_data), + "assert_sql": request_data.get("assert_sql"), + "extract": data_handle(obj=request_data.get("extract"), source=source_data), + "case_dependence": request_data.get("case_dependence") + } + + logger.trace("\n-------------用例数据处理后--------------------\n" + f"用例ID: {type(new_request_data.get('id', None))} || {new_request_data.get('id', None)}\n" \ + f"用例优先级(severity): {type(new_request_data.get('severity', None))} || {new_request_data.get('severity', None)}\n" \ + f"用例标题(title): {type(new_request_data.get('title', None))} || {new_request_data.get('title', None)}\n" \ + f"请求路径(url): {type(new_request_data.get('url', None))} || {new_request_data.get('url', None)}\n" \ + f"请求方式(method): {type(new_request_data.get('method', None))} || {new_request_data.get('method', None)}\n" \ + f"请求头(headers): {type(new_request_data.get('headers', None))} || {new_request_data.get('headers', None)}\n" \ + f"请求cookies: {type(new_request_data.get('cookies', None))} || {new_request_data.get('cookies', None)}\n" \ + f"请求类型(request_type): {type(new_request_data.get('request_type', None))} || {new_request_data.get('request_type', None)}\n" \ + f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \ + f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \ + f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \ + f"响应断言(validate): {type(new_request_data.get('validate', None))} || {new_request_data.get('validate', None)}\n" \ + f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \ + f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \ + f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n" + "=====================================================") + logger.trace(new_request_data) + return new_request_data + except Exception as e: + logger.error(f"接口数据处理异常:{e}") + raise f"接口数据处理异常:\n{e}" + + @classmethod + def api_step_record(cls, **kwargs) -> None: + """ + 在allure/logger中记录请求数据 + """ + key = kwargs.get("id") + title = kwargs.get("title") + url = kwargs.get("url") + method = kwargs.get("method") + headers = kwargs.get("headers") + cookies = kwargs.get("cookies") + request_type = kwargs.get("request_type") + payload = kwargs.get("payload") + files = kwargs.get("files") + wait_seconds = kwargs.get("wait_seconds") + status_code = kwargs.get("status_code") + response_result = kwargs.get("response_result") + response_time_seconds = kwargs.get("response_time_seconds") + response_time_millisecond = kwargs.get("response_time_millisecond") + + _res = "\n" + "=" * 80 \ + + "\n-------------发送请求--------------------\n" \ + f"ID: {key}\n" \ + f"标题: {title}\n" \ + f"请求URL: {url}\n" \ + f"请求方式: {method}\n" \ + f"请求头: {headers}\n" \ + f"请求Cookies: {cookies}\n" \ + f"请求关键字: {request_type}\n" \ + f"请求参数: {payload}\n" \ + f"请求文件: {files}\n" \ + f"响应码: {status_code}\n" \ + f"响应数据: {response_result}\n" \ + f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms\n" \ + + "=" * 80 + logger.debug(_res) + allure_step(f"ID: {key}") + allure_step(f"标题: {title}") + allure_step(f"请求URL: {url}") + allure_step(f"请求方式: {method}") + allure_step(f"请求头: {headers}") + allure_step(f"请求Cookies: {cookies}") + allure_step(f"请求关键字: {request_type}") + allure_step(f"请求参数: {payload}") + allure_step(f"请求文件: {files}") + allure_step(f"请求后等待时间: {wait_seconds}") + allure_step(f"响应码: {status_code}") + allure_step(f"响应结果: {response_result}") + allure_step(f"响应耗时: {response_time_seconds} s || {response_time_millisecond} ms") + + def after_request(self, response: Response, api_data, db_info=None): + """ + 请求结束后提取参数,目前支持从响应数据、数据库、用例数据中提取 + :param response: Response 响应对象 + :param api_data: 接口数据需要提取的参数字典 '{"k1": "$.data"}' 或 '{"k1": "data:(.*?)$"}' + :return: + + """ + extract = api_data.get("extract") + if not extract: + logger.debug(f"断言成功后不需要进行提取操作,extract={extract}") + return None + + logger.debug(f"断言成功后需要进行提取操作,extract={extract}") + + case_results = {} + response_results = {} + database_results = {} + + # 封装三种提取方式 + def extract_data(source_data, patterns): + """ + 采用不同的提取方式对数据进行提取 + :param source_data: 数据来源:Response 响应对象, 用例数据Dict, 数据库查询结果Dict + :param patterns: 提取模式, {提取方式: {提取key: 提取表达式}}, ,例如:{'type_jsonpath: {'login': '$.login'}} + """ + results = {} + items = patterns.items() + for pattern_type, pattern_values in items: + if pattern_type == "type_jsonpath": + for key, expr in pattern_values.items(): + # 如果数据来源是response对象,需要处理成response.json() + if isinstance(source_data, requests.Response): + source_data = response.json() + results[key] = json_extractor(source_data, expr) + elif pattern_type == "type_re": + # 如果数据来源是response对象,需要处理成response.text + if isinstance(source_data, requests.Response): + source_data = response.text + for key, expr in pattern_values.items(): + results[key] = re_extract(str(source_data), expr) + elif pattern_type == "type_response": + for key, attr in pattern_values.items(): + results[key] = response_extract(source_data, attr) + else: + logger.error(f"不支持的提取方式: {pattern_type}") + return results + + for k, v in extract.items(): + """根据不同的数据来源,采取不同方式进行提取""" + if k in ["case"]: + case_results = extract_data(api_data, v) + elif k in ["response"]: + response_results = extract_data(response, v) + elif k in ["database"]: + if "sql" in v.keys(): + mysql = MysqlServer(**db_info) + sql_result = mysql.query_all(v["sql"]) + v.pop("sql") + database_results = extract_data(sql_result, v) + else: + logger.error(f"数据库提取参数必须传入sql") + else: # 兼容以前的写法 + # 通过update方法合并dict + response_results.update(extract_data(response, {k: v})) + + result = {**case_results, **response_results, **database_results} + logger.debug(f"--用例提取结果 --> {case_results} --") + logger.debug(f"--响应提取结果 --> {response_results} --") + logger.debug(f"--数据库提取结果 --> {database_results} --") + + return result + + # -----接口请求流程:获取接口数据 -> 处理接口请求数据 -> 请求接口 -> 接口断言 -> 接口数据提取 -------------- + def api_request_flow(self, request_data: dict = None, global_var: dict = None, api_file_path: str = None, + key: str = None, db_info: dict = None): + """ + 发送请求并进行后置参数提取操作。lll + request_data参数 与 api_file_path,key, 这两个必须传递其中一个 + + :param request_data: 请求数据字典,包含请求所需的所有信息。 + :param global_var: 包含全局变量的字典,这些变量用于替换到请求数据的关键字:${} + :param api_file_path: 接口所在的目录或者文件路径 + :param key: 接口的ID + :param db_info: 数据库连接信息,用于数据库断言/数据库提取数据时连接数据库 + :return: 接口请求数据以及从接口响应提取的参数,字典。 + :raises ValueError: 如果请求数据无效或缺失。 + """ + # 初始化一个变量,保存接口请求参数payload以及通过extract提取的参数 + save_api_data = {} + + if request_data: + api_info = request_data + + elif api_file_path and key: + api_info = self.get_api_data(api_file_path=api_file_path, key=key) + else: + logger.error("请求数据异常") + raise ValueError("请求数据异常") + + new_api_data = self.before_request(request_data=api_info, source_data=global_var) + + response = self.send_request(new_api_data) + + # 根据配置,增加接口请求等待时间。适应部分请求调用后,需要进行内置数据处理的问题 + logger.trace(f"开始等待") + if new_api_data.get("wait_seconds"): + time.sleep(new_api_data["wait_seconds"]) + logger.trace(f"结束等待") + + new_api_data["status_code"] = response.status_code + new_api_data["response_time_seconds"] = round(response.elapsed.total_seconds(), 2) + new_api_data["response_time_millisecond"] = round(response.elapsed.total_seconds() * 1000, 2) + + try: + # 先检查响应内容是否可能为JSON + content_type = response.headers.get('content-type', '').lower() + if 'application/json' in content_type or response.text.strip().startswith(('{', '[')): + new_api_data["response_result"] = response.json() + else: + new_api_data["response_result"] = response.text + except json.JSONDecodeError as e: + logger.debug(f"JSON解析失败,使用文本格式: {e}") + new_api_data["response_result"] = response.text + except Exception as e: + logger.error(f"处理响应数据时发生意外错误: {e}") + new_api_data["response_result"] = f"Error: {str(e)}" + + self.api_step_record(**new_api_data) + # 进行响应断言 + AssertHandle(assert_data=new_api_data["validate"], response=response).assert_handle() + # todo 数据库断言 和 响应断言二选一 + + # 进行响应参数提取,并保存提取后的数据 + if new_api_data.get("extract"): + extract_results = self.after_request(response=response, api_data=new_api_data, db_info=db_info) + save_api_data.update(extract_results) + + # 将接口请求参数payload的值保存到save_api_data中 + save_api_data.update({"_payload": new_api_data["payload"]} if new_api_data.get("payload") else {}) + logger.trace(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}") + allure_step(f"接口请求完成后,接口请求数据payload,响应数据 & 提取数据 save_api_data={save_api_data}") + return save_api_data