#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2025/3/15 16:41 # @Author : AngesZhu # @File : data_creation_service.py # @Desc : 数据生成调用方法 import os.path import pandas as pd from service.get_data_demo_service import get_columns_from_channel from service.get_db_config import get_db_config from service.move_excel_service import copy_and_rename_file from service.read_field_config_service import field_configs from utils.compare_values_utils import compare_values from utils.data_file_utils import ExcelHandler from utils.enum_utils import FunctionEnum from utils.mysql_utils import MySQLHandler from utils.postgresql_utils import PostgresHandler from utils.logger_utils import logger # excel数据生成调用类 class DataCreationService: def __init__(self, env): # 获取环境数据库配置信心 self.env = env get_results, self.env_data = get_db_config(env) logger.info(f"初始化获取环境相关配置信息, {self.env_data}") def create_excel(self, channel_name: str, limit: int = 10, **kwargs): logger.info(f"根据渠道 {channel_name} 数据获取并生成导出excel文件") # 获取模版字段 result ,template_data = self._get_columns(channel_name) if not result: logger.error("获取渠道{}字段配置失败 {}".format(channel_name, template_data)) return False, template_data # 获取生成规则,调用函数相关配置,固定值参数配置 result, create_rules, func_temp, fixed_temp = self._field_config(channel_name) if not result: logger.error("获取生成规则失败 {}".format(template_data)) return result, create_rules func_data = self._run_func(func_temp, limit=limit, **kwargs) if not func_data: logger.error("执行函数失败 {}".format(template_data)) return False, func_data elif len(func_data) == 0: return False, "数据获取失败,未查询到数据" logger.info(f"获取数据成功,获取数据{len(func_data)}") # cv模版文件至channel_data,并重命名 template_path = os.path.join(template_data["path"], template_data["name"]) data_path = copy_and_rename_file(template_path, env=self.env, **kwargs) # 数据储存的路径 logger.debug(f"复制的文件储存的路径:{data_path}") # 根据获取的字段模版拼接数据 columns = template_data["columns"] dict_frame = {} for count in range(0, limit): logger.debug(f"index: {count}") data_line = {} for column in columns: # column 模版中字段名称 logger.debug(f"column: {column}") if column in create_rules: config = create_rules[column] # 字段生成规则 logger.debug(f"config: {config}") if "get_func" in config.keys(): logger.debug(f"字段数据为函数执行获取字段,{column}") # 如果函数名称在func_data里,则为数据库执行的函数结果,走dict # 如果字段名称在func_data里,则为调用函数执行走list if "group" in config: _temp_name = "{}_{}".format(config["get_func"], config["group"]) __func_data = func_data[_temp_name] if count>len(__func_data['data'])-1: # 长度和序号的起始值,导致比较时需要在长度的数据上-1 break __index = __func_data['columns'].index(config["field"]) # 获取字段在列表中的索引 if not __func_data['data']: logger.error(f"{config["get_func"]},获取数据内容为空") return False, "获取数据内容为空" data_line[column] = __func_data['data'][count][__index] elif config["get_func"] in func_data: __func_data = func_data[config["get_func"]] if count>len(__func_data['data'])-1: # 长度和序号的起始值,导致比较时需要在长度的数据上-1 break __index = __func_data['columns'].index(config["field"]) # 获取字段在列表中的索引 if not __func_data['data']: logger.error(f"{config["get_func"]},获取数据内容为空") return False, "获取数据内容为空" data_line[column] = __func_data['data'][count][__index] elif config["field"] in func_data: __func_data = func_data[config["field"]] __index = columns.index(column) if "number" in config["parameter"] and "percentage" in config and config["percentage"]: # 如果函数参数配置中,有数字,且数字有小数位要求 data_line[column] = f"{__func_data[count]}%" else: data_line[column] = __func_data[count] logger.debug(f"data_line: {data_line}") else: match config["data_type"]: case "excel_demo": logger.debug(f"字段数据为模版自动填充,{column}") # 当前设计,字段数据为模版自动填充,需要从模版数据中获取 # 获取字段在模版中的序号,获取模版中的数据 __index = columns.index(column) data_line[column] = template_data["values"][count][__index] case "fixed": logger.debug(f"字段数据为固定值字段,{column}") # 当前设计,如果不是函数中获取的,就是固定值 data_line[column] = config["content"] logger.debug(f"data_line: {data_line}") else: logger.error(f"字段 {column} 名称在配置中未找到") logger.error(f"字段配置信息 {create_rules}") return "字段名称配置异常" dict_frame[count] = data_line df = pd.DataFrame.from_dict(columns=columns, data=dict_frame, orient="index") # 写入channel_data中已复制好的excel文件中 data_obj = ExcelHandler(data_path) data_obj.write_excel(df, sheet_name=channel_name, mode='w', index=False) logger.info(f"根据渠道 {channel_name} 数据获取并生成导出excel文件成功,文件地址:{data_path}") def _get_columns(self, channel_name: str): columns = get_columns_from_channel(channel_name) return columns def _get_channel_config(self, channel_name: str): # 根据渠道获取字段配置信息 get_results, data = field_configs(channel_name) logger.info(f"根据渠道获取字段配置信息, {data}") if get_results: return True, data else: return False, data def _field_config(self, channel_name: str): # 获取字段配置信息,并处理字段配置信息 logger.info(f"获取字段配置信息") get_results, field_configs_data =self._get_channel_config(channel_name) if not get_results: return False, field_configs_data # 获取字段生成规则 logger.info(f"拼接整理字段生成规则") create_rules = {} func_temp = {} fixed_temp = {} for field_key, field_info in field_configs_data.get(channel_name).items(): field_info = field_info["info"] create_rules[field_key] = field_info # 判断生成规则中是否含有获取函数 if "get_func" in field_info: logger.debug("字段来源为数据库查询&函数") # 查询字段生成规则中是否有函数调用的函数名 key_temp = field_info["data_type"] # 如果配置不在函数规则中,则新建函数规则配置 if key_temp not in func_temp: func_temp[key_temp] = {} _func_name_temp = "{}_{}".format(field_info["get_func"], field_info["group"]) if "group" in field_info else field_info["get_func"] if _func_name_temp not in func_temp[key_temp]: # 如果类型对应函数配置中,没有对应函数配置, 则拼接函数配置并新增 temp_dict = { 'get_func': field_info["get_func"], 'field': [field_info["field"]] } if "parameter" in field_info: # 如果参数中有parameter,则为函数调用的参数 temp_dict['parameter'] = field_info["parameter"] if "dependency" in field_info: temp_dict['dependency'] = field_info["dependency"] func_temp[field_info["field"]] = temp_dict elif "group" in field_info: # 如果参数中有分组,则为同一函数的多次调用, 函数注册的名称为分组拼接后的名称 func_group = "{}_{}".format(field_info["get_func"], field_info["group"]) temp_dict['func_group'] = func_group func_temp[key_temp][func_group] = temp_dict else: # 如果参数中没有parameter,则为数据库函数的参数 func_temp[key_temp][field_info["get_func"]] = temp_dict else: if "field" in field_info: func_temp[key_temp][_func_name_temp]["field"].append(field_info["field"]) else: if field_info["data_type"] == 'fixed': logger.debug("字段来源为固定值") fixed_temp[field_key] = field_info["content"] logger.debug("字段来源不为数据库查询") logger.debug(f'生成规则: {create_rules}') logger.debug(f'调用函数相关配置: {func_temp}') return True, create_rules, func_temp, fixed_temp def _run_func(self, func_temp, limit, **kwargs): logger.info("执行函数并返回执行后数据") return_data = {} for type_key, type_value in func_temp.items(): logger.info("获取数据库配置并动态创建数据库连接") handler = None run_func = None table_group = None match type_key: case "postgresql": sql_config = self.env_data[type_key] logger.debug(f"数据库配置: {sql_config}") handler = PostgresHandler(**sql_config) table_group = sql_config["table_group"] case "mysql": sql_config = self.env_data[type_key] logger.debug(f"数据库配置: {sql_config}") handler = MySQLHandler(**sql_config) case _: sql_config = {} handler = None for func_key, func_value in type_value.items(): if handler: logger.debug("动态注册函数字段与函数") func_name = func_value["func_group"] if "func_group" in func_value else func_value["get_func"] # 如果函数已注册,则不注册函数,直接执行 if not FunctionEnum.get_func_by_zh_name(func_name): FunctionEnum.register_dynamic_function( func_name, "test_data.field_sqls.sql_run.sql_run" ) logger.debug("已有数据库连接,执行已注册的动态函数获取数据库数据") if "query" in func_value: result, data = FunctionEnum.call_function( func_name, handler, config=func_value["get_func"], limit=limit, table_group=table_group, data_from=sql_config["data_from"] if "data_from" in sql_config else None, **func_value["query"], **kwargs) else: result, data = FunctionEnum.call_function( func_name, handler, config=func_value["get_func"], limit=limit, table_group=table_group, data_from=sql_config["data_from"] if "data_from" in sql_config else None, **kwargs) if result: # 获取目标列的索引 column_indices = [data['columns'].index(col) for col in func_value["field"]] # 提取对应的数据 filtered_data = [] for row in data['data']: filtered_row = tuple(row[i] for i in column_indices) filtered_data.append(filtered_row) return_data[func_name] = {"columns": func_value["field"], "data": filtered_data} else: return False else: func_name = type_value["get_func"] logger.debug(f"动态注册函数字段与函数 {func_name}") # 如果函数已注册,则不注册函数,直接执行 if not FunctionEnum.get_func_by_zh_name(func_name): FunctionEnum.register_dynamic_function( func_name, "test_data.field_sqls.{}.{}".format(type_value["get_func"], type_value["get_func"]) # 动态注册的函数路径 ) logger.debug("当前函数为执行调用,不涉及数据库") result, data = FunctionEnum.call_function(func_name, limit=limit, **type_value["parameter"], **kwargs) if result: """ 根据匹配规则,传输配置参数至调用函数,根据limit数据生成指定条数的数据 """ if func_name in return_data.keys(): logger.debug("如果其他字段配置在other执行结果中已有数据,则添加到已有数据中") return_data[type_value["field"][0]].append(data) else: logger.debug("如果其他字段配置在other执行结果中没有数据,则添加other_func数据") return_data[type_value["field"][0]] = data # 目标格式,[{"column": "", "data": [数据列表]}] else: return False # 第一次数据生成完毕,检查数据之间的依赖关系 - 函数生成的数据,某两个字段之间存在关系 for type_key, type_value in func_temp.items(): logger.debug("循环查找是否有字段配置包含依赖关系") if "dependency" in type_value: __dependency_info = type_value["dependency"] # 当前字段的生成数据 __filed_data = return_data[type_value["field"][0]] # 依赖字段的生成数据 __dependency_data = return_data[__dependency_info["field"]] for index in range(limit): if not compare_values(__filed_data[index], __dependency_data[index], __dependency_info["correlation"]): # 如果数据关系依赖判断失败,则需要重新注册函数执行 # 如果函数已注册,则不注册函数,直接执行 logger.debug(f"如果数据关系依赖判断失败,需要重新注册函数执行") if not FunctionEnum.get_func_by_zh_name(type_value["get_func"]): FunctionEnum.register_dynamic_function( type_value["get_func"], "test_data.field_sqls.{}.{}".format(type_value["get_func"], type_value["get_func"]) # 动态注册的函数路径 ) logger.debug("当前函数为执行调用,不涉及数据库") type_value["parameter"]["start"] = __dependency_data[index] result, data = FunctionEnum.call_function(type_value["get_func"], limit=1, **type_value["parameter"], **kwargs) if result: logger.debug(f"如果重新生成完毕,则更改依赖的值") # 如果重新生成完毕,则更改依赖的值 return_data[type_value["field"][0]][index] = data[0] return return_data # if __name__ == '__main__': # create_obj = DataCreationService("test") # create_obj.create_excel("解放军军供站", business_line='RT', quarter='2025Q1',dt='20250316', limit=10) # create_obj.create_excel("进口品过滤批次号", limit=10) # create_obj.create_excel("类消费品折扣计划", business_line='RT', quarter='2025Q1',dt='20250316',limit=10) # create_obj.create_excel("跨省白名单",dt='20250316',limit=10) # create_obj.create_excel("互联网补偿",dt='20250316',limit=10)