blog/docs/code/highPython/nacos.md

21 KiB
Raw Permalink Blame History

title date tags categories author
Python如何使用Nacos 2020-11-04 17:08:14
Python
Nacos
Python
Nacos
Anges黎梦

前言

我看 nacos 主要是因为平台功能多之后,牵涉业务也很多。势必要进行服务拆分,

而拆分之后,需要对服务进行监控,配置中心进行管理。

这种情况下,因为 Python 没有成熟的微服务和分布式架构体系,就需要自己搭建。

多方对比之后,选定 nacos 作为注册中心以及配置中心,搭建微服务架构体系。

什么是 Nacos

详见 【Nacos简介】

Nacos Python SDK

sdk git 地址:

nacos-sdk-python

Nacos 操作

其实他操作很简单git 上的例子也很详细。

这里给大家介绍的是我的微服务架构体系第一版,借由 nacos 做的简单负载均衡以及服务接口调用基于Rest服务

其他 Nacos 操作看 sdk 中的 api 即可,我也给大家总结了一下。

简单架构图:

服务注册

服务注册,其实就是注册一个实例。

但是由于 Python 框架中没有相对应的方案。

这里我们其实要做的是:实例注册 + 持续心跳检测(服务保活)

使用方法为:

# 注册实例
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)

# 发送心跳
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)

这样我们的服务就注册成功了,在 nacos 里查看。

服务详细信息:

服务间调用Rest/HTTP

这里就可以直接看一下代码了,当然我也把其他操作都封装成了方法。

当然这也是初步的一个方案,后续要想保证效率和速度,还是需要用其他方法来实现。

