--- title: Python如何使用Nacos date: 2020-11-04 17:08:14 tags: [Python, Nacos] categories: [Python, Nacos] author: Anges黎梦 --- ### 前言 > 我看 nacos 主要是因为平台功能多之后,牵涉业务也很多。势必要进行服务拆分, > >而拆分之后,需要对服务进行监控,配置中心进行管理。 > >这种情况下,因为 Python 没有成熟的微服务和分布式架构体系,就需要自己搭建。 > >多方对比之后,选定 nacos 作为注册中心以及配置中心,搭建微服务架构体系。 ### 什么是 Nacos? 详见 [【Nacos简介】](http://www.angeszhu.cn/middleware/nacos.html) ### Nacos Python SDK **sdk git 地址:** [nacos-sdk-python](https://github.com/nacos-group/nacos-sdk-python) ### Nacos 操作 > 其实他操作很简单,git 上的例子也很详细。 > > 这里给大家介绍的是我的微服务架构体系第一版,借由 nacos 做的简单负载均衡以及服务接口调用(基于Rest服务)。 > > 其他 Nacos 操作看 sdk 中的 api 即可,我也给大家总结了一下。 **简单架构图:** ![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/888.png) #### 服务注册 服务注册,其实就是注册一个实例。 但是由于 Python 框架中没有相对应的方案。 这里我们其实要做的是:实例注册 + 持续心跳检测(服务保活) 使用方法为: ```python # 注册实例 NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy) # 发送心跳 NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata) ``` 这样我们的服务就注册成功了,在 nacos 里查看。 ![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/999.png) **服务详细信息:** ![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/000.png) #### 服务间调用(Rest/HTTP) ![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/111111.png) 这里就可以直接看一下代码了,当然我也把其他操作都封装成了方法。 当然这也是初步的一个方案,后续要想保证效率和速度,还是需要用其他方法来实现。 ```python class NacosUtils: """ @Author: AngesZhu @desc: Nacos相关工具方法 http://www.angeszhu.cn/ """ def __init__(self): logger.info('实例化Nacos操作方法') self.connect_bool = None try: logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE)) self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE) self.connect_bool = True logger.info('建立Nacos连接成功') except Exception as ex: self.connect_bool = False logger.error("Nacos连接异常:{}".format(ex)) def get_config(self, data_id, group, no_snapshot=None, timeout: int = 10) -> dict: """ 获取配置 :param data_id: data_id Data id. :param group: group Group, use DEFAULT_GROUP if no group specified. :param no_snapshot: no_snapshot Whether to use local snapshot while server is unavailable. :param timeout: timeout Timeout for requesting server in seconds. :return: 返回获取的配置信息 """ try: logger.info("获取配置,{}".format(data_id)) res = self.client.get_config(data_id=data_id, group=group, no_snapshot=no_snapshot, timeout=timeout) logger.info("获取配置成功,{}".format(res)) return {"code": 200, "data": res, "msg": "获取配置成功"} except Exception as ex: logger.error("获取配置异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "获取配置异常"} def publish_config(self, data_id, content, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict: """ 新增/更新配置 :param data_id: data_id Data id. :param content: param content Config value. :param group: param group Group, use "DEFAULT_GROUP" if no group specified. :param timeout: param timeout Timeout for requesting server in seconds. :return: 返回新增/更新结果 """ try: logger.info("更新配置,{}".format(data_id)) res = self.client.publish_config(data_id=data_id, group=group, content=content, timeout=timeout) logger.info("更新配置成功,{}".format(res)) return {"code": 200, "data": res, "msg": "发布配置成功"} except Exception as ex: logger.error("发布配置异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "发布配置异常"} def remove_config(self, data_id, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict: """ 删除配置 :param data_id: param data_id Data id. :param group: param group Group, use "DEFAULT_GROUP" if no group specified. :param timeout: param timeout Timeout for requesting server in seconds. :return: 返回删除结果 """ try: logger.info("删除配置,{}".format(data_id)) res = self.client.remove_config(data_id=data_id, group=group, timeout=timeout) logger.info("删除配置成功,{}".format(res)) return {"code": 200, "data": res, "msg": "删除配置成功"} except Exception as ex: logger.error("删除配置异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "删除配置异常"} def register_instance(self, data: str) -> dict: """ 注册实例 :param data: 注册实例中的数据 :return: 返回注册的结果 """ try: logger.info("注册服务:{}".format(SERVER_NAME)) logger.debug("注册信息, 服务:{}, IP:{},PORT:{}, ".format(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME)) a = self.client.add_naming_instance(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, data, True, True, True) logger.info("服务注册成功,注册结果:{}".format(a)) return {"code": 200, "data": a, "msg": "服务注册成功"} except Exception as ex: logger.error("服务注册异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "服务注册异常"} def instance_heart(self): """ 发送心跳实例 :return: 返回注册的结果 """ try: logger.info("为服务{}发送心跳检测".format(SERVER_NAME)) self.client.send_heartbeat(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, "{}") except Exception as ex: logger.info("为服务{}发送心跳检测异常".format(SERVER_NAME)) logger.error("异常:{}".format(ex)) def get_instance(self, instance_name) -> dict: """ 获取实例信息 :param instance_name: service_name required Service name to query. :return: 返回实例信息 """ try: logger.info("获取服务信息:{}".format(instance_name)) instance_info = self.client.list_naming_instance(instance_name) logger.info("获取服务信息成功,服务信息:{}".format(instance_info)) return {"code": 200, "data": instance_info, "msg": "服务信息获取成功"} except Exception as ex: logger.error("服务信息获取异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "服务信息获取异常"} def method_invoke(self, instance_name, method, uri, **params) -> dict: try: logger.info("请求指定服务({})中的方法:{}".format(instance_name, uri)) logger.info("获取服务信息") get_res = self.get_instance(instance_name) if get_res["code"] == 110: return {"code": 110, "data": get_res["data"], "msg": "服务信息获取异常"} instance_info = get_res["data"] check_res = self._instance_info_check__(instance_info) if check_res["code"] == 110: return {"code": 110, "data": check_res["data"], "msg": "服务信息校验异常"} choose_res = self._choose_dom(instance_info["hosts"]) if choose_res["code"] == 110: return {"code": 110, "data": choose_res["data"], "msg": "节点选择异常"} dom_info = choose_res["data"] method = method.upper() data_dispose = lambda data: dict(data) if isinstance(data, str) else data.__dict__ if isinstance(data, BaseModel) else data re_data = { "method": method, "url": "http://{}:{}{}".format(dom_info["ip"], dom_info["port"], uri if not params.__contains__("uri") else uri.format(data_dispose(params["uri"]))), } if params.__contains__("headers"): re_data["headers"] = data_dispose(params["headers"]) if not re_data["headers"].__contains__("Content-Type"): re_data["headers"]["Content-Type"] = "application/json" # else: # ReHeader["Content-Type"] = "application/json" # re_data["headers"] = ReHeader if method == "GET": if params.__contains__("data"): re_data["params"] = data_dispose(params["data"]) elif method in ["POST"]: if params.__contains__("form"): re_data["form"] = data_dispose(params["form"]) re_data["Content-Type"] = "multipart/form-data" elif params.__contains__("data"): re_data["data"] = data_dispose(params["data"]) elif method == "PUT": if params.__contains__("data"): re_data["data"] = data_dispose(params["data"]) elif method in ["DELETE", "PATCH"]: if params.__contains__("data"): re_data["json"] = data_dispose(params["data"]) res = requests.request(**re_data, verify=False) if res.status_code != 200: logger.error("请求异常,{}".format(res.text)) return {"code": 110, "data": res.text, "msg": "请求异常"} logger.info("请求成功") return {"code": 200, "data": res.text, "msg": "请求成功"} except Exception as ex: logger.error("方法调用异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "方法调用异常"} def _instance_info_check__(self, info: dict) -> dict: """ 实例数据检查 :param info: 检查的信息 :return: 返回检查结果 """ try: logger.info("服务信息校验") if "dom" in info.keys() and len(info["hosts"]) != 0: if abs(int(info["lastRefTime"]) - time.time()*1000) > 3600*1000: logger.error("无效的服务信息:{}".format(info)) return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"} info["hosts"] = [i for i in info["hosts"] if i["enabled"] and i["healthy"]] if len(info["hosts"]) == 0: logger.error("无效的服务信息:{}".format(info)) return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"} logger.info("服务信息校验通过") return {"code": 200, "data": True, "msg": "服务信息校验通过"} else: logger.error("无效的服务信息:{}".format(info)) return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"} except Exception as ex: logger.error("服务信息校验异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "服务信息校验异常"} def _choose_dom(self, info: list) -> dict: """ 选择节点,负载均衡 :param info: 节点列表 :return: 返回选中节点数据 """ try: logger.info("选择节点,负载均衡,节点数据:{}".format(info)) b = len(info)-1 index = random.randint(0, len(info) - 1) logger.info("节点选择成功,已选数据:{}".format(info[index])) return {"code": 200, "data": info[index], "msg": "节点选择成功"} except Exception as ex: logger.error("节点选择异常:{}".format(ex)) return {"code": 110, "data": ex, "msg": "节点选择异常"} ``` ### Nacos 连接 **语法:** ```python client = NacosClient(server_addresses, namespace=your_ns) ``` **代码示例:** ```python # http://www.angeszhu.cn/ logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE)) self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE) self.connect_bool = True ``` - server_addresses -需要 - NACOS服务器地址,如果逗号超过1分离。 - 名称空间-命名空间。| 默认:None #### 额外的选择 可以通过设置额外的选项 `set_options`,如下所示: ```python client.set_options({key}={value}) ``` **可配置的选项包括:** - default_timeout-从服务器获取配置的默认超时,以秒为单位。 - pull_timeout-长轮询超时(以秒为单位)。 - pull_config_size-一个轮询过程监听的最大配置项数。 - callback_thread_num-调用回调的并发性。 - failover_base-用于存储故障转移配置文件的目录。 - snapshot_base-用于存储快照配置文件的目录。 - no_snapshot -禁用默认的快照行为,这可以通过PARAM覆盖no_snapshot在GET方法。 - 代理-Dict代理映射,某些环境需要代理访问,因此您可以设置此参数,从而使HTTP请求通过代理。 ### Api 参考 #### 获取配置 ```python NacosClient.get_config(data_id, group, timeout, no_snapshot) ``` - param data_id数据ID。 - param 组组,DEFAULT_GROUP如果未指定组,则使用。 - param timeout请求服务器的超时时间(以秒为单位)。 - param no_snapshot服务器不可用时是否使用本地快照。 - return W按照优先级获取一个配置项的值: - 步骤1-从本地故障转移目录获取(默认值:)${cwd}/nacos-data/data。 - 可以预先从快照目录(默认值:)手动复制故障转移目录${cwd}/nacos-data/snapshot。 - 这有助于抑制已知服务器故障的影响。 - 步骤2-从一台服务器获取,直到获得价值或尝试所有服务器。 - 从服务器获取内容后,内容将保存到快照目录。 - 步骤3-从快照目录获取。 #### 添加观察者 ```python NacosClient.add_config_watchers(data_id, group, cb_list) ``` - param data_id数据ID。 - param 组组,DEFAULT_GROUP如果未指定组,则使用。 - param cb_list要添加的回调函数列表。 - return **将观察者添加到指定的配置项。** - 一旦更改或删除项目,将调用回调函数。 - 如果该项目已经存在于服务器中,则回调函数将被调用一次。 - 允许在一项上进行多个回调,并且所有回调函数由并发调用threading.Thread。 - 从当前进程中调用回调函数。 #### 删除观察者 ```python NacosClient.remove_config_watcher(data_id, group, cb, remove_all) ``` - param data_id数据ID。 - param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。 - param cb删除回调函数。 - param remove_all是删除所有出现的回调还是仅删除一次。 - return 从指定键中删除观察者。 #### 发布配置 ```python NacosClient.publish_config(data_id, group, content, timeout) ``` - param data_id数据ID。 - param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。 - param 内容配置值。 - param timeout请求服务器的超时时间(以秒为单位)。 - return 如果将引发成功或异常,则为true。 **将一个数据项发布到Nacos。** - 如果数据密钥不存在,请首先创建一个。 - 如果数据密钥存在,请更新到指定的内容。 - 内容不能设置为无,如果需要删除配置项,请使用功能删除。 #### 删除配置 ```python NacosClient.remove_config(data_id, group, timeout) ``` - param data_id数据ID。 - param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。 - param timeout请求服务器的超时时间(以秒为单位)。 - return 如果将引发成功或异常,则为true。 从Nacos中删除一个数据项。 #### 注册实例 ```python NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy) ``` - param service_name必 需要注册的服务名称。 - param ip 实例的必需IP。 - param 端口 所需实例的端口。 - param cluster_name要注册的集群。 - param weight一个用于负载均衡重量的浮点数。 - param 元数据JSON字符串格式或dict格式的额外信息 - param enable布尔值,用于确定是否启用实例。 - param 正常的布尔值,用于确定实例是否正常。 - param 短暂的布尔值,用于确定实例是否短暂。 - return 如果将引发成功或异常,则为true。 #### 注销实例 ```python NacosClient.remove_naming_instance(service_name, ip, port, cluster_name) ``` - param service_name必 需要注销的服务名称。 - param ip 实例的必需IP。 - param 端口 所需实例的端口。 - param cluster_name要注销的集群。 - param 短暂的布尔值,用于确定实例是否短暂。 - return 如果将引发成功或异常,则为true。 #### 修改实例 ```python NacosClient.modify_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable) ``` - param service_name 必需的服务名称。 - param ip 实例的必需IP。 - param 端口 所需实例的端口。 - param cluster_name集群名称。 - param weight一个用于负载均衡重量的浮点数。 - param 元数据JSON字符串格式或dict格式的额外信息。 - param enable布尔值,用于确定是否启用实例。 - param 短暂的布尔值,用于确定实例是否短暂。 - return 如果将引发成功或异常,则为true。 #### 查询实例 ```python NacosClient.list_naming_instance(service_name, clusters, namespace_id, group_name, healthy_only) ``` - param service_name必 需要查询的服务名称。 - param 群集群集名称以逗号分隔。 - param namespace_id自定义的组名,默认值blank。 - param group_name自定义的组名,默认值DEFAULT_GROUP。 - param Healthy_only一个用于查询健康实例或非健康实例的布尔值。 - return 实例信息列表,如果将引发成功或异常。 #### 查询实例详细信息 ```python NacosClient.get_naming_instance(service_name, ip, port, cluster_name) ``` - param service_name 必需的服务名称。 - param ip 实例的必需IP。 - param 端口 所需实例的端口。 - param cluster_name集群名称。 - return 实例信息,如果将引发成功或异常。 #### 发送实例节拍 ```python NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata) ``` - param service_name 必需的服务名称。 - param ip 实例的必需IP。 - param 端口 所需实例的端口。 - param cluster_name要注册的集群。 - param weight一个用于负载均衡重量的浮点数。 - param 短暂的布尔值,用于确定实例是否短暂。 - param 元数据JSON字符串格式或dict格式的额外信息。 - return 如果成功或出现异常,则JSON对象包括服务器建议的拍子间隔。 #### 订阅服务实例已更改 ```python NacosClient.subscribe(listener_fn, listener_interval=7, *args, **kwargs) ``` - param listener_fn 必需自定义的侦听器功能。 - param listener_interval 监听间隔,默认为7秒。 - param service_name 必需订阅的服务名称。 - param 群集群集名称以逗号分隔。 - param namespace_id自定义的组名,默认值blank。 - param group_name自定义的组名,默认值DEFAULT_GROUP。 - param Healthy_only一个用于查询健康实例或非健康实例的布尔值。 - return #### 退订服务实例已更改 ```python NacosClient.unsubscribe(service_name, listener_name) ``` - param service_name必 需要订阅的服务名称。 - param listener_name listener_name是自定义的。 - return #### 停止所有服务订阅 ```python NacosClient.stop_subscribe() ``` - return ### 调试模式 调试模式(如果用于获取更详细的控制台登录信息)。 可以通过以下方式设置调试模式: ```python NacosClient.set_debugging() # only effective within the current process ```