Merge pull request '重构并优化了data_handle方法' (#1) from data_handle into master

This commit is contained in:
floraachy
2023-09-22 16:52:27 +08:00

View File

@@ -1,14 +1,13 @@
# -*- coding: utf-8 -*-
# @Version: Python 3.9
# @Time : 2023/2/1 14:16
# @Time : 2023/9/21 17:20
# @Author : chenyinhua
# @File : data_handle.py
# @Software: PyCharm
# @Desc: 数据处理
# @Desc:
# 标准库导入
import random
import re
import re, uuid
from datetime import datetime, date, timedelta
# 第三方库导入
from faker import Faker
@@ -147,80 +146,228 @@ class FakerData:
_time_after_week = (date.today() + timedelta(days=+6)).strftime("%Y-%m-%d") + " 00:00:00"
return _time_after_week
@classmethod
def remove_special_characters(cls, target: str):
"""
移除字符串中的特殊字符。
在Python中用replace()函数操作指定字符
常用字符unicode的编码范围
数字:\u0030-\u0039
汉字:\u4e00-\u9fa5
大写字母:\u0041-\u005a
小写字母:\u0061-\u007a
英文字母:\u0041-\u007a
"""
pattern = r'([^\u4e00-\u9fa5])'
result = re.sub(pattern, '', target)
return result
def data_handle(obj, source):
"""
递归处理字典、列表中的字符串,将${}占位符替换成source中的值
"""
obj = eval_data_process(obj)
if isinstance(obj, str):
# 寻找${} 在source中找到对应的关键字进行替换如obj=${user_id}, 去寻找source中对应键user_id的值假设user_id=1使得obj=1
obj = Template(obj).safe_substitute(source)
# 寻找${python表达式} 将Python表达式eval得出其具体值
for func in re.findall('\\${(.*?)}', obj):
"""
兼容一下如下数据处理faker.name().replace(" ", "").replace(".", "")
faker是FakerData()中的实例变量。
"""
if func.startswith("faker."):
# 英文的faker数据self.faker = Faker()
faker = FakerData().faker
obj = obj.replace('${%s}' % func, eval(func))
elif func.startswith("fk_zh."):
# 中文的faker数据 self.fk_zh = Faker(locale='zh_CN')
fk_zh = FakerData().fk_zh
obj = obj.replace('${%s}' % func, eval(func))
class DataHandle:
def __init__(self):
# 实例化FakerData类避免反复实例提高性能。
self.FakerDataClass = FakerData()
# 获取FakerData类所有自定义方法
self.method_list = [method for method in dir(FakerData) if
callable(getattr(FakerData, method)) and not method.startswith("__")]
# 将"[1,2,3]" 或者"{'k':'v'}" -> [1,2,3], {'k':'v'}
def eval_data(self, data):
"""
执行一个字符串表达式,并返回其表达式的值
"""
try:
if hasattr(eval(data), "__call__"):
return data
else:
# 这里获取到的func是这样的格式例如random_int() 但实际我们通过FakerData()获取到的方法是这样的格式例如random_int, 所以我们需要进行处理
func_name = func.split("(")[0] if "(" in func else func
if hasattr(FakerData(), func_name) and callable(getattr(FakerData(), func_name)):
# 调用FakerData类的方法获取数据
obj = obj.replace('${%s}' % func, str(getattr(FakerData(), func_name)()))
# 处理其他Python表达式例如${1+1}
obj = obj.replace('${%s}' % func, eval_data_process(func))
obj = eval_data_process(obj)
return eval_data_process(obj)
elif isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = data_handle(item, source)
return obj
elif isinstance(obj, dict):
for key, value in obj.items():
obj[key] = data_handle(value, source)
return obj
else:
return obj
# 将"[1,2,3]" 或者"{'k':'v'}" -> [1,2,3], {'k':'v'}
def eval_data(data):
"""
执行一个字符串表达式,并返回其表达式的值
"""
try:
if hasattr(eval(data), "__call__"):
return eval(data)
except Exception:
return data
def replace_and_store_placeholders(self, pattern, text):
"""
提取字符串中符合正则表达式的元素同时用一个唯一的uuid来替换原有字符串
例如:
原字符串user_id: ${user_id}, user_name: ${user_name}
替换后的原字符串user_id: e1c6fc74-2f21-49a9-8d0c-de16650c6364, user_name: 50c74155-5cb5-4809-bc5d-277addf8c3e7
暂存的需要被处理的关键字或函数:{'e1c6fc74-2f21-49a9-8d0c-de16650c6364': {0: '${user_id}', 1: 'user_id'}, '50c74155-5cb5-4809-bc5d-277addf8c3e7': {0: '${user_name}', 1: 'user_name'}}
"""
placeholders = {}
def replace(match):
placeholder = str(uuid.uuid4()) # 使用uuid生成唯一的占位符
placeholders[placeholder] = {0: match.group(0), 1: match.group(1)} # 将提取到的字符串存储到字典中
return placeholder
# 使用正则表达式进行字符串匹配和替换,同时指定替换次数为 1
replaced_text = re.sub(pattern, replace, text, count=1)
while replaced_text != text:
text = replaced_text
replaced_text = re.sub(pattern, replace, text, count=1)
return replaced_text, placeholders
def data_handle(self, obj, source=None):
"""
递归处理字典、列表中的字符串,将${}占位符替换成source中的值
"""
func = {}
keys = {}
if not source or not isinstance(source, dict):
print("source为空或者source不是字典格式都将认为是{}")
source = {}
# 如果进来的是字符串,先将各种类型的表达式处理完
if isinstance(obj, str):
# 先把python表达式找出来存着这里会漏掉一些诸如1+1的表达式
pattern = r"\${([^}]+\))}" # 匹配以 "${" 开头、以 ")}" 结尾的字符串,并在括号内提取内容,括号内不能包含"}"字符
obj, func = self.replace_and_store_placeholders(pattern, obj)
# 再把关键字替换的找出来存着这里会将1+1这样的表达式存起来
pattern = r'\$\{([^}]+)\}' # 定义匹配以"${"开头,"}"结尾的字符串的正则表达式
obj, keys = self.replace_and_store_placeholders(pattern, obj)
# 接着处理表达式和关键字替换,先进行关键字替换
keys = eval(Template(str(keys)).safe_substitute(source)) # 替换并转为字典
for key, value in keys.items(): # 遍历字典替换
obj = obj.replace(key, value[0])
# 再找一遍剩余的${}跟第一步的结果合并提取漏掉的诸如1+1的表达式(在此认为关键字无法替换的都是表达式,最后表达式也无法处理的情况就报错或者原样返回)
obj, func_temp = self.replace_and_store_placeholders(pattern, obj)
func.update(func_temp)
# 进行函数调用替换
obj = self.invoke_funcs(obj, func)
# 把处理后的结果eval一下
return self.eval_data(obj)
elif isinstance(obj, list):
for index, item in enumerate(obj):
obj[index] = self.data_handle(item, source)
return obj
elif isinstance(obj, dict):
for key, value in obj.items():
obj[key] = self.data_handle(value, source)
return obj
else:
return eval(data)
except Exception:
return data
return obj
def invoke_funcs(self, obj, funcs):
"""
调用方法并将方法返回的结果替换到obj中去
"""
for key, funcs in funcs.items(): # 遍历方法字典调用并替换
func = funcs[1]
# print("invoke func : ", func)
try:
if "." in func:
if func.startswith("faker."):
# 英文的faker数据self.faker = Faker()
faker = self.FakerDataClass.faker
obj = obj.replace(key, eval(func))
elif func.startswith("fk_zh."):
# 中文的faker数据 self.fk_zh = Faker(locale='zh_CN')
fk_zh = self.FakerDataClass.fk_zh
obj = obj.replace(key, eval(func))
else:
obj = obj.replace(key, str(eval(func)))
else:
if func[:-2] in self.method_list: # 证明是FakerData类方法
obj = obj.replace(key, str(getattr(self.FakerDataClass, func[:-2])()))
else: # 不是FakerData类方法但有可能是 1+1 这样的
obj = obj.replace(key, str(eval(func)))
except:
print("Warn: --------函数:%s 无法调用成功-------" % func)
obj = obj.replace(key, funcs[0])
pass
return obj
# 声明data_handle方法这样外部就可以直接import data_handle来使用了
data_handle = DataHandle().data_handle
if __name__ == '__main__':
# 下面是测试代码
source = {
"user_id": 1,
"user_name": "flora",
"name": "test",
"age": 17
}
# 需要识别${python表达式}这里random方法是需要导入random包的
data_01 = "选择.gitignore: ${random.choice(['Ada', 'Actionscript', 'Ansible', 'Android', 'Agda'])},开源许可证: ${random.choice(['0BSD', 'AAL', 'AFL-1.1', '389-exception'])}"
new = data_handle(data_01)
print(new)
# 需要识别 字符串里面是python表达式的情况
data_02 = "[1,2,3,4]"
new = data_handle(data_02)
print(new)
data_03 = "1+1"
new = data_handle(data_03)
print(new)
data_04 = "[1, '1', [1, 2], {'name':'flora', 'age': '1'}]"
new = data_handle(data_04)
print(new)
data_05 = "user_id: ${user_id}, user_name: ${user_name}"
new = data_handle(data_05, source)
print(new)
# 需要识别自定义的函数,同时支持多种,下面两种写法有细微差别
data_06 = "Hello, ${generate_female_name()}! Random number: ${generate_random_int()}"
new = data_handle(data_06)
print(new)
data_07 = "Hello, ${generate_female_name()}! Random number: ${FakerData().generate_random_int()}"
data_07_09 = "Hello, ${FakerData().generate_female_name()}! Random number: ${FakerData.generate_random_int()}"
new = data_handle(data_07)
print(new)
new = data_handle(data_07_09)
print(new)
data_08 = {
"payload": {
"startTime": "${FakerData.generate_time('%Y-%m-%d')}",
"common2": "${faker.name()}", # 这里是使用类FakerData里面的实例属性faker
"url": "/api/accounts/${FakerData.generate_time('%Y-%m-%d')} / login.json",
"fragement": {
"startTime": "${FakerData.generate_time('%Y-%m-%d')}",
"common2": "${faker.name()}", # 这里是使用类FakerData里面的实例属性faker
"url": "/api/accounts/${FakerData.generate_time('%Y-%m-%d')} / login.json"
}
}
}
new = data_handle(data_08)
print(new)
data_09 = "/api/accounts/${FakerData.generate_time('%Y-%m-%d')}/login.json"
new = data_handle(data_09)
print(new)
# FakerData类中没有封装random_name这个方法会无法处理
data_10 = '[[1,2,3,4],${FakerData().random_name()}]'
new = data_handle(data_10)
print(new)
# 这个用到了source
data_11 = {
"age": "${generate_random_int()}.",
"message": "Hello, ${FakerData().generate_female_name()}! Your age is ${age}. Random number: ${FakerData().generate_random_int()}",
"nested_data": [
"This is ${name}'s data.",
{
"message": "Age: ${age}.",
"nested_list": [
"More data: ${generate_random_int()}",
]
}
]
}
new = data_handle(data_11, source)
print(new)
# 导入其他方法,也可以直接使用
from common_utils.time_handle import test_fun_a
data = "${test_fun_a()}"
new = data_handle(data)
print(new)
def eval_data_process(data):
"""
将数据中的字符串表达式处理后更新其值为表达式
"""
# 如果目标数据是字符串直接尝试eval
if isinstance(data, str):
data = eval_data(data)
# 如果目标数据是列表遍历列表的每一个数据再用递归的方法处理每一个item
if isinstance(data, list):
for index, item in enumerate(data):
data[index] = eval_data_process(eval_data(item))
# 如果目标数据是字典遍历字典的每一个值再用递归的方法处理每一个value
elif isinstance(data, dict):
for key, value in data.items():
data[key] = eval_data_process(eval_data(value))
return data