626 lines
21 KiB
Markdown
626 lines
21 KiB
Markdown
---
|
||
title: Python如何使用Nacos
|
||
date: 2020-11-04 17:08:14
|
||
tags: [Python, Nacos]
|
||
categories: [Python, Nacos]
|
||
author: Anges黎梦
|
||
---
|
||
|
||
### 前言
|
||
|
||
> 我看 nacos 主要是因为平台功能多之后,牵涉业务也很多。势必要进行服务拆分,
|
||
>
|
||
>而拆分之后,需要对服务进行监控,配置中心进行管理。
|
||
>
|
||
>这种情况下,因为 Python 没有成熟的微服务和分布式架构体系,就需要自己搭建。
|
||
>
|
||
>多方对比之后,选定 nacos 作为注册中心以及配置中心,搭建微服务架构体系。
|
||
|
||
### 什么是 Nacos?
|
||
|
||
详见 [【Nacos简介】](http://www.angeszhu.cn/middleware/nacos.html)
|
||
|
||
### Nacos Python SDK
|
||
|
||
**sdk git 地址:**
|
||
|
||
[nacos-sdk-python](https://github.com/nacos-group/nacos-sdk-python)
|
||
|
||
### Nacos 操作
|
||
|
||
> 其实他操作很简单,git 上的例子也很详细。
|
||
>
|
||
> 这里给大家介绍的是我的微服务架构体系第一版,借由 nacos 做的简单负载均衡以及服务接口调用(基于Rest服务)。
|
||
>
|
||
> 其他 Nacos 操作看 sdk 中的 api 即可,我也给大家总结了一下。
|
||
|
||
**简单架构图:**
|
||
|
||

|
||
|
||
#### 服务注册
|
||
|
||
服务注册,其实就是注册一个实例。
|
||
|
||
但是由于 Python 框架中没有相对应的方案。
|
||
|
||
这里我们其实要做的是:实例注册 + 持续心跳检测(服务保活)
|
||
|
||
使用方法为:
|
||
|
||
```python
|
||
# 注册实例
|
||
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
|
||
|
||
# 发送心跳
|
||
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
|
||
```
|
||
|
||
这样我们的服务就注册成功了,在 nacos 里查看。
|
||
|
||

|
||
|
||
**服务详细信息:**
|
||
|
||

|
||
|
||
#### 服务间调用(Rest/HTTP)
|
||
|
||

|
||
|
||
这里就可以直接看一下代码了,当然我也把其他操作都封装成了方法。
|
||
|
||
当然这也是初步的一个方案,后续要想保证效率和速度,还是需要用其他方法来实现。
|
||
|
||
```python
|
||
class NacosUtils:
|
||
"""
|
||
@Author: AngesZhu
|
||
@desc: Nacos相关工具方法
|
||
http://www.angeszhu.cn/
|
||
"""
|
||
def __init__(self):
|
||
logger.info('实例化Nacos操作方法')
|
||
self.connect_bool = None
|
||
try:
|
||
logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE))
|
||
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
|
||
self.connect_bool = True
|
||
logger.info('建立Nacos连接成功')
|
||
except Exception as ex:
|
||
self.connect_bool = False
|
||
logger.error("Nacos连接异常:{}".format(ex))
|
||
|
||
def get_config(self, data_id, group, no_snapshot=None, timeout: int = 10) -> dict:
|
||
"""
|
||
获取配置
|
||
:param data_id: data_id Data id.
|
||
:param group: group Group, use DEFAULT_GROUP if no group specified.
|
||
:param no_snapshot: no_snapshot Whether to use local snapshot while server is unavailable.
|
||
:param timeout: timeout Timeout for requesting server in seconds.
|
||
:return: 返回获取的配置信息
|
||
"""
|
||
try:
|
||
logger.info("获取配置,{}".format(data_id))
|
||
res = self.client.get_config(data_id=data_id, group=group, no_snapshot=no_snapshot, timeout=timeout)
|
||
logger.info("获取配置成功,{}".format(res))
|
||
return {"code": 200, "data": res, "msg": "获取配置成功"}
|
||
except Exception as ex:
|
||
logger.error("获取配置异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "获取配置异常"}
|
||
|
||
def publish_config(self, data_id, content, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
|
||
"""
|
||
新增/更新配置
|
||
:param data_id: data_id Data id.
|
||
:param content: param content Config value.
|
||
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
|
||
:param timeout: param timeout Timeout for requesting server in seconds.
|
||
:return: 返回新增/更新结果
|
||
"""
|
||
try:
|
||
logger.info("更新配置,{}".format(data_id))
|
||
res = self.client.publish_config(data_id=data_id, group=group, content=content, timeout=timeout)
|
||
logger.info("更新配置成功,{}".format(res))
|
||
return {"code": 200, "data": res, "msg": "发布配置成功"}
|
||
except Exception as ex:
|
||
logger.error("发布配置异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "发布配置异常"}
|
||
|
||
def remove_config(self, data_id, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
|
||
"""
|
||
删除配置
|
||
:param data_id: param data_id Data id.
|
||
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
|
||
:param timeout: param timeout Timeout for requesting server in seconds.
|
||
:return: 返回删除结果
|
||
"""
|
||
try:
|
||
logger.info("删除配置,{}".format(data_id))
|
||
res = self.client.remove_config(data_id=data_id, group=group, timeout=timeout)
|
||
logger.info("删除配置成功,{}".format(res))
|
||
return {"code": 200, "data": res, "msg": "删除配置成功"}
|
||
except Exception as ex:
|
||
logger.error("删除配置异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "删除配置异常"}
|
||
|
||
def register_instance(self, data: str) -> dict:
|
||
"""
|
||
注册实例
|
||
:param data: 注册实例中的数据
|
||
:return: 返回注册的结果
|
||
"""
|
||
try:
|
||
logger.info("注册服务:{}".format(SERVER_NAME))
|
||
logger.debug("注册信息, 服务:{}, IP:{},PORT:{}, ".format(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME))
|
||
a = self.client.add_naming_instance(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, data, True, True, True)
|
||
logger.info("服务注册成功,注册结果:{}".format(a))
|
||
return {"code": 200, "data": a, "msg": "服务注册成功"}
|
||
except Exception as ex:
|
||
logger.error("服务注册异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "服务注册异常"}
|
||
|
||
def instance_heart(self):
|
||
"""
|
||
发送心跳实例
|
||
:return: 返回注册的结果
|
||
"""
|
||
try:
|
||
logger.info("为服务{}发送心跳检测".format(SERVER_NAME))
|
||
self.client.send_heartbeat(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, "{}")
|
||
except Exception as ex:
|
||
logger.info("为服务{}发送心跳检测异常".format(SERVER_NAME))
|
||
logger.error("异常:{}".format(ex))
|
||
|
||
def get_instance(self, instance_name) -> dict:
|
||
"""
|
||
获取实例信息
|
||
:param instance_name: service_name required Service name to query.
|
||
:return: 返回实例信息
|
||
"""
|
||
try:
|
||
logger.info("获取服务信息:{}".format(instance_name))
|
||
instance_info = self.client.list_naming_instance(instance_name)
|
||
logger.info("获取服务信息成功,服务信息:{}".format(instance_info))
|
||
return {"code": 200, "data": instance_info, "msg": "服务信息获取成功"}
|
||
except Exception as ex:
|
||
logger.error("服务信息获取异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "服务信息获取异常"}
|
||
|
||
def method_invoke(self, instance_name, method, uri, **params) -> dict:
|
||
try:
|
||
logger.info("请求指定服务({})中的方法:{}".format(instance_name, uri))
|
||
logger.info("获取服务信息")
|
||
get_res = self.get_instance(instance_name)
|
||
if get_res["code"] == 110:
|
||
return {"code": 110, "data": get_res["data"], "msg": "服务信息获取异常"}
|
||
instance_info = get_res["data"]
|
||
|
||
check_res = self._instance_info_check__(instance_info)
|
||
if check_res["code"] == 110:
|
||
return {"code": 110, "data": check_res["data"], "msg": "服务信息校验异常"}
|
||
|
||
choose_res = self._choose_dom(instance_info["hosts"])
|
||
if choose_res["code"] == 110:
|
||
return {"code": 110, "data": choose_res["data"], "msg": "节点选择异常"}
|
||
dom_info = choose_res["data"]
|
||
|
||
method = method.upper()
|
||
data_dispose = lambda data: dict(data) if isinstance(data, str) else data.__dict__ if isinstance(data, BaseModel) else data
|
||
re_data = {
|
||
"method": method,
|
||
"url": "http://{}:{}{}".format(dom_info["ip"], dom_info["port"], uri if not params.__contains__("uri") else uri.format(data_dispose(params["uri"]))),
|
||
}
|
||
if params.__contains__("headers"):
|
||
re_data["headers"] = data_dispose(params["headers"])
|
||
if not re_data["headers"].__contains__("Content-Type"):
|
||
re_data["headers"]["Content-Type"] = "application/json"
|
||
# else:
|
||
# ReHeader["Content-Type"] = "application/json"
|
||
# re_data["headers"] = ReHeader
|
||
if method == "GET":
|
||
if params.__contains__("data"):
|
||
re_data["params"] = data_dispose(params["data"])
|
||
elif method in ["POST"]:
|
||
if params.__contains__("form"):
|
||
re_data["form"] = data_dispose(params["form"])
|
||
re_data["Content-Type"] = "multipart/form-data"
|
||
elif params.__contains__("data"):
|
||
re_data["data"] = data_dispose(params["data"])
|
||
elif method == "PUT":
|
||
if params.__contains__("data"):
|
||
re_data["data"] = data_dispose(params["data"])
|
||
elif method in ["DELETE", "PATCH"]:
|
||
if params.__contains__("data"):
|
||
re_data["json"] = data_dispose(params["data"])
|
||
|
||
res = requests.request(**re_data, verify=False)
|
||
if res.status_code != 200:
|
||
logger.error("请求异常,{}".format(res.text))
|
||
return {"code": 110, "data": res.text, "msg": "请求异常"}
|
||
logger.info("请求成功")
|
||
return {"code": 200, "data": res.text, "msg": "请求成功"}
|
||
except Exception as ex:
|
||
logger.error("方法调用异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "方法调用异常"}
|
||
|
||
def _instance_info_check__(self, info: dict) -> dict:
|
||
"""
|
||
实例数据检查
|
||
:param info: 检查的信息
|
||
:return: 返回检查结果
|
||
"""
|
||
try:
|
||
logger.info("服务信息校验")
|
||
if "dom" in info.keys() and len(info["hosts"]) != 0:
|
||
if abs(int(info["lastRefTime"]) - time.time()*1000) > 3600*1000:
|
||
logger.error("无效的服务信息:{}".format(info))
|
||
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
|
||
info["hosts"] = [i for i in info["hosts"] if i["enabled"] and i["healthy"]]
|
||
if len(info["hosts"]) == 0:
|
||
logger.error("无效的服务信息:{}".format(info))
|
||
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
|
||
logger.info("服务信息校验通过")
|
||
return {"code": 200, "data": True, "msg": "服务信息校验通过"}
|
||
else:
|
||
logger.error("无效的服务信息:{}".format(info))
|
||
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
|
||
except Exception as ex:
|
||
logger.error("服务信息校验异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "服务信息校验异常"}
|
||
|
||
def _choose_dom(self, info: list) -> dict:
|
||
"""
|
||
选择节点,负载均衡
|
||
:param info: 节点列表
|
||
:return: 返回选中节点数据
|
||
"""
|
||
try:
|
||
logger.info("选择节点,负载均衡,节点数据:{}".format(info))
|
||
b = len(info)-1
|
||
index = random.randint(0, len(info) - 1)
|
||
logger.info("节点选择成功,已选数据:{}".format(info[index]))
|
||
return {"code": 200, "data": info[index], "msg": "节点选择成功"}
|
||
except Exception as ex:
|
||
logger.error("节点选择异常:{}".format(ex))
|
||
return {"code": 110, "data": ex, "msg": "节点选择异常"}
|
||
```
|
||
|
||
### Nacos 连接
|
||
|
||
**语法:**
|
||
```python
|
||
client = NacosClient(server_addresses, namespace=your_ns)
|
||
```
|
||
|
||
**代码示例:**
|
||
|
||
```python
|
||
# http://www.angeszhu.cn/
|
||
logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE))
|
||
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
|
||
self.connect_bool = True
|
||
```
|
||
|
||
- server_addresses -需要 - NACOS服务器地址,如果逗号超过1分离。
|
||
|
||
- 名称空间-命名空间。| 默认:None
|
||
|
||
#### 额外的选择
|
||
|
||
可以通过设置额外的选项 `set_options`,如下所示:
|
||
|
||
```python
|
||
client.set_options({key}={value})
|
||
```
|
||
|
||
**可配置的选项包括:**
|
||
|
||
- default_timeout-从服务器获取配置的默认超时,以秒为单位。
|
||
|
||
- pull_timeout-长轮询超时(以秒为单位)。
|
||
|
||
- pull_config_size-一个轮询过程监听的最大配置项数。
|
||
|
||
- callback_thread_num-调用回调的并发性。
|
||
|
||
- failover_base-用于存储故障转移配置文件的目录。
|
||
|
||
- snapshot_base-用于存储快照配置文件的目录。
|
||
|
||
- no_snapshot -禁用默认的快照行为,这可以通过PARAM覆盖no_snapshot在GET方法。
|
||
|
||
- 代理-Dict代理映射,某些环境需要代理访问,因此您可以设置此参数,从而使HTTP请求通过代理。
|
||
|
||
### Api 参考
|
||
|
||
#### 获取配置
|
||
|
||
```python
|
||
NacosClient.get_config(data_id, group, timeout, no_snapshot)
|
||
```
|
||
|
||
- param data_id数据ID。
|
||
|
||
- param 组组,DEFAULT_GROUP如果未指定组,则使用。
|
||
|
||
- param timeout请求服务器的超时时间(以秒为单位)。
|
||
|
||
- param no_snapshot服务器不可用时是否使用本地快照。
|
||
|
||
- return W按照优先级获取一个配置项的值:
|
||
|
||
- 步骤1-从本地故障转移目录获取(默认值:)${cwd}/nacos-data/data。
|
||
|
||
- 可以预先从快照目录(默认值:)手动复制故障转移目录${cwd}/nacos-data/snapshot。
|
||
|
||
- 这有助于抑制已知服务器故障的影响。
|
||
|
||
- 步骤2-从一台服务器获取,直到获得价值或尝试所有服务器。
|
||
|
||
- 从服务器获取内容后,内容将保存到快照目录。
|
||
|
||
- 步骤3-从快照目录获取。
|
||
|
||
#### 添加观察者
|
||
|
||
```python
|
||
NacosClient.add_config_watchers(data_id, group, cb_list)
|
||
```
|
||
|
||
- param data_id数据ID。
|
||
|
||
- param 组组,DEFAULT_GROUP如果未指定组,则使用。
|
||
|
||
- param cb_list要添加的回调函数列表。
|
||
|
||
- return
|
||
|
||
**将观察者添加到指定的配置项。**
|
||
|
||
- 一旦更改或删除项目,将调用回调函数。
|
||
|
||
- 如果该项目已经存在于服务器中,则回调函数将被调用一次。
|
||
|
||
- 允许在一项上进行多个回调,并且所有回调函数由并发调用threading.Thread。
|
||
|
||
- 从当前进程中调用回调函数。
|
||
|
||
#### 删除观察者
|
||
|
||
```python
|
||
NacosClient.remove_config_watcher(data_id, group, cb, remove_all)
|
||
```
|
||
|
||
- param data_id数据ID。
|
||
|
||
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
|
||
|
||
- param cb删除回调函数。
|
||
|
||
- param remove_all是删除所有出现的回调还是仅删除一次。
|
||
|
||
- return
|
||
|
||
从指定键中删除观察者。
|
||
|
||
#### 发布配置
|
||
|
||
```python
|
||
NacosClient.publish_config(data_id, group, content, timeout)
|
||
```
|
||
|
||
- param data_id数据ID。
|
||
|
||
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
|
||
|
||
- param 内容配置值。
|
||
|
||
- param timeout请求服务器的超时时间(以秒为单位)。
|
||
|
||
- return 如果将引发成功或异常,则为true。
|
||
|
||
**将一个数据项发布到Nacos。**
|
||
|
||
- 如果数据密钥不存在,请首先创建一个。
|
||
|
||
- 如果数据密钥存在,请更新到指定的内容。
|
||
|
||
- 内容不能设置为无,如果需要删除配置项,请使用功能删除。
|
||
|
||
#### 删除配置
|
||
|
||
```python
|
||
NacosClient.remove_config(data_id, group, timeout)
|
||
```
|
||
|
||
- param data_id数据ID。
|
||
|
||
- param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
|
||
|
||
- param timeout请求服务器的超时时间(以秒为单位)。
|
||
|
||
- return 如果将引发成功或异常,则为true。
|
||
|
||
从Nacos中删除一个数据项。
|
||
|
||
#### 注册实例
|
||
|
||
```python
|
||
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
|
||
```
|
||
|
||
- param service_name必 需要注册的服务名称。
|
||
|
||
- param ip 实例的必需IP。
|
||
|
||
- param 端口 所需实例的端口。
|
||
|
||
- param cluster_name要注册的集群。
|
||
|
||
- param weight一个用于负载均衡重量的浮点数。
|
||
|
||
- param 元数据JSON字符串格式或dict格式的额外信息
|
||
|
||
- param enable布尔值,用于确定是否启用实例。
|
||
|
||
- param 正常的布尔值,用于确定实例是否正常。
|
||
|
||
- param 短暂的布尔值,用于确定实例是否短暂。
|
||
|
||
- return 如果将引发成功或异常,则为true。
|
||
|
||
#### 注销实例
|
||
|
||
```python
|
||
NacosClient.remove_naming_instance(service_name, ip, port, cluster_name)
|
||
```
|
||
|
||
- param service_name必 需要注销的服务名称。
|
||
|
||
- param ip 实例的必需IP。
|
||
|
||
- param 端口 所需实例的端口。
|
||
|
||
- param cluster_name要注销的集群。
|
||
|
||
- param 短暂的布尔值,用于确定实例是否短暂。
|
||
|
||
- return 如果将引发成功或异常,则为true。
|
||
|
||
#### 修改实例
|
||
|
||
```python
|
||
NacosClient.modify_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable)
|
||
```
|
||
|
||
- param service_name 必需的服务名称。
|
||
|
||
- param ip 实例的必需IP。
|
||
|
||
- param 端口 所需实例的端口。
|
||
|
||
- param cluster_name集群名称。
|
||
|
||
- param weight一个用于负载均衡重量的浮点数。
|
||
|
||
- param 元数据JSON字符串格式或dict格式的额外信息。
|
||
|
||
- param enable布尔值,用于确定是否启用实例。
|
||
|
||
- param 短暂的布尔值,用于确定实例是否短暂。
|
||
|
||
- return 如果将引发成功或异常,则为true。
|
||
|
||
#### 查询实例
|
||
|
||
```python
|
||
NacosClient.list_naming_instance(service_name, clusters, namespace_id, group_name, healthy_only)
|
||
```
|
||
|
||
- param service_name必 需要查询的服务名称。
|
||
|
||
- param 群集群集名称以逗号分隔。
|
||
|
||
- param namespace_id自定义的组名,默认值blank。
|
||
|
||
- param group_name自定义的组名,默认值DEFAULT_GROUP。
|
||
|
||
- param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
|
||
|
||
- return 实例信息列表,如果将引发成功或异常。
|
||
|
||
#### 查询实例详细信息
|
||
|
||
```python
|
||
NacosClient.get_naming_instance(service_name, ip, port, cluster_name)
|
||
```
|
||
|
||
- param service_name 必需的服务名称。
|
||
|
||
- param ip 实例的必需IP。
|
||
|
||
- param 端口 所需实例的端口。
|
||
|
||
- param cluster_name集群名称。
|
||
|
||
- return 实例信息,如果将引发成功或异常。
|
||
|
||
#### 发送实例节拍
|
||
|
||
```python
|
||
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
|
||
```
|
||
|
||
- param service_name 必需的服务名称。
|
||
|
||
- param ip 实例的必需IP。
|
||
|
||
- param 端口 所需实例的端口。
|
||
|
||
- param cluster_name要注册的集群。
|
||
|
||
- param weight一个用于负载均衡重量的浮点数。
|
||
|
||
- param 短暂的布尔值,用于确定实例是否短暂。
|
||
|
||
- param 元数据JSON字符串格式或dict格式的额外信息。
|
||
|
||
- return 如果成功或出现异常,则JSON对象包括服务器建议的拍子间隔。
|
||
|
||
#### 订阅服务实例已更改
|
||
|
||
```python
|
||
NacosClient.subscribe(listener_fn, listener_interval=7, *args, **kwargs)
|
||
```
|
||
|
||
- param listener_fn 必需自定义的侦听器功能。
|
||
|
||
- param listener_interval 监听间隔,默认为7秒。
|
||
|
||
- param service_name 必需订阅的服务名称。
|
||
|
||
- param 群集群集名称以逗号分隔。
|
||
|
||
- param namespace_id自定义的组名,默认值blank。
|
||
|
||
- param group_name自定义的组名,默认值DEFAULT_GROUP。
|
||
|
||
- param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
|
||
|
||
- return
|
||
|
||
#### 退订服务实例已更改
|
||
|
||
```python
|
||
NacosClient.unsubscribe(service_name, listener_name)
|
||
```
|
||
|
||
- param service_name必 需要订阅的服务名称。
|
||
- param listener_name listener_name是自定义的。
|
||
- return
|
||
|
||
#### 停止所有服务订阅
|
||
|
||
```python
|
||
NacosClient.stop_subscribe()
|
||
```
|
||
|
||
- return
|
||
|
||
### 调试模式
|
||
|
||
调试模式(如果用于获取更详细的控制台登录信息)。
|
||
|
||
可以通过以下方式设置调试模式:
|
||
|
||
```python
|
||
NacosClient.set_debugging()
|
||
# only effective within the current process
|
||
```
|
||
|
||
|
||
|
||
|
||
|