diff --git a/Pipfile b/Pipfile index 847f07a..4248951 100644 --- a/Pipfile +++ b/Pipfile @@ -19,6 +19,7 @@ jsonpath = "*" pytest = "==6.2.5" pytest-html = "==2.1.1" pytest-rerunfailures = "*" +allure-pytest = "==2.9.45" [dev-packages] diff --git a/README.md b/README.md index 3ac6e7b..465de94 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,17 @@ +## 前言 + + 公司突然要求你做自动化,但是没有代码基础不知道怎么做?或者有自动化基础,但是不知道如何系统性的做自动化, 放在yaml文件中维护,不知道如何处理多业务依赖的逻辑? + + 那么本自动化框架,将为你解决这些问题。 + - 框架主要使用 python 语言编写,结合 pytest 进行二次开发,用户仅需要在 yaml 或者 excel 文件中编写测试用例, 编写成功之后,会自动生成测试用例代码,零基础代码小白,也可以操作。 + - 如果是具备代码基础的,也可以直接通过 py 文件编写测试用例。 + - 使用 pytest-html / Allure 生成报告,并针对测试报告样式进行了调整,使得报告更加美观; + - 测试完成后,支持发送 企业微信通知/ 钉钉通知/ 邮箱通知,灵活配置。 + ## 一、框架介绍 -本框架主要是基于 Python + pytest + pytest-html + loguru + 邮件通知/企业微信通知/钉钉通知 实现的接口自动化框架。 +本框架主要是基于 Python + pytest + pytest-html/Allure + loguru + 邮件通知/企业微信通知/钉钉通知 实现的接口自动化框架。 * git地址: [https://www.gitlink.org.cn/floraachy/apiautotest](https://www.gitlink.org.cn/floraachy/apiautotest) * 项目参与者: floraachy @@ -10,37 +20,32 @@ 对于框架任何问题,欢迎联系我! -## 二、前言 - 公司突然要求你做自动化,但是没有代码基础不知道怎么做?或者有自动化基础,但是不知道如何系统性的做自动化, 放在yaml文件中维护,不知道如何处理多业务依赖的逻辑? - - 那么本自动化框架,将为你解决这些问题。 - - 框架主要使用 python 语言编写,结合 pytest 进行二次开发,用户仅需要在 yaml 或者 excel 文件中编写测试用例, 编写成功之后,会自动生成测试用例代码,零基础代码小白,也可以操作。 - - 如果是具备代码基础的,也可以直接通过 py 文件编写测试用例。 - - 使用 pytest-html 生成报告,并针对测试报告样式进行了调整,使得报告更加美观; - - 测试完成后,支持发送 企业微信通知/ 钉钉通知/ 邮箱通知,灵活配置。 - -## 三、实现功能 +## 二、实现功能 * 通过session会话方式,解决了登录之后cookie关联处理 * 框架天然支持接口动态传参、关联灵活处理 * 测试数据隔离, 实现数据驱动 * 自动生成用例代码: 测试人员在yaml/excel文件中填写好测试用例, 程序可以直接生成用例代码,纯小白也能使用 * 动态多断言: 支持响应断言和数据库断言 +* 多种报告随心选择:框架支持pytest-html以及Allure测试报告,可以动态配置所需报告 * 日志模块: 采用loguru管理日志,可以输出更为优雅,简洁的日志 * 钉钉、企业微信通知: 支持多种通知场景,执行成功之后,可选择发送钉钉、或者企业微信、邮箱通知 * 执行环境一键切换,解决多环境相互影响问题 * 使用pipenv管理虚拟环境和依赖文件,提供了一系列命令和选项来帮助你实现各种依赖和环境管理相关的操作。 -## 四、目录结构 +## 三、目录结构 ``` ├────case_utils/ 测试框架相关工具类 │ ├────__init__.py +│ ├────allure_handle.py 操作allure的相关方法 +│ ├────platform_handle.py 跨平台的支持allure,用于生成allure测试报告 │ ├────assert_handle.py 断言处理, 包括响应断言和数据库断言 │ ├────case_handle.py 根据配置文件,从指定类型文件中读取用例数据,并调用生成用例文件方法,生成用例文件 │ ├────data_handle.py 数据处理 │ ├────request_data_handle.py 针对用例数据进行请求前后的处理 +│ ├────get_results_handle.py 从pytest-html/allure测试报告中获取测试结果 │ └────send_result_handle.py 根据配置文件,从html测试报告中获取测试结果,发送指定类型的通知 ├────common_utils/ 公共的工具类 │ ├────__init__.py @@ -50,8 +55,9 @@ │ ├────excel_handle.py 处理excel │ ├────files_handle.py 处理文件相关操作 │ ├────func_handle.py 函数装饰器 -│ ├────webchat_handle.py 封装企业微信机器人 +│ ├────wechat_handle.py 封装企业微信机器人 │ ├────yagmail_handle.py 封装通过yagmail发送邮件的方法 +│ ├────time_handle.py 封装处理时间操作的一些方法 │ └────yaml_handle.py 处理yaml文件 ├────config/ │ ├────__init__.py @@ -84,7 +90,7 @@ │ │ └────test_login_demo.py ``` -## 五、依赖库 +## 四、依赖库 ``` python_version = "3.9" pymysql = "*" @@ -97,15 +103,16 @@ sshtunnel = "*" yagmail = "*" pyyaml = "*" click = "*" -pytest-html = "==2.1.1" faker = "*" -pytest-rerunfailures = "*" jsonpath = "*" pytest = "==6.2.5" +pytest-html = "==2.1.1" +pytest-rerunfailures = "*" +allure-pytest = "==2.9.45" ``` -## 六、安装教程 +## 五、安装教程 1. 通过Git工具clone代码到本地 或者 直接下载压缩包ZIP 2. 本地电脑搭建好 python环境,我使用的python版本是3.9 diff --git a/case_utils/case_handle.py b/case_utils/case_handle.py index 3a1a311..49b424e 100644 --- a/case_utils/case_handle.py +++ b/case_utils/case_handle.py @@ -37,6 +37,41 @@ def get_excel_data(file_path): return ExcelHandle(file_path).read() +def gen_case_data_from_excel(files): + cases = [] + for file in files: + # 读取excel文件中的用例数据,存储到data中 + data = get_excel_data(file) + for _data in data: + # 将excel读取到的用例数据,适配allure格式 + excel_data = { + 'case_common': {'allure_epic': 'GitLink接口', 'allure_feature': _data["sheet_name"], + 'allure_story': _data["sheet_name"]}, + 'case_info': _data["data"] + } + # 调用gen_case方法生成测试用例, 例如:test_demo.py + gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=excel_data, + target_case_path=AUTO_CASE_DIR) + # 将获取到的用例数据统一保存到cases中 + cases.extend([excel_data]) + logger.debug(f"从{file}中读取到的用例数据是:{excel_data}") + return cases + + +def gen_case_data_from_yaml(files): + cases = [] + for file in files: + # 从yaml/yml中读取用例数据 + yaml_data = get_yaml_data(file) + # 调用gen_case方法生成测试用例, 例如:test_demo.py + gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=yaml_data, + target_case_path=AUTO_CASE_DIR) + # 将获取到的用例数据统一保存到cases中 + cases.extend([yaml_data]) + logger.debug(f"从{file}中读取到的用例数据是:{yaml_data}") + return cases + + def get_case_data(): """ 根据配置文件,从指定类型文件中读取用例数据,并调用生成用例文件方法,生成用例文件 @@ -46,71 +81,28 @@ def get_case_data(): # 从excel中读取用例数据 if CASE_FILE_TYPE == CaseFileType.EXCEL.value: # 在用例数据"DATA_DIR"目录中寻找后缀是xlsx, xls的文件 - files = get_files(target=DATA_DIR, start="test_", end=".xlsx") + get_files(target=DATA_DIR, start="test_", - end=".xls") - for file in files: - # 读取excel文件中的用例数据,存储到data中 - data = ExcelHandle(file).read() - for _data in data: - # 将excel读取到的用例数据,适配allure格式 - excel_data = { - 'case_common': {'allure_epic': 'GitLink接口', 'allure_feature': _data["sheet_name"], - 'allure_story': _data["sheet_name"]}, - 'case_info': _data["data"] - } - # 调用gen_case方法生成测试用例, 例如:test_demo.py - gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=excel_data, - target_case_path=AUTO_CASE_DIR) - # 将获取到的用例数据统一保存到cases中 - cases.extend([excel_data]) - logger.debug(f"从{file}中读取到的用例数据是:{excel_data}") + files = get_files(target=DATA_DIR, start="test_", end=".xlsx") \ + + get_files(target=DATA_DIR, start="test_", end=".xls") + cases = gen_case_data_from_excel(files) return cases # 从yaml中读取用例数据 elif CASE_FILE_TYPE == CaseFileType.YAML.value: # 在用例数据"DATA_DIR"目录中寻找后缀是yaml, yml的文件 - files = get_files(target=DATA_DIR, start="test_", end=".yaml") + get_files(target=DATA_DIR, start="test_", - end=".yml") - for file in files: - # 从yaml/yml中读取用例数据 - yaml_data = get_yaml_data(file) - # 调用gen_case方法生成测试用例, 例如:test_demo.py - gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=yaml_data, - target_case_path=AUTO_CASE_DIR) - # 将获取到的用例数据统一保存到cases中 - cases.extend([yaml_data]) - logger.debug(f"从{file}中读取到的用例数据是:{yaml_data}") + files = get_files(target=DATA_DIR, start="test_", end=".yaml") \ + + get_files(target=DATA_DIR, start="test_", end=".yml") + cases = gen_case_data_from_yaml(files) return cases else: # 在用例数据"DATA_DIR"目录中寻找后缀是xlsx,xls, yaml, yml的文件 - files = get_files(target=DATA_DIR, start="test_", end=".xlsx") + get_files(target=DATA_DIR, start="test_", - end=".xls") + get_files( - target=DATA_DIR, start="test_", end=".yaml") + get_files(target=DATA_DIR, start="test_", - end=".yml") - for file in files: - if os.path.splitext(file)[1] == ".xlsx" or os.path.splitext(file)[1] == ".xls": - # 读取excel文件中的用例数据,存储到data中 - data = ExcelHandle(file).read() - for _data in data: - # 将excel读取到的用例数据,适配allure格式 - excel_data = { - 'case_common': {'allure_epic': 'GitLink接口', 'allure_feature': _data["sheet_name"], - 'allure_story': _data["sheet_name"]}, - 'case_info': _data["data"] - } - # 调用gen_case方法生成测试用例, 例如:test_demo.py - gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=excel_data, - target_case_path=AUTO_CASE_DIR) - # 将获取到的用例数据统一保存到cases中 - cases.extend([excel_data]) - logger.debug(f"从{file}中读取到的用例数据是:{excel_data}") - else: - # 从yaml/yml中读取用例数据 - yaml_data = get_yaml_data(file) - # 调用gen_case方法生成测试用例, 例如:test_demo.py - gen_case_file(case_file_path=file, case_template_path=CASE_TEMPLATE_DIR, case_data=yaml_data, - target_case_path=AUTO_CASE_DIR) - # 将获取到的用例数据统一保存到cases中 - cases.extend([yaml_data]) + excel_files = get_files(target=DATA_DIR, start="test_", end=".xlsx") \ + + get_files(target=DATA_DIR, start="test_", end=".xls") + yaml_files = get_files(target=DATA_DIR, start="test_", end=".yaml") \ + + get_files(target=DATA_DIR, start="test_", end=".yml") + excel_cases = gen_case_data_from_excel(excel_files) + cases.extend([excel_cases]) + yaml_cases = gen_case_data_from_yaml(yaml_files) + cases.extend([yaml_cases]) + return cases diff --git a/case_utils/get_results_handle.py b/case_utils/get_results_handle.py index d51ea3d..95584ce 100644 --- a/case_utils/get_results_handle.py +++ b/case_utils/get_results_handle.py @@ -24,21 +24,16 @@ def get_test_results_from_pytest_html_report(html_report_path): # -------------- 获取测试环境信息 -------------- environment_info = bs.get_element_by_id("environment").get_text() new_environment_info = [element for element in environment_info.split("\n") if element != ''] + info_mapping = { + "Platform": "platform", + "Python": "python_version", + "开始时间": "start_time", + "项目名称": "project_name", + "项目环境": "project_env" + } for key, value in enumerate(new_environment_info): - if value == "Platform": - test_results['platform'] = new_environment_info[key + 1] - - if value == "Python": - test_results['python_version'] = f"Python {new_environment_info[key + 1]}" - - if value == "开始时间": - test_results['start_time'] = new_environment_info[key + 1] - - if value == "项目名称": - test_results['project_name'] = new_environment_info[key + 1] - - if value == "项目环境": - test_results['project_env'] = new_environment_info[key + 1] + if value in info_mapping: + test_results[info_mapping[value]] = new_environment_info[key + 1] # -------------- 获取测试人员,所属部门,测试用例总数,运行时长 -------------- p_elements = bs.get_elements_by_tag("p") @@ -53,31 +48,22 @@ def get_test_results_from_pytest_html_report(html_report_path): test_results['run_time'] = f"{new_list[4]} 秒" # -------------- 获取具体结果 -------------- - # 通过的用例个数 - passed = bs.select_element('span.passed')[0] - test_results["passed"] = int(passed.get_text().split(" ")[0]) - # 跳过的用例个数 - skipped = bs.select_element('span.skipped')[0] - test_results["skipped"] = int(skipped.get_text().split(" ")[0]) - # 失败的用例个数 - failed = bs.select_element('span.failed')[0] - test_results["failed"] = int(failed.get_text().split(" ")[0]) - # 错误的用例个数 - error = bs.select_element('span.error')[0] - test_results["broken"] = int(error.get_text().split(" ")[0]) - # 预期失败的用例个数 - xfailed = bs.select_element('span.xfailed')[0] - test_results["xfailed"] = int(xfailed.get_text().split(" ")[0]) - # 意外通过的用例个数 - xpassed = bs.select_element('span.xpassed')[0] - test_results["xpassed"] = int(xpassed.get_text().split(" ")[0]) - # 重跑的用例个数 - rerun = bs.select_element('span.rerun')[0] - test_results["rerun"] = int(rerun.get_text().split(" ")[0]) + result_mapping = { + 'passed': 'span.passed', # 通过的用例个数 + 'skipped': 'span.skipped', # 跳过的用例个数 + 'failed': 'span.failed', # 失败的用例个数 + 'broken': 'span.error', # 错误的用例个数 + 'xfailed': 'span.xfailed', # 预期失败的用例个数 + 'xpassed': 'span.xpassed', # 意外通过的用例个数 + 'rerun': 'span.rerun' # 重跑的用例个数 + } + for key, value in result_mapping.items(): + result_element = bs.select_element(value)[0] + test_results[key] = int(result_element.get_text().split(" ")[0]) + # 用例总数 test_results['total'] = test_results["passed"] + test_results["skipped"] + test_results["failed"] + \ - test_results[ - "broken"] + test_results["xfailed"] + test_results["xpassed"] + test_results["broken"] + test_results["xfailed"] + test_results["xpassed"] # 通过率 # 判断运行用例总数大于0 if test_results['total'] > 0: diff --git a/case_utils/request_data_handle.py b/case_utils/request_data_handle.py index 1c52059..b66d0df 100644 --- a/case_utils/request_data_handle.py +++ b/case_utils/request_data_handle.py @@ -63,7 +63,6 @@ class RequestPreDataHandle: f"响应断言: {self.request_data.get('assert_response', None)}\n" \ f"数据库断言: {self.request_data.get('assert_sql', None)}\n" \ "=====================================================") - return self.request_data def url_handle(self): @@ -163,9 +162,9 @@ class RequestHandle: allure_step(step_title="请求参数", content=self.case_data['payload']) allure_step(step_title="请求文件", content=self.case_data['files']) allure_step(step_title="后置提取参数(新)", content=self.case_data['extract']) - allure_step(step_title="请求响应数据", content=self.case_data['assert_response']) allure_step(step_title="响应断言参数", content=self.case_data['assert_response']) - allure_step(step_title="数据库断言", content=self.case_data['assert_response']) + allure_step(step_title="数据库断言", content=self.case_data['assert_sql']) + allure_step(step_title="请求响应数据", content=response.text) return response def after_extract(self, response: Response, extract): diff --git a/case_utils/send_result_handle.py b/case_utils/send_result_handle.py index a785c04..5a1c479 100644 --- a/case_utils/send_result_handle.py +++ b/case_utils/send_result_handle.py @@ -11,11 +11,11 @@ from config.global_vars import NotificationType from config.settings import SEND_RESULT_TYPE, email, ding_talk, wechat, email_subject, email_content, ding_talk_title, \ ding_talk_content, wechat_content from common_utils.dingding_handle import DingTalkBot -from common_utils.webchat_handle import WechatBot +from common_utils.wechat_handle import WechatBot from common_utils.data_handle import data_replace -def send_email(user, pwd, host, subject, contents, to, attachments): +def send_email(user, pwd, host, subject, content, to, attachments): """ 发送邮件 """ @@ -23,7 +23,7 @@ def send_email(user, pwd, host, subject, contents, to, attachments): yag = YagEmailServe(user=user, password=pwd, host=host) info = { "subject": subject, - "contents": contents, + "contents": content, "to": to, "attachments": attachments @@ -33,13 +33,13 @@ def send_email(user, pwd, host, subject, contents, to, attachments): logger.error(f"发送邮件通知异常, 错误信息:{e}") -def send_dingding(webhook_url, secret, title, text): +def send_dingding(webhook_url, secret, title, content): """ 发送钉钉消息 """ try: dingding = DingTalkBot(webhook_url=webhook_url, secret=secret) - res = dingding.send_markdown(title=title, text=text, is_at_all=True) + res = dingding.send_markdown(title=title, text=content, is_at_all=True) if res: logger.info(f"发送钉钉通知成功~") else: @@ -72,35 +72,66 @@ def send_result(results, attachment_path=None): """ 根据用户配置,采取指定方式,发送测试结果 """ - # -----------------------邮件通知内容----------------------- - # 默认不发送任何通知 if SEND_RESULT_TYPE == NotificationType.DEFAULT.value: pass - # 发送邮件通知 - elif SEND_RESULT_TYPE == NotificationType.EMAIL.value: - content = data_replace(content=email_content, source=results) - send_email(user=email.get("user"), pwd=email.get("password"), host=email.get("host"), subject=email_subject, - contents=content, to=email.get("to"), attachments=attachment_path) - # 发送钉钉通知 - elif SEND_RESULT_TYPE == NotificationType.DING_TALK.value: - content = data_replace(content=ding_talk_content, source=results) - send_dingding(webhook_url=ding_talk["webhook_url"], secret=ding_talk["secret"], title=ding_talk_title, - text=content) - # 发送企业微信通知 - elif SEND_RESULT_TYPE == NotificationType.WECHAT.value: - content = data_replace(content=wechat_content, source=results) - send_wechat(webhook_url=wechat["webhook_url"], content=content, attachment=attachment_path) - # 全部渠道都发送通知 + + # 建立发送消息的内容、函数以及参数的映射关系 + notification_mappings = { + NotificationType.EMAIL.value: { + 'sender': send_email, + 'sender_args': { + 'user': email.get("user"), + 'pwd': email.get("password"), + 'host': email.get("host"), + 'subject': email_subject, + 'content': email_content, + 'to': email.get("to"), + 'attachments': attachment_path, + } + }, + NotificationType.DING_TALK.value: { + 'sender': send_dingding, + 'sender_args': { + 'webhook_url': ding_talk["webhook_url"], + 'secret': ding_talk["secret"], + 'title': ding_talk_title, + 'content': ding_talk_content, + } + }, + NotificationType.WECHAT.value: { + 'sender': send_wechat, + 'sender_args': { + 'webhook_url': wechat["webhook_url"], + 'content': wechat_content, + 'attachment': attachment_path, + } + } + } + # 单一渠道发送消息 + if SEND_RESULT_TYPE in notification_mappings: + notification = notification_mappings[SEND_RESULT_TYPE] + # 获取消息内容并替换 + notification['sender_args']['content'] = data_replace(notification['sender_args']['content'], + source=results) + # 获取消息发送函数 + sender = notification['sender'] + # 获取对应消息发送函数的参数 + sender_args = notification['sender_args'] + # 调用消息发送函数 + sender(**sender_args) + # 全渠道发送消息 else: - # 发送邮件 - content = data_replace(content=email_content, source=results) - send_email(user=email.get("user"), pwd=email.get("password"), host=email.get("host"), subject=email_subject, - contents=content, to=email.get("to"), attachments=attachment_path) - # 发送钉钉通知 - content = data_replace(content=ding_talk_content, source=results) - send_dingding(webhook_url=ding_talk["webhook_url"], secret=ding_talk["secret"], title=ding_talk_title, - text=content) - # 发送企业微信 - content = data_replace(content=wechat_content, source=results) - send_wechat(webhook_url=wechat["webhook_url"], content=content, attachment=attachment_path) + # 遍历所有消息发送方式 + for notification in notification_mappings.values(): + # 获取消息内容并替换 + notification['sender_args']['content'] = data_replace(notification['sender_args']['content'], + source=results) + # 获取消息发送函数 + sender = notification['sender'] + # 获取对应消息发送函数的参数 + sender_args = notification['sender_args'] + # 调用消息发送函数 + sender(**sender_args) + + diff --git a/common_utils/dingding_handle.py b/common_utils/dingding_handle.py index 3ea6c87..c52e54c 100644 --- a/common_utils/dingding_handle.py +++ b/common_utils/dingding_handle.py @@ -32,6 +32,7 @@ class DingTalkBot: self.webhook_url = webhook_url + f'×tamp={timestamp}&sign={sign}' # 最终url,url+时间戳+签名 else: self.webhook_url = webhook_url + self.headers = { "Content-Type": "application/json", "Charset": "UTF-8" @@ -52,6 +53,24 @@ class DingTalkBot: sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return sign + def send_message(self, payload): + """ + 发送钉钉消息 + :payload: 请求json数据 + """ + response = request( + url=self.webhook_url, + json=payload, + headers=self.headers, + method="POST" + ) + if response.json().get("errcode") == 0: + logger.debug(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}") + return True + else: + logger.error(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息失败:{response.text}") + return False + def send_text(self, content, mobiles=None, is_at_all=False): """ 发送文本消息 @@ -59,40 +78,27 @@ class DingTalkBot: :param mobiles: 被艾特的用户的手机号码,格式是列表,注意需要在content里面添加@人的手机号码 :param is_at_all: 是否艾特所有人,布尔类型,true为艾特所有人,false为不艾特 """ + at_mobiles = "" if mobiles: if isinstance(mobiles, list): - payload = { - "msgtype": "text", - "text": { - "content": content - }, - "at": { - "atMobiles": mobiles, - "isAtAll": False - } - } + at_mobiles = mobiles + is_at_all = False for mobile in mobiles: - payload["text"]["content"] += f"@{mobile}" + content += f"@{mobile}" else: raise TypeError("mobiles类型错误 不是list类型.") - else: - payload = { - "msgtype": "text", - "text": { - "content": content - }, - "at": { - "atMobiles": "", - "isAtAll": is_at_all - } + + payload = { + "msgtype": "text", + "text": { + "content": content + }, + "at": { + "atMobiles": at_mobiles, + "isAtAll": is_at_all } - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_text发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_text发送钉钉消息失败:{response.text}") - return False + } + return self.send_message(payload, "send_text") def send_link(self, title, text, message_url, pic_url=None): """ @@ -111,13 +117,7 @@ class DingTalkBot: "messageUrl": message_url } } - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_link发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_link发送钉钉消息失败:{response.text}") - return False + return self.send_message(payload) def send_markdown(self, title, text, mobiles=None, is_at_all=False): """ @@ -128,42 +128,28 @@ class DingTalkBot: :param mobiles: 被艾特的用户的手机号码,格式是列表,注意需要在text里面添加@人的手机号码 :param is_at_all: 是否艾特所有人,布尔类型,true为艾特所有人,false为不艾特 """ + at_mobiles = "" if mobiles: if isinstance(mobiles, list): - payload = { - "msgtype": "markdown", - "markdown": { - "title": title, - "text": text - }, - "at": { - "atMobiles": mobiles, - "isAtAll": False - } - } + at_mobiles = mobiles + is_at_all = False for mobile in mobiles: - payload["markdown"]["text"] += f" @{mobile}" + text += f"@{mobile}" else: raise TypeError("mobiles类型错误 不是list类型.") - else: - payload = { - "msgtype": "markdown", - "markdown": { - "title": title, - "text": text - }, - "at": { - "atMobiles": "", - "isAtAll": is_at_all - } + payload = { + "msgtype": "markdown", + "markdown": { + "title": title, + "text": text + }, + "at": { + "atMobiles": at_mobiles, + "isAtAll": is_at_all } - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_markdown发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_markdown发送钉钉消息失败:{response.text}") - return False + } + + return self.send_message(payload) def send_action_card_single(self, title, text, single_title, single_url, btn_orientation=0): """ @@ -185,13 +171,7 @@ class DingTalkBot: } } - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_action_card_single发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_action_card_single发送钉钉消息失败:{response.text}") - return False + return self.send_message(payload) def send_action_card_split(self, title, text, btns, btn_orientation=0): """ @@ -216,13 +196,8 @@ class DingTalkBot: "title": btn.get("title"), "actionURL": btn.get("action_url") }) - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_action_card_split发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_action_card_split发送钉钉消息失败:{response.text}") - return False + + return self.send_message(payload) def send_feed_card(self, links_msg): """ @@ -243,10 +218,5 @@ class DingTalkBot: "picURL": link.get("picURL") } ) - response = request(url=self.webhook_url, json=payload, headers=self.headers, method="POST") - if response.json().get("errcode") == 0: - logger.debug(f"send_feed_card发送钉钉消息成功:{response.json()}") - return True - else: - logger.error(f"send_feed_card发送钉钉消息失败:{response.text}") - return False + + return self.send_message(payload) diff --git a/common_utils/excel_handle.py b/common_utils/excel_handle.py index bd0493f..45c7a71 100644 --- a/common_utils/excel_handle.py +++ b/common_utils/excel_handle.py @@ -29,6 +29,25 @@ class ExcelHandle: wb.save(self.filename) return self.filename + # add by xiahb + def read_sheet(self, sheet, workbook): + """ + 读取一个sheet的内容 + :param sheet: 表单名称 + :param workbook: 工作簿对象 + :return: sheet数据列表 + """ + sheet_data = { + "sheet_name": sheet, + "data": [] + } + sheet = workbook[sheet] + all_values = list(sheet.values) + header = all_values[0] + for i in all_values[1:]: + sheet_data["data"].append(dict(zip(header, i))) + return sheet_data + def read(self, sheet=None) -> list: """ 读取excel数据并返回 @@ -46,29 +65,11 @@ class ExcelHandle: # 如果sheet不为空,则取sheet等于指定sheet if sheet: - sheet_data = { - "sheet_name": sheet, - "data": [] - } - sheet = workbook[sheet] - all_values = list(sheet.values) - header = all_values[0] - for i in all_values[1:]: - sheet_data["data"].append(dict(zip(header, i))) - results.append(sheet_data) - # 如果sheet为空,则sheet为第一个表单 + results.append(self.read_sheet(sheet, workbook)) + # 如果sheet为空,则读取所有表单数据 else: for sheet in sheets: - sheet_data = { - "sheet_name": sheet, - "data": [] - } - sheet = workbook[sheet] - all_values = list(sheet.values) - header = all_values[0] - for i in all_values[1:]: - sheet_data["data"].append(dict(zip(header, i))) - results.append(sheet_data) + results.append(self.read_sheet(sheet, workbook)) # 关闭excel workbook.close() return results diff --git a/common_utils/files_handle.py b/common_utils/files_handle.py index 7f8d183..216377d 100644 --- a/common_utils/files_handle.py +++ b/common_utils/files_handle.py @@ -25,21 +25,22 @@ def get_files(target, start=None, end=None): # filenames:包含了当前dirpath路径下所有的非目录子文件的名字(不包含目录路径)。 for dirpath, dirnames, filenames in os.walk(target): for filename in filenames: + file_path = os.path.abspath(os.path.join(dirpath, filename)) # 如果"start"和"end"都有值 if start and end: # filename是以"start"且filename是以"end"结尾,则追加到files if filename.startswith(start) and filename.endswith(end): - files.append(os.path.abspath(os.path.join(dirpath, filename))) + files.append(file_path) # 或者如果"start"有值,filename是以"start"开头,则追加到files elif start and (not end): if filename.startswith(start): - files.append(os.path.abspath(os.path.join(dirpath, filename))) + files.append(file_path) # 或者如果"end"有值,且filename是以"end"结尾,则追加到files elif end and (not start): if filename.endswith(end): - files.append(os.path.abspath(os.path.join(dirpath, filename))) + files.append(file_path) else: - files.append(os.path.abspath(os.path.join(dirpath, filename))) + files.append(file_path) # 判断files列表是否为空,不为空则返回files,为空则返回all_files return files diff --git a/common_utils/webchat_handle.py b/common_utils/wechat_handle.py similarity index 74% rename from common_utils/webchat_handle.py rename to common_utils/wechat_handle.py index 9355c82..d601a6d 100644 --- a/common_utils/webchat_handle.py +++ b/common_utils/wechat_handle.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # @Time : 2023/5/11 15:01 # @Author : chenyinhua -# @File : webchat_handle.py +# @File : wechat_handle.py # @Software: PyCharm # @Desc: 企业微信机器人 import os @@ -29,6 +29,24 @@ class WechatBot: "Charset": "UTF-8" } + def send_message(self, payload): + """ + 发送微信消息 + :payload: 请求json数据 + """ + response = request( + url=self.webhook_url, + json=payload, + headers=self.headers, + method="POST" + ) + if response.json().get("errcode") == 0: + logger.debug(f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}") + return True + else: + logger.error(f"通过企业微信发送{payload.get('msgtype', '')}消息失败:{response.text}") + return False + def send_text(self, content, mentioned_list=[], mentioned_mobile_list=[]): """ 发送文本消息 @@ -44,13 +62,7 @@ class WechatBot: "mentioned_mobile_list": mentioned_mobile_list } } - response = request(url=self.webhook_url, method="POST", json=payload, headers=self.headers) - if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送文本消息成功:{response.json()}") - return True - else: - logger.error(f"通过企业微信发送文本消息失败:{response.text}") - return False + return self.send_message(payload) def send_markdown(self, content): """ @@ -70,13 +82,7 @@ class WechatBot: "content": content } } - response = request(url=self.webhook_url, method="POST", json=payload, headers=self.headers) - if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送md消息成功:{response.json()}") - return True - else: - logger.error(f"通过企业微信发送md消息失败:{response.text}") - return False + return self.send_message(payload) def send_picture(self, image_path): """ @@ -92,13 +98,7 @@ class WechatBot: "md5": hashlib.md5(image_data).hexdigest() # # 计算图片的MD5值 } } - response = request(url=self.webhook_url, method="POST", json=payload, headers=self.headers) - if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送图片消息成功:{response.json()}") - return True - else: - logger.error(f"通过企业微信发送图片失败:{response.text}") - return False + return self.send_message(payload) def send_text_picture(self, articles: list): """ @@ -125,13 +125,7 @@ class WechatBot: "picurl": article.get("picurl", "") } ) - response = request(url=self.webhook_url, method="POST", json=payload, headers=self.headers) - if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送图文消息成功:{response.json()}") - return True - else: - logger.error(f"通过企业微信发送图文失败:{response.text}") - return False + return self.send_message(payload) def upload_file(self, file_path): """ @@ -168,10 +162,4 @@ class WechatBot: "media_id": media_id, } } - response = request(url=self.webhook_url, method="POST", json=payload, headers=self.headers) - if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送文件消息成功:{response.json()}") - return True - else: - logger.error(f"通过企业微信发送文件消息失败:{response.text}") - return False + return self.send_message(payload) diff --git a/config/settings.py b/config/settings.py index 09afb85..eea63cc 100644 --- a/config/settings.py +++ b/config/settings.py @@ -8,7 +8,7 @@ # ------------------------------------ 配置信息 ----------------------------------------------------# # 0代表执行Excel和yaml两种格式的用例, 1 代表 yaml文件,2 用例代表Excel用例 -CASE_FILE_TYPE = 0 +CASE_FILE_TYPE = 1 # 0表示默认不发送任何通知, 1代表钉钉通知,2代表企业微信通知, 3代表邮件通知, 4代表所有途径都发送通知 SEND_RESULT_TYPE = 4 @@ -61,6 +61,7 @@ email = { "host": "smtp.qq.com", "to": ["******", "******"] # 收件人邮箱 } + # ------------------------------------ 邮件通知内容 ----------------------------------------------------# email_subject = f"{ENV_INFO.get('project_name', None)} 接口自动化报告" email_content = """ @@ -89,6 +90,7 @@ ding_talk = { "webhook_url": "https://oapi.dingtalk.com/robot/send?access_token=***********", "secret": "***********" } + # ------------------------------------ 钉钉通知内容 ----------------------------------------------------# ding_talk_title = f"{ENV_INFO.get('project_name', None)} 接口自动化报告" ding_talk_content = """ @@ -141,7 +143,7 @@ wechat_content = """ # ------------------------------------ 数据库相关配置 ----------------------------------------------------# db_info = { "test": { - "db_host": "xx.xx.xx.xx", + "db_host": "xx.xx.xx.xx", "db_port": 3306, "db_user": "root", "db_pwd": "**********", diff --git a/run.py b/run.py index 2676e20..e4a58bb 100644 --- a/run.py +++ b/run.py @@ -9,9 +9,10 @@ 说明: 1、用例创建原则,测试文件名必须以“test”开头,测试函数必须以“test”开头。 2、运行方式: - > python run.py (默认在test环境运行测试用例) + > python run.py (默认在test环境运行测试用例, 报告采用allure) > python run.py -env live 在live环境运行测试用例 > python run.py -env=test 在test环境运行测试用例 + > python run.py -report=pytest-html (默认在test环境运行测试用例, 报告采用pytest-html) """ import os