#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2025/3/15 21:38 # @Author : AngesZhu # @File : read_field_config_service.py # @Desc : 读取目录中yaml文件的字段配置 from utils.path_utils import PathOperator from utils.yaml_utils import YAMLOperator from utils.logger_utils import logger def field_configs(channel: str = None): """ 1. 获取yaml目录下的层级文件和文件名称、路径 2. 轮询查询并获取配置文件中的信息 :return: 返回读取后拼接的数据 """ # 路径获取并获取文件信息 logger.info(f"获取渠道{channel} 对应的字段配置信息") path_operator = PathOperator() base_path = path_operator.get_parent_path(path_operator.get_full_path()) field_rules_path = path_operator.join_path(base_path, "test_data", "field_config") config_list = path_operator.get_all_paths_recursive(field_rules_path) config_data = {} # 判断目录并获取字段配置 if channel and channel in config_list: # 渠道存在且在文件夹目录内 config = config_list[channel] field_data = {} for file_name in config["files"]: yaml_operator = YAMLOperator("{}/{}".format(config["path"], file_name)) channel_name = yaml_operator.get_value("field.name") field_data[channel_name] = yaml_operator.data config_data[channel] = field_data return True, config_data elif channel: # 渠道存在且为单独表格配置文件 file_list = config_list["field_config"]["files"] field_data = {} fine_name = None for temp_name in file_list: # 如果渠道名称在当前目录下存在 if channel in temp_name: fine_name = temp_name if fine_name: yaml_operator = YAMLOperator("{}/{}".format(field_rules_path,fine_name)) fields_data = yaml_operator.get_value("fields") for temp_data in fields_data: temp_key = temp_data["field"]["name"] field_data[temp_key] = temp_data config_data[channel] = field_data return True, config_data # for channel_channel, config in config_list.items(): # field_data = {} # for file_name in config["files"]: # yaml_operator = YAMLOperator("{}/{}".format(config["path"], file_name)) # channel_name = yaml_operator.get_value("field.name") # field_data[channel_name] = yaml_operator.data # config_data[channel_channel] = field_data # return True, config_data if __name__ == '__main__': data = field_configs("解放军军供站") print(data)