#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2025/3/25 13:43 # @Author : AngesZhu # @File : sql_run.py # @Desc : postgresql数据库执行方法封装 from typing import Any from utils.logger_utils import logger from utils.path_utils import PathOperator from utils.yaml_utils import YAMLOperator def sql_run(sql_handler: Any, config: str, table_group: str = None, data_from: str = None, **kwargs) -> Any: # sql配置封装获取 -> 执行 exe_sql = "" results = "" try: # 获取sql配置信息 logger.info("获取sql配置信息,{}".format(config)) config_info = __get_config(config) exe_sql = __montage_sql(config_info, kwargs, table_group, data_from) results = sql_handler.execute_query(exe_sql) logger.debug("Query len:{}".format(len(results["data"]))) logger.info(f"Query Results: {results}") return True, results except Exception as e: logger.error("Error SQL: {}".format(exe_sql)) logger.error("Query Error: {}".format(e)) return False, e # finally: # sql_handler.close() def __get_config(file_name: str) -> dict[str, Any]: path_operator = PathOperator() base_path = path_operator.get_parent_path(path_operator.get_full_path()) # base_path = path_operator.get_full_path() full_path = path_operator.join_path(base_path, "test_data", "field_sqls","sql_config") yaml_operator = YAMLOperator("{}/{}.yaml".format(full_path, file_name)) config_info = yaml_operator.get_value("config") return config_info def __montage_sql(config_info: dict[str, Any], kwargs: dict, table_group: str = None, data_from: str = None) -> str: logger.info("开始拼接并执行{}:{}".format(config_info["type"], config_info["desc"])) logger.debug("拼接额外参数执行sql") optional_sql = "" if "optional_parameters" in config_info and len(config_info["optional_parameters"]) >= 1: o_variables = {} temp_parameters = config_info["optional_parameters"] for o_parameter in temp_parameters: # 如果参数在可变参数内,获取可变参数的键对值 if isinstance(o_parameter, str): if o_parameter in kwargs: o_variables[o_parameter] = kwargs[o_parameter] else: logger.debug(f"查询参数{o_parameter}不在传输列表中") elif isinstance(o_parameter, dict): for key, value in o_parameter.items(): if key in kwargs: o_variables[key]=kwargs[key] else: o_variables[key] = value optional_sql = "and ".join(f"{key}={value}" for key, value in o_variables.items()) must_variables = {} if isinstance(config_info["table"], str): # 如果数据表配置了单表 match config_info["type"]: case "postgresql": if data_from: must_variables["table"] = f'{table_group}.clone_{config_info["table"]}' if data_from == "clone" else f'{table_group}.{config_info["table"]}' else: must_variables["table"] = f'{table_group}.{config_info["table"]}' case "mysql": must_variables["table"] = config_info["table"] elif isinstance(config_info["table"], dict): # 如果数据表配置了多个 match config_info["type"]: case "postgresql": if data_from: for key, table in config_info["table"].items(): must_variables[key] = f'{table_group}.clone_{table}' if data_from == "clone" else f'{table_group}.{table}' else: for key, table in config_info["table"].items(): must_variables[key] = f'{table_group}.{table}' case "mysql": for key, table in config_info["table"].items(): must_variables[key] = table if "must_parameter" in config_info and len(config_info["must_parameter"]) >= 1: for m_parameter in config_info["must_parameter"]: if isinstance(m_parameter, str): if m_parameter in kwargs: must_variables[m_parameter] = kwargs[m_parameter] else: logger.debug(f"查询参数{m_parameter}不在传输列表中") elif isinstance(m_parameter, dict): for key, value in m_parameter.items(): if key in kwargs: must_variables[key]=kwargs[key] else: must_variables[key] = value must_variables["other"] = optional_sql if optional_sql else "" exe_sql = config_info["sql_str"] exe_sql = exe_sql.format(**must_variables) logger.info(f'已拼接好的执行sql:{exe_sql}') return exe_sql if __name__ == '__main__': print(__get_config("get_package_data"))