当前位置: 首页 > news >正文

Spring Cloud Nacos源码讲解(七)- Nacos客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程

​ 说起Nacos的服务订阅机制,大家会觉得比较难理解,那我们就来详细分析一下,那我们先从Nacos订阅的概述说起

Nacos订阅概述

​ Nacos的订阅机制,如果用一句话来描述就是:Nacos客户端通过一个定时任务,每6秒从注册中心获取实例列表,当发现实例发生变化时,发布变更事件,订阅者进行业务处理(更新实例,更改本地缓存)。

​ 以下是订阅方法的主线流程,涉及内容比较多,细节比较复杂,所以这里我们主要学习核心部分。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Coadaqu-1677029556577)(image-20211025182722346.png)]

定时任务开启

​ 其实订阅本质上就是服务发现的一种方式,也就是在服务发现的时候执行订阅方法,触发定时任务去拉取服务端的数据。

​ NacosNamingService中暴露的许多重载的subscribe,重载的目的就是让大家少写一些参数,这些参数呢,Nacos给默认处理了。最终这些重载方法都会调用到下面这个方法:

@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);
}

​ 这里我们先来看subscribe方法,大家可能有些眼熟它是clientProxy类型调用的方法,实际上就是NamingClientProxyDelegate.subscribe(),所以其实这里和之前的服务发现中调用的是一个方法,这里其实是在做服务列表的查询,所以得出结论查询和订阅都调用了同一个方法

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 定时调度UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的ServiceInfoServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result) {// 如果为null,则进行订阅逻辑处理,基于gRPC协议result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// ServiceInfo本地缓存处理serviceInfoHolder.processServiceInfo(result);return result;
}

​ 但是这里我们要关注这里的任务调度,该方法包含了构建serviceKey、通过serviceKey判断重复、最后添加UpdateTask,而其中的addTask的实现就是发起了一个定时任务:

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//构建UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}

​ 定时任务延迟一秒执行:

private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

​ 所以在这里我们得出结论,核心为:调用订阅方法和发起定时任务。

定时任务执行内容

​ UpdateTask封装了订阅机制的核心业务逻辑,我们来看一下流程图:

​ 当我们知道了整体流程以后,我们再来看对应源码:

