diff --git a/common_utils/base_request.py b/common_utils/base_request.py index f5d6223..5aca1f1 100644 --- a/common_utils/base_request.py +++ b/common_utils/base_request.py @@ -30,33 +30,50 @@ class BaseRequest: :return: 响应对象 """ try: - logger.info(f"\n======================================================\n" \ - "-------------Start:请求前--------------------\n" - f"用例标题: {req_data.get('title', None)}\n" \ - f"请求路径: {req_data.get('url', None)}\n" \ - f"请求方式: {req_data.get('method', None)}\n" \ - f"请求头: {req_data.get('headers', None)}\n" \ - f"请求Cookies: {req_data.get('cookies', None)}\n" \ - f"请求关键字: {req_data.get('pk', None)}\n" \ - f"请求内容: {req_data.get('payload', None)}\n" \ - f"请求文件: {req_data.get('files', None)}\n" \ - "=====================================================") + logger.debug("\n======================================================\n" \ + "-------------Start:请求前--------------------\n" + f"用例标题: {req_data.get('title', None)}\n" \ + f"请求路径: {req_data.get('url', None)}\n" \ + f"请求方式: {req_data.get('method', None)}\n" \ + f"请求头: {req_data.get('headers', None)}\n" \ + f"请求Cookies: {req_data.get('cookies', None)}\n" \ + f"请求关键字: {req_data.get('pk', None)}\n" \ + f"请求内容: {req_data.get('payload', None)}\n" \ + f"请求文件: {req_data.get('files', None)}\n" \ + "=====================================================") + print("\n======================================================\n" \ + "-------------Start:请求前--------------------\n" + f"用例标题: {req_data.get('title', None)}\n" \ + f"请求路径: {req_data.get('url', None)}\n" \ + f"请求方式: {req_data.get('method', None)}\n" \ + f"请求头: {req_data.get('headers', None)}\n" \ + f"请求Cookies: {req_data.get('cookies', None)}\n" \ + f"请求关键字: {req_data.get('pk', None)}\n" \ + f"请求内容: {req_data.get('payload', None)}\n" \ + f"请求文件: {req_data.get('files', None)}\n" \ + "=====================================================") res = cls.send_api_request( url=req_data.get("url"), method=req_data.get("method").lower(), pk=req_data.get("pk", None), - header=req_data.get("headers"), - payload=req_data.get("payload"), - files=req_data.get("files"), - cookies=req_data.get("cookies") + header=req_data.get("headers", None), + payload=req_data.get("payload", None), + files=req_data.get("files", None), + cookies=req_data.get("cookies", None) ) - logger.info(f"\n======================================================\n" \ - "-------------End:请求后--------------------\n" - f"响应数据: {res.text}\n" \ - f"响应码: {res.status_code}\n" \ - "=====================================================") + logger.debug("\n======================================================\n" \ + "-------------End:请求后--------------------\n" + f"响应数据: {res.text}\n" \ + f"响应码: {res.status_code}\n" \ + "=====================================================") + print("\n======================================================\n" \ + "-------------End:请求后--------------------\n" + f"响应数据: {res.text}\n" \ + f"响应码: {res.status_code}\n" \ + "=====================================================") except requests.exceptions.RequestException as e: logger.error(f"请求出错,{str(e)}") + print(f"请求出错,{str(e)}") raise ValueError(f"请求出错,{str(e)}") return res @@ -72,36 +89,39 @@ class BaseRequest: :param payload: 请求数据,对于不同请求类型,可以为dict,MultipartEncoder等 :param files: 请求上传的文件 :param header: 请求头 + :param cookies: 请求cookies :return: 返回res对象 """ headers = header or {} session = cls.get_session() - if pk.lower() == 'params': - res = session.request(method=method, url=url, params=payload, headers=headers, cookies=cookies) - elif pk.lower() == 'data': + if pk and pk.lower() == 'params': + res = session.request(method=method, url=url, params=payload, headers=headers, cookies=cookies, timeout=5) + elif pk and pk.lower() == 'data': if files: if not isinstance(files, dict): raise ValueError('data参数必须为dict') encoder = MultipartEncoder(fields=files, boundary='------------------------' + str(time.time())) headers['Content-Type'] = encoder.content_type res = session.request(method=method, url=url, data=encoder.to_string(), headers=headers, - cookies=cookies) + cookies=cookies, timeout=5) else: headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8' - res = session.request(method=method, url=url, data=payload, headers=headers, cookies=cookies) - elif pk.lower() == 'json': + res = session.request(method=method, url=url, data=payload, headers=headers, cookies=cookies, timeout=5) + elif pk and pk.lower() == 'json': if files: if not isinstance(files, dict): raise ValueError('json参数必须为dict') encoder = MultipartEncoder(fields=files, boundary='------------------------' + str(time.time())) headers['Content-Type'] = encoder.content_type res = session.request(method=method, url=url, json=encoder.to_string(), headers=headers, - cookies=cookies) + cookies=cookies, timeout=5) else: headers['Content-Type'] = 'application/json' - res = session.request(method=method, url=url, json=payload, headers=headers, cookies=cookies) + res = session.request(method=method, url=url, json=payload, headers=headers, cookies=cookies, timeout=5) else: + logger.error('pk可选关键字为params, json, data') + print('pk可选关键字为params, json, data') raise ValueError('pk可选关键字为params, json, data') return res diff --git a/common_utils/data_handle.py b/common_utils/data_handle.py index a3cd084..9fd0122 100644 --- a/common_utils/data_handle.py +++ b/common_utils/data_handle.py @@ -5,11 +5,11 @@ # @File : data_handle.py # @Software: PyCharm # @Desc: 数据处理 -from loguru import logger from jsonpath import jsonpath import re from faker import Faker from string import Template +from loguru import logger faker = Faker() @@ -21,22 +21,38 @@ def data_replace(content, source): :param source: 需要替换字符串的来源 return content: 替换表达式后的字符串 """ - if content is None: - return None - logger.debug(f"-----Start-----开始进行字符串替换: 初始字符串为:{content}") - if len(content) != 0: - # safe_substitute() 方法会保留没有被替换的占位符,不会抛出 KeyError 异常。 - # 所以,如果 content 中不存在占位符,使用 safe_substitute() 方法进行替换后,得到的结果和原始字符串是一样的。 - content = Template(str(content)).safe_substitute(source) - for func in re.findall('\\${(.*?)}', content): - content = content.replace('${%s}' % func, exec_func(func)) - try: + try: + if content is None: + return None + logger.debug("\n======================================================\n" \ + "-------------Start:数据替换--------------------\n" + f"初始字符串: {content}\n" \ + "=====================================================") + print(f"-----Start-----数据替换: 初始字符串为:{content}") + if len(content) != 0: + # safe_substitute() 方法会保留没有被替换的占位符,不会抛出 KeyError 异常。 + # 所以,如果 content 中不存在占位符,使用 safe_substitute() 方法进行替换后,得到的结果和原始字符串是一样的。 + content = Template(str(content)).safe_substitute(source) + for func in re.findall('\\${(.*?)}', content): content = content.replace('${%s}' % func, exec_func(func)) - except Exception as e: - logger.error(f"-----END-----替换数据时出现了异常:{e}") - return f"-----END-----替换数据时出现了异常:{e}" - logger.debug(f"-----End-----字符串替换完成: 新字符串为:{content}") - return content + try: + content = content.replace('${%s}' % func, exec_func(func)) + except Exception as e: + logger.error(f"\n======================================================\n" \ + "-------------End:数据替换--------------------\n" + f"替换数据时出现了异常:{e}" \ + "=====================================================") + print(f"-----END-----替换数据时出现了异常:{e}") + return e + logger.debug("\n======================================================\n" \ + "-------------End:数据替换--------------------\n" + f"新字符串:{content}\n" \ + "=====================================================") + print(f"-----End-----数据替换完成: 新字符串为:{type(content)} || {content}\n") + return content + except Exception as e: + logger.error(f"进行数据替换时报错:{e}") + print(f"进行数据替换时报错:{e}") def json_extractor(obj: dict, expr: str = '.'): @@ -47,10 +63,17 @@ def json_extractor(obj: dict, expr: str = '.'): """ try: result = jsonpath(obj, expr)[0] - logger.debug("提取响应内容成功,提取表达式为: {} 提取值为 {}".format(expr, result)) + logger.debug("\n======================================================\n" \ + "-------------Start:json_extractor--------------------\n" + f"提取表达式为: {expr} 提取值为 {result}\n" \ + "=====================================================") + print("提取响应内容成功,提取表达式为: {} 提取值为 {}".format(expr, result)) except Exception as e: - logger.exception(e) - logger.exception(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj}') + logger.debug("\n======================================================\n" \ + "-------------End:json_extractor--------------------\n" + f"提取表达式为: {expr} 提取数据为 {obj}\n, 错误信息为:{e}\n" \ + "=====================================================") + print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj}, 错误信息为:{e}') result = None return result @@ -63,10 +86,18 @@ def re_extract(obj: str, expr: str = '.'): """ try: result = re.findall(expr, obj)[0] + logger.debug("\n======================================================\n" \ + "-------------Start:re_extract--------------------\n" + f"提取表达式为: {expr} 提取值为: {result}\n" \ + "=====================================================") + print("提取响应内容成功,提取表达式为: {} 提取值为: {}".format(expr, result)) except Exception as e: + logger.debug(f"\n======================================================\n" \ + "-------------End:re_extract--------------------\n" + f"提取表达式为: {expr} 提取数据为:{obj}\n, 错误信息为:{e}\n" \ + "=====================================================") + print(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj}, 错误信息为:{e}') result = None - logger.exception(e) - logger.exception(f'未提取到内容,请检查表达式是否错误!提取表达式为:{expr} 提取数据为 {obj}') return result @@ -76,6 +107,10 @@ def exec_func(func) -> str: : return 返回的转换成函数执行的结果,已字符串格式返回 """ result = eval(func) + logger.debug("\n======================================================\n" \ + "-------------exec_func--------------------\n" + f"函数: {func} 执行结果: {result}\n" \ + "=====================================================") return str(result) diff --git a/common_utils/dingding_handle.py b/common_utils/dingding_handle.py index c52e54c..e99fd27 100644 --- a/common_utils/dingding_handle.py +++ b/common_utils/dingding_handle.py @@ -11,8 +11,8 @@ import base64 import urllib.parse import time import urllib.request -from loguru import logger from requests import request +from loguru import logger class DingTalkBot: @@ -58,6 +58,11 @@ class DingTalkBot: 发送钉钉消息 :payload: 请求json数据 """ + logger.debug("\n======================================================\n" \ + "-------------Start:发送钉钉消息--------------------\n" + f"Webhook_Url: {self.webhook_url}\n" \ + f"内容: {payload}\n" \ + "=====================================================") response = request( url=self.webhook_url, json=payload, @@ -65,10 +70,15 @@ class DingTalkBot: method="POST" ) if response.json().get("errcode") == 0: - logger.debug(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}") + logger.debug("\n======================================================\n" \ + "-------------End:发送钉钉消息--------------------\n" + f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}\n" \ + "=====================================================") + print(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息成功:{response.json()}") return True else: logger.error(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息失败:{response.text}") + print(f"通过钉钉机器人发送{payload.get('msgtype', '')}消息失败:{response.text}") return False def send_text(self, content, mobiles=None, is_at_all=False): @@ -98,7 +108,7 @@ class DingTalkBot: "isAtAll": is_at_all } } - return self.send_message(payload, "send_text") + return self.send_message(payload) def send_link(self, title, text, message_url, pic_url=None): """ diff --git a/common_utils/excel_handle.py b/common_utils/excel_handle.py index 45c7a71..d2f9780 100644 --- a/common_utils/excel_handle.py +++ b/common_utils/excel_handle.py @@ -7,7 +7,6 @@ # @Desc: 使用openpyxl对excel进行读写操作 import openpyxl -from loguru import logger class ExcelHandle: @@ -29,10 +28,9 @@ class ExcelHandle: wb.save(self.filename) return self.filename - # add by xiahb def read_sheet(self, sheet, workbook): """ - 读取一个sheet的内容 + 读取指定表单的内容 :param sheet: 表单名称 :param workbook: 工作簿对象 :return: sheet数据列表 @@ -88,11 +86,11 @@ class ExcelHandle: sheets = workbook.sheetnames if sheet_name in sheets: sheet = workbook[sheet_name] - logger.info(f"往表单【{sheet_name}】中写入数据") + print(f"往表单【{sheet_name}】中写入数据") else: # 如果表单为空,就默认使用第一个表单 sheet = workbook.active - logger.info(f"表单【{sheet_name}】不存在,默认往第一个表单中写入数据") + print(f"表单【{sheet_name}】不存在,默认往第一个表单中写入数据") sheet.cell(row=row, column=column, value=data) # 更上面写法效果一样 sheet.cell(row=row, column=column).value = data diff --git a/common_utils/files_handle.py b/common_utils/files_handle.py index 3dfba6a..45a4f6f 100644 --- a/common_utils/files_handle.py +++ b/common_utils/files_handle.py @@ -5,7 +5,6 @@ # @Software: PyCharm # @Desc: 处理文件相关操作 import os -from loguru import logger import zipfile import shutil @@ -76,7 +75,7 @@ def zip_file(in_path: str, out_path: str): """ # 如果传入的路径是一个目录才进行压缩操作 if os.path.isdir(in_path): - logger.debug("目标路径是一个目录,开始进行压缩......") + print("目标路径是一个目录,开始进行压缩......") # 写入 zip = zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) for path, dirnames, filenames in os.walk(in_path): @@ -88,9 +87,9 @@ def zip_file(in_path: str, out_path: str): path, filename), os.path.join( fpath, filename)) zip.close() - logger.debug("压缩完成!") + print("压缩完成!") else: - logger.error("目标路径不是一个目录,请检查!") + print("目标路径不是一个目录,请检查!") def delete_dir_file(file_path): @@ -100,7 +99,7 @@ def delete_dir_file(file_path): """ paths = os.listdir(file_path) if paths: - logger.debug("目标目录存在文件或目录,进行删除操作") + print("目标目录存在文件或目录,进行删除操作") for item in paths: path = os.path.join(file_path, item) # 如果目标路径是一个文件,使用os.remove删除 @@ -110,7 +109,7 @@ def delete_dir_file(file_path): if os.path.isdir(path): os.rmdir(path) else: - logger.debug("目标目录不存在文件或目录,不需要删除") + print("目标目录不存在文件或目录,不需要删除") def copy_file(src_file_path, dest_dir_path): @@ -134,5 +133,3 @@ def copy_file(src_file_path, dest_dir_path): return "复制成功" except Exception as e: return f"复制失败:{e}" - - diff --git a/common_utils/mysql_handle.py b/common_utils/mysql_handle.py index 9ddc053..b174d2f 100644 --- a/common_utils/mysql_handle.py +++ b/common_utils/mysql_handle.py @@ -22,6 +22,16 @@ class MysqlServer: """ 初始化方法中, 连接mysql数据库, 根据ssh参数决定是否走SSH隧道方式连接mysql数据库 """ + logger.debug("\n======================================================\n" \ + "-------------数据库配置信息--------------------\n" + f"db_host: {db_host}\n" \ + f"db_port: {db_port}\n" \ + f"db_user: {db_user}\n" \ + f"db_pwd: {db_pwd}\n" \ + f"db_database: {db_database}\n" \ + f"ssh: {ssh}\n" \ + f"kwargs: {kwargs}\n" \ + "=====================================================") self.server = None try: if ssh: @@ -48,6 +58,7 @@ class MysqlServer: self.cursor = self.conn.cursor() except Exception as e: logger.error(f"数据库连接失败:{e}") + print(f"数据库连接失败:{e}") def query_all(self, sql): """ @@ -63,7 +74,8 @@ class MysqlServer: self.close() return self.verify(data) except Exception as e: - logger.error(f"查询所有符合sql条件的数据报错: {e}") + logger.error(f"{sql} --> 报错: {e}") + print(f"{sql} --> 报错: {e}") raise e def query_one(self, sql): @@ -81,6 +93,7 @@ class MysqlServer: return self.verify(data) except Exception as e: logger.error(f"{sql} --> 报错: {e}") + print(f"{sql} --> 报错: {e}") raise e def insert(self, sql): @@ -96,6 +109,7 @@ class MysqlServer: self.close() except Exception as e: logger.error(f"{sql} --> 报错: {e}") + print(f"{sql} --> 报错: {e}") raise e def update(self, sql): @@ -111,6 +125,7 @@ class MysqlServer: self.close() except Exception as e: logger.error(f"{sql} --> 报错: {e}") + print(f"{sql} --> 报错: {e}") raise e def query(self, sql, one=True): @@ -127,6 +142,7 @@ class MysqlServer: return self.query_all(sql) except Exception as e: logger.error(f"{sql} --> 报错: {e}") + print(f"{sql} --> 报错: {e}") raise e def close(self): diff --git a/common_utils/wechat_handle.py b/common_utils/wechat_handle.py index d601a6d..2592288 100644 --- a/common_utils/wechat_handle.py +++ b/common_utils/wechat_handle.py @@ -6,10 +6,10 @@ # @Desc: 企业微信机器人 import os from requests import request -from loguru import logger import base64 import hashlib import re +from loguru import logger class WechatBot: @@ -34,6 +34,11 @@ class WechatBot: 发送微信消息 :payload: 请求json数据 """ + logger.debug("\n======================================================\n" \ + "-------------Start:发送企业微信消息--------------------\n" + f"Webhook_Url: {self.webhook_url}\n" \ + f"内容: {payload}\n" \ + "=====================================================") response = request( url=self.webhook_url, json=payload, @@ -41,13 +46,18 @@ class WechatBot: method="POST" ) if response.json().get("errcode") == 0: - logger.debug(f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}") + logger.debug("\n======================================================\n" \ + "-------------End:发送企业微信消息--------------------\n" + f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}\n" \ + "=====================================================") + print(f"通过企业微信发送{payload.get('msgtype', '')}消息成功:{response.json()}") return True else: logger.error(f"通过企业微信发送{payload.get('msgtype', '')}消息失败:{response.text}") + print(f"通过企业微信发送{payload.get('msgtype', '')}消息失败:{response.text}") return False - def send_text(self, content, mentioned_list=[], mentioned_mobile_list=[]): + def send_text(self, content, mentioned_list=None, mentioned_mobile_list=None): """ 发送文本消息 :param content: 文本内容,最长不超过2048个字节,必须是utf8编码 @@ -145,10 +155,10 @@ class WechatBot: response = request(url=url, method="POST", files=files, headers=headers) if response.json().get("errcode") == 0: media_id = response.json().get("media_id") - logger.debug(f"上传文件成功,media_id= {media_id}") + print(f"上传文件成功,media_id= {media_id}") return media_id else: - logger.error(f"上传文件失败:{response.text}") + print(f"上传文件失败:{response.text}") return False def send_file(self, media_id): diff --git a/common_utils/yagmail_handle.py b/common_utils/yagmail_handle.py index 93ab071..0aa2c1c 100644 --- a/common_utils/yagmail_handle.py +++ b/common_utils/yagmail_handle.py @@ -4,9 +4,9 @@ # @File : yagmail_handle.py # @Software: PyCharm # @Desc: 通过第三方模块yagmail发送邮件 +from loguru import logger import yagmail -from loguru import logger import os @@ -31,23 +31,37 @@ class YagEmailServe: } :return: """ - logger.debug("------------ 连接邮件服务器 ---------------") - yag = yagmail.SMTP( - user=self.user, - password=self.password, - host=self.host) - logger.debug("------------ 开始发送邮件 ---------------") - # 如果存在附件,则与邮件内容一起发送附件,否则仅发送邮件内容 - if os.path.exists(info['attachments']): - yag.send( - to=info['to'], - subject=info['subject'], - contents=info['contents'], - attachments=info['attachments']) - else: - yag.send( - to=info['to'], - subject=info['subject'], - contents=info['contents']) - logger.debug("------------ 邮件发送完毕,关闭邮件服务器连接 ---------------") - yag.close() + try: + logger.debug("\n======================================================\n" \ + "-------------Start:发送邮件--------------------\n" + f"用户名: {self.user}\n" \ + f"密码: {self.password}\n" \ + f"host: {self.host}\n" \ + f"host: {self.host}\n" \ + f"邮件内容: {info}\n" \ + "=====================================================") + yag = yagmail.SMTP( + user=self.user, + password=self.password, + host=self.host) + # 如果存在附件,则与邮件内容一起发送附件,否则仅发送邮件内容 + if os.path.exists(info['attachments']): + yag.send( + to=info['to'], + subject=info['subject'], + contents=info['contents'], + attachments=info['attachments']) + else: + yag.send( + to=info['to'], + subject=info['subject'], + contents=info['contents']) + yag.close() + logger.debug("\n======================================================\n" \ + "-------------End:发送邮件--------------------\n" + "发送邮件成功\n" \ + "=====================================================") + print("发送邮件成功") + except Exception as e: + logger.error(f"发送邮件失败,错误信息: {e}") + print(f"发送邮件失败,错误信息: {e}") diff --git a/data/test_login_demo.yaml b/data/1test_login_demo.yaml similarity index 67% rename from data/test_login_demo.yaml rename to data/1test_login_demo.yaml index 3b4d4bc..35c9a20 100644 --- a/data/test_login_demo.yaml +++ b/data/1test_login_demo.yaml @@ -6,7 +6,34 @@ case_common: # 用例数据 case_info: - feature: 登录 - title: 用户名密码正确,登录成功 + title: 用户名密码正确,登录成功(不校验数据库) + url: /api/accounts/login.json + method: POST + headers: {"Content-Type": "application/json; charset=utf-8;"} + cookies: + pk: json + payload: { "login": "${login}","password": "${password}","autologin": 1 } + files: + extract: + nickname: $.username + login: $.login + user_id: $.user_id + run: False + assert_response: + eq: + http_code: 200 + $.user_id: ${user_id} + in: + $.login: ${login} + gt: + $.user_id: 84955 + lt: + $.user_id: 84953 + not: + $.user_id: 85390 + assert_sql: +- feature: 登录 + title: 用户名密码正确,登录成功(校验数据库) url: /api/accounts/login.json method: POST headers: {"Content-Type": "application/json; charset=utf-8;"} diff --git a/test_case/test_manual_case/test_demo.py b/test_case/test_manual_case/1test_demo.py similarity index 98% rename from test_case/test_manual_case/test_demo.py rename to test_case/test_manual_case/1test_demo.py index 0885aaf..487091b 100644 --- a/test_case/test_manual_case/test_demo.py +++ b/test_case/test_manual_case/1test_demo.py @@ -15,8 +15,8 @@ import allure from case_utils.allure_handle import allure_title, allure_step # 读取用例数据 -cases = [{"title": "demo case 01", "user": "flora1", "age": 17, "run": True}, - {"title": "demo case 02", "user": "lucy", "age": 17, "run": True}] +cases = [{"title": "demo case 01", "user": "flora1", "age": 17, "run": False}, + {"title": "demo case 02", "user": "lucy", "age": 17, "run": False}] @allure.story("demo模块")