data_factory/service/data_creation_service.py

315 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/3/15 16:41
# @Author : AngesZhu
# @File : data_creation_service.py
# @Desc : 数据生成调用方法
import os.path
import pandas as pd
from service.get_data_demo_service import get_columns_from_channel
from service.get_db_config import get_db_config
from service.move_excel_service import copy_and_rename_file
from service.read_field_config_service import field_configs
from utils.compare_values_utils import compare_values
from utils.data_file_utils import ExcelHandler
from utils.enum_utils import FunctionEnum
from utils.mysql_utils import MySQLHandler
from utils.postgresql_utils import PostgresHandler
from utils.logger_utils import logger
# excel数据生成调用类
class DataCreationService:
def __init__(self, env):
# 获取环境数据库配置信心
self.env = env
get_results, self.env_data = get_db_config(env)
logger.info(f"初始化获取环境相关配置信息, {self.env_data}")
def create_excel(self, channel_name: str, limit: int = 10, **kwargs):
logger.info(f"根据渠道 {channel_name} 数据获取并生成导出excel文件")
# 获取模版字段
result ,template_data = self._get_columns(channel_name)
if not result:
logger.error("获取渠道{}字段配置失败 {}".format(channel_name, template_data))
return False, template_data
# 获取生成规则,调用函数相关配置,固定值参数配置
result, create_rules, func_temp, fixed_temp = self._field_config(channel_name)
if not result:
logger.error("获取生成规则失败 {}".format(template_data))
return result, create_rules
func_data = self._run_func(func_temp, limit=limit, **kwargs)
if not func_data:
logger.error("执行函数失败 {}".format(template_data))
return False, func_data
elif len(func_data) == 0:
return False, "数据获取失败,未查询到数据"
logger.info(f"获取数据成功,获取数据{len(func_data)}")
# cv模版文件至channel_data并重命名
template_path = os.path.join(template_data["path"], template_data["name"])
data_path = copy_and_rename_file(template_path, env=self.env, **kwargs) # 数据储存的路径
logger.debug(f"复制的文件储存的路径:{data_path}")
# 根据获取的字段模版拼接数据
columns = template_data["columns"]
dict_frame = {}
for count in range(0, limit):
logger.debug(f"index: {count}")
data_line = {}
for column in columns: # column 模版中字段名称
logger.debug(f"column: {column}")
if column in create_rules:
config = create_rules[column] # 字段生成规则
logger.debug(f"config: {config}")
if "get_func" in config.keys():
logger.debug(f"字段数据为函数执行获取字段,{column}")
# 如果函数名称在func_data里则为数据库执行的函数结果走dict
# 如果字段名称在func_data里则为调用函数执行走list
if "group" in config:
_temp_name = "{}_{}".format(config["get_func"], config["group"])
__func_data = func_data[_temp_name]
if count>len(__func_data['data'])-1: # 长度和序号的起始值,导致比较时需要在长度的数据上-1
break
__index = __func_data['columns'].index(config["field"]) # 获取字段在列表中的索引
if not __func_data['data']:
logger.error(f"{config["get_func"]},获取数据内容为空")
return False, "获取数据内容为空"
data_line[column] = __func_data['data'][count][__index]
elif config["get_func"] in func_data:
__func_data = func_data[config["get_func"]]
if count>len(__func_data['data'])-1: # 长度和序号的起始值,导致比较时需要在长度的数据上-1
break
__index = __func_data['columns'].index(config["field"]) # 获取字段在列表中的索引
if not __func_data['data']:
logger.error(f"{config["get_func"]},获取数据内容为空")
return False, "获取数据内容为空"
data_line[column] = __func_data['data'][count][__index]
elif config["field"] in func_data:
__func_data = func_data[config["field"]]
__index = columns.index(column)
if "number" in config["parameter"] and "percentage" in config and config["percentage"]:
# 如果函数参数配置中,有数字,且数字有小数位要求
data_line[column] = f"{__func_data[count]}%"
else:
data_line[column] = __func_data[count]
logger.debug(f"data_line: {data_line}")
else:
match config["data_type"]:
case "excel_demo":
logger.debug(f"字段数据为模版自动填充,{column}")
# 当前设计,字段数据为模版自动填充,需要从模版数据中获取
# 获取字段在模版中的序号,获取模版中的数据
__index = columns.index(column)
data_line[column] = template_data["values"][count][__index]
case "fixed":
logger.debug(f"字段数据为固定值字段,{column}")
# 当前设计,如果不是函数中获取的,就是固定值
data_line[column] = config["content"]
logger.debug(f"data_line: {data_line}")
else:
logger.error(f"字段 {column} 名称在配置中未找到")
logger.error(f"字段配置信息 {create_rules}")
return "字段名称配置异常"
dict_frame[count] = data_line
df = pd.DataFrame.from_dict(columns=columns, data=dict_frame, orient="index")
# 写入channel_data中已复制好的excel文件中
data_obj = ExcelHandler(data_path)
data_obj.write_excel(df, sheet_name=channel_name, mode='w', index=False)
logger.info(f"根据渠道 {channel_name} 数据获取并生成导出excel文件成功文件地址{data_path}")
def _get_columns(self, channel_name: str):
columns = get_columns_from_channel(channel_name)
return columns
def _get_channel_config(self, channel_name: str):
# 根据渠道获取字段配置信息
get_results, data = field_configs(channel_name)
logger.info(f"根据渠道获取字段配置信息, {data}")
if get_results:
return True, data
else:
return False, data
def _field_config(self, channel_name: str):
# 获取字段配置信息,并处理字段配置信息
logger.info(f"获取字段配置信息")
get_results, field_configs_data =self._get_channel_config(channel_name)
if not get_results:
return False, field_configs_data
# 获取字段生成规则
logger.info(f"拼接整理字段生成规则")
create_rules = {}
func_temp = {}
fixed_temp = {}
for field_key, field_info in field_configs_data.get(channel_name).items():
field_info = field_info["info"]
create_rules[field_key] = field_info
# 判断生成规则中是否含有获取函数
if "get_func" in field_info:
logger.debug("字段来源为数据库查询&函数")
# 查询字段生成规则中是否有函数调用的函数名
key_temp = field_info["data_type"]
# 如果配置不在函数规则中,则新建函数规则配置
if key_temp not in func_temp:
func_temp[key_temp] = {}
_func_name_temp = "{}_{}".format(field_info["get_func"], field_info["group"]) if "group" in field_info else field_info["get_func"]
if _func_name_temp not in func_temp[key_temp]:
# 如果类型对应函数配置中,没有对应函数配置, 则拼接函数配置并新增
temp_dict = {
'get_func': field_info["get_func"], 'field': [field_info["field"]]
}
if "parameter" in field_info:
# 如果参数中有parameter则为函数调用的参数
temp_dict['parameter'] = field_info["parameter"]
if "dependency" in field_info:
temp_dict['dependency'] = field_info["dependency"]
func_temp[field_info["field"]] = temp_dict
elif "group" in field_info:
# 如果参数中有分组,则为同一函数的多次调用, 函数注册的名称为分组拼接后的名称
func_group = "{}_{}".format(field_info["get_func"], field_info["group"])
temp_dict['func_group'] = func_group
func_temp[key_temp][func_group] = temp_dict
else:
# 如果参数中没有parameter则为数据库函数的参数
func_temp[key_temp][field_info["get_func"]] = temp_dict
else:
if "field" in field_info:
func_temp[key_temp][_func_name_temp]["field"].append(field_info["field"])
else:
if field_info["data_type"] == 'fixed':
logger.debug("字段来源为固定值")
fixed_temp[field_key] = field_info["content"]
logger.debug("字段来源不为数据库查询")
logger.debug(f'生成规则: {create_rules}')
logger.debug(f'调用函数相关配置: {func_temp}')
return True, create_rules, func_temp, fixed_temp
def _run_func(self, func_temp, limit, **kwargs):
logger.info("执行函数并返回执行后数据")
return_data = {}
for type_key, type_value in func_temp.items():
logger.info("获取数据库配置并动态创建数据库连接")
handler = None
run_func = None
table_group = None
match type_key:
case "postgresql":
sql_config = self.env_data[type_key]
logger.debug(f"数据库配置: {sql_config}")
handler = PostgresHandler(**sql_config)
table_group = sql_config["table_group"]
case "mysql":
sql_config = self.env_data[type_key]
logger.debug(f"数据库配置: {sql_config}")
handler = MySQLHandler(**sql_config)
case _:
sql_config = {}
handler = None
for func_key, func_value in type_value.items():
if handler:
logger.debug("动态注册函数字段与函数")
func_name = func_value["func_group"] if "func_group" in func_value else func_value["get_func"]
# 如果函数已注册,则不注册函数,直接执行
if not FunctionEnum.get_func_by_zh_name(func_name):
FunctionEnum.register_dynamic_function(
func_name,
"test_data.field_sqls.sql_run.sql_run"
)
logger.debug("已有数据库连接,执行已注册的动态函数获取数据库数据")
if "query" in func_value:
result, data = FunctionEnum.call_function(
func_name, handler, config=func_value["get_func"], limit=limit,
table_group=table_group,
data_from=sql_config["data_from"] if "data_from" in sql_config else None,
**func_value["query"],
**kwargs)
else:
result, data = FunctionEnum.call_function(
func_name, handler, config=func_value["get_func"], limit=limit,
table_group=table_group,
data_from=sql_config["data_from"] if "data_from" in sql_config else None,
**kwargs)
if result:
# 获取目标列的索引
column_indices = [data['columns'].index(col) for col in func_value["field"]]
# 提取对应的数据
filtered_data = []
for row in data['data']:
filtered_row = tuple(row[i] for i in column_indices)
filtered_data.append(filtered_row)
return_data[func_name] = {"columns": func_value["field"], "data": filtered_data}
else:
return False
else:
func_name = type_value["get_func"]
logger.debug(f"动态注册函数字段与函数 {func_name}")
# 如果函数已注册,则不注册函数,直接执行
if not FunctionEnum.get_func_by_zh_name(func_name):
FunctionEnum.register_dynamic_function(
func_name,
"test_data.field_sqls.{}.{}".format(type_value["get_func"], type_value["get_func"])
# 动态注册的函数路径
)
logger.debug("当前函数为执行调用,不涉及数据库")
result, data = FunctionEnum.call_function(func_name, limit=limit, **type_value["parameter"], **kwargs)
if result:
"""
根据匹配规则传输配置参数至调用函数根据limit数据生成指定条数的数据
"""
if func_name in return_data.keys():
logger.debug("如果其他字段配置在other执行结果中已有数据则添加到已有数据中")
return_data[type_value["field"][0]].append(data)
else:
logger.debug("如果其他字段配置在other执行结果中没有数据则添加other_func数据")
return_data[type_value["field"][0]] = data # 目标格式,[{"column": "", "data": [数据列表]}]
else:
return False
# 第一次数据生成完毕,检查数据之间的依赖关系 - 函数生成的数据,某两个字段之间存在关系
for type_key, type_value in func_temp.items():
logger.debug("循环查找是否有字段配置包含依赖关系")
if "dependency" in type_value:
__dependency_info = type_value["dependency"]
# 当前字段的生成数据
__filed_data = return_data[type_value["field"][0]]
# 依赖字段的生成数据
__dependency_data = return_data[__dependency_info["field"]]
for index in range(limit):
if not compare_values(__filed_data[index], __dependency_data[index], __dependency_info["correlation"]):
# 如果数据关系依赖判断失败,则需要重新注册函数执行
# 如果函数已注册,则不注册函数,直接执行
logger.debug(f"如果数据关系依赖判断失败,需要重新注册函数执行")
if not FunctionEnum.get_func_by_zh_name(type_value["get_func"]):
FunctionEnum.register_dynamic_function(
type_value["get_func"],
"test_data.field_sqls.{}.{}".format(type_value["get_func"], type_value["get_func"])
# 动态注册的函数路径
)
logger.debug("当前函数为执行调用,不涉及数据库")
type_value["parameter"]["start"] = __dependency_data[index]
result, data = FunctionEnum.call_function(type_value["get_func"], limit=1, **type_value["parameter"], **kwargs)
if result:
logger.debug(f"如果重新生成完毕,则更改依赖的值")
# 如果重新生成完毕,则更改依赖的值
return_data[type_value["field"][0]][index] = data[0]
return return_data
# if __name__ == '__main__':
# create_obj = DataCreationService("test")
# create_obj.create_excel("解放军军供站", business_line='RT', quarter='2025Q1',dt='20250316', limit=10)
# create_obj.create_excel("进口品过滤批次号", limit=10)
# create_obj.create_excel("类消费品折扣计划", business_line='RT', quarter='2025Q1',dt='20250316',limit=10)
# create_obj.create_excel("跨省白名单",dt='20250316',limit=10)
# create_obj.create_excel("互联网补偿",dt='20250316',limit=10)