@Override
public void run() {long delayTime = DEFAULT_DELAY;try {// 判断是服务是否订阅和未开启过定时任务,如果订阅过直接不在执行if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);return;}// 获取缓存的service信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);// 如果为空if (serviceObj == null) {// 根据serviceName从注册中心服务端获取Service信息serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}// 过期服务,服务的最新更新时间小于等于缓存刷新(最后一次拉取数据的时间)时间,从注册中心重新查询if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新时间lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新缓存时间设置,默认6秒// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为0(可能会出现失败情况,没有ServiceInfo,连接失败)resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount有关,failCount=0,则下次调度时间为6秒,最长为1分钟// 即当无异常情况下缓存实例的刷新时间是6秒executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}

​ 业务逻辑最后会计算下一次定时任务的执行时间,通过delayTime来延迟执行。delayTime默认为 1000L * 6,也就是6秒。而在finally里面真的发起下一次定时任务。当出现异常时,下次执行的时间与失败次数有关,但最长不超过1分钟。

总结:

  1. 订阅方法的调用,并进行EventListener的注册,后面UpdateTask要用来进行判断;

  2. 通过委托代理类来处理订阅逻辑,此处与获取实例列表方法使用了同一个方法;

  3. 通过定时任务执行UpdateTask方法,默认执行间隔为6秒,当发生异常时会延长,但不超过1分钟;

  4. UpdateTask方法中会比较本地是否存在缓存,缓存是否过期。当不存在或过期时,查询注册中心,获取最新实例,更新最后获取时间,处理ServiceInfo。

  5. 重新计算定时任务时间,循环执行流程。

相关文章:

Spring Cloud Nacos源码讲解(七)- Nacos客户端服务订阅机制的核心流程

Nacos客户端服务订阅机制的核心流程 ​ 说起Nacos的服务订阅机制&#xff0c;大家会觉得比较难理解&#xff0c;那我们就来详细分析一下&#xff0c;那我们先从Nacos订阅的概述说起 Nacos订阅概述 ​ Nacos的订阅机制&#xff0c;如果用一句话来描述就是&#xff1a;Nacos客…...

【华为OD机试模拟题】用 C++ 实现 - 对称美学(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 获得完美走位(2023.Q1) 文章目录 最近更新的博客使用说明对称美学题目输入输出示例一输入输出说明示例二输入输出说明备注Code使用说明 参加华为od机试,一定要注意不要完全背诵代码࿰...

Go语言内存管理详解-学习笔记

1 自动内存管理 1.1 相关概念 Mutator&#xff1a;业务线程&#xff0c;分配新对象&#xff0c;修改对象指向关系Collector&#xff1a;GC线程&#xff0c;找到存活对象&#xff0c;回收死亡对象的内存空间Serial GC&#xff1a;只有一个collector&#xff08;需要暂停&#…...

Geospatial Data Science (4): Spatial weights

Geospatial Data Science (4): Spatial weights 在本节中,我们将学习空间分析中关键部分之一的来龙去脉:空间权重矩阵。这些是结构化的数字集,用于形式化数据集中观测值之间的地理关系。本质上,给定地理的空间权重矩阵是维度 N N N 乘以 N N N 的正定矩阵,其中...

JUC-Synchronized相关内容

设计同步器的意义多线程编程中&#xff0c;有可能会出现多个线程同时访问同一个共享、可变资源的情况&#xff0c;这个资源我们称之其为临界资源&#xff1b;这种资源可能是&#xff1a;对象、变量、文件等。共享&#xff1a;资源可以由多个线程同时访问可变&#xff1a;资源可…...

【c++】文件操作(文本文件、二进制文件)

文章目录文件操作文本文件写文件读文件二进制文件写文件读文件文件操作 程序运行时产生的数据都属于临时数据&#xff0c;程序一旦运行结束都会被释放&#xff1b; 通过文件可以将数据持久化&#xff1b; c中对文件操作需要包含头文件 文件类型分为两种&#xff1a; 1、文本文…...

带你了解IP报警柱的特点

IP可视报警柱是一款室外防水紧急求助可视对讲终端。安装在学校、广场、道路人流密集和案件高发区域&#xff0c;当发生紧急情况或需要咨询求助时按下呼叫按钮立即可与监控中心值班人员通话&#xff0c;值班人员也可通过前置摄像头了解现场情况并广播喊话。IP可视报警柱的使用特…...

一步步教你电脑变成服务器,tomcat的花生壳设置(原创)

1&#xff0c;首先你去https://console.oray.com/这网站注册个帐号&#xff0c;如果注册成功它会送你一个免费域名&#xff0c;当然不记得也没关系&#xff0c;你记住你注册的 帐号跟密码&#xff0c;然后下载它的软件&#xff08;花生壳动态域名6.0正式版&#xff09;有xp跟li…...

Python 卷积神经网络 ResNet的基本编写方法

ResNet&#xff08;Residual Network&#xff09;是由微软亚洲研究院提出的深度卷积神经网络&#xff0c;它在2015年的ImageNet挑战赛上取得了第一名的好成绩。ResNet最大的特点是使用了残差学习&#xff0c;可以解决深度网络退化问题。在传统的深度神经网络中&#xff0c;随着…...

【索引】什么是索引

&#x1f4d4; 笔记介绍 大家好&#xff0c;千寻简笔记是一套全部开源的企业开发问题记录&#xff0c;毫无保留给个人及企业免费使用&#xff0c;我是作者星辰&#xff0c;笔记内容整理并发布&#xff0c;内容有误请指出&#xff0c;笔记源码已开源&#xff0c;前往Gitee搜索《…...

【算法刷题】动态规划算法题型及方法归纳

动态规划特点 动态规划中每一个状态一定是由上一个状态推导出来&#xff0c;根据这个特点&#xff0c;可以在状态计算过程中&#xff0c;存储某一条件下的数据&#xff0c;当再次遍历该条件时&#xff0c;直接取该条件对应的数据即可&#xff0c;可以避免重复计算&#xff0c;…...

PolarDB数据库的CSN机制

背景 对postgres数据库熟悉的同学会发现在高并发场景下在获取快照处易出现性能瓶颈&#xff0c;其原因在于PG使用全局数组在共享内存中保存所有事务的状态&#xff0c;在获取快照时需要加锁以保证数据一致性。获取快照时需要持有ProcArraryLock共享锁比遍历ProcArray数组中活跃…...

使用kubeadm 部署kubernetes 1.26.1集群 Calico ToR配置

目录 机器信息 升级内核 系统配置 部署容器运行时Containerd 安装crictl客户端命令 配置服务器支持开启ipvs的前提条件 安装 kubeadm、kubelet 和 kubectl 初始化集群 &#xff08;master&#xff09; 安装CNI Calico 集群加入node节点 机器信息 主机名集群角色IP内…...

Servlet笔记(11):Servletcontext对象

1、什么是ServletContext ServletContext是一个全局储存空间&#xff0c;随服务器的生命周期变化&#xff0c; Cookie&#xff0c;Session&#xff0c;ServletContext的区别 Cookie&#xff1a; 存在于客户端的本地文本文件 Session&#xff1a; 存在于服务器的文本文件&#…...

EM算法是什么

EM算法是什么 EM算法&#xff08;Expectation-Maximization Algorithm&#xff09;是一种用于参数估计的迭代算法。它常被用于含有隐变量&#xff08;latent variable&#xff09;的概率模型中&#xff0c;例如高斯混合模型、隐马尔可夫模型等。 EM算法分为两个步骤&#xff…...

C++---线性dp---方格取数(每日一道算法2023.2.25)

注意事项&#xff1a; 本题属于"数字三角形"和"摘花生"两题的进阶版&#xff0c;建议优先看懂那两道&#xff0c;有助理解。 题目&#xff1a; 输入: 8 2 3 13 2 6 6 3 5 7 4 4 14 5 2 21 5 6 4 6 3 15 7 2 14 0 0 0输出&#xff1a; 67#include <cm…...

《第一行代码》 第八章:应用手机多媒体

一&#xff0c;使用通知 第一步&#xff0c;创建项目&#xff0c;书写布局 <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:orientation"vertical"android:layout_width"match_parent"android:layout_he…...

C++设计模式(20)——迭代器模式

亦称&#xff1a; Iterator 意图 迭代器模式是一种行为设计模式&#xff0c; 让你能在不暴露集合底层表现形式 &#xff08;列表、 栈和树等&#xff09; 的情况下遍历集合中所有的元素。 问题 集合是编程中最常使用的数据类型之一。 尽管如此&#xff0c; 集合只是一组对…...

戴尔Latitude 3410电脑 Hackintosh 黑苹果efi引导文件

原文来源于黑果魏叔官网&#xff0c;转载需注明出处。硬件型号驱动情况主板戴尔Latitude 3410处理器英特尔酷睿i7-10510U已驱动内存8GB已驱动硬盘SK hynix BC511 NVMe SSD已驱动显卡Intel UHD 620Nvidia GeForce MX230(屏蔽)无法驱动声卡Realtek ALC236已驱动网卡Realtek RTL81…...

一起Talk Android吧(第五百零四回:如何调整组件在约束布局中的位置)

文章目录 背景介绍调整方法一调整方法二经验分享各位看官们大家好,上一回中咱们说的例子是"解决retrofit被混淆后代码出错的问题",这一回中咱们说的例子是" 如何调整组件在约束布局中的位置"。闲话休提,言归正转, 让我们一起Talk Android吧! 背景介绍…...

ssh连不上实验室的物理机了

实验室的电脑&#xff0c;不能在校外用 ssh 连接了 192.168.1.33 是本地地址&#xff0c;掩码16位&#xff0c;图1。 192.168.1.14 是实验室的另一台可以ssh连接的物理机&#xff0c;掩码16。 192.168.0.1 是无线路由器地址。 192.168.0.2 是192.168.1.14上的虚拟机地址&#…...

selinux讲解

Selinux讲解 1、selinux的概述 Selinux的历史 Linux安全性与windows在不开启防御措施的时候是一样的&#xff1b;同样是C2级别的安全防护安全级别评定&#xff1a; D–>C1–>C2–>B1–>B2–>B3–>A1 D级&#xff0c;最低安全性C1级&#xff0c;主存取控制…...

【计算机网络】TCP底层设计交互原理

文章目录1.TCP底层三次握手详细流程2.TCP洪水攻击介绍和ss命令浅析3.Linux服务器TCP洪水攻击入侵案例4.TCP洪水攻击结果分析和解决方案5.TCP底层四次挥手详细流程1.TCP底层三次握手详细流程 TCP的可靠性传输机制&#xff1a;TCP三次我手的流程 一次握手&#xff1a;客户端发送一…...

Kotlin1.8新特性

Kotlin1.8.0新特性 新特性概述 JVM 的新实验性功能&#xff1a;递归复制或删除目录内容提升了 kotlin-reflect 性能新的 -Xdebug 编译器选项&#xff0c;提供更出色的调试体验kotlin-stdlib-jdk7 与 kotlin-stdlib-jdk8 合并为 kotlin-stdlib提升了 Objective-C/Swift 互操作…...

【Java8】

1、接口中默认方法修饰为普通方法 在jdk8之前&#xff0c;interface之中可以定义变量和方法&#xff0c;变量必须是public、static、final的&#xff0c;方法必须是public、abstract的&#xff0c;由于这些修饰符都是默认的。 接口定义方法: public抽象方法需要子类实现 接口定…...

阿里 Java 程序员面试经验分享,附带个人学习笔记、路线大纲

背景经历 当时我工作近5年&#xff0c;明显感觉到了瓶颈期。说句不好听的成了老油条&#xff0c;可以每天舒服的混日子&#xff08;这也有好处&#xff0c;有时间准备面试&#xff09;。这对于个人成长不利&#xff0c;长此以往可能面临大龄失业。所以我觉得需要痛下决心改变一…...

十大算法基础——上(共有20道例题,大多数为简单题)

一、枚举&#xff08;Enumerate&#xff09;算法 定义&#xff1a;就是一个个举例出来&#xff0c;然后看看符不符合条件。 举例&#xff1a;一个数组中的数互不相同&#xff0c;求其中和为0的数对的个数。 for (int i 0; i < n; i)for (int j 0; j < i; j)if (a[i] …...

【PAT甲级题解记录】1018 Public Bike Management (30 分)

【PAT甲级题解记录】1018 Public Bike Management (30 分) 前言 Problem&#xff1a;1018 Public Bike Management (30 分) Tags&#xff1a;dijkstra最短路径 DFS Difficulty&#xff1a;剧情模式 想流点汗 想流点血 死而无憾 Address&#xff1a;1018 Public Bike Managemen…...

SpringCloud————Eureka概述及单机注册中心搭建

Spring Cloud Eureka是Netflix开发的注册发现组件&#xff0c;本身是一个基于REST的服务。提供注册与发现&#xff0c;同时还提供了负载均衡、故障转移等能力。 Eureka组件的三个角色 服务中心服务提供者服务消费者 Eureka Server&#xff1a;服务器端。提供服务的注册和发现…...

原生django raw() 分页

def change_obj_to_dict(self,temp):dict {}dict["wxh_name"] temp.wxh_namedict["types"] temp.typesdict["subject"] temp.subjectdict["ids"] temp.ids# 虽然产品表里没有替代型号&#xff0c;但是通过sql语句的raw()查询可以…...