diff --git a/case_utils/assert_handle.py b/case_utils/assert_handle.py index 47c768d..8c6a92d 100644 --- a/case_utils/assert_handle.py +++ b/case_utils/assert_handle.py @@ -10,11 +10,12 @@ from requests import Response from case_utils.data_handle import json_extractor, re_extract from loguru import logger -from case_utils.requests_handle import response_type -from case_utils.data_handle import eval_data_process +from case_utils.request_data_handle import response_type +from common_utils.mysql_handle import MysqlServer +from config.settings import db_info -def assert_result(response: Response, expected: dict) -> None: +def assert_response(response: Response, expected: dict) -> None: """ 断言方法 :param response: 实际响应对象 :param expected: 预期响应内容,从excel中或者yaml读取、或者手动传入,格式如下: @@ -27,9 +28,9 @@ def assert_result(response: Response, expected: dict) -> None: return None """ if expected is None: - logger.info("当前用例无断言!") + logger.info("当前用例无响应断言!") return - logger.info(f"预期结果:{expected}, {type(expected)}") + logger.debug(f"响应断言预期结果:{expected}, {type(expected)}") index = 0 for k, v in expected.items(): # 获取需要断言的实际结果部分 @@ -37,7 +38,6 @@ def assert_result(response: Response, expected: dict) -> None: if _k == "http_code": actual = response.status_code else: - logger.debug("根据响应类型的不同,从响应数据中提取实际结果") if response_type(response) == "json": # 如果响应数据是json格式 actual = json_extractor(response.json(), _k) @@ -45,27 +45,70 @@ def assert_result(response: Response, expected: dict) -> None: # 响应数据不是json格式 actual = re_extract(response.text, _k) index += 1 - # 对预期结果进行数据处理 - _v = eval_data_process(_v) - logger.info(f'第{index}个断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual},{type(actual)}') + logger.info(f'第{index}个响应断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}') try: if k == "eq": # 预期结果 = 实际结果 assert _v == actual - logger.info("断言成功!") + logger.info(f"预期结果: {_v} == 实际结果: {actual}, 断言成功!") elif k == "in": # 实际结果 包含 预期结果 assert _v in actual - logger.info("断言成功!") + logger.info(f"预期结果: {_v} in 实际结果: {actual}, 断言成功!") elif k == "gt": # 预期结果 > 实际结果 (值应该为数值型) - assert actual < _v - logger.info("断言成功!") + assert _v > actual + logger.info(f"预期结果: {_v} > 实际结果: {actual}, 断言成功!") elif k == "lt": # 预期结果 < 实际结果 (值应该为数值型) - assert actual > _v - logger.info("断言成功!") + assert _v < actual + logger.info(f"预期结果: {_v} < 实际结果: {actual}, 断言成功!") elif k == "not": # 预期结果 != 实际结果 - assert actual != _v - logger.info("断言成功!") + assert _v != actual + logger.info(f"预期结果: {_v} != 实际结果: {actual}, 断言成功!") else: - logger.error(f"判断关键字: {k} 错误!") + logger.error(f"判断关键字: {k} 错误!, 目前仅支持如下关键字:eq, in, gt, lt, not") except AssertionError: - logger.error("断言失败") - raise AssertionError(f"第{index}个断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual},{type(actual)}") + logger.error(f"第{index}个响应断言失败 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}") + raise AssertionError( + f"第{index}个响应断言失败 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}") + + +def assert_sql(env, expected: dict): + """ + 数据库断言 + :param env: 当前所处环境 + :param expected: 预期结果,从excel中或者yaml读取、或者手动传入,格式如下: + { + 'eq': + {'sql': 'select count(*) from users where user_id=1;', 'len': '1'}, + 'eq': + {'sql': 'select * from users where user_id=1;', '$.username': '${username}'}, + } + """ + if expected is None: + logger.info("当前用例无数据库断言!") + return + try: + # 拿不到数据库配置,则不进行数据库断言 + db = db_info["test" if env.lower() == "test" else "live"] + db_host = db["db_host"] + except KeyError: + logger.error("当前环境无数据库配置,跳过数据库断言!") + return + for k, v in expected.items(): + sql_result = None + for _k, _v in v.items(): + if _k == "sql": + # 查询数据库,获取查询结果 + sql_result = MysqlServer(**db).query_one(_v) + logger.info(f'数据库响应断言 -|- SQL:{_v} || 查询结果:{sql_result}') + try: + if k == "eq": # 预期结果 = 实际结果 + if _k == "len": + assert _v == len(sql_result) + logger.info(f"预期结果: {_v} == 实际结果: {len(sql_result)}, 断言成功!") + # 如果时$.开头,则从数据库查询结果中提取相应的值作为实际结果 + elif _k.startswith("$."): + actual = json_extractor(sql_result, _k) + assert _v == actual + logger.info(f"预期结果: {_v} == 实际结果: {actual}, 断言成功!") + except AssertionError: + logger.error(f"数据库断言失败 -|- 预期结果:{_v} == 实际结果: {len(sql_result)}") + raise AssertionError(f"数据库断言失败 -|-预期结果: {_v} == 实际结果: {len(sql_result)}") diff --git a/common_utils/mysql_handle.py b/common_utils/mysql_handle.py new file mode 100644 index 0000000..c36f590 --- /dev/null +++ b/common_utils/mysql_handle.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# @Time : 2023/5/12 11:04 +# @Author : chenyinhua +# @File : mysql_handle.py +# @Software: PyCharm +# @Desc: 使用pymysql模块连接mysql数据库的公共方法 +import pymysql +from typing import Union +import json +from datetime import datetime +from sshtunnel import SSHTunnelForwarder +from loguru import logger + + +class MysqlServer: + """ + 初始化数据库连接(支持通过SSH隧道的方式连接),并指定查询的结果集以字典形式返回 + """ + + def __init__(self, db_host, db_port, db_user, db_pwd, db_database, ssh=False, + **kwargs): + """ + 初始化方法中, 连接mysql数据库, 根据ssh参数决定是否走SSH隧道方式连接mysql数据库 + """ + self.server = None + if ssh: + self.server = SSHTunnelForwarder( + ssh_address_or_host=(kwargs.get("ssh_host"), kwargs.get("ssh_port")), # ssh 目标服务器 ip 和 port + ssh_username=kwargs.get("ssh_user"), # ssh 目标服务器用户名 + ssh_password=kwargs.get("ssh_pwd"), # ssh 目标服务器用户密码 + remote_bind_address=(db_host, db_port), # mysql 服务ip 和 part + local_bind_address=('127.0.0.1', 5143), # ssh 目标服务器的用于连接 mysql 或 redis 的端口,该 ip 必须为 127.0.0.1 + ) + self.server.start() + db_host = self.server.local_bind_host # server.local_bind_host 是 参数 local_bind_address 的 ip + db_port = self.server.local_bind_port # server.local_bind_port 是 参数 local_bind_address 的 port + # 建立连接 + self.conn = pymysql.connect(host=db_host, + port=db_port, + user=db_user, + password=db_pwd, + database=db_database, + charset="utf8", + cursorclass=pymysql.cursors.DictCursor # 加上pymysql.cursors.DictCursor这个返回的就是字典 + ) + # 创建一个游标对象 + self.cursor = self.conn.cursor() + + def query_all(self, sql): + """ + 查询所有符合sql条件的数据 + :param sql: 执行的sql + :return: 查询结果 + """ + try: + self.conn.commit() + self.cursor.execute(sql) + data = self.cursor.fetchall() + # 关闭数据库链接和隧道 + self.close() + return self.verify(data) + except Exception as e: + logger.error(f"查询所有符合sql条件的数据报错: {e}") + raise e + + def query_one(self, sql): + """ + 查询符合sql条件的数据的第一条数据 + :param sql: 执行的sql + :return: 返回查询结果的第一条数据 + """ + try: + self.conn.commit() + self.cursor.execute(sql) + data = self.cursor.fetchone() + # 关闭数据库链接和隧道 + self.close() + return self.verify(data) + except Exception as e: + logger.error(f"{sql} --> 报错: {e}") + raise e + + def insert(self, sql): + """ + 插入数据 + :param sql: 执行的sql + """ + try: + self.cursor.execute(sql) + # 提交 只要数据库更新就要commit + self.conn.commit() + # 关闭数据库链接和隧道 + self.close() + except Exception as e: + logger.error(f"{sql} --> 报错: {e}") + raise e + + def update(self, sql): + """ + 更新数据 + :param sql: 执行的sql + """ + try: + self.cursor.execute(sql) + # 提交 只要数据库更新就要commit + self.conn.commit() + # 关闭数据库链接和隧道 + self.close() + except Exception as e: + logger.error(f"{sql} --> 报错: {e}") + raise e + + def query(self, sql, one=True): + """ + 根据传值决定查询一条数据还是所有 + :param sql: 查询的SQL语句 + :param one: 默认True. True查一条数据,否则查所有 + :return: + """ + try: + if one: + return self.query_one(sql) + else: + return self.query_all(sql) + except Exception as e: + logger.error(f"{sql} --> 报错: {e}") + raise e + + def close(self): + """ + 断开游标,关闭数据库 + 如果开启了SSH隧道,也关闭 + :return: + """ + # 关闭游标 + self.cursor.close() + # 关闭数据库链接 + self.conn.close() + # 如果开启了SSH隧道,则关闭 + if self.server: + self.server.close() + + def verify(self, result: dict) -> Union[dict, None]: + """验证结果能否被json.dumps序列化""" + # 尝试变成字符串,解决datetime 无法被json 序列化问题 + try: + json.dumps(result) + except TypeError: # TypeError: Object of type datetime is not JSON serializable + for k, v in result.items(): + if isinstance(v, datetime): + result[k] = str(v) + return result diff --git a/config/case_template.txt b/config/case_template.txt index b9e1417..9f3bf28 100644 --- a/config/case_template.txt +++ b/config/case_template.txt @@ -1,8 +1,8 @@ import pytest -from case_utils.assert_handle import assert_result -from case_utils.requests_handle import BaseRequest +from case_utils.assert_handle import assert_response, assert_sql +from common_utils.base_request import BaseRequest from loguru import logger -from case_utils.request_data_handle import RequestPreDataHandle +from case_utils.request_data_handle import RequestPreDataHandle, after_extract, case_data_replace,eval_data_process from pytest_html import extras # 往pytest-html报告中填写额外的内容 from common_utils.func_handle import add_docstring @@ -16,28 +16,38 @@ def case(request): @pytest.mark.${func_title} @pytest.mark.auto -def ${func_title}_auto(case, extra): - """ - - """ +def ${func_title}_auto(case, extra, request): logger.info("-----------------------------START-开始执行用例-----------------------------") logger.debug(f"当前执行的用例数据:{case}") - # 给当前测试方法添加文档注释 - add_docstring(case.get("title", ""))(${func_title}_auto) - if case.get("run", None): - # 获取处理完成的用例数据 - case_data = RequestPreDataHandle(case).request_data_handle() - # 将用例数据显示在pytest-html报告中 - extra.append(extras.text(str(case_data), name="用例数据")) - # 发送请求 - response = BaseRequest.send_request(case_data) - # 将响应数据显示在pytest-html报告中 - extra.append(extras.text(str(response.text), name="响应数据")) - # 进行断言 - assert_result(response, case_data["expected"]) - else: - reason = f"标记了该用例为false,不执行\\n" - logger.warning(f"{reason}") - pytest.skip(reason) - logger.info("------------------------------------------用例执行结束------------------------------------------") + try: + # 获取命令行参数,判断当前处于哪个环境 + env = request.config.getoption("--env") + # 给当前测试方法添加文档注释 + add_docstring(case.get("title", ""))(${func_title}_auto) + if case.get("run", None): + # 处理请求前的用例数据 + case_data = RequestPreDataHandle(case).request_data_handle() + # 将用例数据显示在pytest-html报告中 + extra.append(extras.text(str(case_data), name="用例数据")) + # 发送请求 + response = BaseRequest.send_request(case_data) + # 将响应数据显示在pytest-html报告中 + extra.append(extras.text(str(response.text), name="响应数据")) + # 请求后,提取后置参数作为全局变量 + after_extract(response, case_data["extract"]) + # 从全局变量中获取最新值,替换数据库断言中的参数 + case_data["assert_sql"] = eval_data_process(case_data_replace(case_data["assert_sql"])) + # 进行响应断言 + assert_response(response, case_data["assert_response"]) + # 进行数据库断言 + assert_sql(env, case_data["assert_sql"]) + else: + reason = f"标记了该用例为false,不执行\\n" + logger.warning(f"{reason}") + pytest.skip(reason) + except Exception as e: + logger.error(f"用例执行过程中报错:{e}") + raise e + finally: + logger.info("-----------------------------END-用例执行结束-----------------------------") diff --git a/config/settings.py b/config/settings.py index 6fa4ae2..e0a6e12 100644 --- a/config/settings.py +++ b/config/settings.py @@ -68,4 +68,26 @@ ding_talk = { # ------------------------------------ 企业微信相关配置 ----------------------------------------------------# wechat = { "webhook_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=********", +} + +# ------------------------------------ 数据库相关配置 ----------------------------------------------------# +db_info = { + "test": { + "db_host": "xx.xx.xx.xx", + "db_port": 3306, + "db_user": "root", + "db_pwd": "**********", + "db_database": "test**********", + "ssh": True, + "ssh_host": "xx.xx.xx.xx", + "ssh_port": 3306, + "ssh_user": "root", + "ssh_pwd": "**********" + + }, + "live": { + + + } + } \ No newline at end of file diff --git a/data/test_login.yaml b/data/test_login_demo.yaml similarity index 87% rename from data/test_login.yaml rename to data/test_login_demo.yaml index 77d0c5c..cb8b5a9 100644 --- a/data/test_login.yaml +++ b/data/test_login_demo.yaml @@ -24,6 +24,10 @@ $.user_id: 84953 not: $.user_id: 85390 + assert_sql: + eq: + sql: select count(*) from tokens where user_id=${user_id}; + len: 1 - feature: 登录 title: 用户名正确,密码错误,登录失败 url: /api/accounts/login.json @@ -38,4 +42,5 @@ assert_response: eq: http_code: 200 - $.status: -2 \ No newline at end of file + $.status: -2 + assert_sql: \ No newline at end of file diff --git a/data/test_login_excel.xlsx b/data/test_login_excel_demo.xlsx similarity index 59% rename from data/test_login_excel.xlsx rename to data/test_login_excel_demo.xlsx index f29e5a8..b6613f4 100644 Binary files a/data/test_login_excel.xlsx and b/data/test_login_excel_demo.xlsx differ diff --git a/data/test_new_project.yaml b/data/test_new_project_demo.yaml similarity index 68% rename from data/test_new_project.yaml rename to data/test_new_project_demo.yaml index e095a09..2b14b2c 100644 --- a/data/test_new_project.yaml +++ b/data/test_new_project_demo.yaml @@ -18,4 +18,10 @@ assert_response: eq: http_code: 200 - $.login: ${login} \ No newline at end of file + $.login: ${login} + assert_sql: + eq: + sql: select id, `name`, identifier from projects where user_id=${user_id} ORDER BY created_on DESC; + $.id: ${project_id} + $.name: ${project_name} + $.identifier: ${project_identifier} \ No newline at end of file diff --git a/test_case/test_manual_case/test_login.py b/test_case/test_manual_case/test_login.py deleted file mode 100644 index 4750a90..0000000 --- a/test_case/test_manual_case/test_login.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# @Version: Python 3.9 -# @Time : 2023/1/9 16:41 -# @Author : chenyinhua -# @File : test_login.py -# @Software: PyCharm -# @Desc: python脚本编写的测试用例文件 - - -import pytest -import os -from common_utils.yaml_handle import HandleYaml -from config.project_path import DATA_DIR -from case_utils.requests_handle import BaseRequest -from case_utils.assert_handle import assert_result -from loguru import logger -from case_utils.request_data_handle import RequestPreDataHandle -from config.global_vars import GLOBAL_VARS - -# 读取用例数据 -cases = HandleYaml(filename=os.path.join(DATA_DIR, "test_login.yaml")).read_yaml - - -@pytest.mark.test_login -class TestLogin: - """ - 登录模块 - """ - - @pytest.mark.parametrize("case", cases) - def test_login(self, case): - logger.info("-----------------------------START-开始执行用例-----------------------------") - logger.debug(f"当前执行的用例数据:{case}") - # 将用例标题title作为全局变量,方便后续写入到测试报告report.description中 - GLOBAL_VARS["title"] = case.get("title", "") - if case.get("run", None): - # 获取处理完成的用例数据 - case_data = RequestPreDataHandle(case).request_data_handle() - # 发送请求 - response = BaseRequest.send_request(case_data) - # 进行断言 - assert_result(response, case_data["expected"]) - else: - reason = f"标记了该用例为false,不执行\\n" - logger.warning(f"{reason}") - pytest.skip(reason) - logger.info("-----------------------------END-用例执行结束-----------------------------") diff --git a/test_case/test_manual_case/test_login_demo.py b/test_case/test_manual_case/test_login_demo.py new file mode 100644 index 0000000..6d1a30f --- /dev/null +++ b/test_case/test_manual_case/test_login_demo.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# @Version: Python 3.9 +# @Time : 2023/1/9 16:41 +# @Author : chenyinhua +# @File : test_login_demo.py +# @Software: PyCharm +# @Desc: python脚本编写的测试用例文件 + + +import pytest +import os +from common_utils.yaml_handle import HandleYaml +from config.settings import DATA_DIR +from common_utils.base_request import BaseRequest +from case_utils.assert_handle import assert_response, assert_sql +from loguru import logger +from case_utils.request_data_handle import RequestPreDataHandle, after_extract, case_data_replace, eval_data_process +from pytest_html import extras # 往pytest-html报告中填写额外的内容 +from common_utils.func_handle import add_docstring + +# 读取用例数据 +cases = HandleYaml(filename=os.path.join(DATA_DIR, "test_login_demo.yaml")).read_yaml + + +@pytest.mark.test_login_demo +@pytest.mark.parametrize("case", cases) +def test_login_demo(case, extra, request): + logger.info("-----------------------------START-开始执行用例-----------------------------") + logger.debug(f"当前执行的用例数据:{case}") + try: + # 获取命令行参数,判断当前处于哪个环境 + env = request.config.getoption("--env") + # 给当前测试方法添加文档注释 + add_docstring(case.get("title", ""))(test_login) + if case.get("run", None): + # 处理请求前的用例数据 + case_data = RequestPreDataHandle(case).request_data_handle() + # 将用例数据显示在pytest-html报告中 + extra.append(extras.text(str(case_data), name="用例数据")) + # 发送请求 + response = BaseRequest.send_request(case_data) + # 将响应数据显示在pytest-html报告中 + extra.append(extras.text(str(response.text), name="响应数据")) + # 请求后,提取后置参数作为全局变量 + after_extract(response, case_data["extract"]) + # 从全局变量中获取最新值,替换数据库断言中的参数 + case_data["assert_sql"] = eval_data_process(case_data_replace(case_data["assert_sql"])) + # 进行响应断言 + assert_response(response, case_data["assert_response"]) + # 进行数据库断言 + assert_sql(env, case_data["assert_sql"]) + else: + reason = f"标记了该用例为false,不执行\\n" + logger.warning(f"{reason}") + pytest.skip(reason) + except Exception as e: + logger.error(f"用例执行过程中报错:{e}") + raise e + finally: + logger.info("-----------------------------END-用例执行结束-----------------------------")