(2023-05-13) 1. 封装Python脚本:使用pymysql+sshtunnel,支持通过SSH隧道方式链接mysql数据库 2. 调整用例参数,支持数据库断言。可以通过assert_sql用例参数进行数据库断言
This commit is contained in:
@@ -10,11 +10,12 @@
|
||||
from requests import Response
|
||||
from case_utils.data_handle import json_extractor, re_extract
|
||||
from loguru import logger
|
||||
from case_utils.requests_handle import response_type
|
||||
from case_utils.data_handle import eval_data_process
|
||||
from case_utils.request_data_handle import response_type
|
||||
from common_utils.mysql_handle import MysqlServer
|
||||
from config.settings import db_info
|
||||
|
||||
|
||||
def assert_result(response: Response, expected: dict) -> None:
|
||||
def assert_response(response: Response, expected: dict) -> None:
|
||||
""" 断言方法
|
||||
:param response: 实际响应对象
|
||||
:param expected: 预期响应内容,从excel中或者yaml读取、或者手动传入,格式如下:
|
||||
@@ -27,9 +28,9 @@ def assert_result(response: Response, expected: dict) -> None:
|
||||
return None
|
||||
"""
|
||||
if expected is None:
|
||||
logger.info("当前用例无断言!")
|
||||
logger.info("当前用例无响应断言!")
|
||||
return
|
||||
logger.info(f"预期结果:{expected}, {type(expected)}")
|
||||
logger.debug(f"响应断言预期结果:{expected}, {type(expected)}")
|
||||
index = 0
|
||||
for k, v in expected.items():
|
||||
# 获取需要断言的实际结果部分
|
||||
@@ -37,7 +38,6 @@ def assert_result(response: Response, expected: dict) -> None:
|
||||
if _k == "http_code":
|
||||
actual = response.status_code
|
||||
else:
|
||||
logger.debug("根据响应类型的不同,从响应数据中提取实际结果")
|
||||
if response_type(response) == "json":
|
||||
# 如果响应数据是json格式
|
||||
actual = json_extractor(response.json(), _k)
|
||||
@@ -45,27 +45,70 @@ def assert_result(response: Response, expected: dict) -> None:
|
||||
# 响应数据不是json格式
|
||||
actual = re_extract(response.text, _k)
|
||||
index += 1
|
||||
# 对预期结果进行数据处理
|
||||
_v = eval_data_process(_v)
|
||||
logger.info(f'第{index}个断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual},{type(actual)}')
|
||||
logger.info(f'第{index}个响应断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}')
|
||||
try:
|
||||
if k == "eq": # 预期结果 = 实际结果
|
||||
assert _v == actual
|
||||
logger.info("断言成功!")
|
||||
logger.info(f"预期结果: {_v} == 实际结果: {actual}, 断言成功!")
|
||||
elif k == "in": # 实际结果 包含 预期结果
|
||||
assert _v in actual
|
||||
logger.info("断言成功!")
|
||||
logger.info(f"预期结果: {_v} in 实际结果: {actual}, 断言成功!")
|
||||
elif k == "gt": # 预期结果 > 实际结果 (值应该为数值型)
|
||||
assert actual < _v
|
||||
logger.info("断言成功!")
|
||||
assert _v > actual
|
||||
logger.info(f"预期结果: {_v} > 实际结果: {actual}, 断言成功!")
|
||||
elif k == "lt": # 预期结果 < 实际结果 (值应该为数值型)
|
||||
assert actual > _v
|
||||
logger.info("断言成功!")
|
||||
assert _v < actual
|
||||
logger.info(f"预期结果: {_v} < 实际结果: {actual}, 断言成功!")
|
||||
elif k == "not": # 预期结果 != 实际结果
|
||||
assert actual != _v
|
||||
logger.info("断言成功!")
|
||||
assert _v != actual
|
||||
logger.info(f"预期结果: {_v} != 实际结果: {actual}, 断言成功!")
|
||||
else:
|
||||
logger.error(f"判断关键字: {k} 错误!")
|
||||
logger.error(f"判断关键字: {k} 错误!, 目前仅支持如下关键字:eq, in, gt, lt, not")
|
||||
except AssertionError:
|
||||
logger.error("断言失败")
|
||||
raise AssertionError(f"第{index}个断言 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual},{type(actual)}")
|
||||
logger.error(f"第{index}个响应断言失败 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}")
|
||||
raise AssertionError(
|
||||
f"第{index}个响应断言失败 -|- 预期结果: {_v}, {type(_v)} {k} 实际结果: {actual}, {type(actual)}")
|
||||
|
||||
|
||||
def assert_sql(env, expected: dict):
|
||||
"""
|
||||
数据库断言
|
||||
:param env: 当前所处环境
|
||||
:param expected: 预期结果,从excel中或者yaml读取、或者手动传入,格式如下:
|
||||
{
|
||||
'eq':
|
||||
{'sql': 'select count(*) from users where user_id=1;', 'len': '1'},
|
||||
'eq':
|
||||
{'sql': 'select * from users where user_id=1;', '$.username': '${username}'},
|
||||
}
|
||||
"""
|
||||
if expected is None:
|
||||
logger.info("当前用例无数据库断言!")
|
||||
return
|
||||
try:
|
||||
# 拿不到数据库配置,则不进行数据库断言
|
||||
db = db_info["test" if env.lower() == "test" else "live"]
|
||||
db_host = db["db_host"]
|
||||
except KeyError:
|
||||
logger.error("当前环境无数据库配置,跳过数据库断言!")
|
||||
return
|
||||
for k, v in expected.items():
|
||||
sql_result = None
|
||||
for _k, _v in v.items():
|
||||
if _k == "sql":
|
||||
# 查询数据库,获取查询结果
|
||||
sql_result = MysqlServer(**db).query_one(_v)
|
||||
logger.info(f'数据库响应断言 -|- SQL:{_v} || 查询结果:{sql_result}')
|
||||
try:
|
||||
if k == "eq": # 预期结果 = 实际结果
|
||||
if _k == "len":
|
||||
assert _v == len(sql_result)
|
||||
logger.info(f"预期结果: {_v} == 实际结果: {len(sql_result)}, 断言成功!")
|
||||
# 如果时$.开头,则从数据库查询结果中提取相应的值作为实际结果
|
||||
elif _k.startswith("$."):
|
||||
actual = json_extractor(sql_result, _k)
|
||||
assert _v == actual
|
||||
logger.info(f"预期结果: {_v} == 实际结果: {actual}, 断言成功!")
|
||||
except AssertionError:
|
||||
logger.error(f"数据库断言失败 -|- 预期结果:{_v} == 实际结果: {len(sql_result)}")
|
||||
raise AssertionError(f"数据库断言失败 -|-预期结果: {_v} == 实际结果: {len(sql_result)}")
|
||||
|
||||
152
common_utils/mysql_handle.py
Normal file
152
common_utils/mysql_handle.py
Normal file
@@ -0,0 +1,152 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Time : 2023/5/12 11:04
|
||||
# @Author : chenyinhua
|
||||
# @File : mysql_handle.py
|
||||
# @Software: PyCharm
|
||||
# @Desc: 使用pymysql模块连接mysql数据库的公共方法
|
||||
import pymysql
|
||||
from typing import Union
|
||||
import json
|
||||
from datetime import datetime
|
||||
from sshtunnel import SSHTunnelForwarder
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class MysqlServer:
|
||||
"""
|
||||
初始化数据库连接(支持通过SSH隧道的方式连接),并指定查询的结果集以字典形式返回
|
||||
"""
|
||||
|
||||
def __init__(self, db_host, db_port, db_user, db_pwd, db_database, ssh=False,
|
||||
**kwargs):
|
||||
"""
|
||||
初始化方法中, 连接mysql数据库, 根据ssh参数决定是否走SSH隧道方式连接mysql数据库
|
||||
"""
|
||||
self.server = None
|
||||
if ssh:
|
||||
self.server = SSHTunnelForwarder(
|
||||
ssh_address_or_host=(kwargs.get("ssh_host"), kwargs.get("ssh_port")), # ssh 目标服务器 ip 和 port
|
||||
ssh_username=kwargs.get("ssh_user"), # ssh 目标服务器用户名
|
||||
ssh_password=kwargs.get("ssh_pwd"), # ssh 目标服务器用户密码
|
||||
remote_bind_address=(db_host, db_port), # mysql 服务ip 和 part
|
||||
local_bind_address=('127.0.0.1', 5143), # ssh 目标服务器的用于连接 mysql 或 redis 的端口,该 ip 必须为 127.0.0.1
|
||||
)
|
||||
self.server.start()
|
||||
db_host = self.server.local_bind_host # server.local_bind_host 是 参数 local_bind_address 的 ip
|
||||
db_port = self.server.local_bind_port # server.local_bind_port 是 参数 local_bind_address 的 port
|
||||
# 建立连接
|
||||
self.conn = pymysql.connect(host=db_host,
|
||||
port=db_port,
|
||||
user=db_user,
|
||||
password=db_pwd,
|
||||
database=db_database,
|
||||
charset="utf8",
|
||||
cursorclass=pymysql.cursors.DictCursor # 加上pymysql.cursors.DictCursor这个返回的就是字典
|
||||
)
|
||||
# 创建一个游标对象
|
||||
self.cursor = self.conn.cursor()
|
||||
|
||||
def query_all(self, sql):
|
||||
"""
|
||||
查询所有符合sql条件的数据
|
||||
:param sql: 执行的sql
|
||||
:return: 查询结果
|
||||
"""
|
||||
try:
|
||||
self.conn.commit()
|
||||
self.cursor.execute(sql)
|
||||
data = self.cursor.fetchall()
|
||||
# 关闭数据库链接和隧道
|
||||
self.close()
|
||||
return self.verify(data)
|
||||
except Exception as e:
|
||||
logger.error(f"查询所有符合sql条件的数据报错: {e}")
|
||||
raise e
|
||||
|
||||
def query_one(self, sql):
|
||||
"""
|
||||
查询符合sql条件的数据的第一条数据
|
||||
:param sql: 执行的sql
|
||||
:return: 返回查询结果的第一条数据
|
||||
"""
|
||||
try:
|
||||
self.conn.commit()
|
||||
self.cursor.execute(sql)
|
||||
data = self.cursor.fetchone()
|
||||
# 关闭数据库链接和隧道
|
||||
self.close()
|
||||
return self.verify(data)
|
||||
except Exception as e:
|
||||
logger.error(f"{sql} --> 报错: {e}")
|
||||
raise e
|
||||
|
||||
def insert(self, sql):
|
||||
"""
|
||||
插入数据
|
||||
:param sql: 执行的sql
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute(sql)
|
||||
# 提交 只要数据库更新就要commit
|
||||
self.conn.commit()
|
||||
# 关闭数据库链接和隧道
|
||||
self.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{sql} --> 报错: {e}")
|
||||
raise e
|
||||
|
||||
def update(self, sql):
|
||||
"""
|
||||
更新数据
|
||||
:param sql: 执行的sql
|
||||
"""
|
||||
try:
|
||||
self.cursor.execute(sql)
|
||||
# 提交 只要数据库更新就要commit
|
||||
self.conn.commit()
|
||||
# 关闭数据库链接和隧道
|
||||
self.close()
|
||||
except Exception as e:
|
||||
logger.error(f"{sql} --> 报错: {e}")
|
||||
raise e
|
||||
|
||||
def query(self, sql, one=True):
|
||||
"""
|
||||
根据传值决定查询一条数据还是所有
|
||||
:param sql: 查询的SQL语句
|
||||
:param one: 默认True. True查一条数据,否则查所有
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
if one:
|
||||
return self.query_one(sql)
|
||||
else:
|
||||
return self.query_all(sql)
|
||||
except Exception as e:
|
||||
logger.error(f"{sql} --> 报错: {e}")
|
||||
raise e
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
断开游标,关闭数据库
|
||||
如果开启了SSH隧道,也关闭
|
||||
:return:
|
||||
"""
|
||||
# 关闭游标
|
||||
self.cursor.close()
|
||||
# 关闭数据库链接
|
||||
self.conn.close()
|
||||
# 如果开启了SSH隧道,则关闭
|
||||
if self.server:
|
||||
self.server.close()
|
||||
|
||||
def verify(self, result: dict) -> Union[dict, None]:
|
||||
"""验证结果能否被json.dumps序列化"""
|
||||
# 尝试变成字符串,解决datetime 无法被json 序列化问题
|
||||
try:
|
||||
json.dumps(result)
|
||||
except TypeError: # TypeError: Object of type datetime is not JSON serializable
|
||||
for k, v in result.items():
|
||||
if isinstance(v, datetime):
|
||||
result[k] = str(v)
|
||||
return result
|
||||
@@ -1,8 +1,8 @@
|
||||
import pytest
|
||||
from case_utils.assert_handle import assert_result
|
||||
from case_utils.requests_handle import BaseRequest
|
||||
from case_utils.assert_handle import assert_response, assert_sql
|
||||
from common_utils.base_request import BaseRequest
|
||||
from loguru import logger
|
||||
from case_utils.request_data_handle import RequestPreDataHandle
|
||||
from case_utils.request_data_handle import RequestPreDataHandle, after_extract, case_data_replace,eval_data_process
|
||||
from pytest_html import extras # 往pytest-html报告中填写额外的内容
|
||||
from common_utils.func_handle import add_docstring
|
||||
|
||||
@@ -16,28 +16,38 @@ def case(request):
|
||||
|
||||
@pytest.mark.${func_title}
|
||||
@pytest.mark.auto
|
||||
def ${func_title}_auto(case, extra):
|
||||
"""
|
||||
|
||||
"""
|
||||
def ${func_title}_auto(case, extra, request):
|
||||
logger.info("-----------------------------START-开始执行用例-----------------------------")
|
||||
logger.debug(f"当前执行的用例数据:{case}")
|
||||
# 给当前测试方法添加文档注释
|
||||
add_docstring(case.get("title", ""))(${func_title}_auto)
|
||||
if case.get("run", None):
|
||||
# 获取处理完成的用例数据
|
||||
case_data = RequestPreDataHandle(case).request_data_handle()
|
||||
# 将用例数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(case_data), name="用例数据"))
|
||||
# 发送请求
|
||||
response = BaseRequest.send_request(case_data)
|
||||
# 将响应数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(response.text), name="响应数据"))
|
||||
# 进行断言
|
||||
assert_result(response, case_data["expected"])
|
||||
else:
|
||||
reason = f"标记了该用例为false,不执行\\n"
|
||||
logger.warning(f"{reason}")
|
||||
pytest.skip(reason)
|
||||
logger.info("------------------------------------------用例执行结束------------------------------------------")
|
||||
try:
|
||||
# 获取命令行参数,判断当前处于哪个环境
|
||||
env = request.config.getoption("--env")
|
||||
# 给当前测试方法添加文档注释
|
||||
add_docstring(case.get("title", ""))(${func_title}_auto)
|
||||
if case.get("run", None):
|
||||
# 处理请求前的用例数据
|
||||
case_data = RequestPreDataHandle(case).request_data_handle()
|
||||
# 将用例数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(case_data), name="用例数据"))
|
||||
# 发送请求
|
||||
response = BaseRequest.send_request(case_data)
|
||||
# 将响应数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(response.text), name="响应数据"))
|
||||
# 请求后,提取后置参数作为全局变量
|
||||
after_extract(response, case_data["extract"])
|
||||
# 从全局变量中获取最新值,替换数据库断言中的参数
|
||||
case_data["assert_sql"] = eval_data_process(case_data_replace(case_data["assert_sql"]))
|
||||
# 进行响应断言
|
||||
assert_response(response, case_data["assert_response"])
|
||||
# 进行数据库断言
|
||||
assert_sql(env, case_data["assert_sql"])
|
||||
else:
|
||||
reason = f"标记了该用例为false,不执行\\n"
|
||||
logger.warning(f"{reason}")
|
||||
pytest.skip(reason)
|
||||
except Exception as e:
|
||||
logger.error(f"用例执行过程中报错:{e}")
|
||||
raise e
|
||||
finally:
|
||||
logger.info("-----------------------------END-用例执行结束-----------------------------")
|
||||
|
||||
|
||||
@@ -68,4 +68,26 @@ ding_talk = {
|
||||
# ------------------------------------ 企业微信相关配置 ----------------------------------------------------#
|
||||
wechat = {
|
||||
"webhook_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=********",
|
||||
}
|
||||
|
||||
# ------------------------------------ 数据库相关配置 ----------------------------------------------------#
|
||||
db_info = {
|
||||
"test": {
|
||||
"db_host": "xx.xx.xx.xx",
|
||||
"db_port": 3306,
|
||||
"db_user": "root",
|
||||
"db_pwd": "**********",
|
||||
"db_database": "test**********",
|
||||
"ssh": True,
|
||||
"ssh_host": "xx.xx.xx.xx",
|
||||
"ssh_port": 3306,
|
||||
"ssh_user": "root",
|
||||
"ssh_pwd": "**********"
|
||||
|
||||
},
|
||||
"live": {
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,6 +24,10 @@
|
||||
$.user_id: 84953
|
||||
not:
|
||||
$.user_id: 85390
|
||||
assert_sql:
|
||||
eq:
|
||||
sql: select count(*) from tokens where user_id=${user_id};
|
||||
len: 1
|
||||
- feature: 登录
|
||||
title: 用户名正确,密码错误,登录失败
|
||||
url: /api/accounts/login.json
|
||||
@@ -38,4 +42,5 @@
|
||||
assert_response:
|
||||
eq:
|
||||
http_code: 200
|
||||
$.status: -2
|
||||
$.status: -2
|
||||
assert_sql:
|
||||
Binary file not shown.
@@ -18,4 +18,10 @@
|
||||
assert_response:
|
||||
eq:
|
||||
http_code: 200
|
||||
$.login: ${login}
|
||||
$.login: ${login}
|
||||
assert_sql:
|
||||
eq:
|
||||
sql: select id, `name`, identifier from projects where user_id=${user_id} ORDER BY created_on DESC;
|
||||
$.id: ${project_id}
|
||||
$.name: ${project_name}
|
||||
$.identifier: ${project_identifier}
|
||||
@@ -1,47 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Version: Python 3.9
|
||||
# @Time : 2023/1/9 16:41
|
||||
# @Author : chenyinhua
|
||||
# @File : test_login.py
|
||||
# @Software: PyCharm
|
||||
# @Desc: python脚本编写的测试用例文件
|
||||
|
||||
|
||||
import pytest
|
||||
import os
|
||||
from common_utils.yaml_handle import HandleYaml
|
||||
from config.project_path import DATA_DIR
|
||||
from case_utils.requests_handle import BaseRequest
|
||||
from case_utils.assert_handle import assert_result
|
||||
from loguru import logger
|
||||
from case_utils.request_data_handle import RequestPreDataHandle
|
||||
from config.global_vars import GLOBAL_VARS
|
||||
|
||||
# 读取用例数据
|
||||
cases = HandleYaml(filename=os.path.join(DATA_DIR, "test_login.yaml")).read_yaml
|
||||
|
||||
|
||||
@pytest.mark.test_login
|
||||
class TestLogin:
|
||||
"""
|
||||
登录模块
|
||||
"""
|
||||
|
||||
@pytest.mark.parametrize("case", cases)
|
||||
def test_login(self, case):
|
||||
logger.info("-----------------------------START-开始执行用例-----------------------------")
|
||||
logger.debug(f"当前执行的用例数据:{case}")
|
||||
# 将用例标题title作为全局变量,方便后续写入到测试报告report.description中
|
||||
GLOBAL_VARS["title"] = case.get("title", "")
|
||||
if case.get("run", None):
|
||||
# 获取处理完成的用例数据
|
||||
case_data = RequestPreDataHandle(case).request_data_handle()
|
||||
# 发送请求
|
||||
response = BaseRequest.send_request(case_data)
|
||||
# 进行断言
|
||||
assert_result(response, case_data["expected"])
|
||||
else:
|
||||
reason = f"标记了该用例为false,不执行\\n"
|
||||
logger.warning(f"{reason}")
|
||||
pytest.skip(reason)
|
||||
logger.info("-----------------------------END-用例执行结束-----------------------------")
|
||||
60
test_case/test_manual_case/test_login_demo.py
Normal file
60
test_case/test_manual_case/test_login_demo.py
Normal file
@@ -0,0 +1,60 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Version: Python 3.9
|
||||
# @Time : 2023/1/9 16:41
|
||||
# @Author : chenyinhua
|
||||
# @File : test_login_demo.py
|
||||
# @Software: PyCharm
|
||||
# @Desc: python脚本编写的测试用例文件
|
||||
|
||||
|
||||
import pytest
|
||||
import os
|
||||
from common_utils.yaml_handle import HandleYaml
|
||||
from config.settings import DATA_DIR
|
||||
from common_utils.base_request import BaseRequest
|
||||
from case_utils.assert_handle import assert_response, assert_sql
|
||||
from loguru import logger
|
||||
from case_utils.request_data_handle import RequestPreDataHandle, after_extract, case_data_replace, eval_data_process
|
||||
from pytest_html import extras # 往pytest-html报告中填写额外的内容
|
||||
from common_utils.func_handle import add_docstring
|
||||
|
||||
# 读取用例数据
|
||||
cases = HandleYaml(filename=os.path.join(DATA_DIR, "test_login_demo.yaml")).read_yaml
|
||||
|
||||
|
||||
@pytest.mark.test_login_demo
|
||||
@pytest.mark.parametrize("case", cases)
|
||||
def test_login_demo(case, extra, request):
|
||||
logger.info("-----------------------------START-开始执行用例-----------------------------")
|
||||
logger.debug(f"当前执行的用例数据:{case}")
|
||||
try:
|
||||
# 获取命令行参数,判断当前处于哪个环境
|
||||
env = request.config.getoption("--env")
|
||||
# 给当前测试方法添加文档注释
|
||||
add_docstring(case.get("title", ""))(test_login)
|
||||
if case.get("run", None):
|
||||
# 处理请求前的用例数据
|
||||
case_data = RequestPreDataHandle(case).request_data_handle()
|
||||
# 将用例数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(case_data), name="用例数据"))
|
||||
# 发送请求
|
||||
response = BaseRequest.send_request(case_data)
|
||||
# 将响应数据显示在pytest-html报告中
|
||||
extra.append(extras.text(str(response.text), name="响应数据"))
|
||||
# 请求后,提取后置参数作为全局变量
|
||||
after_extract(response, case_data["extract"])
|
||||
# 从全局变量中获取最新值,替换数据库断言中的参数
|
||||
case_data["assert_sql"] = eval_data_process(case_data_replace(case_data["assert_sql"]))
|
||||
# 进行响应断言
|
||||
assert_response(response, case_data["assert_response"])
|
||||
# 进行数据库断言
|
||||
assert_sql(env, case_data["assert_sql"])
|
||||
else:
|
||||
reason = f"标记了该用例为false,不执行\\n"
|
||||
logger.warning(f"{reason}")
|
||||
pytest.skip(reason)
|
||||
except Exception as e:
|
||||
logger.error(f"用例执行过程中报错:{e}")
|
||||
raise e
|
||||
finally:
|
||||
logger.info("-----------------------------END-用例执行结束-----------------------------")
|
||||
Reference in New Issue
Block a user