From c99c8a8c82f95f1f8ee36b0b80fcf436218b793c Mon Sep 17 00:00:00 2001 From: floraachy <1622042529@qq.com> Date: Wed, 23 Aug 2023 14:58:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81postman.json=E8=BD=AC?= =?UTF-8?q?=E4=B8=BAYAML=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yaml_case_maker/postman_for_yaml.py | 247 ++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 case_utils/yaml_case_maker/postman_for_yaml.py diff --git a/case_utils/yaml_case_maker/postman_for_yaml.py b/case_utils/yaml_case_maker/postman_for_yaml.py new file mode 100644 index 0000000..a109c17 --- /dev/null +++ b/case_utils/yaml_case_maker/postman_for_yaml.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- +# @Time : 2023/8/22 14:13 +# @Author : chenyinhua +# @File : postman_for_yaml.py +# @Software: PyCharm +# @Desc: + +# 标准库导入 +import os +import re +import json +from typing import Dict +# 第三方库导入 +from ruamel import yaml + +""" +相比较于pyyaml, Ruamel可以保持YAML文件的结构和顺序不变。 +安装:pip install ruamel.yaml +""" + + +class PostmanForYaml: + """ + 将postman接口文档转为YAML格式用例 + """ + + def __init__(self, case_dir, postman_path): + """ + :param case_dir: 用例需要保存的目录 + :param postman_path: 需要读取的swagger文件的路径 + """ + self._data = self.get_postman_json(postman_path) + self.case_dir = case_dir + + def get_postman_json(self, postman_path): + """ + 获取 postman 中的 json 数据 + :param postman_path: 需要读取的swagger文件的路径 + :return: + """ + result = [] + try: + with open(postman_path, "r", encoding='utf-8') as f: + row_data = json.load(f) + + def _parse_api(content): + nonlocal row_data + nonlocal result + if isinstance(content, list): + for item in content: + _parse_api(content=item) + elif isinstance(content, dict): + if 'item' in content.keys(): + _parse_api(content=content['item']) + elif 'request' in content.keys(): + # 获取所有接口的相关数据 + yaml_data = { + "case_common": { + "allure_epic": self.get_allure_epic(row_data), + "allure_feature": self.get_allure_feature(content), + "allure_story": self.get_allure_story(content) + }, + "case_info": [ + { + "id": f"case_{self.get_case_id(self.get_url(content)).lower()}_01", + "title": self.get_title(content), + "run": False, + "url": self.get_url(content), + "severity": None, + "method": self.get_method(content), + "headers": self.get_headers(content), + "cookies": None, + "request_type": self.get_request_type_payload(content).get("request_type"), + "payload": self.get_request_type_payload(content).get("payload"), + "files": self.get_request_type_payload(content).get("files"), + "extract": None, + "assert_response": {'eq': {'http_code': 200}}, + "assert_sql": None + + } + ] + } + result.append({ + self.get_case_id(self.get_url(content)): yaml_data + }) + _parse_api(content=row_data) + return result + except FileNotFoundError: + raise FileNotFoundError("文件路径不存在,请重新输入") + + def get_allure_epic(self, content): + """ + 获取 yaml 用例中的 allure_epic + """ + _allure_epic = content['info']['name'] + return _allure_epic + + @classmethod + def get_allure_feature(cls, content): + """ + 获取 yaml 用例中的 allure_feature + 这里直接获取最下级的item.name,因为可以有多级item,不好判断 + """ + + _allure_feature = content['name'] + return str(_allure_feature) + + @classmethod + def get_allure_story(cls, content): + """ + 获取 yaml 用例中的 allure_story + 这里直接获取最下级的item.name,因为可以有多级item,不好判断 + """ + _allure_story = content['name'] + return _allure_story + + @classmethod + def get_case_id(cls, content): + """ + 获取 case_id, 是根据接口路径生成的 + """ + # 这里接收到的参数content其实是url + # 移除?后面拼接的值 + url_path = content.split("?")[0] + # 去除"http://"或"https://"部分 + if url_path.startswith(("http://", "https://")): + url_path = url_path.split("//", 1)[1] + # 使用正则表达式匹配并删除第一个斜线以及斜线前的内容 + new_url_path = re.sub(r'^.*?/', '', url_path) + # 将剩余的斜线替换成_ + _case_id = new_url_path.replace("/", "_") + return _case_id + + @classmethod + def get_title(cls, content): + """ + 获取接口的标题 + """ + _get_detail = content.get('name') + return "测试 " + _get_detail + + @classmethod + def get_url(cls, content): + """ + 获取接口的url + """ + request = content.get('request') + url = request.get('url') + url_raw = url.get('raw') if url else url + _url = url_raw.replace('{{', '${').replace('}}', '}') + # 使用正则表达式匹配":dept"内的内容并替换为${dept} + _get_url = re.sub(r':(\w+)', r'${\1}', _url) + return _get_url + + @classmethod + def get_method(cls, content): + """ + 获取接口的method + """ + request = content.get('request') + _get_method = request.get('method', 'GET').upper() + return _get_method + + @classmethod + def get_headers(cls, content): + """ + 获取请求头 + """ + _headers = {} + request = content.get('request') + if request: + _headers = request.get('header') + _headers = {item.get('key'): item.get('value') for item in _headers} if _headers else {} + auth = request.get('auth') + if auth: + auth_type = auth.get('type') + if auth.get(auth_type): + auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if + (item and item.get('key'))} + _headers.update(auth_value) + # 如果_headers是{}就返回None + return None if not _headers else _headers + + @classmethod + def get_request_type_payload(cls, content): + """ + 获取request_type, 并响应处理payload及file参数 + """ + api = { + "request_type": 'json', + "payload": {}, + "files": {"file": []} + } + request = content.get('request') + if request: + body = request.get('body') + if body: + # api接口请求参数类型 + request_mode = body.get('mode') + if request_mode in ['raw', 'formdata', 'urlencoded']: + api["request_type"] = 'json' if request_mode == 'raw' else 'data' + request_data = body.get(request_mode) + if request_data: + if request_mode == 'raw': + api["payload"].update( + json.loads(request_data.replace('\t', '').replace('\n', '').replace('\r', ''))) + elif request_mode in ['formdata', 'urlencoded']: + for item in request_data: + if item['type'] == "text": + api["payload"][item['key']] = item.get('value', '') + elif item['type'] == "file": + api["files"]["file"].append(item.get('src', '')) + api["request_type"] = "file" + else: + raise ValueError("不支持的请求参数类型") + api["payload"] = None if not api["payload"] else api["payload"] + api["files"] = None if not api["files"]["file"] else api["files"] + return api + + def yaml_cases(self, data: Dict, file_path: str) -> None: + """ + 写入 yaml 数据 + :param file_path: + :param data: 测试用例数据 + :return: + """ + # 检查目录不存在则创建, 存在则不创建 + os.makedirs(self.case_dir, exist_ok=True) + _file_name = file_path + '.yaml' + _file_path = os.path.join(self.case_dir, _file_name) + if _file_name in os.listdir(self.case_dir): + data.pop("case_common") + data = data["case_info"] + with open(_file_path, "a", encoding="utf-8") as file: + yaml.dump(data, file, Dumper=yaml.RoundTripDumper, allow_unicode=True) + file.write('\n') + + def write_yaml_handler(self): + # 获取所有接口的相关数据 + for case in self._data: + for k, v in case.items(): + self.yaml_cases(data=v, file_path=k) + + +if __name__ == '__main__': + PostmanForYaml(case_dir=r"C:\1_xinjinyuan_chy\1project\apiautotest\files\postman", + postman_path=r"C:\1_xinjinyuan_chy\1project\apiautotest\files\Gitlink.postman_collection.json").write_yaml_handler()