调整gitlink一系列测试用例

This commit is contained in:
floraachy
2024-10-10 13:41:58 +08:00
parent c14029edf6
commit bb41aeee6d
144 changed files with 3799 additions and 2152 deletions

View File

@@ -9,11 +9,7 @@ import random # 导包不能移除否则random.choice这种就不能处理
import json
import re, uuid
import copy
import os
import base64
from datetime import datetime, timedelta
# 第三方库导入
from loguru import logger
from string import Template
from requests.cookies import RequestsCookieJar
from requests.utils import dict_from_cookiejar
@@ -21,8 +17,6 @@ from requests.utils import dict_from_cookiejar
from utils.data_utils.faker_handle import FakerData
from utils.data_utils.eval_data_handle import eval_data
from utils.data_utils.data_tools import *
from utils.files_utils.files_handle import file_to_base64, filepath_to_base64, get_files
from config.path_config import FILES_DIR
class DataHandle:
@@ -33,22 +27,22 @@ class DataHandle:
self.method_list = [method for method in dir(FakerData) if
callable(getattr(FakerData, method)) and not method.startswith("__")]
def process_cookie_jar(self, data):
def process_cookie_jar(self, _data):
"""
将任意数据里的RequestsCookieJar转成dict再转换成JSON 格式的字符串(序列化)
:param data: 待处理的数据
:param _data: 待处理的数据
"""
if isinstance(data, dict):
for key, value in data.items():
data[key] = self.process_cookie_jar(value)
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = self.process_cookie_jar(item)
elif isinstance(data, RequestsCookieJar):
data = json.dumps(dict_from_cookiejar(data))
return data
if isinstance(_data, dict):
for key, value in _data.items():
_data[key] = self.process_cookie_jar(value)
elif isinstance(_data, list):
for i, item in enumerate(_data):
_data[i] = self.process_cookie_jar(item)
elif isinstance(_data, RequestsCookieJar):
data = json.dumps(dict_from_cookiejar(_data))
return _data
def replace_and_store_placeholders(self, pattern, text, resultAsDict=True):
def replace_and_store_placeholders(self, pattern, text, result_as_dict=True):
"""
提取字符串中符合正则表达式的元素同时用一个唯一的uuid来替换原有字符串
例如:
@@ -69,7 +63,7 @@ class DataHandle:
text = replaced_text
replaced_text = re.sub(pattern, replace, text, count=1)
if resultAsDict:
if result_as_dict:
return replaced_text, placeholders
else:
# 构造结果字符串
@@ -95,7 +89,7 @@ class DataHandle:
# 处理一下source检测到里面存在RequestsCookieJar转成dict再转换成JSON 格式的字符串(序列化)。
# 避免传递过来一个RequestsCookieJar替换后变成了'RequestsCookieJar'导致cookies无法使用的问题
source = self.process_cookie_jar(data=source)
source = self.process_cookie_jar(_data=source)
# 如果进来的是字符串,先将各种类型的表达式处理完
if isinstance(obj, str):
@@ -185,158 +179,6 @@ class DataHandle:
return obj
def get_file_content(file_name):
"""
获取文件二进制内容
:param file_name: 文件名称
:return:
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径,则返回文件二进制内容
return file_to_base64(file_path=file_path)
else:
# 若文件不存在,则尝试以文件扩展名随机选择一个文件
logger.warning(f"图片不存在,将获取传入文件名后缀,随机取对应类型的文件, 路径:{file_path}")
file_extension = os.path.splitext(file_name)[1]
files = get_files(target=FILES_DIR, end=file_extension)
if files:
# 返回文件二进制内容
return file_to_base64(file_path=random.choice(files))
else:
logger.warning(f"找不到该文件后缀对应的同类型文件,将返回空, 传入的文件名:{file_name}")
return None
def list_to_str(target):
"""
将列表中的元素转换为字符串,并用逗号分隔。
:param target: 要转换为字符串的列表。
:return: 以逗号分隔的字符串。
"""
if isinstance(target, list) and target:
# 过滤掉列表中的None值
filtered_list = [str(item) for item in target if item is not None]
# 使用逗号连接字符串
return ",".join(filtered_list)
else:
return None
def string_to_base64(input_string: str):
"""
将字符串转换为Base64格式
"""
base64_bytes = base64.b64encode(input_string.encode('utf-8'))
base64_string = base64_bytes.decode('utf-8')
return base64_string
def str_to_list(target):
"""
将字符串转换为列表,字符串中以逗号分隔的元素将转换为列表中的元素。
"""
if isinstance(target, str):
return [target]
else:
return target
def none_to_null(target):
"""
'None'转成空字符串
"""
if target == 'None':
return ""
else:
return target
def get_file_base64(file_name):
"""
返回文文件内容的base64编码
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径则返回base64编码内容
return file_to_base64(file_path=file_path)
else:
logger.warning(f"找不到该文件,将返回空, 传入的文件名:{file_name}")
return None
def get_filepath_base64(file_name):
"""
返回文件路径的base64编码
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径则返回base64编码内容
return filepath_to_base64(file_path=file_path)
else:
logger.warning(f"找不到该文件,将返回空, 传入的文件名:{file_name}")
return None
def get_base64_content(input_string: str):
"""
获取base64编码内容
"""
byte_string = input_string.encode('utf-8')
base64_bytes = base64.b64encode(byte_string)
base64_string = base64_bytes.decode('utf-8')
return base64_string
def base64_decode(encoded_string):
try:
decoded_bytes = base64.b64decode(encoded_string)
decoded_string = decoded_bytes.decode('utf-8')
return decoded_string
except Exception as e:
return f"Error decoding: {str(e)}"
def update_wiki_sidebar(sidebar_content, new_page_name):
"""
获取wiki sideber的base64编码内容将新页面追加到后面再重新编码返回
"""
_sidebar_content = base64_decode(sidebar_content)
new_sidebar_content = _sidebar_content + f"\n[[{new_page_name}]]"
return string_to_base64(new_sidebar_content)
def get_current_week(start_or_end="start"):
"""
获取当前日期,并根据参数返回本周的开始或结束日期。
参数:
- start_or_end: 字符串,指定返回本周的开始日期("start")还是结束日期("end")。
返回:
- 本周开始或结束日期的字符串表示,格式为"月日"(例如:"01月01日")。
"""
# 获取当前日期
today = datetime.today()
# 计算今天是本周的第几天0代表周一1代表周二以此类推
current_weekday = today.weekday()
if start_or_end == "start":
# 计算本周的周一
res = today - timedelta(days=current_weekday)
elif start_or_end == "end":
# 计算本周的周日
res = today - timedelta(days=current_weekday) + timedelta(days=6)
else:
# 如果参数非法,返回当前日期的周一
logger.error(f"Invalid value for start_or_end: {start_or_end}. Defaulting to 'start'.")
res = today - timedelta(days=current_weekday)
return res.strftime("%m月%d")
# 声明data_handle方法这样外部就可以直接import data_handle来使用了
data_handle = DataHandle().data_handle