class NacosUtils:
    """
    @Author: AngesZhu
    @desc: Nacos相关工具方法
    http://www.angeszhu.cn/
    """
    def __init__(self):
        logger.info('实例化Nacos操作方法')
        self.connect_bool = None
        try:
            logger.info('建立Nacos连接address{}namespace{}'.format(SERVER_ADDRESSES, NAMESPACE))
            self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
            self.connect_bool = True
            logger.info('建立Nacos连接成功')
        except Exception as ex:
            self.connect_bool = False
            logger.error("Nacos连接异常{}".format(ex))

    def get_config(self, data_id, group, no_snapshot=None, timeout: int = 10) -> dict:
        """
        获取配置
        :param data_id: data_id Data id.
        :param group: group Group, use DEFAULT_GROUP if no group specified.
        :param no_snapshot: no_snapshot Whether to use local snapshot while server is unavailable.
        :param timeout: timeout Timeout for requesting server in seconds.
        :return: 返回获取的配置信息
        """
        try:
            logger.info("获取配置,{}".format(data_id))
            res = self.client.get_config(data_id=data_id, group=group, no_snapshot=no_snapshot, timeout=timeout)
            logger.info("获取配置成功,{}".format(res))
            return {"code": 200, "data": res, "msg": "获取配置成功"}
        except Exception as ex:
            logger.error("获取配置异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "获取配置异常"}

    def publish_config(self, data_id, content, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
        """
        新增/更新配置
        :param data_id: data_id Data id.
        :param content: param content Config value.
        :param group: param group Group, use "DEFAULT_GROUP" if no group specified.
        :param timeout: param timeout Timeout for requesting server in seconds.
        :return: 返回新增/更新结果
        """
        try:
            logger.info("更新配置,{}".format(data_id))
            res = self.client.publish_config(data_id=data_id, group=group, content=content, timeout=timeout)
            logger.info("更新配置成功,{}".format(res))
            return {"code": 200, "data": res, "msg": "发布配置成功"}
        except Exception as ex:
            logger.error("发布配置异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "发布配置异常"}

    def remove_config(self, data_id, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
        """
        删除配置
        :param data_id: param data_id Data id.
        :param group: param group Group, use "DEFAULT_GROUP" if no group specified.
        :param timeout: param timeout Timeout for requesting server in seconds.
        :return: 返回删除结果
        """
        try:
            logger.info("删除配置,{}".format(data_id))
            res = self.client.remove_config(data_id=data_id, group=group, timeout=timeout)
            logger.info("删除配置成功,{}".format(res))
            return {"code": 200, "data": res, "msg": "删除配置成功"}
        except Exception as ex:
            logger.error("删除配置异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "删除配置异常"}

    def register_instance(self, data: str) -> dict:
        """
        注册实例
        :param data: 注册实例中的数据
        :return: 返回注册的结果
        """
        try:
            logger.info("注册服务:{}".format(SERVER_NAME))
            logger.debug("注册信息, 服务:{} IP{}PORT{}, ".format(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME))
            a = self.client.add_naming_instance(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, data, True, True, True)
            logger.info("服务注册成功,注册结果:{}".format(a))
            return {"code": 200, "data": a, "msg": "服务注册成功"}
        except Exception as ex:
            logger.error("服务注册异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "服务注册异常"}

    def instance_heart(self):
        """
        发送心跳实例
        :return: 返回注册的结果
        """
        try:
            logger.info("为服务{}发送心跳检测".format(SERVER_NAME))
            self.client.send_heartbeat(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, "{}")
        except Exception as ex:
            logger.info("为服务{}发送心跳检测异常".format(SERVER_NAME))
            logger.error("异常:{}".format(ex))

    def get_instance(self, instance_name) -> dict:
        """
        获取实例信息
        :param instance_name: service_name required Service name to query.
        :return: 返回实例信息
        """
        try:
            logger.info("获取服务信息:{}".format(instance_name))
            instance_info = self.client.list_naming_instance(instance_name)
            logger.info("获取服务信息成功,服务信息:{}".format(instance_info))
            return {"code": 200, "data": instance_info, "msg": "服务信息获取成功"}
        except Exception as ex:
            logger.error("服务信息获取异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "服务信息获取异常"}

    def method_invoke(self, instance_name, method, uri, **params) -> dict:
        try:
            logger.info("请求指定服务({})中的方法:{}".format(instance_name, uri))
            logger.info("获取服务信息")
            get_res = self.get_instance(instance_name)
            if get_res["code"] == 110:
                return {"code": 110, "data": get_res["data"], "msg": "服务信息获取异常"}
            instance_info = get_res["data"]

            check_res = self._instance_info_check__(instance_info)
            if check_res["code"] == 110:
                return {"code": 110, "data": check_res["data"], "msg": "服务信息校验异常"}

            choose_res = self._choose_dom(instance_info["hosts"])
            if choose_res["code"] == 110:
                return {"code": 110, "data": choose_res["data"], "msg": "节点选择异常"}
            dom_info = choose_res["data"]

            method = method.upper()
            data_dispose = lambda data: dict(data) if isinstance(data, str) else data.__dict__ if isinstance(data, BaseModel) else data
            re_data = {
                "method": method,
                "url": "http://{}:{}{}".format(dom_info["ip"], dom_info["port"], uri if not params.__contains__("uri") else uri.format(data_dispose(params["uri"]))),
            }
            if params.__contains__("headers"):
                re_data["headers"] = data_dispose(params["headers"])
                if not re_data["headers"].__contains__("Content-Type"):
                    re_data["headers"]["Content-Type"] = "application/json"
            # else:
                # ReHeader["Content-Type"] = "application/json"
                # re_data["headers"] = ReHeader
            if method == "GET":
                if params.__contains__("data"):
                    re_data["params"] = data_dispose(params["data"])
            elif method in ["POST"]:
                if params.__contains__("form"):
                    re_data["form"] = data_dispose(params["form"])
                    re_data["Content-Type"] = "multipart/form-data"
                elif params.__contains__("data"):
                    re_data["data"] = data_dispose(params["data"])
            elif method == "PUT":
                if params.__contains__("data"):
                    re_data["data"] = data_dispose(params["data"])
            elif method in ["DELETE", "PATCH"]:
                if params.__contains__("data"):
                    re_data["json"] = data_dispose(params["data"])

            res = requests.request(**re_data, verify=False)
            if res.status_code != 200:
                logger.error("请求异常,{}".format(res.text))
                return {"code": 110, "data": res.text, "msg": "请求异常"}
            logger.info("请求成功")
            return {"code": 200, "data": res.text, "msg": "请求成功"}
        except Exception as ex:
            logger.error("方法调用异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "方法调用异常"}

    def _instance_info_check__(self, info: dict) -> dict:
        """
        实例数据检查
        :param info: 检查的信息
        :return: 返回检查结果
        """
        try:
            logger.info("服务信息校验")
            if "dom" in info.keys() and len(info["hosts"]) != 0:
                if abs(int(info["lastRefTime"]) - time.time()*1000) > 3600*1000:
                    logger.error("无效的服务信息:{}".format(info))
                    return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
                info["hosts"] = [i for i in info["hosts"] if i["enabled"] and i["healthy"]]
                if len(info["hosts"]) == 0:
                    logger.error("无效的服务信息:{}".format(info))
                    return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
                logger.info("服务信息校验通过")
                return {"code": 200, "data": True, "msg": "服务信息校验通过"}
            else:
                logger.error("无效的服务信息:{}".format(info))
                return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
        except Exception as ex:
            logger.error("服务信息校验异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "服务信息校验异常"}

    def _choose_dom(self, info: list) -> dict:
        """
        选择节点,负载均衡
        :param info: 节点列表
        :return: 返回选中节点数据
        """
        try:
            logger.info("选择节点,负载均衡,节点数据:{}".format(info))
            b = len(info)-1
            index = random.randint(0, len(info) - 1)
            logger.info("节点选择成功,已选数据:{}".format(info[index]))
            return {"code": 200, "data": info[index], "msg": "节点选择成功"}
        except Exception as ex:
            logger.error("节点选择异常:{}".format(ex))
            return {"code": 110, "data": ex, "msg": "节点选择异常"}

Nacos 连接

语法:

client = NacosClient(server_addresses, namespace=your_ns)

代码示例:

# http://www.angeszhu.cn/
logger.info('建立Nacos连接address{}namespace{}'.format(SERVER_ADDRESSES, NAMESPACE))
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
self.connect_bool = True
  • server_addresses -需要 - NACOS服务器地址如果逗号超过1分离。

  • 名称空间-命名空间。| 默认None

额外的选择

可以通过设置额外的选项 set_options,如下所示:

client.set_options({key}={value})

可配置的选项包括:

  • default_timeout-从服务器获取配置的默认超时,以秒为单位。

  • pull_timeout-长轮询超时(以秒为单位)。

  • pull_config_size-一个轮询过程监听的最大配置项数。

  • callback_thread_num-调用回调的并发性。

  • failover_base-用于存储故障转移配置文件的目录。

  • snapshot_base-用于存储快照配置文件的目录。

  • no_snapshot -禁用默认的快照行为这可以通过PARAM覆盖no_snapshot在GET方法。

  • 代理-Dict代理映射某些环境需要代理访问因此您可以设置此参数从而使HTTP请求通过代理。

Api 参考

获取配置

NacosClient.get_config(data_id, group, timeout, no_snapshot)
  • param data_id数据ID。

  • param 组组DEFAULT_GROUP如果未指定组则使用。

  • param timeout请求服务器的超时时间以秒为单位

  • param no_snapshot服务器不可用时是否使用本地快照。

  • return W按照优先级获取一个配置项的值

  • 步骤1-从本地故障转移目录获取(默认值:)${cwd}/nacos-data/data。

    • 可以预先从快照目录(默认值:)手动复制故障转移目录${cwd}/nacos-data/snapshot。

    • 这有助于抑制已知服务器故障的影响。

  • 步骤2-从一台服务器获取,直到获得价值或尝试所有服务器。

    • 从服务器获取内容后,内容将保存到快照目录。
  • 步骤3-从快照目录获取。

添加观察者

NacosClient.add_config_watchers(data_id, group, cb_list)
  • param data_id数据ID。

  • param 组组DEFAULT_GROUP如果未指定组则使用。

  • param cb_list要添加的回调函数列表。

  • return

将观察者添加到指定的配置项。

  • 一旦更改或删除项目,将调用回调函数。

  • 如果该项目已经存在于服务器中,则回调函数将被调用一次。

  • 允许在一项上进行多个回调并且所有回调函数由并发调用threading.Thread。

  • 从当前进程中调用回调函数。

删除观察者

NacosClient.remove_config_watcher(data_id, group, cb, remove_all)
  • param data_id数据ID。

  • param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。

  • param cb删除回调函数。

  • param remove_all是删除所有出现的回调还是仅删除一次。

  • return

从指定键中删除观察者。

发布配置

NacosClient.publish_config(data_id, group, content, timeout)
  • param data_id数据ID。

  • param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。

  • param 内容配置值。

  • param timeout请求服务器的超时时间以秒为单位

  • return 如果将引发成功或异常则为true。

将一个数据项发布到Nacos。

  • 如果数据密钥不存在,请首先创建一个。

  • 如果数据密钥存在,请更新到指定的内容。

  • 内容不能设置为无,如果需要删除配置项,请使用功能删除。

删除配置

NacosClient.remove_config(data_id, group, timeout)
  • param data_id数据ID。

  • param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。

  • param timeout请求服务器的超时时间以秒为单位

  • return 如果将引发成功或异常则为true。

从Nacos中删除一个数据项。

注册实例

NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
  • param service_name必 需要注册的服务名称。

  • param ip 实例的必需IP。

  • param 端口 所需实例的端口。

  • param cluster_name要注册的集群。

  • param weight一个用于负载均衡重量的浮点数。

  • param 元数据JSON字符串格式或dict格式的额外信息

  • param enable布尔值用于确定是否启用实例。

  • param 正常的布尔值,用于确定实例是否正常。

  • param 短暂的布尔值,用于确定实例是否短暂。

  • return 如果将引发成功或异常则为true。

注销实例

NacosClient.remove_naming_instance(service_name, ip, port, cluster_name)
  • param service_name必 需要注销的服务名称。

  • param ip 实例的必需IP。

  • param 端口 所需实例的端口。

  • param cluster_name要注销的集群。

  • param 短暂的布尔值,用于确定实例是否短暂。

  • return 如果将引发成功或异常则为true。

修改实例

NacosClient.modify_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable)
  • param service_name 必需的服务名称。

  • param ip 实例的必需IP。

  • param 端口 所需实例的端口。

  • param cluster_name集群名称。

  • param weight一个用于负载均衡重量的浮点数。

  • param 元数据JSON字符串格式或dict格式的额外信息。

  • param enable布尔值用于确定是否启用实例。

  • param 短暂的布尔值,用于确定实例是否短暂。

  • return 如果将引发成功或异常则为true。

查询实例

NacosClient.list_naming_instance(service_name, clusters, namespace_id, group_name, healthy_only)
  • param service_name必 需要查询的服务名称。

  • param 群集群集名称以逗号分隔。

  • param namespace_id自定义的组名默认值blank。

  • param group_name自定义的组名默认值DEFAULT_GROUP。

  • param Healthy_only一个用于查询健康实例或非健康实例的布尔值。

  • return 实例信息列表,如果将引发成功或异常。

查询实例详细信息

NacosClient.get_naming_instance(service_name, ip, port, cluster_name)
  • param service_name 必需的服务名称。

  • param ip 实例的必需IP。

  • param 端口 所需实例的端口。

  • param cluster_name集群名称。

  • return 实例信息,如果将引发成功或异常。

发送实例节拍

NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
  • param service_name 必需的服务名称。

  • param ip 实例的必需IP。

  • param 端口 所需实例的端口。

  • param cluster_name要注册的集群。

  • param weight一个用于负载均衡重量的浮点数。

  • param 短暂的布尔值,用于确定实例是否短暂。

  • param 元数据JSON字符串格式或dict格式的额外信息。

  • return 如果成功或出现异常则JSON对象包括服务器建议的拍子间隔。

订阅服务实例已更改

NacosClient.subscribe(listener_fn, listener_interval=7, *args, **kwargs)
  • param listener_fn 必需自定义的侦听器功能。

  • param listener_interval 监听间隔默认为7秒。

  • param service_name 必需订阅的服务名称。

  • param 群集群集名称以逗号分隔。

  • param namespace_id自定义的组名默认值blank。

  • param group_name自定义的组名默认值DEFAULT_GROUP。

  • param Healthy_only一个用于查询健康实例或非健康实例的布尔值。

  • return

退订服务实例已更改

NacosClient.unsubscribe(service_name, listener_name)
  • param service_name必 需要订阅的服务名称。
  • param listener_name listener_name是自定义的。
  • return

停止所有服务订阅

NacosClient.stop_subscribe()
  • return

调试模式

调试模式(如果用于获取更详细的控制台登录信息)。

可以通过以下方式设置调试模式:

NacosClient.set_debugging()
# only effective within the current process