21 KiB
| title | date | tags | categories | author | ||||
|---|---|---|---|---|---|---|---|---|
| Python如何使用Nacos | 2020-11-04 17:08:14 |
|
|
Anges黎梦 |
前言
我看 nacos 主要是因为平台功能多之后,牵涉业务也很多。势必要进行服务拆分,
而拆分之后,需要对服务进行监控,配置中心进行管理。
这种情况下,因为 Python 没有成熟的微服务和分布式架构体系,就需要自己搭建。
多方对比之后,选定 nacos 作为注册中心以及配置中心,搭建微服务架构体系。
什么是 Nacos?
详见 【Nacos简介】
Nacos Python SDK
sdk git 地址:
Nacos 操作
其实他操作很简单,git 上的例子也很详细。
这里给大家介绍的是我的微服务架构体系第一版,借由 nacos 做的简单负载均衡以及服务接口调用(基于Rest服务)。
其他 Nacos 操作看 sdk 中的 api 即可,我也给大家总结了一下。
简单架构图:
服务注册
服务注册,其实就是注册一个实例。
但是由于 Python 框架中没有相对应的方案。
这里我们其实要做的是:实例注册 + 持续心跳检测(服务保活)
使用方法为:
# 注册实例
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
# 发送心跳
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
这样我们的服务就注册成功了,在 nacos 里查看。
服务详细信息:
服务间调用(Rest/HTTP)
这里就可以直接看一下代码了,当然我也把其他操作都封装成了方法。
当然这也是初步的一个方案,后续要想保证效率和速度,还是需要用其他方法来实现。
class NacosUtils:
"""
@Author: AngesZhu
@desc: Nacos相关工具方法
http://www.angeszhu.cn/
"""
def __init__(self):
logger.info('实例化Nacos操作方法')
self.connect_bool = None
try:
logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE))
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
self.connect_bool = True
logger.info('建立Nacos连接成功')
except Exception as ex:
self.connect_bool = False
logger.error("Nacos连接异常:{}".format(ex))
def get_config(self, data_id, group, no_snapshot=None, timeout: int = 10) -> dict:
"""
获取配置
:param data_id: data_id Data id.
:param group: group Group, use DEFAULT_GROUP if no group specified.
:param no_snapshot: no_snapshot Whether to use local snapshot while server is unavailable.
:param timeout: timeout Timeout for requesting server in seconds.
:return: 返回获取的配置信息
"""
try:
logger.info("获取配置,{}".format(data_id))
res = self.client.get_config(data_id=data_id, group=group, no_snapshot=no_snapshot, timeout=timeout)
logger.info("获取配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "获取配置成功"}
except Exception as ex:
logger.error("获取配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "获取配置异常"}
def publish_config(self, data_id, content, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
"""
新增/更新配置
:param data_id: data_id Data id.
:param content: param content Config value.
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
:param timeout: param timeout Timeout for requesting server in seconds.
:return: 返回新增/更新结果
"""
try:
logger.info("更新配置,{}".format(data_id))
res = self.client.publish_config(data_id=data_id, group=group, content=content, timeout=timeout)
logger.info("更新配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "发布配置成功"}
except Exception as ex:
logger.error("发布配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "发布配置异常"}
def remove_config(self, data_id, group: str = 'DEFAULT_GROUP', timeout: int = 10) -> dict:
"""
删除配置
:param data_id: param data_id Data id.
:param group: param group Group, use "DEFAULT_GROUP" if no group specified.
:param timeout: param timeout Timeout for requesting server in seconds.
:return: 返回删除结果
"""
try:
logger.info("删除配置,{}".format(data_id))
res = self.client.remove_config(data_id=data_id, group=group, timeout=timeout)
logger.info("删除配置成功,{}".format(res))
return {"code": 200, "data": res, "msg": "删除配置成功"}
except Exception as ex:
logger.error("删除配置异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "删除配置异常"}
def register_instance(self, data: str) -> dict:
"""
注册实例
:param data: 注册实例中的数据
:return: 返回注册的结果
"""
try:
logger.info("注册服务:{}".format(SERVER_NAME))
logger.debug("注册信息, 服务:{}, IP:{},PORT:{}, ".format(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME))
a = self.client.add_naming_instance(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, data, True, True, True)
logger.info("服务注册成功,注册结果:{}".format(a))
return {"code": 200, "data": a, "msg": "服务注册成功"}
except Exception as ex:
logger.error("服务注册异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务注册异常"}
def instance_heart(self):
"""
发送心跳实例
:return: 返回注册的结果
"""
try:
logger.info("为服务{}发送心跳检测".format(SERVER_NAME))
self.client.send_heartbeat(SERVER_NAME, SERVER_IP, SERVER_PORT, CLUSTER_NAME, 0.1, "{}")
except Exception as ex:
logger.info("为服务{}发送心跳检测异常".format(SERVER_NAME))
logger.error("异常:{}".format(ex))
def get_instance(self, instance_name) -> dict:
"""
获取实例信息
:param instance_name: service_name required Service name to query.
:return: 返回实例信息
"""
try:
logger.info("获取服务信息:{}".format(instance_name))
instance_info = self.client.list_naming_instance(instance_name)
logger.info("获取服务信息成功,服务信息:{}".format(instance_info))
return {"code": 200, "data": instance_info, "msg": "服务信息获取成功"}
except Exception as ex:
logger.error("服务信息获取异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务信息获取异常"}
def method_invoke(self, instance_name, method, uri, **params) -> dict:
try:
logger.info("请求指定服务({})中的方法:{}".format(instance_name, uri))
logger.info("获取服务信息")
get_res = self.get_instance(instance_name)
if get_res["code"] == 110:
return {"code": 110, "data": get_res["data"], "msg": "服务信息获取异常"}
instance_info = get_res["data"]
check_res = self._instance_info_check__(instance_info)
if check_res["code"] == 110:
return {"code": 110, "data": check_res["data"], "msg": "服务信息校验异常"}
choose_res = self._choose_dom(instance_info["hosts"])
if choose_res["code"] == 110:
return {"code": 110, "data": choose_res["data"], "msg": "节点选择异常"}
dom_info = choose_res["data"]
method = method.upper()
data_dispose = lambda data: dict(data) if isinstance(data, str) else data.__dict__ if isinstance(data, BaseModel) else data
re_data = {
"method": method,
"url": "http://{}:{}{}".format(dom_info["ip"], dom_info["port"], uri if not params.__contains__("uri") else uri.format(data_dispose(params["uri"]))),
}
if params.__contains__("headers"):
re_data["headers"] = data_dispose(params["headers"])
if not re_data["headers"].__contains__("Content-Type"):
re_data["headers"]["Content-Type"] = "application/json"
# else:
# ReHeader["Content-Type"] = "application/json"
# re_data["headers"] = ReHeader
if method == "GET":
if params.__contains__("data"):
re_data["params"] = data_dispose(params["data"])
elif method in ["POST"]:
if params.__contains__("form"):
re_data["form"] = data_dispose(params["form"])
re_data["Content-Type"] = "multipart/form-data"
elif params.__contains__("data"):
re_data["data"] = data_dispose(params["data"])
elif method == "PUT":
if params.__contains__("data"):
re_data["data"] = data_dispose(params["data"])
elif method in ["DELETE", "PATCH"]:
if params.__contains__("data"):
re_data["json"] = data_dispose(params["data"])
res = requests.request(**re_data, verify=False)
if res.status_code != 200:
logger.error("请求异常,{}".format(res.text))
return {"code": 110, "data": res.text, "msg": "请求异常"}
logger.info("请求成功")
return {"code": 200, "data": res.text, "msg": "请求成功"}
except Exception as ex:
logger.error("方法调用异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "方法调用异常"}
def _instance_info_check__(self, info: dict) -> dict:
"""
实例数据检查
:param info: 检查的信息
:return: 返回检查结果
"""
try:
logger.info("服务信息校验")
if "dom" in info.keys() and len(info["hosts"]) != 0:
if abs(int(info["lastRefTime"]) - time.time()*1000) > 3600*1000:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
info["hosts"] = [i for i in info["hosts"] if i["enabled"] and i["healthy"]]
if len(info["hosts"]) == 0:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
logger.info("服务信息校验通过")
return {"code": 200, "data": True, "msg": "服务信息校验通过"}
else:
logger.error("无效的服务信息:{}".format(info))
return {"code": 110, "data": "无效的服务信息", "msg": "无效的服务信息"}
except Exception as ex:
logger.error("服务信息校验异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "服务信息校验异常"}
def _choose_dom(self, info: list) -> dict:
"""
选择节点,负载均衡
:param info: 节点列表
:return: 返回选中节点数据
"""
try:
logger.info("选择节点,负载均衡,节点数据:{}".format(info))
b = len(info)-1
index = random.randint(0, len(info) - 1)
logger.info("节点选择成功,已选数据:{}".format(info[index]))
return {"code": 200, "data": info[index], "msg": "节点选择成功"}
except Exception as ex:
logger.error("节点选择异常:{}".format(ex))
return {"code": 110, "data": ex, "msg": "节点选择异常"}
Nacos 连接
语法:
client = NacosClient(server_addresses, namespace=your_ns)
代码示例:
# http://www.angeszhu.cn/
logger.info('建立Nacos连接,address:{},namespace:{}'.format(SERVER_ADDRESSES, NAMESPACE))
self.client = nacos.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE)
self.connect_bool = True
-
server_addresses -需要 - NACOS服务器地址,如果逗号超过1分离。
-
名称空间-命名空间。| 默认:None
额外的选择
可以通过设置额外的选项 set_options,如下所示:
client.set_options({key}={value})
可配置的选项包括:
-
default_timeout-从服务器获取配置的默认超时,以秒为单位。
-
pull_timeout-长轮询超时(以秒为单位)。
-
pull_config_size-一个轮询过程监听的最大配置项数。
-
callback_thread_num-调用回调的并发性。
-
failover_base-用于存储故障转移配置文件的目录。
-
snapshot_base-用于存储快照配置文件的目录。
-
no_snapshot -禁用默认的快照行为,这可以通过PARAM覆盖no_snapshot在GET方法。
-
代理-Dict代理映射,某些环境需要代理访问,因此您可以设置此参数,从而使HTTP请求通过代理。
Api 参考
获取配置
NacosClient.get_config(data_id, group, timeout, no_snapshot)
-
param data_id数据ID。
-
param 组组,DEFAULT_GROUP如果未指定组,则使用。
-
param timeout请求服务器的超时时间(以秒为单位)。
-
param no_snapshot服务器不可用时是否使用本地快照。
-
return W按照优先级获取一个配置项的值:
-
步骤1-从本地故障转移目录获取(默认值:)${cwd}/nacos-data/data。
-
可以预先从快照目录(默认值:)手动复制故障转移目录${cwd}/nacos-data/snapshot。
-
这有助于抑制已知服务器故障的影响。
-
-
步骤2-从一台服务器获取,直到获得价值或尝试所有服务器。
- 从服务器获取内容后,内容将保存到快照目录。
-
步骤3-从快照目录获取。
添加观察者
NacosClient.add_config_watchers(data_id, group, cb_list)
-
param data_id数据ID。
-
param 组组,DEFAULT_GROUP如果未指定组,则使用。
-
param cb_list要添加的回调函数列表。
-
return
将观察者添加到指定的配置项。
-
一旦更改或删除项目,将调用回调函数。
-
如果该项目已经存在于服务器中,则回调函数将被调用一次。
-
允许在一项上进行多个回调,并且所有回调函数由并发调用threading.Thread。
-
从当前进程中调用回调函数。
删除观察者
NacosClient.remove_config_watcher(data_id, group, cb, remove_all)
-
param data_id数据ID。
-
param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
-
param cb删除回调函数。
-
param remove_all是删除所有出现的回调还是仅删除一次。
-
return
从指定键中删除观察者。
发布配置
NacosClient.publish_config(data_id, group, content, timeout)
-
param data_id数据ID。
-
param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
-
param 内容配置值。
-
param timeout请求服务器的超时时间(以秒为单位)。
-
return 如果将引发成功或异常,则为true。
将一个数据项发布到Nacos。
-
如果数据密钥不存在,请首先创建一个。
-
如果数据密钥存在,请更新到指定的内容。
-
内容不能设置为无,如果需要删除配置项,请使用功能删除。
删除配置
NacosClient.remove_config(data_id, group, timeout)
-
param data_id数据ID。
-
param 组组,如果未指定组,则使用“ DEFAULT_GROUP”。
-
param timeout请求服务器的超时时间(以秒为单位)。
-
return 如果将引发成功或异常,则为true。
从Nacos中删除一个数据项。
注册实例
NacosClient.add_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable, healthy)
-
param service_name必 需要注册的服务名称。
-
param ip 实例的必需IP。
-
param 端口 所需实例的端口。
-
param cluster_name要注册的集群。
-
param weight一个用于负载均衡重量的浮点数。
-
param 元数据JSON字符串格式或dict格式的额外信息
-
param enable布尔值,用于确定是否启用实例。
-
param 正常的布尔值,用于确定实例是否正常。
-
param 短暂的布尔值,用于确定实例是否短暂。
-
return 如果将引发成功或异常,则为true。
注销实例
NacosClient.remove_naming_instance(service_name, ip, port, cluster_name)
-
param service_name必 需要注销的服务名称。
-
param ip 实例的必需IP。
-
param 端口 所需实例的端口。
-
param cluster_name要注销的集群。
-
param 短暂的布尔值,用于确定实例是否短暂。
-
return 如果将引发成功或异常,则为true。
修改实例
NacosClient.modify_naming_instance(service_name, ip, port, cluster_name, weight, metadata, enable)
-
param service_name 必需的服务名称。
-
param ip 实例的必需IP。
-
param 端口 所需实例的端口。
-
param cluster_name集群名称。
-
param weight一个用于负载均衡重量的浮点数。
-
param 元数据JSON字符串格式或dict格式的额外信息。
-
param enable布尔值,用于确定是否启用实例。
-
param 短暂的布尔值,用于确定实例是否短暂。
-
return 如果将引发成功或异常,则为true。
查询实例
NacosClient.list_naming_instance(service_name, clusters, namespace_id, group_name, healthy_only)
-
param service_name必 需要查询的服务名称。
-
param 群集群集名称以逗号分隔。
-
param namespace_id自定义的组名,默认值blank。
-
param group_name自定义的组名,默认值DEFAULT_GROUP。
-
param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
-
return 实例信息列表,如果将引发成功或异常。
查询实例详细信息
NacosClient.get_naming_instance(service_name, ip, port, cluster_name)
-
param service_name 必需的服务名称。
-
param ip 实例的必需IP。
-
param 端口 所需实例的端口。
-
param cluster_name集群名称。
-
return 实例信息,如果将引发成功或异常。
发送实例节拍
NacosClient.send_heartbeat(service_name, ip, port, cluster_name, weight, metadata)
-
param service_name 必需的服务名称。
-
param ip 实例的必需IP。
-
param 端口 所需实例的端口。
-
param cluster_name要注册的集群。
-
param weight一个用于负载均衡重量的浮点数。
-
param 短暂的布尔值,用于确定实例是否短暂。
-
param 元数据JSON字符串格式或dict格式的额外信息。
-
return 如果成功或出现异常,则JSON对象包括服务器建议的拍子间隔。
订阅服务实例已更改
NacosClient.subscribe(listener_fn, listener_interval=7, *args, **kwargs)
-
param listener_fn 必需自定义的侦听器功能。
-
param listener_interval 监听间隔,默认为7秒。
-
param service_name 必需订阅的服务名称。
-
param 群集群集名称以逗号分隔。
-
param namespace_id自定义的组名,默认值blank。
-
param group_name自定义的组名,默认值DEFAULT_GROUP。
-
param Healthy_only一个用于查询健康实例或非健康实例的布尔值。
-
return
退订服务实例已更改
NacosClient.unsubscribe(service_name, listener_name)
- param service_name必 需要订阅的服务名称。
- param listener_name listener_name是自定义的。
- return
停止所有服务订阅
NacosClient.stop_subscribe()
- return
调试模式
调试模式(如果用于获取更详细的控制台登录信息)。
可以通过以下方式设置调试模式:
NacosClient.set_debugging()
# only effective within the current process