View File

@@ -6,7 +6,15 @@
# @Desc:
# 标准库导入
import random
import random # 导包不能移除否则random.choice这种就不能处理了
import os
import base64
from datetime import datetime, timedelta
# 第三方库导入
from loguru import logger
# 本地应用/模块导入
from utils.files_utils.files_handle import file_to_base64, filepath_to_base64, get_files
from config.path_config import FILES_DIR
def zip_test_step(step_id, step_status_id=None):
@@ -17,3 +25,176 @@ def zip_test_step(step_id, step_status_id=None):
return [{'id': id, 'stepStatus': execResult} for id, execResult in zip(step_id, step_status_id)]
else:
return [{'id': id, 'stepStatus': random.choice([0, 1, 2, 3, 4])} for id in step_id]
def get_file_content(file_name):
"""
获取文件二进制内容
:param file_name: 文件名称
:return:
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径,则返回文件二进制内容
return file_to_base64(file_path=file_path)
else:
# 若文件不存在,则尝试以文件扩展名随机选择一个文件
logger.warning(f"图片不存在,将获取传入文件名后缀,随机取对应类型的文件, 路径:{file_path}")
file_extension = os.path.splitext(file_name)[1]
files = get_files(target=FILES_DIR, end=file_extension)
if files:
# 返回文件二进制内容
return file_to_base64(file_path=random.choice(files))
else:
logger.warning(f"找不到该文件后缀对应的同类型文件,将返回空, 传入的文件名:{file_name}")
return None
def list_to_str(target):
"""
将列表中的元素转换为字符串,并用逗号分隔。
:param target: 要转换为字符串的列表。
:return: 以逗号分隔的字符串。
"""
if isinstance(target, list) and target:
# 过滤掉列表中的None值
filtered_list = [str(item) for item in target if item is not None]
# 使用逗号连接字符串
return ",".join(filtered_list)
elif isinstance(target, str) and target:
return target
else:
return None
def string_to_base64(input_string: str):
"""
将字符串转换为Base64格式
"""
base64_bytes = base64.b64encode(input_string.encode('utf-8'))
base64_string = base64_bytes.decode('utf-8')
return base64_string
def str_to_list(target):
"""
将字符串转换为列表,字符串中以逗号分隔的元素将转换为列表中的元素。
"""
if isinstance(target, str):
return [target]
else:
return target
def none_to_null(target):
"""
'None'转成空字符串
"""
if target == 'None':
return ""
else:
return target
def get_file_base64(file_name):
"""
返回文文件内容的base64编码
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径则返回base64编码内容
return file_to_base64(file_path=file_path)
else:
logger.warning(f"找不到该文件,将返回空, 传入的文件名:{file_name}")
return None
def get_filepath_base64(file_name):
"""
返回文件路径的base64编码
"""
file_path = os.path.join(FILES_DIR, file_name)
if os.path.exists(file_path):
# 如果文件是一个真实存在的路径则返回base64编码内容
return filepath_to_base64(file_path=file_path)
else:
logger.warning(f"找不到该文件,将返回空, 传入的文件名:{file_name}")
return None
def get_base64_content(input_string: str):
"""
获取base64编码内容
"""
byte_string = input_string.encode('utf-8')
base64_bytes = base64.b64encode(byte_string)
base64_string = base64_bytes.decode('utf-8')
return base64_string
def base64_decode(encoded_string):
try:
decoded_bytes = base64.b64decode(encoded_string)
decoded_string = decoded_bytes.decode('utf-8')
return decoded_string
except Exception as e:
return f"Error decoding: {str(e)}"
def update_wiki_sidebar(sidebar_content, new_page_name):
"""
获取wiki sideber的base64编码内容将新页面追加到后面再重新编码返回
"""
_sidebar_content = base64_decode(sidebar_content)
new_sidebar_content = _sidebar_content + f"\n[[{new_page_name}]]"
return string_to_base64(new_sidebar_content)
def get_current_week(start_or_end="start"):
"""
获取当前日期,并根据参数返回本周的开始或结束日期。
参数:
- start_or_end: 字符串,指定返回本周的开始日期("start")还是结束日期("end")。
返回:
- 本周开始或结束日期的字符串表示,格式为"月日"(例如:"01月01日")。
"""
# 获取当前日期
today = datetime.today()
# 计算今天是本周的第几天0代表周一1代表周二以此类推
current_weekday = today.weekday()
if start_or_end == "start":
# 计算本周的周一
res = today - timedelta(days=current_weekday)
elif start_or_end == "end":
# 计算本周的周日
res = today - timedelta(days=current_weekday) + timedelta(days=6)
else:
# 如果参数非法,返回当前日期的周一
logger.error(f"Invalid value for start_or_end: {start_or_end}. Defaulting to 'start'.")
res = today - timedelta(days=current_weekday)
return res.strftime("%m月%d")
def split_data(target: str, split_char: str, start_index: int, end_index: int = None):
"""
切割数据,返回指定索引范围内的切割结果
:param target: 要切割的字符串
:param split_char: 切割的方式
:param start_index: 切割结果的起始索引。假如end_index为空则返回指定索引值
:param end_index: 切割结果的结束索引(可选,默认为 None
:return: 指定索引范围内的切割结果
"""
# 参数类型检查
if not isinstance(target, str):
raise ValueError("target must be a string")
if start_index and end_index:
return target.split(split_char)[start_index:end_index]
else:
return target.split(split_char)[start_index]

