diff --git a/.gitignore b/.gitignore
index 3182dfd..4b5c12d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,7 @@ outputs
Pipfile.lock
test_case/test_auto_case
test_scenario
-config/settings_local.py
\ No newline at end of file
+config/settings_local.py
+env/dev.yml
+env/osredm.yml
+env/test.yml
\ No newline at end of file
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..4af4903
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "common"]
+ path = common
+ url = https://gitlink.org.cn/floraachy/utils.git
diff --git a/common b/common
new file mode 160000
index 0000000..e83e25a
--- /dev/null
+++ b/common
@@ -0,0 +1 @@
+Subproject commit e83e25a98248827db52bc399c0895bdb17944548
diff --git a/env/demo.yaml b/env/demo.yaml
new file mode 100644
index 0000000..d234346
--- /dev/null
+++ b/env/demo.yaml
@@ -0,0 +1,15 @@
+host: h
+wiki_host:
+pms_host:
+remote_url:
+remote_username:
+remote_password:
+green_code:
+env_login:
+env_password:
+env_nickname:
+env_user_id:
+env_super_login:
+env_super_password:
+env_super_nickname:
+env_super_user_id:
\ No newline at end of file
diff --git a/run.py b/run.py
index d482c3a..0312aac 100644
--- a/run.py
+++ b/run.py
@@ -40,12 +40,12 @@ import pytest
from loguru import logger
import click
# 本地应用/模块导入
+from common.files_utils.files_handle import load_yaml_file
from utils.case_generate_utils.case_fun_generate import generate_cases
from utils.report_utils.send_result_handle import send_result
-from config.path_config import REPORT_DIR, LOG_DIR, CONF_DIR, ALLURE_RESULTS_DIR, ALLURE_HTML_DIR, AUTO_CASE_DIR
-from config.settings import LOG_LEVEL, RunConfig, ENV_VARS
-from config.global_vars import GLOBAL_VARS
-from utils.logger_utils.loguru_log import capture_logs
+from settings import REPORT_DIR, LOG_DIR, ENV_DIR, ALLURE_RESULTS_DIR, ALLURE_HTML_DIR, AUTO_CASE_DIR, ALLURE_CONFIG_DIR
+from settings import LOG_LEVEL, GLOBAL_VARS, REPORT, RERUN, RERUN_DELAY, MAX_FAIL
+from common.logger_utils.loguru_log import capture_logs
from utils.report_utils.allure_handle import generate_allure_report
from utils.report_utils.push_allure_report import push_allure_report
@@ -72,10 +72,9 @@ def run(env, m, report):
# ------------------------ 处理一下获取到的参数----------------------------
# # 根据指定的环境参数,将运行环境所需相关配置数据保存到GLOBAL_VARS
- ENV_VARS["common"]["env"] = ENV_VARS[env]["host"]
- GLOBAL_VARS.update(ENV_VARS["common"])
- GLOBAL_VARS.update(ENV_VARS[env])
- logger.info(f"启动后初始化的全局变量:{GLOBAL_VARS}")
+ env_file = os.path.join(ENV_DIR, f"{env}.yml")
+ __env = load_yaml_file(env_file)
+ GLOBAL_VARS.update(__env)
# ------------------------ 自动生成测试用例 ------------------------
# 删除原有的测试用例,以便生成新的测试用例
if os.path.exists(AUTO_CASE_DIR):
@@ -85,8 +84,8 @@ def run(env, m, report):
generate_cases()
# ------------------------ 设置pytest相关参数 ------------------------
- arg_list = [f"--maxfail={RunConfig.max_fail}", f"--reruns={RunConfig.rerun}",
- f"--reruns-delay={RunConfig.reruns_delay}", f'--alluredir={ALLURE_RESULTS_DIR}',
+ arg_list = [f"--maxfail={MAX_FAIL}", f"--reruns={RERUN}",
+ f"--reruns-delay={RERUN_DELAY}", f'--alluredir={ALLURE_RESULTS_DIR}',
'--clean-alluredir']
if m:
arg_list.append(f"-m {m}")
@@ -97,20 +96,20 @@ def run(env, m, report):
if report == "yes":
report_path, attachment_path = generate_allure_report(allure_results=ALLURE_RESULTS_DIR,
allure_report=ALLURE_HTML_DIR,
- windows_title=ENV_VARS["common"]["项目名称"],
- report_name=ENV_VARS["common"]["报告标题"],
+ windows_title=REPORT["项目名称"],
+ report_name=REPORT["报告标题"],
env_info={
"运行环境": GLOBAL_VARS.get("host", None)},
- allure_config_path=os.path.join(CONF_DIR,
- "allure_config"),
+ allure_config_path=ALLURE_CONFIG_DIR,
attachment_path=os.path.join(REPORT_DIR,
f'autotest_report.zip'))
# ------------------------ 发送测试结果 ------------------------
- send_result(report_info=ENV_VARS["common"], report_path=report_path, attachment_path=attachment_path)
+ send_result(report_info=REPORT, report_path=report_path, attachment_path=attachment_path)
+
# 推送allure报告到个人建站仓库
- push_allure_report(allure_report_dir=ALLURE_HTML_DIR, remote_url=GLOBAL_VARS["remote_url"],
- username=GLOBAL_VARS["remote_username"], password=GLOBAL_VARS["remote_password"])
+ # push_allure_report(allure_report_dir=ALLURE_HTML_DIR, remote_url=GLOBAL_VARS["remote_url"],
+ # username=GLOBAL_VARS["remote_username"], password=GLOBAL_VARS["remote_password"])
except Exception as e:
raise e
diff --git a/settings.py b/settings.py
new file mode 100644
index 0000000..b2ce840
--- /dev/null
+++ b/settings.py
@@ -0,0 +1,204 @@
+# -*- coding: utf-8 -*-
+# @Version: Python 3.9
+# @Time : 2023/1/9 17:08
+# @Author : chenyinhua
+# @File : settings.py
+# @Software: PyCharm
+# @Desc: 项目配置文件
+
+import os
+
+# 定义一个全局变量,用于存储运行过程中相关数据
+GLOBAL_VARS = {}
+
+# 定义一个变量。存储自定义的标记markers
+CUSTOM_MARKERS = []
+
+REPORT = {
+ "报告标题": "API 自动化测试报告",
+ "项目名称": "GitLink 确实开源",
+ "tester": "陈银花",
+ "department": "开源中心",
+ "env": ""
+}
+# ------------------------------------ pytest相关配置 ----------------------------------------------------#
+# 失败重跑次数
+RERUN = 0
+
+# 失败重跑间隔时间
+RERUN_DELAY = 5
+
+# 当达到最大失败数,停止执行
+MAX_FAIL = "100"
+
+# ------------------------------------ 配置信息 ----------------------------------------------------#
+# 1 代表 yaml文件,其他数值将不自动生成用例,仅能执行手动编写的用例
+CASE_FILE_TYPE = 1
+
+# 0表示默认不发送任何通知, 1 代表钉钉通知,2 代表企业微信通知, 3 代表邮件通知, 4 代表所有途径都发送通知
+SEND_RESULT_TYPE = 0
+
+# 指定日志收集级别
+LOG_LEVEL = "DEBUG" # 可选值:TRACE DEBUG INFO SUCCESS WARNING ERROR CRITICAL
+"""
+支持的日志级别:
+ TRACE: 最低级别的日志级别,用于详细追踪程序的执行。
+ DEBUG: 用于调试和开发过程中打印详细的调试信息。
+ INFO: 提供程序执行过程中的关键信息。
+ SUCCESS: 用于标记成功或重要的里程碑事件。
+ WARNING: 表示潜在的问题或不符合预期的情况,但不会导致程序失败。
+ ERROR: 表示错误和异常情况,但程序仍然可以继续运行。
+ CRITICAL: 表示严重的错误和异常情况,可能导致程序崩溃或无法正常运行。
+"""
+
+# ------------------------------------ 邮件配置信息 ----------------------------------------------------#
+
+# 发送邮件的相关配置信息
+email = {
+ "user": "****email-user****", # 发件人邮箱
+ "password": "****email-password****", # 发件人邮箱授权码
+ "host": "smtp.qq.com",
+ "to": ["****email-user-1****", "****email-user-2****"] # 收件人邮箱
+}
+
+# ------------------------------------ 邮件通知内容 ----------------------------------------------------#
+email_subject = f"UI自动化报告"
+email_content = """
+ 各位同事, 大家好:
+
+ 自动化用例于 ${start_time} 开始运行,运行时长:${run_time} s, 目前已执行完成。
+ ---------------------------------------------------------------------------------------------------------------
+ 测试人: ${tester}
+ 所属部门: ${department}
+ 项目环境: ${env}
+ ---------------------------------------------------------------------------------------------------------------
+ 执行结果如下:
+ 用例运行总数: ${total} 个
+ 通过用例个数(passed): ${passed} 个
+ 失败用例个数(failed): ${failed} 个
+ 异常用例个数(error): ${broken} 个
+ 跳过用例个数(skipped): ${skipped} 个
+ 失败重试用例个数 * 次数之和(rerun): ${rerun} 个
+ 成 功 率: ${pass_rate} %
+
+ **********************************
+ 附件为具体的测试报告,详细情况可下载附件查看, 非相关负责人员可忽略此消息。谢谢。
+ """
+# ------------------------------------ 钉钉相关配置 ----------------------------------------------------#
+ding_talk = {
+ "webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=********",
+ "secret": "****ding****"
+}
+
+# ------------------------------------ 钉钉通知内容 ----------------------------------------------------#
+ding_talk_title = f"UI自动化报告"
+ding_talk_content = """
+ 各位同事, 大家好:
+
+ ### 自动化用例于 ${start_time} 开始运行,运行时长:${run_time} s, 目前已执行完成。
+ ---------------------------------------------------------------------------------------------------------------
+ #### 测试人: ${tester}
+ #### 所属部门: ${department}
+ #### 项目环境: ${env}
+ ---------------------------------------------------------------------------------------------------------------
+ #### 执行结果如下:
+ - 用例运行总数: ${total} 个
+ - 通过用例个数(passed): ${passed} 个
+ - 失败用例个数(failed): ${failed} 个
+ - 异常用例个数(error): ${broken} 个
+ - 跳过用例个数(skipped): ${skipped} 个
+ - 失败重试用例个数 * 次数之和(rerun): ${rerun} 个
+ - 成 功 率: ${pass_rate} %
+
+ **********************************
+ 附件为具体的测试报告,详细情况可下载附件查看, 非相关负责人员可忽略此消息。谢谢。
+ """
+# ------------------------------------ 企业微信相关配置 ----------------------------------------------------#
+wechat = {
+ "webhook_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=********",
+}
+# ------------------------------------ 企业微信通知内容 ----------------------------------------------------#
+wechat_content = """
+ 各位同事, 大家好:
+
+ ### 自动化用例于 ${start_time} 开始运行,运行时长:${run_time} s, 目前已执行完成。
+ --------------------------------
+ #### 测试人: ${tester}
+ #### 所属部门: ${department}
+ #### 项目环境: ${env}
+ --------------------------------
+ #### 执行结果如下:
+ - 用例运行总数: ${total} 个
+ - 通过用例个数(passed): ${passed} 个
+ - 失败用例个数(failed): ${failed} 个
+ - 异常用例个数(error): ${broken} 个
+ - 跳过用例个数(skipped): ${skipped} 个
+ - 失败重试用例个数 * 次数之和(rerun): ${rerun} 个
+ - 成 功 率: ${pass_rate} %
+
+ **********************************
+ 附件为具体的测试报告,详细情况可下载附件查看, 非相关负责人员可忽略此消息。谢谢。
+ """
+
+# ------------------------------------ 项目路径 ----------------------------------------------------#
+# 项目根目录
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+
+# 通用模块目录
+UTILS_DIR = os.path.join(BASE_DIR, "utils")
+
+# 配置文件目录
+ENV_DIR = os.path.join(BASE_DIR, "env")
+
+# 接口数据模块目录
+INTERFACE_DIR = os.path.join(BASE_DIR, "interface")
+
+# gitlink测试数据模块目录
+GITLINK_DIR = os.path.join(INTERFACE_DIR, "gitlink")
+
+# osredm测试数据模块目录
+OSREDM_DIR = os.path.join(INTERFACE_DIR, "osredm")
+
+# glcc测试数据模块目录
+GLCC_DIR = os.path.join(INTERFACE_DIR, "glcc")
+
+# 测试文件模块目录
+FILES_DIR = os.path.join(BASE_DIR, "files")
+
+# 第三方库目录
+LIB_DIR = os.path.join(BASE_DIR, "lib")
+
+# 日志/报告保存目录
+OUT_DIR = os.path.join(BASE_DIR, "outputs")
+if not os.path.exists(OUT_DIR):
+ os.mkdir(OUT_DIR)
+
+# 报告保存目录
+REPORT_DIR = os.path.join(OUT_DIR, "report")
+if not os.path.exists(REPORT_DIR):
+ os.mkdir(REPORT_DIR)
+
+# 报日志保存目录
+LOG_DIR = os.path.join(OUT_DIR, "log")
+if not os.path.exists(LOG_DIR):
+ os.mkdir(LOG_DIR)
+
+# 测试用例模块
+CASE_DIR = os.path.join(BASE_DIR, "test_case")
+
+# 手动生成测试用例模块
+MANUAL_CASE_DIR = os.path.join(CASE_DIR, "test_manual_case")
+if not os.path.exists(MANUAL_CASE_DIR):
+ os.mkdir(MANUAL_CASE_DIR)
+
+# 自动生成测试用例模块
+AUTO_CASE_DIR = os.path.join(CASE_DIR, "test_auto_case")
+
+# Allure报告,测试结果集目录
+ALLURE_RESULTS_DIR = os.path.join(REPORT_DIR, "allure_results")
+
+# Allure报告,HTML测试报告目录
+ALLURE_HTML_DIR = os.path.join(REPORT_DIR, "allure_html")
+
+# Allure报告,配置文件目录
+ALLURE_CONFIG_DIR = os.path.join(LIB_DIR, "allure_config")
diff --git a/utils/assertion_utils/assert_control.py b/utils/assertion_utils/assert_control.py
index 5c2e36b..af74f27 100644
--- a/utils/assertion_utils/assert_control.py
+++ b/utils/assertion_utils/assert_control.py
@@ -16,7 +16,7 @@ from loguru import logger
from utils.models import AssertMethod
from utils.assertion_utils import assert_function
from utils.data_utils.extract_data_handle import json_extractor, re_extract
-from utils.database_utils.mysql_handle import MysqlServer
+from common.database_utils.mysql_handle import MysqlServer
class AssertUtils:
diff --git a/utils/case_generate_utils/case_data_analysis.py b/utils/case_generate_utils/case_data_analysis.py
index a2e530a..2f39a58 100644
--- a/utils/case_generate_utils/case_data_analysis.py
+++ b/utils/case_generate_utils/case_data_analysis.py
@@ -93,7 +93,7 @@ class CaseDataCheck:
case_list = []
for key, values in cases.items():
# 公共配置中的数据,与用例数据不同,需要单独处理
- if key == 'case_info':
+ if key == 'teststeps':
for value in values:
self.case_data = value
self.case_id = value.get("id")
@@ -113,9 +113,8 @@ class CaseDataCheck:
'request_type': self.get_request_type,
'payload': self.case_data.get(TestCaseEnum.PAYLOAD.value[0]),
'files': self.case_data.get(TestCaseEnum.FILES.value[0]),
- "wait_seconds": self.case_data.get(TestCaseEnum.WAIT_SECONDS.value[0]),
- "assert_response": self.case_data.get(TestCaseEnum.ASSERT_RESPONSE.value[0]),
- "assert_sql": self.case_data.get(TestCaseEnum.ASSERT_SQL.value[0]),
+ "think_time": self.case_data.get(TestCaseEnum.THINK_TIME.value[0]),
+ "validate": self.case_data.get(TestCaseEnum.VALIDATE.value[0]),
'extract': self.case_data.get(TestCaseEnum.EXTRACT.value[0]),
"case_dependence": self.case_data.get(TestCaseEnum.CASE_DEPENDENCE.value[0])
}
diff --git a/utils/case_generate_utils/case_fun_generate.py b/utils/case_generate_utils/case_fun_generate.py
index 0ab73c4..ee86a7c 100644
--- a/utils/case_generate_utils/case_fun_generate.py
+++ b/utils/case_generate_utils/case_fun_generate.py
@@ -10,16 +10,10 @@ from string import Template
import datetime
import re
# 第三方库导入
-from xpinyin import Pinyin # 纯 Python 编写的中文字符串拼音转换模块,不需要依赖外部程序和词库。
from loguru import logger
# 本地应用/模块导入
-from utils.files_utils.excel_handle import ExcelHandle
-from utils.files_utils.yaml_handle import YamlHandle
-from utils.files_utils.files_handle import get_files, get_relative_path
-from utils.models import CaseFileType
-from config.settings import CASE_FILE_TYPE
-from config.path_config import INTERFACE_DIR, CASE_TEMPLATE_DIR, AUTO_CASE_DIR
-from config.global_vars import CUSTOM_MARKERS
+from common.files_utils.files_handle import load_yaml_file, get_files, get_relative_path
+from settings import CASE_FILE_TYPE, CUSTOM_MARKERS, AUTO_CASE_DIR, INTERFACE_DIR
from utils.case_generate_utils.case_data_analysis import CaseDataCheck
"""
@@ -29,105 +23,71 @@ from utils.case_generate_utils.case_data_analysis import CaseDataCheck
3. 确认符合规范后,获取所有用例数据,自动生成测试用例方法(PY)
"""
-
-def handle_excel_data(file):
- """
- 读取excel用例数据生成测试用例
- """
- if os.path.isfile(file):
- # 读取excel文件中的用例数据,存储到data中
- data = ExcelHandle(file).read()
- for index, v in enumerate(data):
- excel_case = {}
- """
- 将excel读取到的用例数据,适配allure格式
- 表单名称以短横线隔开的情况下,左边部分作为allure_epic, 右边部分作为allure_feature以及allure_story。
- 否则,表单名称全部作为allure_epic,allure_feature,allure_story
- """
- if "-" in v["sheet_name"]:
- excel_case["case_common"] = {
- "allure_epic": v["sheet_name"].split("-")[0],
- "allure_feature": v["sheet_name"].split("-")[1],
- "allure_story": v["sheet_name"].split("-")[1]
- }
- else:
- excel_case["case_common"] = {
- "allure_epic": v["sheet_name"],
- "allure_feature": v["sheet_name"],
- "allure_story": v["sheet_name"]
- }
- # 处理excel用例数据
- excel_case["case_info"] = v["data"]
- # 检查用例数据是否符合规范
- tested_case = CaseDataCheck().case_process(excel_case)
- # 生成测试方法
- """
- 由于excel涉及到多个表单,每一个表单都会生成一个测试方法。因此会将表单名称的首字母拼接到测试方法上。
- excel名称:test_demo
- 例如表单名称:"GitLink-登录模块" 或 "登录模块",都是取关键字"登录模块"首字母
- 测试文件:test_demo_dl.py
- 测试方法名称: TestDemoDl.test_demo_dl
- """
- pin_yin = Pinyin()
- _name = pin_yin.get_initials(excel_case["case_common"]["allure_feature"], "").lower()
- if os.path.samefile(INTERFACE_DIR, os.path.dirname(file)):
- # 用例文件的直接父级目录是INTERFACE_DIR,则直接在AUTO_CASE_DIR下生成测试用例方法
- gen_case_file(
- filename=os.path.splitext(os.path.basename(file))[0] + "_" + _name,
- case_template_path=CASE_TEMPLATE_DIR,
- case_common=excel_case["case_common"],
- common_dependence=None,
- case_data=tested_case,
- target_case_path=AUTO_CASE_DIR
- )
- else:
- # 用例文件的直接父级目录不是INTERFACE_DIR, 则保留其直接父级目录,再在AUTO_CASE_DIR下生成测试用例方法
- gen_case_file(
- filename=os.path.splitext(os.path.basename(file))[0] + "_" + _name,
- case_template_path=CASE_TEMPLATE_DIR,
- case_common=excel_case["case_common"],
- common_dependence=None,
- case_data=tested_case,
- target_case_path=os.path.join(AUTO_CASE_DIR,
- get_relative_path(file_path=file, directory_path=INTERFACE_DIR))
- )
- return True
- else:
- logger.error(f"{file}不是一个正确的文件路径!")
- return False
+CASE_TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "case_template.txt")
+CONFTEST_TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "conftest_template.txt")
-def handle_yaml_data(file):
+def __load_yaml_data(file):
"""
读取yaml用例数据生成测试用例
"""
if os.path.isfile(file):
# 读取yaml/yml文件中的用例数据,存储到data中
- yaml_data = YamlHandle(file).read_yaml
- # 检查用例数据是否符合规范
- tested_case = CaseDataCheck().case_process(yaml_data)
+ yaml_data = load_yaml_file(file)
+ logger.debug(f"需要处理的文件:{file}")
if os.path.samefile(INTERFACE_DIR, os.path.dirname(file)):
- gen_case_file(
- # 用例文件的直接父级目录是INTERFACE_DIR,则直接在AUTO_CASE_DIR下生成测试用例方法
- filename=os.path.splitext(os.path.basename(file))[0],
- case_template_path=CASE_TEMPLATE_DIR,
- case_common=yaml_data["case_common"],
- common_dependence=yaml_data.get("common_dependence", None),
- case_data=tested_case,
- target_case_path=AUTO_CASE_DIR
- )
+ """# os.path.samefile是 Python 中 os.path 模块提供的一个函数,用于检查两个文件路径是否指向同一个文件。"""
+ if os.path.basename(file) == "init_data.yaml" or os.path.basename(file) == "init_data.yml":
+ """识别到init_data.yaml或者init_data.yml文件,自动生成conftest.py文件"""
+ os.makedirs(os.path.dirname(file), exist_ok=True)
+ logger.debug(f"识别到init_data.yaml或者init_data.yml文件,自动生成conftest.py文件")
+ generate_conftest_file(
+ template_path=CONFTEST_TEMPLATE_DIR,
+ init_data=yaml_data,
+ target_path=AUTO_CASE_DIR
+ )
+ elif os.path.basename(file).startswith("test"):
+ # 检查用例数据是否符合规范
+ tested_case = CaseDataCheck().case_process(yaml_data)
+ gen_case_file(
+ # 用例文件的直接父级目录是INTERFACE_DIR,则直接在AUTO_CASE_DIR下生成测试用例方法
+ filename=os.path.splitext(os.path.basename(file))[0],
+ case_template_path=CASE_TEMPLATE_DIR,
+ config=yaml_data["config"],
+ common_dependence=yaml_data.get("common_dependence", None),
+ case_data=tested_case,
+ target_case_path=AUTO_CASE_DIR
+ )
+ else:
+ logger.error(f"{file}不是以init_data或者test开头的文件")
else:
# 用例文件的直接父级目录不是INTERFACE_DIR, 则保留其直接父级目录,再在AUTO_CASE_DIR下生成测试用例方法
- os.makedirs(os.path.dirname(file), exist_ok=True)
- gen_case_file(
- filename=os.path.splitext(os.path.basename(file))[0],
- case_template_path=CASE_TEMPLATE_DIR,
- case_common=yaml_data["case_common"],
- common_dependence=yaml_data.get("common_dependence", None),
- case_data=tested_case,
- target_case_path=os.path.join(AUTO_CASE_DIR,
- get_relative_path(file_path=file, directory_path=INTERFACE_DIR))
- )
+ if os.path.basename(file) == "init_data.yaml" or os.path.basename(file) == "init_data.yml":
+ """识别到init_data.yaml或者init_data.yml文件,自动生成conftest.py文件"""
+ os.makedirs(os.path.dirname(file), exist_ok=True)
+ logger.debug(f"识别到init_data.yaml或者init_data.yml文件,自动生成conftest.py文件")
+ generate_conftest_file(
+ template_path=CONFTEST_TEMPLATE_DIR,
+ init_data=yaml_data,
+ target_path=os.path.join(AUTO_CASE_DIR,
+ get_relative_path(file_path=file, directory_path=INTERFACE_DIR))
+ )
+
+ elif os.path.basename(file).startswith("test"):
+ # 检查用例数据是否符合规范
+ tested_case = CaseDataCheck().case_process(yaml_data)
+ os.makedirs(os.path.dirname(file), exist_ok=True)
+ gen_case_file(
+ filename=os.path.splitext(os.path.basename(file))[0],
+ case_template_path=CASE_TEMPLATE_DIR,
+ config=yaml_data["config"],
+ common_dependence=yaml_data.get("common_dependence", None),
+ case_data=tested_case,
+ target_case_path=os.path.join(AUTO_CASE_DIR,
+ get_relative_path(file_path=file, directory_path=INTERFACE_DIR))
+ )
+ else:
+ logger.error(f"{file}不是以init_data或者test开头的文件")
return True
else:
logger.error(f"{file}不是一个正确的文件路径!")
@@ -138,141 +98,169 @@ def generate_cases():
"""
根据配置文件,从指定类型文件中读取所有用例数据,并自动生成测试用例
"""
- excel_files = []
- yaml_files = []
- if CASE_FILE_TYPE == CaseFileType.EXCEL.value:
- # 在用例数据"INTERFACE_DIR"目录中寻找后缀是xlsx, xls的文件
- excel_files = get_files(target=INTERFACE_DIR, start="test_", end=".xlsx") \
- + get_files(target=INTERFACE_DIR, start="test_", end=".xls")
- elif CASE_FILE_TYPE == CaseFileType.YAML.value:
- # 在用例数据"INTERFACE_DIR"目录中寻找后缀是yaml, yml的文件
- yaml_files = get_files(target=INTERFACE_DIR, start="test_", end=".yaml") \
- + get_files(target=INTERFACE_DIR, start="test_", end=".yml")
- elif CASE_FILE_TYPE == CaseFileType.ALL.value:
- # 在用例数据"INTERFACE_DIR"目录中寻找后缀是xlsx,xls, yaml, yml的文件
- excel_files = get_files(target=INTERFACE_DIR, start="test_", end=".xlsx") + get_files(target=INTERFACE_DIR,
- start="test_",
- end=".xls")
- yaml_files = get_files(target=INTERFACE_DIR, start="test_", end=".yaml") + get_files(target=INTERFACE_DIR,
- start="test_",
- end=".yml")
- else:
- logger.error(f"{CASE_FILE_TYPE}不在CaseFileType内,不能自动生成用例!")
- # 自动生成测试用例
- for file in excel_files:
- handle_excel_data(file=file)
- for file in yaml_files:
- handle_yaml_data(file=file)
+ try:
+ yaml_files = []
+ if CASE_FILE_TYPE == 1:
+ # 在用例数据"INTERFACE_DIR"目录中寻找后缀是yaml, yml的文件
+ yaml_files = get_files(target=INTERFACE_DIR, start="test_", end=".yaml") \
+ + get_files(target=INTERFACE_DIR, start="test_", end=".yml") \
+ + get_files(target=INTERFACE_DIR, start="init_data", end=".yml") \
+ + get_files(target=INTERFACE_DIR, start="init_data", end=".yaml")
+ else:
+ logger.error(f"{CASE_FILE_TYPE}不在CaseFileType内,不能自动生成用例!")
+ # 自动生成测试用例
+ for file in yaml_files:
+ __load_yaml_data(file=file)
+ except Exception as e:
+ logger.error(f"生成用例时发生错误: {e}")
-def gen_case_file(filename, case_template_path, case_common, common_dependence, case_data, target_case_path):
+def generate_conftest_file(init_data, template_path, target_path):
"""
- 根据测试用例文件(yaml/yml/xlsx/xls),以及事先定义的测试用例模板,实际用例数据,生成测试用例方法(.py)
- :param filename: 测试用例文件(yaml/yml/xlsx/xls)的名称,用作生成测试用例类名,方法名
- :param case_template_path: 测试用例模板的绝对路径
- :param case_common: 用例公共参数
- :param common_dependence: 用例公共依赖
- :param case_data: 实际用例数据
- :param target_case_path: 测试用例方法(.py)的绝对路径
+ 识别到目录中的init_data.yaml或者init_data.yml文件,自动生成conftest.py文件
+ :param init_data: 需要初始化的数据
+ :param template_path: 模板的绝对路径
+ :param target_path: conftest.py的需要生成的目录
"""
- # 如果自动生成用例的目录不存在则自动创建一个
- if not os.path.exists(target_case_path):
+ try:
+ # 如果自动生成用例的目录不存在则自动创建一个
"""
exist_ok=True 是一个可选参数,用于指定在目录已经存在的情况下是否忽略错误。
如果设置为 True,则不论目录是否已存在,os.makedirs 都不会报错;如果设置为 False(默认值),则在目录已存在时会引发 FileExistsError 异常。
"""
- os.makedirs(target_case_path, exist_ok=True)
- # 获取用例数据中的标记
- case_markers = case_common.get("case_markers", []) or []
- # logger.debug(f"从用例中拿到的标记有:{case_markers}, {type(case_markers)}")
- # 先读取用例模板中每一行的内容
- with open(file=case_template_path, mode="r", encoding="utf-8") as f:
- case_template = f.readlines()
- current_case_template = []
- for line_num, content in enumerate(case_template):
- current_case_template.append(content)
- # 这里是预计往 @pytest.mark.parametrize( 这一行的上面插入标记
- if content.strip().startswith('@pytest.mark.parametrize('):
- # 往测试用例模板中插入自定义标记
- # logger.debug(f"获取到的case_markers:{case_markers}, {type(case_markers)}")
- for case_marker in case_markers:
- # 获取符合要求格式的自定义标记名称,并插入到测试模板中
- marker = is_valid_marker(case_marker)
- if marker and isinstance(marker, str):
- # !! 注意这里的4个空格,必须要有4个空格!!
- # current_case_template.append(f" @pytest.mark.{marker}\n")
- # 由于在测试用例中移除了测试类,直接使用测试方法,因此此处不需要空格了,可以直接顶格写
- current_case_template.append(f"@pytest.mark.{marker}\n")
- if marker and isinstance(marker, dict):
- for k, v in marker.items():
+ os.makedirs(target_path, exist_ok=True)
+ # 先读取用例模板中每一行的内容
+ with open(file=template_path, mode="r", encoding="utf-8") as f:
+ current_template = ''.join(f.readlines())
+
+ # 根据模板,生成测试用例方法
+ """
+ string.Template是将一个string设置为模板,通过替换变量的方法,最终得到想要的string。
+ """
+ conftest_content = Template(current_template).safe_substitute(
+ {
+ "init_data": init_data,
+ }
+ )
+ # 基于模板生成conftest.py文件
+ filepath = os.path.join(target_path, 'conftest.py')
+ with open(filepath, "w", encoding="utf-8") as fp:
+ fp.write(conftest_content)
+ logger.debug(f"conftest.py文件创建成功:{filepath}")
+ except Exception as e:
+ logger.error(f"生成conftest.py文件时发生错误: {e}")
+
+
+def gen_case_file(filename, case_template_path, config, common_dependence, case_data, target_case_path):
+ """
+ 根据测试用例文件(yaml/yml),以及事先定义的测试用例模板,实际用例数据,生成测试用例方法(.py)
+ :param filename: 测试用例文件(yaml/yml)的名称,用作生成测试用例类名,方法名
+ :param case_template_path: 测试用例模板的绝对路径
+ :param config: 用例公共参数
+ :param common_dependence: 用例公共依赖
+ :param case_data: 实际用例数据
+ :param target_case_path: 测试用例方法(.py)的需要生成的目录
+ """
+ try:
+ # 如果自动生成用例的目录不存在则自动创建一个
+ if not os.path.exists(target_case_path):
+ """
+ exist_ok=True 是一个可选参数,用于指定在目录已经存在的情况下是否忽略错误。
+ 如果设置为 True,则不论目录是否已存在,os.makedirs 都不会报错;如果设置为 False(默认值),则在目录已存在时会引发 FileExistsError 异常。
+ """
+ os.makedirs(target_case_path, exist_ok=True)
+ # 获取用例数据中的标记
+ pytest_markers = config.get("pytest_markers", []) or []
+ # logger.debug(f"从用例中拿到的标记有:{pytest_markers}, {type(pytest_markers)}")
+ # 先读取用例模板中每一行的内容
+ with open(file=case_template_path, mode="r", encoding="utf-8") as f:
+ case_template = f.readlines()
+ current_case_template = []
+ for line_num, content in enumerate(case_template):
+ current_case_template.append(content)
+ # 这里是预计往 @pytest.mark.parametrize( 这一行的上面插入标记
+ if content.strip().startswith('@pytest.mark.parametrize('):
+ # 往测试用例模板中插入自定义标记
+ # logger.debug(f"获取到的pytest_markers:{pytest_markers}, {type(pytest_markers)}")
+ for case_marker in pytest_markers:
+ # 获取符合要求格式的自定义标记名称,并插入到测试模板中
+ marker = is_valid_marker(case_marker)
+ if marker and isinstance(marker, str):
# !! 注意这里的4个空格,必须要有4个空格!!
- # current_case_template.append(f" @pytest.mark.{k}('{v}')\n")
+ # current_case_template.append(f" @pytest.mark.{marker}\n")
# 由于在测试用例中移除了测试类,直接使用测试方法,因此此处不需要空格了,可以直接顶格写
- current_case_template.append(f"@pytest.mark.{k}('{v}')\n")
- # 将用例数据的名称作为测试用例文件名称, 如test_login_demo
- func_name = filename
- # 方法名test_demo的类名是TestDemo(在测试用例中移除了测试类,直接使用测试方法,所以下行代码注释)
- # class_name = "".join([word.capitalize() for word in func_name.split("_")])
+ current_case_template.append(f"@pytest.mark.{marker}\n")
+ if marker and isinstance(marker, dict):
+ for k, v in marker.items():
+ # !! 注意这里的4个空格,必须要有4个空格!!
+ # current_case_template.append(f" @pytest.mark.{k}('{v}')\n")
+ # 由于在测试用例中移除了测试类,直接使用测试方法,因此此处不需要空格了,可以直接顶格写
+ current_case_template.append(f"@pytest.mark.{k}('{v}')\n")
+ # 将用例数据的名称作为测试用例文件名称, 如test_login_demo
+ func_name = filename
+ # 方法名test_demo的类名是TestDemo(在测试用例中移除了测试类,直接使用测试方法,所以下行代码注释)
+ # class_name = "".join([word.capitalize() for word in func_name.split("_")])
- if common_dependence:
- # 如果存在公共依赖,则对公共依赖进行处理: 识别到模板中的关键字“公共依赖”,往下一行插入pytest的fixture,作用域级别是class
- keyword = "公共依赖"
- indices = [i for i, x in enumerate(current_case_template) if re.search(keyword, x, re.IGNORECASE)]
- common_dependence_template = ['common_dependence = ${common_dependence}\n', '\n', '\n',
- '@pytest.fixture(scope="class", autouse=True)\n',
- 'def background():\n',
- ' dependence_results = dependence_handler.case_dependence_handle(case_dependence=common_dependence.get("setup"), db_info=GLOBAL_VARS["db_info"])\n',
- ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
- ' yield\n',
- ' dependence_results = dependence_handler.case_dependence_handle(case_dependence=common_dependence.get("teardown"), db_info=GLOBAL_VARS["db_info"])\n',
- ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
- '\n', '\n']
+ if common_dependence:
+ # 如果存在公共依赖,则对公共依赖进行处理: 识别到模板中的关键字“公共依赖”,往下一行插入pytest的fixture,作用域级别是class
+ keyword = "公共依赖"
+ indices = [i for i, x in enumerate(current_case_template) if re.search(keyword, x, re.IGNORECASE)]
+ common_dependence_template = ['common_dependence = ${common_dependence}\n', '\n', '\n',
+ '@pytest.fixture(scope="class", autouse=True)\n',
+ 'def background():\n',
+ ' dependence_results = dependence_handler.case_dependence_handle(case_dependence=common_dependence.get("setup"), db_info=GLOBAL_VARS["db_info"])\n',
+ ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
+ ' yield\n',
+ ' dependence_results = dependence_handler.case_dependence_handle(case_dependence=common_dependence.get("teardown"), db_info=GLOBAL_VARS["db_info"])\n',
+ ' GLOBAL_VARS.update(dependence_results if dependence_results else {})\n',
+ '\n', '\n']
- for idx in reversed(indices):
- current_case_template[idx + 1:idx + 1] = common_dependence_template
- # 根据模板,生成测试用例方法
- """
- string.Template是将一个string设置为模板,通过替换变量的方法,最终得到想要的string。
- """
- current_template = ''.join(current_case_template)
- my_case = Template(current_template).safe_substitute(
- {
- "allure_epic": case_common["allure_epic"],
- "allure_feature": case_common["allure_feature"],
- "allure_story": case_common["allure_story"],
- "case_markers": case_markers,
- "common_dependence": common_dependence,
- "case_data": case_data,
- "func_title": func_name,
- # 在测试用例中移除了测试类,直接使用测试方法
- # "class_title": class_name,
- "now": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- )
- else:
- # 根据模板,生成测试用例方法
- """
- string.Template是将一个string设置为模板,通过替换变量的方法,最终得到想要的string。
- """
- current_template = ''.join(current_case_template)
- my_case = Template(current_template).safe_substitute(
- {
- "allure_epic": case_common["allure_epic"],
- "allure_feature": case_common["allure_feature"],
- "allure_story": case_common["allure_story"],
- "case_markers": case_markers,
- "case_data": case_data,
- "func_title": func_name,
- # 在测试用例中移除了测试类,直接使用测试方法
- # "class_title": class_name,
- "now": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
- )
-
- # 将测试用例方法写入py文件中
- with open(os.path.join(target_case_path, func_name + '.py'), "w", encoding="utf-8") as fp:
- fp.write(my_case)
+ for idx in reversed(indices):
+ current_case_template[idx + 1:idx + 1] = common_dependence_template
+ # 根据模板,生成测试用例方法
+ """
+ string.Template是将一个string设置为模板,通过替换变量的方法,最终得到想要的string。
+ """
+ current_template = ''.join(current_case_template)
+ my_case = Template(current_template).safe_substitute(
+ {
+ "epic": config["epic"],
+ "feature": config["feature"],
+ "story": config["story"],
+ "pytest_markers": pytest_markers,
+ "common_dependence": common_dependence,
+ "case_data": case_data,
+ "func_title": func_name,
+ # 在测试用例中移除了测试类,直接使用测试方法
+ # "class_title": class_name,
+ "now": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ }
+ )
+ else:
+ # 根据模板,生成测试用例方法
+ """
+ string.Template是将一个string设置为模板,通过替换变量的方法,最终得到想要的string。
+ """
+ current_template = ''.join(current_case_template)
+ my_case = Template(current_template).safe_substitute(
+ {
+ "epic": config["epic"],
+ "feature": config["feature"],
+ "story": config["story"],
+ "pytest_markers": pytest_markers,
+ "case_data": case_data,
+ "func_title": func_name,
+ # 在测试用例中移除了测试类,直接使用测试方法
+ # "class_title": class_name,
+ "now": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+ }
+ )
+ # 将测试用例方法写入py文件中
+ with open(os.path.join(target_case_path, func_name + '.py'), "w", encoding="utf-8") as fp:
+ fp.write(my_case)
+ except Exception as e:
+ logger.error(f"生成测试用例文件时发生错误: {e}")
def is_valid_marker(markers):
"""
diff --git a/utils/case_generate_utils/case_template.txt b/utils/case_generate_utils/case_template.txt
index e73fac7..037fb75 100644
--- a/utils/case_generate_utils/case_template.txt
+++ b/utils/case_generate_utils/case_template.txt
@@ -10,7 +10,7 @@
import pytest
import allure
# 本地应用/模块导入
-from config.global_vars import GLOBAL_VARS
+from settings import GLOBAL_VARS
from utils.requests_utils.request_control import RequestControl
from utils.requests_utils.case_dependence import CaseDependenceHandler
@@ -21,9 +21,9 @@ dependence_handler = CaseDependenceHandler(GLOBAL_VARS)
# 用例数据
cases = ${case_data}
-@allure.epic("${allure_epic}")
-@allure.feature("${allure_feature}")
-@allure.story("${allure_story}")
+@allure.epic("${epic}")
+@allure.feature("${feature}")
+@allure.story("${story}")
@pytest.mark.auto
@pytest.mark.parametrize("case", cases, ids=lambda x: x["title"])
def ${func_title}_auto(case):
diff --git a/utils/case_generate_utils/conftest_template.txt b/utils/case_generate_utils/conftest_template.txt
new file mode 100644
index 0000000..9f14c65
--- /dev/null
+++ b/utils/case_generate_utils/conftest_template.txt
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# @Time : ${DATE} ${TIME}
+# @Author : floraachy
+# @File : ${NAME}.py
+# @Software: ${PRODUCT_NAME}
+# @Desc:
+
+
+# 标准库导入
+# 第三方库导入
+import pytest
+from loguru import logger
+# 本地应用/模块导入
+from settings import GLOBAL_VARS
+from utils.requests_utils.case_dependence import CaseDependenceHandler
+
+@pytest.fixture(scope="module", autouse=True)
+def init_data():
+ """
+ 运行测试之前,初始化数据;
+ 运行测试之后,清理数据
+ """
+ logger.debug(f"打印一下全局变量:{GLOBAL_VARS}")
+ dependence_handler = CaseDependenceHandler(GLOBAL_VARS)
+ init_data = ${init_data}
+ logger.info("Start ----- 开始初始化数据...")
+ if init_data.get("setup"):
+ dependence_results = dependence_handler.case_dependence_handle(
+ case_dependence=init_data["setup"],
+ db_info=GLOBAL_VARS["db_info"])
+ GLOBAL_VARS.update(dependence_results if dependence_results else {})
+ yield
+ if init_data.get("teardown"):
+ dependence_results = dependence_handler.case_dependence_handle(
+ case_dependence=init_data["teardown"],
+ db_info=GLOBAL_VARS["db_info"])
+ GLOBAL_VARS.update(dependence_results if dependence_results else {})
+
+
+ logger.info("End ----- 初始化数据完成!")
\ No newline at end of file
diff --git a/utils/data_utils/data_handle.py b/utils/data_utils/data_handle.py
index 288089f..5754b16 100644
--- a/utils/data_utils/data_handle.py
+++ b/utils/data_utils/data_handle.py
@@ -14,7 +14,7 @@ from string import Template
from requests.cookies import RequestsCookieJar
from requests.utils import dict_from_cookiejar
# 本地应用/模块导入
-from utils.data_utils.faker_handle import FakerData
+from common.data_utils.faker_handle import FakerData
from utils.data_utils.eval_data_handle import eval_data
from utils.data_utils.data_tools import *
diff --git a/utils/data_utils/data_tools.py b/utils/data_utils/data_tools.py
index 46799dc..7c42799 100644
--- a/utils/data_utils/data_tools.py
+++ b/utils/data_utils/data_tools.py
@@ -13,9 +13,9 @@ from datetime import datetime, timedelta
# 第三方库导入
from loguru import logger
# 本地应用/模块导入
-from utils.files_utils.files_handle import file_to_base64, filepath_to_base64, get_files
+from common.files_utils.files_handle import file_to_base64, filepath_to_base64, get_files
from config.path_config import FILES_DIR
-from utils.tools.aes_encrypt_decrypt import Encrypt
+from common.tools.aes_encrypt_decrypt import Encrypt
def zip_test_step(step_id, step_status_id=None):
diff --git a/utils/data_utils/faker_handle.py b/utils/data_utils/faker_handle.py
deleted file mode 100644
index bace43c..0000000
--- a/utils/data_utils/faker_handle.py
+++ /dev/null
@@ -1,280 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/12/5 13:51
-# @Author : floraachy
-# @File : faker_handle
-# @Software: PyCharm
-# @Desc:
-
-
-# 标准库导入
-import random
-import string
-import re
-from datetime import datetime, date, timedelta
-# 第三方库导入
-from faker import Faker
-
-
-# 本地应用/模块导入
-
-
-class FakerData:
- """
- 测试数据生成类
- 官方文档:https://faker.readthedocs.io/en/master/index.html
- """
-
- def __init__(self):
- self.fk_zh = Faker(locale='zh_CN')
- self.faker = Faker()
-
- @classmethod
- def generate_random_int(cls, *args) -> int:
- """
- :return: 随机数
- """
- # 检查是否传入了参数
- if not args:
- # 没有传参,就从5000内随机取一个整数返回
- return random.randint(0, 5000)
-
- # 排序参数并获取最小值和最大值
- min_val = min(args)
- max_val = max(args)
-
- # 生成并返回随机整数
- return random.randint(min_val, max_val)
-
- def generate_catch_phrase(self):
- """
- :return: 生成妙句(口号) (输出结果都是英文)
- """
- return self.faker.catch_phrase()
-
- def generate_phone(self, lan="en") -> int:
- """
- :return: 随机生成手机号码
- """
- if lan == "zh":
- phone = self.fk_zh.phone_number()
- else:
- phone = self.faker.phone_number()
- return phone
-
- def generate_id_number(self, lan="en") -> int:
- """
-
- :return: 随机生成身份证号码
- """
- if lan == "zh":
- id_number = self.fk_zh.ssn()
- else:
- id_number = self.faker.ssn()
- return id_number
-
- def generate_female_name(self, lan="en") -> str:
- """
-
- :return: 女生姓名
- """
- if lan == "zh":
- female_name = self.fk_zh.name_female()
- else:
- female_name = self.faker.name_female()
- return female_name
-
- def generate_male_name(self, lan="en") -> str:
- """
-
- :return: 男生姓名
- """
- if lan == "zh":
- male_name = self.fk_zh.name_male()
- else:
- male_name = self.faker.name_male()
- return male_name
-
- def generate_name(self, lan="en") -> str:
- """
- 生成人名
- :return: 人名
- """
- if lan == "zh":
- name = self.fk_zh.name()
- else:
- name = self.faker.name()
- return name
-
- def generate_company_name(self, lan: str = "en", fix: str = None) -> str:
- """
- 生成公司名
- :param lan: 语言类型,可选:en, zh; zh表示中文,en表示英文,默认是en
- :param fix: 前后缀,可选pre, suf; pre表示公司前缀,suf标识公司后缀
- :return: 公司名
- """
- if lan == "zh":
- if fix == "pre":
- name = self.fk_zh.company_prefix()
- elif fix == "suf":
- name = self.fk_zh.company_suffix()
- else:
- name = self.fk_zh.company()
- else:
- if fix == "pre":
- name = self.faker.company_prefix()
- elif fix == "suf":
- name = self.faker.company_suffix()
- else:
- name = self.faker.company()
-
- return name
-
- def generate_paragraph(self, lan: str = "en", nb: int = 3) -> str:
- """
- 生成段落
- :param lan: 语言类型,可选:en, zh; zh表示中文,en表示英文,默认是en
- :param nb: 段落个数,默认是3个
- """
- if lan == "zh":
- text = self.fk_zh.paragraph(nb_sentences=nb, variable_nb_sentences=True, ext_word_list=None)
- else:
- text = self.faker.paragraph(nb_sentences=nb, variable_nb_sentences=True, ext_word_list=None)
-
- return text
-
- def generate_words(self, lan: str = "en", nb: int = 1) -> str:
-
- """
- 生成词语
- :param lan: 语言类型,可选:en, zh; zh表示中文,en表示英文,默认是en
- :param nb: 词语个数,默认是1个
- """
- if lan == "zh":
- if nb == 1 or nb < 1:
- text = self.fk_zh.word(ext_word_list=None)
- else:
- res = self.fk_zh.words(nb=nb, ext_word_list=None)
- text = "-".join(res)
-
- else:
- if nb == 1 or nb < 1:
- text = self.faker.word(ext_word_list=None)
- else:
- res = self.faker.words(nb=nb, ext_word_list=None)
- text = "-".join(res)
-
- return text
-
- def generate_email(self, lan="en") -> str:
- """
-
- :return: 生成邮箱
- """
- if lan == "zh":
- email = self.fk_zh.email()
- else:
- email = self.faker.email()
- return email
-
- def generate_identifier(self, lan="en", char_len=8):
- """
- :return:生成随机标识,满足要求:长度为2~100(这里长度通过传参控制,默认为8), 只能包含数字,字母,下划线(_),中划线(-),英文句号(.),必须以数字和字母开头,不能以下划线/中划线/英文句号开头和结尾
- """
- if lan == "zh":
- fk = self.fk_zh
- else:
- fk = self.faker
- while True:
- identifier = ''.join(random.choices(string.ascii_letters + string.digits + '_.-', k=char_len)) # 生成指定长度的随机标识
-
- if (
- re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_.-]{0,98}[a-zA-Z0-9]$', identifier) and
- not (identifier.startswith('_') or identifier.startswith('-') or identifier.startswith('.')) and
- not (identifier.endswith('_') or identifier.startswith('-') or identifier.endswith('.'))
- ):
- return identifier
-
- def generate_city(self, lan="en", full: bool = True) -> str:
- """
- :return: 随机生成城市名
- """
- if lan == "zh":
- faker = self.fk_zh
- else:
- faker = self.faker
-
- if full:
- city = faker.city()
- else:
- city = faker.city_name()
-
- return city
-
- def generate_province(self, lan="en") -> str:
- """
- :return: 随机生成城市名
- """
- if lan == "zh":
- faker = self.fk_zh
- else:
- faker = self.faker
-
- return faker.province()
-
- def generate_address(self, lan="en") -> str:
- """
-
- :return: 生成地址
- """
- if lan == "zh":
- address = self.fk_zh.address()
- else:
- address = self.faker.address()
- return address
-
- @classmethod
- def generate_time(cls, fmt='%Y-%m-%d %H:%M:%S', days=0) -> str:
- """
- 根据传入的天数,返回当前时间加上或减去这些天数后的日期和时间,或者仅返回当前时间。
- :return:
- """
- # 获取当前时间
- current_time = datetime.now()
- # 计算增加或减少天数后的时间
- if days != 0:
- future_time = current_time + timedelta(days=days)
- else:
- future_time = current_time
- # 格式化时间
- return future_time.strftime(fmt)
-
- @classmethod
- def generate_today_date(cls, fmt='%Y-%m-%d'):
- """获取今日0点整时间"""
- today = datetime.now().date()
- if fmt == '%Y-%m-%d %H:%M:%S':
- return today.strftime(fmt) + " 00:00:00"
- return today.strftime(fmt)
-
- @classmethod
- def generate_time_after_week(cls, fmt='%Y-%m-%d'):
- """获取一周后12点整的时间"""
- if fmt == '%Y-%m-%d %H:%M:%S':
- return (date.today() + timedelta(days=+6)).strftime(fmt) + " 00:00:00"
- return (date.today() + timedelta(days=+6)).strftime(fmt)
-
- @classmethod
- def remove_special_characters(cls, target: str):
- """
- 移除字符串中的特殊字符。
- 在Python中用replace()函数操作指定字符
- 常用字符unicode的编码范围:
- 数字:\u0030-\u0039
- 汉字:\u4e00-\u9fa5
- 大写字母:\u0041-\u005a
- 小写字母:\u0061-\u007a
- 英文字母:\u0041-\u007a
- """
- pattern = r'([^\u4e00-\u9fa5])'
- result = re.sub(pattern, '', target)
- return result
diff --git a/utils/database_utils/__init__.py b/utils/database_utils/__init__.py
deleted file mode 100644
index 208fee5..0000000
--- a/utils/database_utils/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/10 14:47
-# @Author : floraachy
-# @File : __init__.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/database_utils/mysql_handle.py b/utils/database_utils/mysql_handle.py
deleted file mode 100644
index 13295b6..0000000
--- a/utils/database_utils/mysql_handle.py
+++ /dev/null
@@ -1,181 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/5/12 11:04
-# @Author : floraachy
-# @File : mysql_handle.py
-# @Software: PyCharm
-# @Desc: 使用pymysql模块连接mysql数据库的公共方法
-
-# 标准库导入
-from typing import Union
-import json
-from datetime import datetime
-# # 第三方库导入
-import pymysql
-from sshtunnel import SSHTunnelForwarder # pip install sshtunnel
-from loguru import logger
-
-
-class MysqlServer:
- """
- 初始化数据库连接(支持通过SSH隧道的方式连接),并指定查询的结果集以字典形式返回
- """
-
- def __init__(self, db_host, db_port, db_user, db_pwd, db_database, ssh=False,
- **kwargs):
- """
- 初始化方法中, 连接mysql数据库, 根据ssh参数决定是否走SSH隧道方式连接mysql数据库
- """
- logger.debug("\n======================================================\n" \
- "-------------数据库配置信息--------------------\n"
- f"db_host: {db_host}\n" \
- f"db_port: {db_port}\n" \
- f"db_user: {db_user}\n" \
- f"db_pwd: {db_pwd}\n" \
- f"db_database: {db_database}\n" \
- f"ssh: {ssh}\n" \
- f"kwargs: {kwargs}\n" \
- "=====================================================")
- self.server = None
- try:
- if ssh:
- self.server = SSHTunnelForwarder(
- ssh_address_or_host=(kwargs.get("ssh_host"), int(kwargs.get("ssh_port"))), # ssh 目标服务器 ip 和 port
- ssh_username=kwargs.get("ssh_user"), # ssh 目标服务器用户名
- ssh_password=kwargs.get("ssh_pwd"), # ssh 目标服务器用户密码
- remote_bind_address=(db_host, db_port), # mysql 服务ip 和 part
- local_bind_address=('127.0.0.1', 5143), # ssh 目标服务器的用于连接 mysql 或 redis 的端口,该 ip 必须为 127.0.0.1
- )
- self.server.start()
- db_host = self.server.local_bind_host # server.local_bind_host 是 参数 local_bind_address 的 ip
- db_port = self.server.local_bind_port # server.local_bind_port 是 参数 local_bind_address 的 port
- # 建立连接
- self.conn = pymysql.connect(host=db_host,
- port=db_port,
- user=db_user,
- password=db_pwd,
- database=db_database,
- charset="utf8",
- cursorclass=pymysql.cursors.DictCursor # 加上pymysql.cursors.DictCursor这个返回的就是字典
- )
- # 创建一个游标对象
- self.cursor = self.conn.cursor()
- except Exception as e:
- logger.error(f"数据库连接失败:{e}")
-
- def __del__(self):
- """
- 在对象销毁前,断开游标,关闭数据库连接
- """
- try:
- # 关闭游标
- self.cursor.close()
- # 关闭数据库链接
- self.conn.close()
- # 如果开启了SSH隧道,则关闭
- if self.server:
- self.server.close()
- except AttributeError as error:
- logger.error("数据库连接失败,失败原因 %s", error)
-
- def query_all(self, sql):
- """
- 查询所有符合sql条件的数据
- :param sql: 执行的sql
- :return: 查询结果
- """
- try:
- self.conn.commit()
- self.cursor.execute(sql)
- data = self.cursor.fetchall()
- logger.debug("\n======================================================\n" \
- "-------------数据库执行结果--------------------\n"
- f"SQL: {sql}\n" \
- f"result: {data}\n" \
- "=====================================================")
- return data
- except Exception as e:
- logger.error(f"{sql} --> 报错: {e}")
- raise e
-
- def query_one(self, sql):
- """
- 查询符合sql条件的数据的第一条数据
- :param sql: 执行的sql
- :return: 返回查询结果的第一条数据
- """
- try:
- self.conn.commit()
- self.cursor.execute(sql)
- data = self.cursor.fetchone()
- logger.debug("\n======================================================\n" \
- "-------------数据库执行结果--------------------\n"
- f"SQL: {sql}\n" \
- f"result: {data}\n" \
- "=====================================================")
- return data
- except Exception as e:
- logger.error(f"{sql} --> 报错: {e}")
- raise e
-
- def insert(self, sql):
- """
- 插入数据
- :param sql: 执行的sql
- """
- try:
- self.cursor.execute(sql)
- # 提交 只要数据库更新就要commit
- self.conn.commit()
- logger.debug("\n======================================================\n" \
- "-------------数据库执行结果--------------------\n"
- f"SQL: {sql}\n" \
- "插入数据成功!\n" \
- "=====================================================")
- except Exception as e:
- logger.error(f"{sql} --> 报错: {e}")
- raise e
-
- def update(self, sql):
- """
- 更新数据
- :param sql: 执行的sql
- """
- try:
- self.cursor.execute(sql)
- # 提交 只要数据库更新就要commit
- self.conn.commit()
- logger.debug("\n======================================================\n" \
- "-------------数据库执行结果--------------------\n"
- f"SQL: {sql}\n" \
- "更新数据成功!\n" \
- "=====================================================")
- except Exception as e:
- logger.error(f"{sql} --> 报错: {e}")
- raise e
-
- def query(self, sql, one=True):
- """
- 根据传值决定查询一条数据还是所有
- :param sql: 查询的SQL语句
- :param one: 默认True. True查一条数据,否则查所有
- :return:
- """
- try:
- if one:
- return self.query_one(sql)
- else:
- return self.query_all(sql)
- except Exception as e:
- logger.error(f"{sql} --> 报错: {e}")
- raise e
-
- def verify(self, result: dict) -> Union[dict, None]:
- """验证结果能否被json.dumps序列化"""
- # 尝试变成字符串,解决datetime 无法被json 序列化问题
- try:
- json.dumps(result)
- except TypeError: # TypeError: Object of type datetime is not JSON serializable
- for k, v in result.items():
- if isinstance(v, datetime):
- result[k] = str(v)
- return result
diff --git a/utils/files_utils/__init__.py b/utils/files_utils/__init__.py
deleted file mode 100644
index 35ea6ab..0000000
--- a/utils/files_utils/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/10 14:45
-# @Author : floraachy
-# @File : __init__.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/files_utils/excel_handle.py b/utils/files_utils/excel_handle.py
deleted file mode 100644
index 6f9f7cb..0000000
--- a/utils/files_utils/excel_handle.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Version: Python 3.9
-# @Time : 2023/1/9 16:42
-# @Author : chenyinhua
-# @File : handle_excel.py
-# @Software: PyCharm
-# @Desc: 使用openpyxl对excel进行读写操作
-
-# 第三方库导入
-import openpyxl # pip install openpyxl
-
-
-class ExcelHandle:
-
- def __init__(self, filename):
- """
- 初始化用例文件
- :param filename: 文件绝对路径,如:D:\test\test.xlsx
- """
- self.filename = filename
-
- def create_excel(self):
- """
- 创建excel文件,需要指定excel文件的绝对路径,如D:\test\test.xlsx
- """
- # 创建文件对象
- wb = openpyxl.Workbook()
- # 创建excel文件
- wb.save(self.filename)
- return self.filename
-
- def read_sheet(self, sheet, workbook):
- """
- 读取指定表单的内容
- :param sheet: 表单名称
- :param workbook: 工作簿对象
- :return: sheet数据列表
- """
- sheet_data = {
- "sheet_name": sheet,
- "data": []
- }
- sheet = workbook[sheet]
- all_values = list(sheet.values)
- header = all_values[0]
- for i in all_values[1:]:
- sheet_data["data"].append(dict(zip(header, i)))
- return sheet_data
-
- def read(self, sheet=None) -> list:
- """
- 读取excel数据并返回
- :param sheet: 表单名称
- :return: 返回读取的excel数据,是一个列表
- """
- # 创建一个工作簿工作对象(excel文件已存在的情况)
- workbook = openpyxl.open(self.filename)
- # 跟上面那句一个意思 workbook = openpyxl.load_workbook(self.file)
-
- # 获取excel当中所有的sheet,返回的是一个列表
- sheets = workbook.sheetnames
- # 保存从excel中获取到的数据
- results = []
-
- # 如果sheet不为空,则取sheet等于指定sheet
- if sheet:
- results.append(self.read_sheet(sheet, workbook))
- # 如果sheet为空,则读取所有表单数据
- else:
- for sheet in sheets:
- results.append(self.read_sheet(sheet, workbook))
- # 关闭excel
- workbook.close()
- return results
-
- def write(self, row, column, data, sheet_name=None):
- """
- 往excel写入数据
- :param sheet_name: 表单名称
- :param row: 要写入的行
- :param column: 要写入的列
- :param data: 要写入的数据
- :return: None
- """
- workbook = openpyxl.open(self.filename)
- # 获取excel当中所有的sheet,返回的是一个列表
- sheets = workbook.sheetnames
- if sheet_name in sheets:
- sheet = workbook[sheet_name]
- print(f"往表单【{sheet_name}】中写入数据")
- else:
- # 如果表单为空,就默认使用第一个表单
- sheet = workbook.active
- print(f"表单【{sheet_name}】不存在,默认往第一个表单中写入数据")
-
- sheet.cell(row=row, column=column, value=data)
- # 更上面写法效果一样 sheet.cell(row=row, column=column).value = data
-
- # 保存并关闭文件
- workbook.save(self.filename)
- workbook.close()
diff --git a/utils/files_utils/files_handle.py b/utils/files_utils/files_handle.py
deleted file mode 100644
index d294532..0000000
--- a/utils/files_utils/files_handle.py
+++ /dev/null
@@ -1,189 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/3/30 22:34
-# @Author : Flora.Chen
-# @File : files_handle.py
-# @Software: PyCharm
-# @Desc: 处理文件相关操作
-
-# 第三方库导入
-from loguru import logger
-# 标准库导入
-import os
-import zipfile
-import shutil
-import base64
-
-
-def get_files(target, start=None, end=None):
- """
- @param: target: 目标文件绝对路径
- @param: start: 以什么开头,默认为空
- @param: end: 以什么结尾,默认为空
- 获取目录下所有的文件,以列表的形式返回
- """
- if os.path.isfile(target):
- return []
- # files返回j经过处理的文件列表
- files = []
- # dirpath:表示获取的目录的路径,以string形式返回值。
- # dirnames: 包含了当前dirpath路径下所有的子目录名字(不包含目录路径),以列表形式返回值。
- # filenames:包含了当前dirpath路径下所有的非目录子文件的名字(不包含目录路径)。
- for dirpath, dirnames, filenames in os.walk(target):
- for filename in filenames:
- file_path = os.path.abspath(os.path.join(dirpath, filename))
- # 如果"start"和"end"都有值
- if start and end:
- # filename是以"start"且filename是以"end"结尾,则追加到files
- if filename.startswith(start) and filename.endswith(end):
- files.append(file_path)
- # 或者如果"start"有值,filename是以"start"开头,则追加到files
- elif start and (not end):
- if filename.startswith(start):
- files.append(file_path)
- # 或者如果"end"有值,且filename是以"end"结尾,则追加到files
- elif end and (not start):
- if filename.endswith(end):
- files.append(file_path)
- else:
- files.append(file_path)
- # 判断files列表是否为空,不为空则返回files,为空则返回all_files
- return files
-
-
-def get_newest_file(dir_path):
- """
- 获取目录下最新的文件
- """
- if os.path.isfile(dir_path):
- return None
-
- # 获取目录下所有文件
- files = os.listdir(dir_path)
-
- # 按文件修改时间排序
- sorted_files = sorted(
- [(os.path.join(dir_path, file), os.path.getmtime(os.path.join(dir_path, file))) for file in files],
- key=lambda x: x[1],
- reverse=True
- )
-
- # 返回最新文件路径
- return sorted_files[0][0]
-
-
-def zip_file(in_path: str, out_path: str):
- """
- 压缩指定文件夹
- :param in_path: 目标文件夹路径
- :param out_path: 压缩文件保存路径+xxxx.zip
- :return: 无
- """
- # 如果传入的路径是一个目录才进行压缩操作
- if os.path.isdir(in_path):
- logger.debug(f"目标路径:{in_path} 是一个目录,开始进行压缩......")
- # 写入
- zip = zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED)
- for path, dirnames, filenames in os.walk(in_path):
- # 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
- fpath = path.replace(in_path, '')
- for filename in filenames:
- zip.write(
- os.path.join(
- path, filename), os.path.join(
- fpath, filename))
- zip.close()
- logger.debug(f"目标路径:{in_path} 压缩完成!, 压缩文件路径:{out_path}")
- else:
- logger.debug(f"目标路径:{in_path} 不是一个目录,请检查!")
-
-
-def delete_dir_file(file_path):
- """
- 删除指定目录下的所有文件
- :param file_path: 目标文件夹路径 (存在多级路径的暂不支持)
- """
- paths = os.listdir(file_path)
- if paths:
- logger.debug(f"目标目录: {file_path} 存在文件或目录,进行删除操作")
- for item in paths:
- path = os.path.join(file_path, item)
- # 如果目标路径是一个文件,使用os.remove删除
- if os.path.isfile(path):
- os.remove(path)
- # 如果目标路径是一个目录,使用os.rmdir删除
- if os.path.isdir(path):
- os.rmdir(path)
- else:
- logger.debug(f"目标目录: {file_path} 不存在文件或目录,不需要删除")
-
-
-def copy_file(src_file_path, dest_dir_path):
- """
- 复制一个文件到另一个目录
- :param: src_file_path: 源文件路径
- :param: dest_dir_path: 目标文件夹路径
-
- """
- # 判断源文件路径是否存在
- if not os.path.isfile(src_file_path):
- return "源文件路径不存在"
-
- # 判断目标文件夹路径是否存在,不存在则创建
- if not os.path.isdir(dest_dir_path):
- os.makedirs(dest_dir_path)
-
- # 复制文件
- try:
- shutil.copy(src_file_path, dest_dir_path)
- return "复制成功"
- except Exception as e:
- return f"复制失败:{e}"
-
-
-def get_file_field(file_path):
- """
- 获取文件名称和二进制内容
- :param: file_path: 文件路径
- """
- # 处理文件绝对路径
- file_name = os.path.basename(file_path)
- # 获取文件二进制内容
- with open(file_path, 'rb') as f:
- file_content = f.read()
- return (file_name, file_content)
-
-
-def get_relative_path(file_path, directory_path):
- """
- os.path.relpath()是Python中os.path模块提供的一个函数,用于计算两个路径之间的相对路径。
- 例如:file_path=data/gitlink/project/test_login_demo.yaml, directory_path=data, 将返回/gitlink/project
- :param: file_path: 文件路径
- :param: directory_path: 相对于目录路径
- """
- # 获取file_path相对于directory_path的相对路径
- relative_path = os.path.relpath(os.path.abspath(file_path), os.path.abspath(directory_path))
- # 如果相对路径中包含文件名,则去除文件名部分并返回
- return os.path.dirname(relative_path)
-
-
-def file_to_base64(file_path):
- """
- 使用Python的标准库base64来读取文件内容并将其转换为base64编码
- """
- if os.path.exists(file_path):
- with open(file_path, "rb") as file:
- encoded_string = base64.b64encode(file.read())
- return encoded_string.decode('utf-8')
- else:
- logger.warning(f"{file_path} 不是一个真实有效的文件路径")
-
-
-def filepath_to_base64(file_path):
- """
- 使用Python的标准库base64来将文件路径并将其转换为base64编码
- """
- if os.path.exists(file_path):
- encoded_string = base64.b64encode(file_path.encode('utf-8'))
- return encoded_string.decode('utf-8')
- else:
- logger.warning(f"{file_path} 不是一个真实有效的文件路径")
diff --git a/utils/files_utils/yaml_handle.py b/utils/files_utils/yaml_handle.py
deleted file mode 100644
index 27aa967..0000000
--- a/utils/files_utils/yaml_handle.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Version: Python 3.9
-# @Time : 2023/1/31 15:22
-# @Author : chenyinhua
-# @File : yaml_handle.py
-# @Software: PyCharm
-# @Desc: 从日志文件中提取响应数据
-
-# 第三方库导入
-import yaml # pip install pyyaml
-from loguru import logger
-
-
-class YamlHandle:
-
- def __init__(self, filename):
- """
- 初始化用例文件
- :param filename: 文件绝对路径,如:D:\test\test.yaml
- """
- self.filename = filename
-
- @property
- def read_yaml(self):
- try:
- with open(file=self.filename, mode="r", encoding="utf-8") as fp:
- return yaml.safe_load(fp.read())
- except FileNotFoundError as e:
- logger.error(f"YAML file ({self.filename}) not found: {e}")
- raise e
- except yaml.YAMLError as e:
- logger.error(f"Error while reading YAML file ({self.filename}): {e}")
- raise e
-
- def write(self, data, mode="a"):
- """
- 往yaml文件中写入数据,默认是追加写入
- :param data: 要写入的数据
- :param mode: 写入模式
- :return:
- """
- try:
- with open(self.filename, mode=mode, encoding="utf-8") as f:
- yaml.dump(data, f)
- except yaml.YAMLError as e:
- logger.error(f"Error while writing to YAML file ({self.filename}): {e}")
- raise e
diff --git a/utils/logger_utils/__init__.py b/utils/logger_utils/__init__.py
deleted file mode 100644
index 5e4fb07..0000000
--- a/utils/logger_utils/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/10 14:42
-# @Author : floraachy
-# @File : __init__.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/logger_utils/loguru_log.py b/utils/logger_utils/loguru_log.py
deleted file mode 100644
index d0a3934..0000000
--- a/utils/logger_utils/loguru_log.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Version: Python 3.9
-# @Time : 2023/1/9 17:09
-# @Author : chenyinhua
-# @File : loguru_log.py
-# @Software: PyCharm
-# @Desc: 日志处理
-
-# 标准库导入
-import sys
-# 第三方库导入
-from loguru import logger
-
-
-def capture_logs(filename, level="TRACE", filter_type=None):
- """
- 日志处理
- 文档参考:https://zhuanlan.zhihu.com/p/429452898
- 基本参数释义:
- sink:可以是一个 file 对象,例如 sys.stderr 或 open('file.log', 'w'),也可以是 str 字符串或者 pathlib.Path 对象,即文件路径,也可以是一个方法,可以自行定义输出实现,也可以是一个 logging 模块的 Handler,比如 FileHandler、StreamHandler 等,还可以是 coroutine function,即一个返回协程对象的函数等。
- level:日志输出和保存级别。
- format:日志格式模板。
- filter:一个可选的指令,用于决定每个记录的消息是否应该发送到 sink。
- colorize:格式化消息中包含的颜色标记是否应转换为用于终端着色的 ansi 代码,或以其他方式剥离。 如果没有,则根据 sink 是否为 tty(电传打字机缩写) 自动做出选择。
- serialize:在发送到 sink 之前,是否应首先将记录的消息转换为 JSON 字符串。
- backtrace:格式化的异常跟踪是否应该向上扩展,超出捕获点,以显示生成错误的完整堆栈跟踪。
- diagnose:异常跟踪是否应显示变量值以简化调试。建议在生产环境中设置 False,避免泄露敏感数据。
- enqueue:要记录的消息是否应在到达 sink 之前首先通过多进程安全队列,这在通过多个进程记录到文件时很有用,这样做的好处还在于使日志记录调用是非阻塞的。
- catch:是否应自动捕获 sink 处理日志消息时发生的错误,如果为 True,则会在 sys.stderr 上显示异常消息,但该异常不会传播到 sink,从而防止应用程序崩溃。
- \kwargs:仅对配置协程或文件接收器有效的附加参数
-
- 日志级别,从低到高:
- logger.trace() 等级5
- logger.debug() 等级10
- logger.info() 等级20
- logger.success() 等级25
- logger.warning() 等级30
- logger.error() 等级40
- logger.critical() 等级50
- :param filename: 日志文件名
- :param filter_type: 日志过滤,如:将日志级别为ERROR的单独记录到一个文件中
- :param level: 日志级别设置
- """
- if level.upper() in ["TRACE", "DEBUG", "INFO", "SUCCESS", "WARNING", "ERROR", "CRITICAL"]:
- level = level
- else:
- logger.error(f"level={level}, 值错误\n"
- f"level的可选值是:TRACE DEBUG INFO SUCCESS WARNING ERROR CRITICAL\n"
- f"将默认level=trace收集日志")
- level = "TRACE"
-
- logger.remove() # 清除之前的设置
- dic = dict(sink=filename, # 日志保存路径
- rotation='10 MB',
- retention='3 days',
- format="{time:YYYY-MM-DD HH:mm:ss} | {level} | From {module}.{function}.{line} : {message}", # 日志输出格式
- encoding='utf-8',
- level=level, # 日志级别设置
- enqueue=True
- )
- if filter_type:
- dic["filter"] = lambda x: filter_type in str(x['level']).upper()
-
- logger.add(**dic)
- # 添加控制台输出
- logger.add(sink=sys.stderr,
- level=level,
- format="{time:YYYY-MM-DD HH:mm:ss} | {level} | From {module}.{function}.{line} : {message}")
diff --git a/utils/notify_utils/__init__.py b/utils/notify_utils/__init__.py
deleted file mode 100644
index 9abd276..0000000
--- a/utils/notify_utils/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/10 14:46
-# @Author : floraachy
-# @File : __init__.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/notify_utils/dingding_bot.py b/utils/notify_utils/dingding_bot.py
deleted file mode 100644
index 2c93f16..0000000
--- a/utils/notify_utils/dingding_bot.py
+++ /dev/null
@@ -1,232 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/5/11 9:47
-# @Author : chenyinhua
-# @File : dingding_bot.py
-# @Software: PyCharm
-# @Desc: 钉钉通知封装
-
-import hmac
-import hashlib
-import base64
-import urllib.parse
-import time
-import urllib.request
-from requests import request
-from loguru import logger
-
-
-class DingTalkBot:
- """
- 钉钉机器人
- """
-
- def __init__(self, webhook_url, secret=None):
- """
- :param secret: 安全设置的加签秘钥
- :param webhook_url: 机器人没有加签的WebHook_url
- """
- # 适配钉钉机器人的加签模式和关键字模式/白名单IP模式
- if secret:
- timestamp = str(round(time.time() * 1000))
- sign = self.get_sign(secret, timestamp)
- self.webhook_url = webhook_url + f'×tamp={timestamp}&sign={sign}' # 最终url,url+时间戳+签名
- else:
- self.webhook_url = webhook_url
-
- self.headers = {
- "Content-Type": "application/json",
- "Charset": "UTF-8"
- }
-
- def get_sign(self, secret, timestamp):
- """
- 根据时间戳 + "sign" 生成密钥
- 把timestamp+"\n"+密钥当做签名字符串,使用HmacSHA256算法计算签名,然后进行Base64 encode,最后再把签名参数再进行urlEncode,得到最终的签名(需要使用UTF-8字符集)。
- :return:
- """
- string_to_sign = f'{timestamp}\n{secret}'.encode('utf-8')
- hmac_code = hmac.new(
- secret.encode('utf-8'),
- string_to_sign,
- digestmod=hashlib.sha256).digest()
-
- sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
- return sign
-
- def send_message(self, payload):
- """
- 发送钉钉消息
- :payload: 请求json数据
- """
- logger.debug("\n======================================================\n" \
- "-------------Start:发送钉钉消息--------------------\n"
- f"Webhook_Url: {self.webhook_url}\n" \
- f"内容: {payload}\n" \
- "=====================================================")
- response = request(
- url=self.webhook_url,
- json=payload,
- headers=self.headers,
- method="POST"
- )
- if response.json().get("errcode") == 0:
- logger.debug("\n======================================================\n" \
- "-------------End:发送钉钉消息--------------------\n"
- f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}\n" \
- "=====================================================")
- print(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}")
- return True
- else:
- logger.error(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息失败:{response.text}")
- print(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息失败:{response.text}")
- return False
-
- def send_text(self, content, mobiles=None, is_at_all=False):
- """
- 发送文本消息
- :param content: 发送的内容
- :param mobiles: 被艾特的用户的手机号码,格式是列表,注意需要在content里面添加@人的手机号码
- :param is_at_all: 是否艾特所有人,布尔类型,true为艾特所有人,false为不艾特
- """
- at_mobiles = ""
- if mobiles:
- if isinstance(mobiles, list):
- at_mobiles = mobiles
- is_at_all = False
- for mobile in mobiles:
- content += f"@{mobile}"
- else:
- raise TypeError("mobiles类型错误 不是list类型.")
-
- payload = {
- "msgtype": "text",
- "text": {
- "content": content
- },
- "at": {
- "atMobiles": at_mobiles,
- "isAtAll": is_at_all
- }
- }
- return self.send_message(payload)
-
- def send_link(self, title, text, message_url, pic_url=None):
- """
- 发送链接消息
- :param title: 消息标题
- :param text: 消息内容,如果太长只会部分展示
- :param message_url: 点击消息跳转的url地址
- :param pic_url: 图片url
- """
- payload = {
- "msgtype": "link",
- "link": {
- "title": title,
- "text": text,
- "picUrl": pic_url,
- "messageUrl": message_url
- }
- }
- return self.send_message(payload)
-
- def send_markdown(self, title, text, mobiles=None, is_at_all=False):
- """
- 发送markdown消息
- 目前仅支持md语法的子集,如标题,引用,文字加粗,文字斜体,链接,图片,无序列表,有序列表
- :param title: 消息标题,首屏回话透出的展示内容
- :param text: 消息内容,markdown格式
- :param mobiles: 被艾特的用户的手机号码,格式是列表,注意需要在text里面添加@人的手机号码
- :param is_at_all: 是否艾特所有人,布尔类型,true为艾特所有人,false为不艾特
- """
- at_mobiles = ""
- if mobiles:
- if isinstance(mobiles, list):
- at_mobiles = mobiles
- is_at_all = False
- for mobile in mobiles:
- text += f"@{mobile}"
- else:
- raise TypeError("mobiles类型错误 不是list类型.")
- payload = {
- "msgtype": "markdown",
- "markdown": {
- "title": title,
- "text": text
- },
- "at": {
- "atMobiles": at_mobiles,
- "isAtAll": is_at_all
- }
- }
-
- return self.send_message(payload)
-
- def send_action_card_single(self, title, text, single_title, single_url, btn_orientation=0):
- """
- 发送消息卡片(整体跳转ActionCard类型)
- :param title: 消息标题
- :param text: 消息内容,md格式消息
- :param single_title: 单个按钮的标题
- :param single_url: 点击singleTitle按钮后触发的URL
- :param btn_orientation: 0-按钮竖直排列,1-按钮横向排列
- """
- payload = {
- "msgtype": "actionCard",
- "actionCard": {
- "title": title,
- "text": text,
- "singleTitle": single_title,
- "singleURL": single_url,
- "btnOrientation": btn_orientation,
- }
-
- }
- return self.send_message(payload)
-
- def send_action_card_split(self, title, text, btns, btn_orientation=0):
- """
- 发送消息卡片(独立跳转ActionCard类型)
- :param title: 消息标题
- :param text: 消息内容,md格式消息
- :param btns: 列表嵌套字典类型,"btns": [{"title": "内容不错", "actionURL": "https://www.dingtalk.com/"}, ......]
- :param btn_orientation: 0-按钮竖直排列,1-按钮横向排列
- """
- payload = {
- "msgtype": "actionCard",
- "actionCard": {
- "title": title,
- "text": text,
- "btns": [],
- "btnOrientation": btn_orientation,
- }
-
- }
- for btn in btns:
- payload["actionCard"]["btns"].append({
- "title": btn.get("title"),
- "actionURL": btn.get("action_url")
- })
-
- return self.send_message(payload)
-
- def send_feed_card(self, links_msg):
- """
- 发送多组消息卡片(FeedCard类型)
- :param links_msg: 列表嵌套字典类型,每一个字段包括如下参数:title(单条信息文本), messageURL(点击单条信息后的跳转链接), picURL(单条信息后面图片的url)
- """
- payload = {
- "msgtype": "feedCard",
- "feedCard": {
- "links": []
- }
- }
- for link in links_msg:
- payload["feedCard"]["links"].append(
- {
- "title": link.get("title"),
- "messageURL": link.get("messageURL"),
- "picURL": link.get("picURL")
- }
- )
-
- return self.send_message(payload)
diff --git a/utils/notify_utils/wechat_bot.py b/utils/notify_utils/wechat_bot.py
deleted file mode 100644
index 0af19de..0000000
--- a/utils/notify_utils/wechat_bot.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/5/11 15:01
-# @Author : chenyinhua
-# @File : wechat_bot.py
-# @Software: PyCharm
-# @Desc: 企业微信机器人
-import os
-from requests import request
-import base64
-import hashlib
-import re
-from loguru import logger
-
-
-class WechatBot:
- """
- 企业微信机器人
- 当前自定义机器人支持文本(text)、markdown(markdown)、图片(image)、图文(news), 文件(file)五种消息类型。
- 机器人的text/markdown类型消息支持在content中使用<@userid>扩展语法来@群成员
- """
-
- def __init__(self, webhook_url):
- """
- :param webhook_url: 机器人的WebHook_url
- """
- self.webhook_url = webhook_url
- self.headers = {
- "Content-Type": "application/json",
- "Charset": "UTF-8"
- }
-
- def send_message(self, payload):
- """
- 发送微信消息
- :payload: 请求json数据
- """
- logger.debug("\n======================================================\n" \
- "-------------Start:发送企业微信消息--------------------\n"
- f"Webhook_Url: {self.webhook_url}\n" \
- f"内容: {payload}\n" \
- "=====================================================")
- response = request(
- url=self.webhook_url,
- json=payload,
- headers=self.headers,
- method="POST"
- )
- if response.json().get("errcode") == 0:
- logger.debug("\n======================================================\n" \
- "-------------End:发送企业微信消息--------------------\n"
- f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}\n" \
- "=====================================================")
- print(f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}")
- return True
- else:
- logger.error(f"通过企业微信发送{payload.get('msgtype', '')}消息失败:{response.text}")
- print(f"通过企业微信发送{payload.get('msgtype', '')}消息失败:{response.text}")
- return False
-
- def send_text(self, content, mentioned_list=None, mentioned_mobile_list=None):
- """
- 发送文本消息
- :param content: 文本内容,最长不超过2048个字节,必须是utf8编码
- :param mentioned_list: userid的列表,提醒群中的指定成员(@某个成员),@all表示提醒所有人,如果开发者获取不到userid,可以使用mentioned_mobile_list
- :param mentioned_mobile_list: 手机号列表,提醒手机号对应的群成员(@某个成员),@all表示提醒所有人
- """
- payload = {
- "msgtype": "text",
- "text": {
- "content": content,
- "mentioned_list": mentioned_list,
- "mentioned_mobile_list": mentioned_mobile_list
- }
- }
- return self.send_message(payload)
-
- def send_markdown(self, content):
- """
- 发送markdown消息
- 目前支持的markdown语法是如下的子集:
- 1. 标题 (支持1至6级标题,注意#与文字中间要有空格)
- 2. 加粗
- 3. 链接
- 4. 行内代码段(暂不支持跨行)
- 5. 引用
- 6. 字体颜色(只支持3种内置颜色), 绿色(color="info"),灰色(color="comment"),橙红色(color="warning")
- :param content: markdown内容,最长不超过4096个字节,必须是utf8编码
- """
- payload = {
- "msgtype": "markdown",
- "markdown": {
- "content": content
- }
- }
- return self.send_message(payload)
-
- def send_picture(self, image_path):
- """
- 发送图片消息
- :param image_path: 图片的绝对路径
- """
- with open(image_path, "rb") as f:
- image_data = f.read()
- payload = {
- "msgtype": "image",
- "image": {
- "base64": base64.b64encode(image_data).decode("utf-8"), # # 将图片数据转换成Base64编码格式
- "md5": hashlib.md5(image_data).hexdigest() # # 计算图片的MD5值
- }
- }
- return self.send_message(payload)
-
- def send_text_picture(self, articles: list):
- """
- 发送图文消息
- :param articles: 图文消息,一个图文消息支持1到8条图文, 包括如下字段
- 1. title: 标题,不超过128个字节,超过会自动截断
- 2. description: 非必填,描述,不超过512个字节,超过会自动截断
- 3. url: 点击后跳转的链接。
- 4. picurl: 非必填,图文消息的图片链接,支持JPG、PNG格式,较好的效果为大图 1068*455,小图150*150。
- """
- payload = {
- "msgtype": "news",
- "news": {
- "articles": [
- ]
- }
- }
- for article in articles:
- payload["news"]["articles"].append(
- {
- "title": article.get("title"),
- "description": article.get("description", ""),
- "url": article.get("url"),
- "picurl": article.get("picurl", "")
- }
- )
- return self.send_message(payload)
-
- def upload_file(self, file_path):
- """
- 上传文件到企业微信服务器(要求文件大小在5B~20M之间)
- 注意:素材上传得到media_id,该media_id仅三天内有效;media_id只能是对应上传文件的机器人可以使用
- :param file_path: 文件绝对路径
- """
- token_regex = r"key=([\w-]+)"
- match = re.search(token_regex, self.webhook_url)
- token = match.group(1)
- url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={token}&type=file"
- headers = {
- "Content-Type": "multipart/form-data;"
- }
- with open(file_path, "rb") as f:
- files = {"media": (os.path.basename(file_path), f.read())}
- response = request(url=url, method="POST", files=files, headers=headers)
- if response.json().get("errcode") == 0:
- media_id = response.json().get("media_id")
- print(f"上传文件成功,media_id= {media_id}")
- return media_id
- else:
- print(f"上传文件失败:{response.text}")
- return False
-
- def send_file(self, media_id):
- """
- 发送文件
- :param media_id: 文件id,通过下文的文件上传接口获取
- """
- payload = {
- "msgtype": "file",
- "file": {
- "media_id": media_id,
- }
- }
- return self.send_message(payload)
diff --git a/utils/notify_utils/yagmail_bot.py b/utils/notify_utils/yagmail_bot.py
deleted file mode 100644
index db6c396..0000000
--- a/utils/notify_utils/yagmail_bot.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2021/8/14 12:21
-# @Author : Flora.Chen
-# @File : yagmail_bot.py
-# @Software: PyCharm
-# @Desc: 通过第三方模块yagmail发送邮件
-
-# 标准库导入
-import os
-# 第三方库导入
-from loguru import logger
-import yagmail
-
-
-class YagEmailServe:
- def __init__(self, host, user, password):
- """
- user(发件人邮箱), password(邮箱授权码), host(发件人使用的邮箱服务 例如:smtp.163.com)
- """
- self.host = host
- self.user = user
- self.password = password
-
- def send_email(self, info: dict):
- """
- 发送邮件
- :param info:包括,contents(内容), to(收件人列表), subject(邮件标题), attachments(附件列表)
- info = {
- "subject": "",
- "contents": "",
- "to": "",
- "files": ""
- }
- :return:
- """
- try:
- logger.info("\n======================================================\n" \
- "-------------Start:发送邮件--------------------\n"
- f"用户名: {self.user}\n" \
- f"密码: {self.password}\n" \
- f"host: {self.host}\n" \
- f"邮件内容: {info}\n" \
- "=====================================================")
- yag = yagmail.SMTP(
- user=self.user,
- password=self.password,
- host=self.host)
- # 如果存在附件,则与邮件内容一起发送附件,否则仅发送邮件内容
- if info.get("attachments") and os.path.exists(info['attachments']):
- yag.send(
- to=info['to'],
- subject=info['subject'],
- contents=info['contents'],
- attachments=info['attachments'])
- else:
- logger.warning(f"\n请检查邮件内容info是否存在附件,info中应该存在键值:attachments\n"
- f"请检查附件地址是否正确 --> info['attachments'] 应该是一个有效的路径\n"
- f"当前仅发送邮件内容,不发送附件~")
- yag.send(
- to=info['to'],
- subject=info['subject'],
- contents=info['contents'])
- yag.close()
- logger.info("\n======================================================\n" \
- "-------------End:发送邮件--------------------\n"
- "发送邮件成功\n" \
- "=====================================================")
- except Exception as e:
- logger.error(f"发送邮件失败,错误信息: {e}")
diff --git a/utils/report_utils/allure_handle.py b/utils/report_utils/allure_handle.py
index 1210c65..f174e73 100644
--- a/utils/report_utils/allure_handle.py
+++ b/utils/report_utils/allure_handle.py
@@ -13,7 +13,7 @@ import allure
# 本地应用/模块导入
from utils.models import AllureAttachmentType
from utils.report_utils.platform_handle import PlatformHandle
-from utils.files_utils.files_handle import zip_file, copy_file
+from common.files_utils.files_handle import zip_file, copy_file
def allure_title(title: str) -> None:
diff --git a/utils/report_utils/get_results_handle.py b/utils/report_utils/get_results_handle.py
index e3ecbb1..36e34ac 100644
--- a/utils/report_utils/get_results_handle.py
+++ b/utils/report_utils/get_results_handle.py
@@ -11,7 +11,7 @@ import json
# 第三方库导入
from loguru import logger
# 本地应用/模块导入
-from utils.tools.time_handle import timestamp_strftime
+from common.tools.time_handle import timestamp_strftime
def get_test_results_from_from_allure_report(allure_html_path):
diff --git a/utils/report_utils/push_allure_report.py b/utils/report_utils/push_allure_report.py
index 59563ec..e652926 100644
--- a/utils/report_utils/push_allure_report.py
+++ b/utils/report_utils/push_allure_report.py
@@ -12,7 +12,7 @@ import subprocess
import shutil
import os
# 本地应用/模块导入
-from utils.files_utils.files_handle import copy_all_files
+from common.files_utils.files_handle import copy_all_files
"""
subprocess.run: 用于执行系统命令。
@@ -61,8 +61,8 @@ def push_allure_report(allure_report_dir: str, remote_url: str, username: str, p
subprocess.run(["git", "-C", repo_path, "init"], check=True)
print("初始化本地仓库成功")
logger.info("初始化本地仓库成功")
-
- auth_remote_url = f"https://{username}:{password}@{remote_url.split("//")[-1]}"
+ split_url = remote_url.split('//')[-1]
+ auth_remote_url = f"https://{username}:{password}@{split_url}"
print(f"添加远程仓库: {auth_remote_url}")
logger.info(f"添加远程仓库: {auth_remote_url}")
subprocess.run(["git", "-C", repo_path, "remote", "add", "origin", auth_remote_url], check=True)
diff --git a/utils/report_utils/send_result_handle.py b/utils/report_utils/send_result_handle.py
index c0a2945..dbb2edf 100644
--- a/utils/report_utils/send_result_handle.py
+++ b/utils/report_utils/send_result_handle.py
@@ -13,9 +13,9 @@ from config.settings import SEND_RESULT_TYPE, email, ding_talk, wechat, email_su
ding_talk_content, wechat_content
from utils.data_utils.data_handle import data_handle
from utils.report_utils.get_results_handle import get_test_results_from_from_allure_report
-from utils.notify_utils.dingding_bot import DingTalkBot
-from utils.notify_utils.wechat_bot import WechatBot
-from utils.notify_utils.yagmail_bot import YagEmailServe
+from common.notify_utils.dingding_bot import DingTalkBot
+from common.notify_utils.wechat_bot import WechatBot
+from common.notify_utils.yagmail_bot import YagEmailServe
def send_email(user, pwd, host, subject, content, to, attachments):
diff --git a/utils/requests_utils/base_request.py b/utils/requests_utils/base_request.py
index 29ecf02..a70d526 100644
--- a/utils/requests_utils/base_request.py
+++ b/utils/requests_utils/base_request.py
@@ -20,7 +20,7 @@ class BaseRequest:
Request操作封装
"""
- TIMEOUT = 12
+ TIMEOUT = 30
@classmethod
def send_request(cls, req_data):
@@ -131,7 +131,8 @@ class BaseRequest:
)
@classmethod
- def request_type_for_file(cls, method: Text, url: Text, headers: Optional[Dict], fields: Union[Dict, Text, None],
+ def request_type_for_file(cls, method: Text, url: Text, headers: Optional[Dict],
+ fields: Union[Dict, Text, None],
files: Text, **kwargs):
"""
处理 requestType 为 file 类型
diff --git a/utils/requests_utils/case_dependence.py b/utils/requests_utils/case_dependence.py
index 894ee62..aeaab40 100644
--- a/utils/requests_utils/case_dependence.py
+++ b/utils/requests_utils/case_dependence.py
@@ -9,7 +9,7 @@
import allure
from loguru import logger
# 本地应用/模块导入
-from config.path_config import INTERFACE_DIR
+from settings import INTERFACE_DIR
from utils.requests_utils.request_control import RequestControl
from utils.data_utils.data_handle import data_handle
from utils.data_utils.extract_data_handle import json_extractor, re_extract
@@ -19,16 +19,16 @@ from utils.database_utils.mysql_handle import MysqlServer
class CaseDependenceHandler:
"""
- 处理用例依赖,支持接口依赖,环境变量依赖,数据库查询依赖。关键字:env_vars, interface, database,
+ 处理用例依赖,支持接口依赖,环境变量依赖,数据库查询依赖。关键字:variables, interface, database,
先处理环境变量依赖,再处理接口依赖,最后处理数据库查询依赖
"""
def __init__(self, source):
self.source = source
- def handle_env_vars(self, env_vars):
+ def handle_variables(self, variables):
"""处理环境变量依赖"""
- for key, value in env_vars.items():
+ for key, value in variables.items():
new_value = data_handle(value, self.source)
allure_step(f"依赖环境变量 --> {key}={new_value}")
logger.info(f"依赖环境变量 --> {key}={new_value}")
@@ -77,7 +77,7 @@ class CaseDependenceHandler:
def case_dependence_handle(self, case_dependence: dict, db_info: dict = None):
"""
- 处理用例依赖,支持接口依赖,环境变量依赖,SQL依赖。关键字:env_vars, interface, database,
+ 处理用例依赖,支持接口依赖,环境变量依赖,SQL依赖。关键字:variables, interface, database,
先处理环境变量依赖,再处理接口依赖,最后处理SQL依赖
"""
if not case_dependence:
@@ -85,11 +85,11 @@ class CaseDependenceHandler:
allure_step("跳过用例依赖处理")
return self.source
- if case_dependence.get("env_vars"):
- if isinstance(case_dependence["env_vars"], dict):
- self.handle_env_vars(case_dependence["env_vars"])
+ if case_dependence.get("variables"):
+ if isinstance(case_dependence["variables"], dict):
+ self.handle_variables(case_dependence["variables"])
else:
- logger.error("依赖环境变量格式错误,跳过依赖环境变量处理~ --> env_vars仅支持dict格式")
+ logger.error("依赖环境变量格式错误,跳过依赖环境变量处理~ --> variables仅支持dict格式")
if case_dependence.get("interface"):
interfaces = case_dependence["interface"]
diff --git a/utils/requests_utils/request_control.py b/utils/requests_utils/request_control.py
index 7618e44..3032083 100644
--- a/utils/requests_utils/request_control.py
+++ b/utils/requests_utils/request_control.py
@@ -15,15 +15,14 @@ import requests
from requests import Response, utils
from loguru import logger
# 本地应用/模块导入
-from config.path_config import FILES_DIR
+from settings import FILES_DIR
from utils.requests_utils.base_request import BaseRequest
from utils.data_utils.data_handle import data_handle
from utils.data_utils.extract_data_handle import json_extractor, re_extract, response_extract
-from utils.files_utils.files_handle import get_files
-from utils.files_utils.yaml_handle import YamlHandle
+from common.files_utils.files_handle import get_files,load_yaml_file
from utils.assertion_utils.assert_control import AssertHandle
from utils.report_utils.allure_handle import allure_step
-from utils.database_utils.mysql_handle import MysqlServer
+from common.database_utils.mysql_handle import MysqlServer
class RequestControl(BaseRequest):
@@ -44,22 +43,23 @@ class RequestControl(BaseRequest):
logger.debug(f"目标路径是一个目录:{api_file_path}")
api_files = get_files(target=api_file_path, end=".yaml") + get_files(target=api_file_path, end=".yml")
for api_file in api_files:
- api_data.append(YamlHandle(filename=api_file).read_yaml)
+ api_data.append(load_yaml_file(api_file))
elif os.path.isfile(api_file_path):
logger.debug(f"目标路径是一个文件:{api_file_path}")
- api_data.append(YamlHandle(filename=api_file_path).read_yaml)
+ api_data.append(load_yaml_file(api_file_path))
else:
logger.error(f"目标路径错误,请检查!api_file_path={api_file_path}")
return None
for api in api_data:
- matching_api = next((item for item in api["case_info"] if item["id"] == key), None)
- if matching_api:
- logger.info("\n----------匹配到的api----------\n"
- f"类型:{type(matching_api)}"
- f"值:{matching_api}\n")
- return matching_api
+ if api.get("teststeps"):
+ matching_api = next((item for item in api["teststeps"] if item["id"] == key), None)
+ if matching_api:
+ logger.info("\n----------匹配到的api----------\n"
+ f"类型:{type(matching_api)}"
+ f"值:{matching_api}\n")
+ return matching_api
# 在找不到匹配的情况下,返回一个默认值且记录一条错误日志
logger.warning(f"未找到id为{key}的接口, 返回值是None")
@@ -175,7 +175,7 @@ class RequestControl(BaseRequest):
f"请求文件(files): {type(request_data.get('files', None))} || {request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(request_data.get('wait_seconds', None))} || {request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(request_data.get('payload', None))} || {request_data.get('payload', None)}\n" \
- f"响应断言(assert_response): {type(request_data.get('assert_response', None))} || {request_data.get('assert_response', None)}\n" \
+ f"响应断言(validate): {type(request_data.get('validate', None))} || {request_data.get('validate', None)}\n" \
f"数据库断言(assert_sql): {type(request_data.get('assert_sql', None))} || {request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(request_data.get('extract', None))} || {request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(request_data.get('case_dependence', None))} || {request_data.get('case_dependence', None)}\n")
@@ -192,7 +192,7 @@ class RequestControl(BaseRequest):
"files": self.files_handle(files=request_data.get("files"), source=source_data),
"wait_seconds": self.wait_seconds_handle(wait_seconds=request_data.get("wait_seconds")),
"payload": data_handle(obj=request_data["payload"], source=source_data),
- "assert_response": data_handle(obj=request_data.get("assert_response"), source=source_data),
+ "validate": data_handle(obj=request_data.get("validate"), source=source_data),
"assert_sql": request_data.get("assert_sql"),
"extract": data_handle(obj=request_data.get("extract"), source=source_data),
"case_dependence": request_data.get("case_dependence")
@@ -210,7 +210,7 @@ class RequestControl(BaseRequest):
f"请求文件(files): {type(new_request_data.get('files', None))} || {new_request_data.get('files', None)}\n" \
f"请求后等待(wait_seconds): {type(new_request_data.get('wait_seconds', None))} || {new_request_data.get('wait_seconds', None)}\n" \
f"请求参数(payload): {type(new_request_data.get('payload', None))} || {new_request_data.get('payload', None)}\n" \
- f"响应断言(assert_response): {type(new_request_data.get('assert_response', None))} || {new_request_data.get('assert_response', None)}\n" \
+ f"响应断言(validate): {type(new_request_data.get('validate', None))} || {new_request_data.get('validate', None)}\n" \
f"数据库断言(assert_sql): {type(new_request_data.get('assert_sql', None))} || {new_request_data.get('assert_sql', None)}\n" \
f"后置提取参数(extract): {type(new_request_data.get('extract', None))} || {new_request_data.get('extract', None)}\n" \
f"用例依赖(case_dependence): {type(new_request_data.get('case_dependence', None))} || {new_request_data.get('case_dependence', None)}\n"
@@ -392,7 +392,7 @@ class RequestControl(BaseRequest):
self.api_step_record(**new_api_data)
# 进行响应断言
- AssertHandle(assert_data=new_api_data["assert_response"], response=response).assert_handle()
+ AssertHandle(assert_data=new_api_data["validate"], response=response).assert_handle()
# 进行响应参数提取,并保存提取后的数据
if new_api_data.get("extract"):
diff --git a/utils/tools/__init__.py b/utils/tools/__init__.py
deleted file mode 100644
index 9abd276..0000000
--- a/utils/tools/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/11/10 14:46
-# @Author : floraachy
-# @File : __init__.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/tools/aes_encrypt_decrypt.py b/utils/tools/aes_encrypt_decrypt.py
deleted file mode 100644
index f40bff6..0000000
--- a/utils/tools/aes_encrypt_decrypt.py
+++ /dev/null
@@ -1,77 +0,0 @@
-"""
-@FileName:aes_encrypt_decrypt.py
-@Description:
-@Author:Floraachy
-@Time:2024/11/22 14:48
-"""
-
-"""
-AES 加密最常用的模式就是 ECB模式 和 CBC 模式,当然还有很多其它模式,他们都属于AES加密。ECB模式和CBC 模式俩者区别就是 ECB 不需要 iv偏移量,而CBC需要。
-AES加密使用参数:
-1) 秘钥: 加密的时候用秘钥,解密的时候需要同样的秘钥才能解出来; 数据类型为bytes
-2) 明文: 需要加密的参数; 数据类型为bytes
-3) 模式: aes 加密常用的有 ECB 和 CBC 模式(我只用了这两个模式,还有其他模式);数据类型为aes类内部的枚举量
-4) iv 偏移量: 这个参数在 ECB 模式下不需要,在 CBC 模式下需要;数据类型为bytes
-
-1. 在Python中进行AES加密解密时,所传入的密文、明文、秘钥、iv偏移量、都需要是bytes(字节型)数据。python 在构建aes对象时也只能接受bytes类型数据。
-2.当秘钥,iv偏移量,待加密的明文,字节长度不够16字节或者16字节倍数的时候需要进行补全。
-3. CBC模式需要重新生成AES对象,为了防止这类错误,我写代码无论是什么模式都重新生成AES对象。
-
-【编码模式】
-由于python中的 AES 加密解密,只能接受字节型(bytes)数据。而我们常见的 待加密的明文可能是中文,或者待解密的密文经过base64编码的,这种都需要先进行编码或者解码,然后才能用AES进行加密或解密。
-因此,在python使用AES进行加密或者解密时,都需要先转换成bytes型数据。
-对于中文明文,我们可以使用encode()函数进行编码,将字符串转换成bytes类型数据。解密后,同样是需要decode()函数进行解码的,将字节型数据转换回中文字符(字符串类型)。
-
-【填充模式】
-前面我使用秘钥,还有明文,包括IV向量,都是固定16字节,也就是数据块对齐了。而填充模式就是为了解决数据块不对齐的问题,使用什么字符进行填充就对应着不同的填充模式
-AES补全模式常见有以下几种:
-1)ZeroPadding: 用b’\x00’进行填充,这里的0可不是字符串0,而是字节型数据的b’\x00’
-2)PKCS7Padding: 当需要N个数据才能对齐时,填充字节型数据为N、并且填充N个
-3)PKCS5Padding:与PKCS7Padding相同,在AES加密解密填充方面我没感到什么区别
-4)no padding:当为16字节数据时候,可以不进行填充,而不够16字节数据时同ZeroPadding一样
-
-
-"""
-# 标准库导入
-import base64
-# 第三方模块导入
-from Crypto.Cipher import AES
-
-
-class Encrypt:
- """
- 使用AES-CBC对称加密算法对密码进行加密,填充模式为PKCS7Padding
- """
- def __init__(self, key, iv):
- self.key = key.encode('utf-8')
- self.iv = iv.encode('utf-8')
-
- # @staticmethod
- def pkcs7padding(self, text):
- """明文使用PKCS7填充 """
- bs = 16
- length = len(text)
- bytes_length = len(text.encode('utf-8'))
- padding_size = length if (bytes_length == length) else bytes_length
- padding = bs - padding_size % bs
- padding_text = chr(padding) * padding
- self.coding = chr(padding)
- return text + padding_text
-
- def aes_encrypt(self, content):
- """ AES加密 """
- cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
- # 处理明文
- content_padding = self.pkcs7padding(content)
- # 加密
- encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
- # 重新编码
- result = str(base64.b64encode(encrypt_bytes), encoding='utf-8')
- return result
-
- def aes_decrypt(self, content):
- """AES解密 """
- cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
- content = base64.b64decode(content)
- text = cipher.decrypt(content).decode('utf-8')
- return text.rstrip(self.coding)
diff --git a/utils/tools/func_handle.py b/utils/tools/func_handle.py
deleted file mode 100644
index dbf8257..0000000
--- a/utils/tools/func_handle.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/4/6 11:36
-# @Author : Flora.Chen
-# @File : helper.py
-# @Software: PyCharm
-# @Desc:
-
-def add_docstring(docstring):
- """
- 函数装饰器,它接受一个字符串参数docstring,
- 并返回一个装饰器函数。装饰器函数接受一个函数参数func,
- 并将func的__doc__属性设置为docstring。
- """
- def decorator(func):
- func.__doc__ = docstring
- return func
-
- return decorator
-
-
-class AddCLassDocstring:
- """
- 类装饰器,它接受一个字符串参数docstring,
- 并返回一个装饰器函数。装饰器函数接受一个函数参数func,
- 并将func的__doc__属性设置为docstring。
- """
- def __init__(self, docstring):
- self.docstring = docstring
-
- def __call__(self, func):
- func.__doc__ = self.docstring
- return func
\ No newline at end of file
diff --git a/utils/tools/http_server.py b/utils/tools/http_server.py
deleted file mode 100644
index fc94104..0000000
--- a/utils/tools/http_server.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/5/24 9:34
-# @Author : chenyinhua
-# @File : http_server.py
-# @Software: PyCharm
-# @Desc: 封装的HTTP服务
-
-# 标准库导入
-import http.server
-import socketserver
-import os
-import sys
-# 第三方库导入
-from functools import partial
-
-
-class HttpServer:
- def __init__(self, bind: str = "127.0.0.1", port: int = 8000, directory=os.getcwd()):
- """
- :param bind: 指定地址,如本地主机
- :param port: 自定义端口号, 服务器默认监听端口是 8000
- :param directory: 指定工作目录, 服务器默认工作目录为当前目录
- """
- self.bind = bind
- self.port = port
- self.directory = directory
- args = sys.argv
- for i in range(1, len(args)):
- if args[i] == "-port" and i + 1 < len(args):
- self.port = int(args[i + 1])
- if args[i] == "-dir" and i + 1 < len(args):
- self.directory = args[i + 1]
- if args[i] == "-bind" and i + 1 < len(args):
- self.bind = args[i + 1]
-
- def run(self):
- try:
- with socketserver.TCPServer((self.bind, self.port), partial(http.server.SimpleHTTPRequestHandler,
- directory=self.directory)) as httpd:
- print(
- f"工作目录:{self.directory}\n"
- f"Serving HTTP on {self.bind} port {self.port} \n"
- f"http://{self.bind}:{self.port}/ ..."
- )
- httpd.serve_forever()
- except KeyboardInterrupt:
- print("\nKeyboard interrupt received, exiting.")
- sys.exit(0)
-
-
-if __name__ == '__main__':
- server = HttpServer()
- server.run()
diff --git a/utils/tools/time_handle.py b/utils/tools/time_handle.py
deleted file mode 100644
index 540d54d..0000000
--- a/utils/tools/time_handle.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/5/19 10:14
-# @Author : chenyinhua
-# @File : time_handle.py
-# @Software: PyCharm
-# @Desc:
-
-# 标准库导入
-import time
-
-
-def timestamp_strftime(timestamp, style="%Y-%m-%d %H:%M:%S"):
- """
- 将时间戳转换为指定格式日期
- """
- try:
- if isinstance(timestamp, str):
- timestamp = eval(timestamp)
- return time.strftime(style, time.localtime(float(timestamp / 1000)))
- except Exception as e:
- return f"timestamp或者style格式错误:{e}"
diff --git a/utils/yaml_case_maker/__init__.py b/utils/yaml_case_maker/__init__.py
deleted file mode 100644
index 6836428..0000000
--- a/utils/yaml_case_maker/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/8/16 16:49
-# @Author : chenyinhua
-# @File : __init__.py.py
-# @Software: PyCharm
-# @Desc:
diff --git a/utils/yaml_case_maker/postman_for_yaml.py b/utils/yaml_case_maker/postman_for_yaml.py
deleted file mode 100644
index a109c17..0000000
--- a/utils/yaml_case_maker/postman_for_yaml.py
+++ /dev/null
@@ -1,247 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/8/22 14:13
-# @Author : chenyinhua
-# @File : postman_for_yaml.py
-# @Software: PyCharm
-# @Desc:
-
-# 标准库导入
-import os
-import re
-import json
-from typing import Dict
-# 第三方库导入
-from ruamel import yaml
-
-"""
-相比较于pyyaml, Ruamel可以保持YAML文件的结构和顺序不变。
-安装:pip install ruamel.yaml
-"""
-
-
-class PostmanForYaml:
- """
- 将postman接口文档转为YAML格式用例
- """
-
- def __init__(self, case_dir, postman_path):
- """
- :param case_dir: 用例需要保存的目录
- :param postman_path: 需要读取的swagger文件的路径
- """
- self._data = self.get_postman_json(postman_path)
- self.case_dir = case_dir
-
- def get_postman_json(self, postman_path):
- """
- 获取 postman 中的 json 数据
- :param postman_path: 需要读取的swagger文件的路径
- :return:
- """
- result = []
- try:
- with open(postman_path, "r", encoding='utf-8') as f:
- row_data = json.load(f)
-
- def _parse_api(content):
- nonlocal row_data
- nonlocal result
- if isinstance(content, list):
- for item in content:
- _parse_api(content=item)
- elif isinstance(content, dict):
- if 'item' in content.keys():
- _parse_api(content=content['item'])
- elif 'request' in content.keys():
- # 获取所有接口的相关数据
- yaml_data = {
- "case_common": {
- "allure_epic": self.get_allure_epic(row_data),
- "allure_feature": self.get_allure_feature(content),
- "allure_story": self.get_allure_story(content)
- },
- "case_info": [
- {
- "id": f"case_{self.get_case_id(self.get_url(content)).lower()}_01",
- "title": self.get_title(content),
- "run": False,
- "url": self.get_url(content),
- "severity": None,
- "method": self.get_method(content),
- "headers": self.get_headers(content),
- "cookies": None,
- "request_type": self.get_request_type_payload(content).get("request_type"),
- "payload": self.get_request_type_payload(content).get("payload"),
- "files": self.get_request_type_payload(content).get("files"),
- "extract": None,
- "assert_response": {'eq': {'http_code': 200}},
- "assert_sql": None
-
- }
- ]
- }
- result.append({
- self.get_case_id(self.get_url(content)): yaml_data
- })
- _parse_api(content=row_data)
- return result
- except FileNotFoundError:
- raise FileNotFoundError("文件路径不存在,请重新输入")
-
- def get_allure_epic(self, content):
- """
- 获取 yaml 用例中的 allure_epic
- """
- _allure_epic = content['info']['name']
- return _allure_epic
-
- @classmethod
- def get_allure_feature(cls, content):
- """
- 获取 yaml 用例中的 allure_feature
- 这里直接获取最下级的item.name,因为可以有多级item,不好判断
- """
-
- _allure_feature = content['name']
- return str(_allure_feature)
-
- @classmethod
- def get_allure_story(cls, content):
- """
- 获取 yaml 用例中的 allure_story
- 这里直接获取最下级的item.name,因为可以有多级item,不好判断
- """
- _allure_story = content['name']
- return _allure_story
-
- @classmethod
- def get_case_id(cls, content):
- """
- 获取 case_id, 是根据接口路径生成的
- """
- # 这里接收到的参数content其实是url
- # 移除?后面拼接的值
- url_path = content.split("?")[0]
- # 去除"http://"或"https://"部分
- if url_path.startswith(("http://", "https://")):
- url_path = url_path.split("//", 1)[1]
- # 使用正则表达式匹配并删除第一个斜线以及斜线前的内容
- new_url_path = re.sub(r'^.*?/', '', url_path)
- # 将剩余的斜线替换成_
- _case_id = new_url_path.replace("/", "_")
- return _case_id
-
- @classmethod
- def get_title(cls, content):
- """
- 获取接口的标题
- """
- _get_detail = content.get('name')
- return "测试 " + _get_detail
-
- @classmethod
- def get_url(cls, content):
- """
- 获取接口的url
- """
- request = content.get('request')
- url = request.get('url')
- url_raw = url.get('raw') if url else url
- _url = url_raw.replace('{{', '${').replace('}}', '}')
- # 使用正则表达式匹配":dept"内的内容并替换为${dept}
- _get_url = re.sub(r':(\w+)', r'${\1}', _url)
- return _get_url
-
- @classmethod
- def get_method(cls, content):
- """
- 获取接口的method
- """
- request = content.get('request')
- _get_method = request.get('method', 'GET').upper()
- return _get_method
-
- @classmethod
- def get_headers(cls, content):
- """
- 获取请求头
- """
- _headers = {}
- request = content.get('request')
- if request:
- _headers = request.get('header')
- _headers = {item.get('key'): item.get('value') for item in _headers} if _headers else {}
- auth = request.get('auth')
- if auth:
- auth_type = auth.get('type')
- if auth.get(auth_type):
- auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if
- (item and item.get('key'))}
- _headers.update(auth_value)
- # 如果_headers是{}就返回None
- return None if not _headers else _headers
-
- @classmethod
- def get_request_type_payload(cls, content):
- """
- 获取request_type, 并响应处理payload及file参数
- """
- api = {
- "request_type": 'json',
- "payload": {},
- "files": {"file": []}
- }
- request = content.get('request')
- if request:
- body = request.get('body')
- if body:
- # api接口请求参数类型
- request_mode = body.get('mode')
- if request_mode in ['raw', 'formdata', 'urlencoded']:
- api["request_type"] = 'json' if request_mode == 'raw' else 'data'
- request_data = body.get(request_mode)
- if request_data:
- if request_mode == 'raw':
- api["payload"].update(
- json.loads(request_data.replace('\t', '').replace('\n', '').replace('\r', '')))
- elif request_mode in ['formdata', 'urlencoded']:
- for item in request_data:
- if item['type'] == "text":
- api["payload"][item['key']] = item.get('value', '')
- elif item['type'] == "file":
- api["files"]["file"].append(item.get('src', ''))
- api["request_type"] = "file"
- else:
- raise ValueError("不支持的请求参数类型")
- api["payload"] = None if not api["payload"] else api["payload"]
- api["files"] = None if not api["files"]["file"] else api["files"]
- return api
-
- def yaml_cases(self, data: Dict, file_path: str) -> None:
- """
- 写入 yaml 数据
- :param file_path:
- :param data: 测试用例数据
- :return:
- """
- # 检查目录不存在则创建, 存在则不创建
- os.makedirs(self.case_dir, exist_ok=True)
- _file_name = file_path + '.yaml'
- _file_path = os.path.join(self.case_dir, _file_name)
- if _file_name in os.listdir(self.case_dir):
- data.pop("case_common")
- data = data["case_info"]
- with open(_file_path, "a", encoding="utf-8") as file:
- yaml.dump(data, file, Dumper=yaml.RoundTripDumper, allow_unicode=True)
- file.write('\n')
-
- def write_yaml_handler(self):
- # 获取所有接口的相关数据
- for case in self._data:
- for k, v in case.items():
- self.yaml_cases(data=v, file_path=k)
-
-
-if __name__ == '__main__':
- PostmanForYaml(case_dir=r"C:\1_xinjinyuan_chy\1project\apiautotest\files\postman",
- postman_path=r"C:\1_xinjinyuan_chy\1project\apiautotest\files\Gitlink.postman_collection.json").write_yaml_handler()
diff --git a/utils/yaml_case_maker/swagger_for_yaml.py b/utils/yaml_case_maker/swagger_for_yaml.py
deleted file mode 100644
index 165d0bc..0000000
--- a/utils/yaml_case_maker/swagger_for_yaml.py
+++ /dev/null
@@ -1,197 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Time : 2023/8/15 15:52
-# @Author : chenyinhua
-# @File : swagger_for_yaml.py
-# @Software: PyCharm
-# @Desc:
-
-# 标准库导入
-import os
-import json
-from typing import Dict
-# 第三方库导入
-from jsonpath import jsonpath
-from ruamel import yaml
-
-"""
-相比较于pyyaml, Ruamel可以保持YAML文件的结构和顺序不变。
-安装:pip install ruamel.yaml
-"""
-
-
-class SwaggerForYaml:
- """
- 将swagger接口文档转为YAML格式用例
- """
-
- def __init__(self, case_dir, swagger_path):
- """
- :param case_dir: 用例需要保存的目录
- :param swagger_path: 需要读取的swagger文件的路径
- """
- self._data = self.get_swagger_json(swagger_path)
- self.case_dir = case_dir
-
- def get_swagger_json(self, path):
- """
- 获取 swagger 中的 json 数据
- :param path: 需要读取的swagger文件的路径
- :return:
- """
- try:
- with open(path, "r", encoding='utf-8') as f:
- row_data = json.load(f)
- return row_data
- except FileNotFoundError:
- raise FileNotFoundError("文件路径不存在,请重新输入")
-
- def get_allure_epic(self):
- """
- 获取 yaml 用例中的 allure_epic
- """
- _allure_epic = self._data['info']['title']
- return _allure_epic
-
- @classmethod
- def get_allure_feature(cls, value):
- """
- 获取 yaml 用例中的 allure_feature
- 取的是每一个接口的tags, tag可能是列表格式,例如:"tags": ["组织下可选角色"],这种就处理一下获取第一个元素值
- """
- _allure_feature = value['tags'][0] if isinstance(value['tags'], list) else value['tags']
- return str(_allure_feature)
-
- @classmethod
- def get_allure_story(cls, value):
- """
- 获取 yaml 用例中的 allure_story
- 取的是每一个接口的summary
- """
- _allure_story = value['summary']
- return _allure_story
-
- @classmethod
- def get_case_id(cls, value):
- """
- 获取 case_id, 是根据接口路径生成的
- """
- _case_id = value.replace("/", "_")
- return "case" + _case_id + "_01"
-
- @classmethod
- def get_title(cls, value):
- """
- 获取接口的标题
- """
- _get_detail = value['summary']
- return "测试 " + _get_detail
-
- @classmethod
- def get_headers(cls, value):
- """
- 获取请求头
- """
- _headers = {}
- # 先检查是否存在consumes, 存在则consumes的值作为header的Content-Type
- consumes = jsonpath(obj=value, expr="$.consumes")
- if consumes and consumes != [[]]:
- _headers = {"Content-Type": consumes[0][0]}
- # 再检查parameters是否存在,存在则检查in是否等于header, 存在则header[parameters[name]]=None
- parameters = jsonpath(obj=value, expr="$.parameters")
- if parameters and parameters != [[]]:
- for i in value['parameters']:
- if i['in'] == 'header':
- _headers[i['name']] = None
- # 如果_headers是{}就返回None
- return None if not _headers else _headers
-
- @classmethod
- def get_request_type(cls, value, headers):
- """
- 处理 request_type:需要综合考虑参数的in和header请求类型
- """
- headers_values = list(headers.values()) if isinstance(headers, dict) else str(headers)
- parameters = jsonpath(obj=value, expr="$.parameters")
- if parameters and parameters != [[]]:
- _parameters = value['parameters']
- if _parameters[0]['in'] == 'query':
- return "params"
- else:
- if 'application/x-www-form-urlencoded' in headers_values or 'multipart/form-data' in headers_values:
- return "data"
- elif 'application/json' in headers_values:
- return "json"
- elif 'application/octet-stream' in headers_values:
- return "file"
- else:
- return "data"
-
- @classmethod
- def get_payload(cls, value):
- """
- 处理 payload数据
- """
- _dict = {}
- if jsonpath(obj=value, expr="$.parameters"):
- _parameters = value['parameters']
- for i in _parameters:
- if i['in'] != 'header':
- _dict[i['name']] = None
- else:
- return None
- return None if not _dict else _dict
-
- def yaml_cases(self, data: Dict, file_path: str) -> None:
- """
- 写入 yaml 数据
- :param file_path:
- :param data: 测试用例数据
- :return:
- """
- # 检查目录不存在则创建, 存在则不创建
- os.makedirs(self.case_dir, exist_ok=True)
- _file_name = file_path[1:].replace("/", "_") + '.yaml'
- _file_path = os.path.join(self.case_dir, _file_name)
- if _file_name in os.listdir(self.case_dir):
- data.pop("case_common")
- data = data["case_info"]
- with open(_file_path, "a", encoding="utf-8") as file:
- yaml.dump(data, file, Dumper=yaml.RoundTripDumper, allow_unicode=True)
- file.write('\n')
-
- def write_yaml_handler(self):
- # 获取所有接口的相关数据,key=接口路径, value=接口各项参数
- _api_data = self._data['paths']
- for key, value in _api_data.items():
- for k, v in value.items():
- yaml_data = {
- "case_common": {
- "allure_epic": self.get_allure_epic(),
- "allure_feature": self.get_allure_feature(v),
- "allure_story": self.get_allure_story(v)
- },
- "case_info": [
- {
- "id": self.get_case_id(key + "_" + k),
- "title": self.get_title(v),
- "run": False,
- "url": key,
- "severity": None,
- "method": k,
- "headers": self.get_headers(v),
- "cookies": None,
- "request_type": self.get_request_type(v, self.get_headers(v)),
- "payload": self.get_payload(v),
- "files": None,
- "extract": None,
- "assert_response": {'eq': {'http_code': 200}},
- "assert_sql": None
-
- }
- ]
- }
- self.yaml_cases(data=yaml_data, file_path=key)
-
-
-if __name__ == '__main__':
- SwaggerForYaml(case_dir=r"", swagger_path=r"").write_yaml_handler()