blog/docs/code/highPython/nacos.md

626 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: Python如何使用Nacos
date: 2020-11-04 17:08:14
tags: [Python, Nacos]
categories: [Python, Nacos]
author: Anges黎梦
---
### 前言
> 我看 nacos 主要是因为平台功能多之后,牵涉业务也很多。势必要进行服务拆分,
>
>而拆分之后,需要对服务进行监控,配置中心进行管理。
>
>这种情况下,因为 Python 没有成熟的微服务和分布式架构体系,就需要自己搭建。
>
>多方对比之后,选定 nacos 作为注册中心以及配置中心,搭建微服务架构体系。
### 什么是 Nacos
详见 [【Nacos简介】](http://www.angeszhu.cn/middleware/nacos.html)
### Nacos Python SDK
**sdk git 地址:**
[nacos-sdk-python](https://github.com/nacos-group/nacos-sdk-python)
### Nacos 操作
> 其实他操作很简单git 上的例子也很详细。
>
> 这里给大家介绍的是我的微服务架构体系第一版,借由 nacos 做的简单负载均衡以及服务接口调用基于Rest服务
>
> 其他 Nacos 操作看 sdk 中的 api 即可,我也给大家总结了一下。
**简单架构图:**
![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/888.png)
#### 服务注册
服务注册,其实就是注册一个实例。
但是由于 Python 框架中没有相对应的方案。
这里我们其实要做的是:实例注册 + 持续心跳检测(服务保活)
使用方法为:
```python
# 注册实例
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
# 发送心跳
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
```
这样我们的服务就注册成功了,在 nacos 里查看。
![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/999.png)
**服务详细信息:**
![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/000.png)
#### 服务间调用Rest/HTTP
![](https://limeng-blog.oss-cn-hangzhou.aliyuncs.com/code/python-nacos/111111.png)
这里就可以直接看一下代码了,当然我也把其他操作都封装成了方法。
当然这也是初步的一个方案,后续要想保证效率和速度,还是需要用其他方法来实现。
```python
class NacosUtils:
"""
@Author: AngesZhu
@desc: Nacos相关工具方法
http://www.angeszhu.cn/
"""
def __init__(self):
logger.info('实例化Nacos操作方法')
self.connect_bool = None
try:
logger.info('建立Nacos连接address{}namespace{}'.format(SERVER_ADDRESSES, NAMESPACE))
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
self.connect_bool = True
logger.info('建立Nacos连接成功')
except Exception as ex:
self.connect_bool = False
logger.error("Nacos连接异常{}".format(ex))
def get_config(self, data_id, group, no_snapshot=None, timeout: int = 10) -> dict:
"""
获取配置
:param data_id: data_id Data id.
:param group: group Group, use DEFAULT_GROUP if no group specified.
:param no_snapshot: no_snapshot Whether to use local snapshot while server is unavailable.
:param timeout: timeout Timeout for requesting server in seconds.
:return: 返回获取的配置信息
"""
try:
logger.info("获取配置,{}".format(data_id))
res = self.client.get_config(data_id=data_id, group=group, no_snapshot=no_snapshot, timeout=timeout)
logger.info("获取配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "获取配置成功"}
except Exception as ex:
logger.error("获取配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "获取配置异常"}
def publish_config(self, data_id, content, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
"""
新增/更新配置
:param data_id: data_id Data id.
:param content: param content Config value.
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
:param timeout: param timeout Timeout for requesting server in seconds.
:return: 返回新增/更新结果
"""
try:
logger.info("更新配置,{}".format(data_id))
res = self.client.publish_config(data_id=data_id, group=group, content=content, timeout=timeout)
logger.info("更新配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "发布配置成功"}
except Exception as ex:
logger.error("发布配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "发布配置异常"}
def remove_config(self, data_id, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
"""
删除配置
:param data_id: param data_id Data id.
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
:param timeout: param timeout Timeout for requesting server in seconds.
:return: 返回删除结果
"""
try:
logger.info("删除配置,{}".format(data_id))
res = self.client.remove_config(data_id=data_id, group=group, timeout=timeout)
logger.info("删除配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "删除配置成功"}
except Exception as ex:
logger.error("删除配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "删除配置异常"}
def register_instance(self, data: str) -> dict:
"""
注册实例
:param data: 注册实例中的数据
:return: 返回注册的结果
"""
try:
logger.info("注册服务:{}".format(SERVER_NAME))
logger.debug("注册信息, 服务:{} IP{}PORT{}, ".format(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME))
a = self.client.add_naming_instance(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, data, True, True, True)
logger.info("服务注册成功,注册结果:{}".format(a))
return {"code": 200, "data": a, "msg": "服务注册成功"}
except Exception as ex:
logger.error("服务注册异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务注册异常"}
def instance_heart(self):
"""
发送心跳实例
:return: 返回注册的结果
"""
try:
logger.info("为服务{}发送心跳检测".format(SERVER_NAME))
self.client.send_heartbeat(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, "{}")
except Exception as ex:
logger.info("为服务{}发送心跳检测异常".format(SERVER_NAME))
logger.error("异常:{}".format(ex))
def get_instance(self, instance_name) -> dict:
"""
获取实例信息
:param instance_name: service_name required Service name to query.
:return: 返回实例信息
"""
try:
logger.info("获取服务信息:{}".format(instance_name))
instance_info = self.client.list_naming_instance(instance_name)
logger.info("获取服务信息成功,服务信息:{}".format(instance_info))
return {"code": 200, "data": instance_info, "msg": "服务信息获取成功"}
except Exception as ex:
logger.error("服务信息获取异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务信息获取异常"}
def method_invoke(self, instance_name, method, uri, **params) -> dict:
try:
logger.info("请求指定服务({})中的方法:{}".format(instance_name, uri))
logger.info("获取服务信息")
get_res = self.get_instance(instance_name)
if get_res["code"] == 110:
return {"code": 110, "data": get_res["data"], "msg": "服务信息获取异常"}
instance_info = get_res["data"]
check_res = self._instance_info_check__(instance_info)
if check_res["code"] == 110:
return {"code": 110, "data": check_res["data"], "msg": "服务信息校验异常"}
choose_res = self._choose_dom(instance_info["hosts"])
if choose_res["code"] == 110:
return {"code": 110, "data": choose_res["data"], "msg": "节点选择异常"}
dom_info = choose_res["data"]
method = method.upper()
data_dispose = lambda data: dict(data) if isinstance(data, str) else data.__dict__ if isinstance(data, BaseModel) else data
re_data = {
"method": method,
"url": "http://{}:{}{}".format(dom_info["ip"], dom_info["port"], uri if not params.__contains__("uri") else uri.format(data_dispose(params["uri"]))),
}
if params.__contains__("headers"):
re_data["headers"] = data_dispose(params["headers"])
if not re_data["headers"].__contains__("Content-Type"):
re_data["headers"]["Content-Type"] = "application/json"
# else:
# ReHeader["Content-Type"] = "application/json"
# re_data["headers"] = ReHeader
if method == "GET":
if params.__contains__("data"):
re_data["params"] = data_dispose(params["data"])
elif method in ["POST"]:
if params.__contains__("form"):
re_data["form"] = data_dispose(params["form"])
re_data["Content-Type"] = "multipart/form-data"
elif params.__contains__("data"):
re_data["data"] = data_dispose(params["data"])
elif method == "PUT":
if params.__contains__("data"):
re_data["data"] = data_dispose(params["data"])
elif method in ["DELETE", "PATCH"]:
if params.__contains__("data"):
re_data["json"] = data_dispose(params["data"])
res = requests.request(**re_data, verify=False)
if res.status_code != 200:
logger.error("请求异常,{}".format(res.text))
return {"code": 110, "data": res.text, "msg": "请求异常"}
logger.info("请求成功")
return {"code": 200, "data": res.text, "msg": "请求成功"}
except Exception as ex:
logger.error("方法调用异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "方法调用异常"}
def _instance_info_check__(self, info: dict) -> dict:
"""
实例数据检查
:param info: 检查的信息
:return: 返回检查结果
"""
try:
logger.info("服务信息校验")
if "dom" in info.keys() and len(info["hosts"]) != 0:
if abs(int(info["lastRefTime"]) - time.time()*1000) > 3600*1000:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
info["hosts"] = [i for i in info["hosts"] if i["enabled"] and i["healthy"]]
if len(info["hosts"]) == 0:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
logger.info("服务信息校验通过")
return {"code": 200, "data": True, "msg": "服务信息校验通过"}
else:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
except Exception as ex:
logger.error("服务信息校验异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务信息校验异常"}
def _choose_dom(self, info: list) -> dict:
"""
选择节点,负载均衡
:param info: 节点列表
:return: 返回选中节点数据
"""
try:
logger.info("选择节点,负载均衡,节点数据:{}".format(info))
b = len(info)-1
index = random.randint(0, len(info) - 1)
logger.info("节点选择成功,已选数据:{}".format(info[index]))
return {"code": 200, "data": info[index], "msg": "节点选择成功"}
except Exception as ex:
logger.error("节点选择异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "节点选择异常"}
```
### Nacos 连接
**语法:**
```python
client = NacosClient(server_addresses, namespace=your_ns)
```
**代码示例:**
```python
# http://www.angeszhu.cn/
logger.info('建立Nacos连接address{}namespace{}'.format(SERVER_ADDRESSES, NAMESPACE))
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
self.connect_bool = True
```
- server_addresses -需要 - NACOS服务器地址如果逗号超过1分离。
- 名称空间-命名空间。| 默认None
#### 额外的选择
可以通过设置额外的选项 `set_options`,如下所示:
```python
client.set_options({key}={value})
```
**可配置的选项包括:**
- default_timeout-从服务器获取配置的默认超时,以秒为单位。
- pull_timeout-长轮询超时(以秒为单位)。
- pull_config_size-一个轮询过程监听的最大配置项数。
- callback_thread_num-调用回调的并发性。
- failover_base-用于存储故障转移配置文件的目录。
- snapshot_base-用于存储快照配置文件的目录。
- no_snapshot -禁用默认的快照行为这可以通过PARAM覆盖no_snapshot在GET方法。
- 代理-Dict代理映射某些环境需要代理访问因此您可以设置此参数从而使HTTP请求通过代理。
### Api 参考
#### 获取配置
```python
NacosClient.get_config(data_id, group, timeout, no_snapshot)
```
- param data_id数据ID。
- param 组组DEFAULT_GROUP如果未指定组则使用。
- param timeout请求服务器的超时时间以秒为单位
- param no_snapshot服务器不可用时是否使用本地快照。
- return W按照优先级获取一个配置项的值
- 步骤1-从本地故障转移目录获取(默认值:)${cwd}/nacos-data/data。
- 可以预先从快照目录(默认值:)手动复制故障转移目录${cwd}/nacos-data/snapshot。
- 这有助于抑制已知服务器故障的影响。
- 步骤2-从一台服务器获取,直到获得价值或尝试所有服务器。
- 从服务器获取内容后,内容将保存到快照目录。
- 步骤3-从快照目录获取。
#### 添加观察者
```python
NacosClient.add_config_watchers(data_id, group, cb_list)
```
- param data_id数据ID。
- param 组组DEFAULT_GROUP如果未指定组则使用。
- param cb_list要添加的回调函数列表。
- return
**将观察者添加到指定的配置项。**
- 一旦更改或删除项目,将调用回调函数。
- 如果该项目已经存在于服务器中,则回调函数将被调用一次。
- 允许在一项上进行多个回调并且所有回调函数由并发调用threading.Thread。
- 从当前进程中调用回调函数。
#### 删除观察者
```python
NacosClient.remove_config_watcher(data_id, group, cb, remove_all)
```
- param data_id数据ID。
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
- param cb删除回调函数。
- param remove_all是删除所有出现的回调还是仅删除一次。
- return
从指定键中删除观察者。
#### 发布配置
```python
NacosClient.publish_config(data_id, group, content, timeout)
```
- param data_id数据ID。
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
- param 内容配置值。
- param timeout请求服务器的超时时间以秒为单位
- return 如果将引发成功或异常则为true。
**将一个数据项发布到Nacos。**
- 如果数据密钥不存在,请首先创建一个。
- 如果数据密钥存在,请更新到指定的内容。
- 内容不能设置为无,如果需要删除配置项,请使用功能删除。
#### 删除配置
```python
NacosClient.remove_config(data_id, group, timeout)
```
- param data_id数据ID。
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
- param timeout请求服务器的超时时间以秒为单位
- return 如果将引发成功或异常则为true。
从Nacos中删除一个数据项。
#### 注册实例
```python
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
```
- param service_name必 需要注册的服务名称。
- param ip 实例的必需IP。
- param 端口 所需实例的端口。
- param cluster_name要注册的集群。
- param weight一个用于负载均衡重量的浮点数。
- param 元数据JSON字符串格式或dict格式的额外信息
- param enable布尔值用于确定是否启用实例。
- param 正常的布尔值,用于确定实例是否正常。
- param 短暂的布尔值,用于确定实例是否短暂。
- return 如果将引发成功或异常则为true。
#### 注销实例
```python
NacosClient.remove_naming_instance(service_name, ip, port, cluster_name)
```
- param service_name必 需要注销的服务名称。
- param ip 实例的必需IP。
- param 端口 所需实例的端口。
- param cluster_name要注销的集群。
- param 短暂的布尔值,用于确定实例是否短暂。
- return 如果将引发成功或异常则为true。
#### 修改实例
```python
NacosClient.modify_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable)
```
- param service_name 必需的服务名称。
- param ip 实例的必需IP。
- param 端口 所需实例的端口。
- param cluster_name集群名称。
- param weight一个用于负载均衡重量的浮点数。
- param 元数据JSON字符串格式或dict格式的额外信息。
- param enable布尔值用于确定是否启用实例。
- param 短暂的布尔值,用于确定实例是否短暂。
- return 如果将引发成功或异常则为true。
#### 查询实例
```python
NacosClient.list_naming_instance(service_name, clusters, namespace_id, group_name, healthy_only)
```
- param service_name必 需要查询的服务名称。
- param 群集群集名称以逗号分隔。
- param namespace_id自定义的组名默认值blank。
- param group_name自定义的组名默认值DEFAULT_GROUP。
- param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
- return 实例信息列表,如果将引发成功或异常。
#### 查询实例详细信息
```python
NacosClient.get_naming_instance(service_name, ip, port, cluster_name)
```
- param service_name 必需的服务名称。
- param ip 实例的必需IP。
- param 端口 所需实例的端口。
- param cluster_name集群名称。
- return 实例信息,如果将引发成功或异常。
#### 发送实例节拍
```python
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
```
- param service_name 必需的服务名称。
- param ip 实例的必需IP。
- param 端口 所需实例的端口。
- param cluster_name要注册的集群。
- param weight一个用于负载均衡重量的浮点数。
- param 短暂的布尔值,用于确定实例是否短暂。
- param 元数据JSON字符串格式或dict格式的额外信息。
- return 如果成功或出现异常则JSON对象包括服务器建议的拍子间隔。
#### 订阅服务实例已更改
```python
NacosClient.subscribe(listener_fn, listener_interval=7, *args, **kwargs)
```
- param listener_fn 必需自定义的侦听器功能。
- param listener_interval 监听间隔默认为7秒。
- param service_name 必需订阅的服务名称。
- param 群集群集名称以逗号分隔。
- param namespace_id自定义的组名默认值blank。
- param group_name自定义的组名默认值DEFAULT_GROUP。
- param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
- return
#### 退订服务实例已更改
```python
NacosClient.unsubscribe(service_name, listener_name)
```
- param service_name必 需要订阅的服务名称。
- param listener_name listener_name是自定义的。
- return
#### 停止所有服务订阅
```python
NacosClient.stop_subscribe()
```
- return
### 调试模式
调试模式(如果用于获取更详细的控制台登录信息)。
可以通过以下方式设置调试模式:
```python
NacosClient.set_debugging()
# only effective within the current process
```