View File

@@ -10,9 +10,10 @@ import re
# 第三方库导入
from jsonpath import jsonpath
from loguru import logger
from requests import Response
from requests import Response, cookies, utils
# 本地应用/模块导入
from utils.data_utils.data_handle import data_handle
def json_extractor(obj, expr: str = '.'):
@@ -24,11 +25,10 @@ def json_extractor(obj, expr: str = '.'):
"""
try:
result = jsonpath(obj, expr)[0] if len(jsonpath(obj, expr)) == 1 else jsonpath(obj, expr)
result = data_handle(obj=result)
logger.debug(f"\n提取对象:{obj}\n"
f"提取表达式: {expr} \n"
f"提取值类型: {type(result)}\n"
f"提取{result}\n")
f"提取结果{result}\n")
return result
except Exception as e:
logger.debug(f"\n提取对象:{obj}\n"
@@ -48,11 +48,10 @@ def re_extract(obj: str, expr: str = '.'):
# 如果提取后的数据长度为1则取第一个元素返回str否则返回列表
result = re.findall(expr, obj)[0] if len(re.findall(expr, obj)) == 1 else re.findall(expr, obj)
# 由于提取出来的数据都是str格式将eval一样还原数据格式
result = data_handle(obj=result)
logger.debug(f"\n提取对象:{obj}\n"
f"提取表达式: {expr}\n"
f"提取值类型: {type(result)}\n"
f"提取{result}\n")
f"提取结果{result}\n")
return result
except Exception as e:
logger.debug(f"\n提取对象:{obj}\n"
@@ -72,7 +71,10 @@ def response_extract(response: Response, expr: str = '.'):
result = eval(expr)
logger.debug(f"\n提取表达式: {expr}\n"
f"提取值类型: {type(result)}\n"
f"提取{result}\n")
f"提取结果{result}\n")
# 将从Response对象提取的cookiejar对象转换为dict格式 避免后续使用cookies的时候出现类型错误
if isinstance(result, cookies.RequestsCookieJar):
result = utils.dict_from_cookiejar(result)
return result
except Exception as e:
logger.debug(f"\n提取表达式: {expr}\n"