当前位置: 首页 > news >正文

七、Nacos源码系列:Nacos服务发现

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {// 创建DiscoveryClient bean对象@Beanpublic DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {return new NacosDiscoveryClient(nacosServiceDiscovery);}@Bean@ConditionalOnMissingBean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",matchIfMissing = true)public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);}}@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {// 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,// 如namespace、username、password、serverAddr等属性return new NacosDiscoveryProperties();}// 创建NacosServiceDiscovery bean对象@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);}}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);/*** Nacos Discovery Client Description.*/public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {// 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上private NacosDiscoveryProperties discoveryProperties;// nacos服务管理器对象private NacosServiceManager nacosServiceManager;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {this.discoveryProperties = discoveryProperties;this.nacosServiceManager = nacosServiceManager;}/*** 返回指定group和servic的所有实例*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NacosNamingService#selectInstances()方法List<Instance> instances = namingService().selectInstances(serviceId, group,true);// 将Instance包装成NacosServiceInstance对象返回return hostToServiceInstanceList(instances, serviceId);}/*** 返回指定group的所有服务名称*/public List<String> getServices() throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NamingGrpcClientProxy#getServiceList()方法ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);// 返回所有服务名称return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");if (instance.getMetadata() != null) {metadata.putAll(instance.getMetadata());}metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}private NamingService namingService() {return nacosServiceManager.getNamingService();}}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
//getServices()调用栈大体如下:namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);NacosNamingService#getServicesOfServerclientProxy.getServiceList(pageNo, pageSize, groupName, selector)NamingClientProxyDelegate#getServiceListgrpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {// 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}// 发送服务列表请求给Nacos服务端,接下来由服务端处理ServiceListResponse response = requestToServer(request, ServiceListResponse.class);// 组装返回值出去ListView<String> result = new ListView<>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {// ServiceManager.getInstance()通过单例返回一个ServiceManager对象/*** 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。* ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)* key: namespaceId* value: Set<Service>* 注册实例的时候,就往这个map写入了数据** ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:* namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))*/Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());// 构建响应结果ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());if (!serviceSet.isEmpty()) {// 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceACollection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// 按分页裁剪serviceNameSetList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);// NamingService#selectInstances(serviceName, groupName, healthy, true)// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 集群名称,使用逗号分隔String clusterString = StringUtils.join(clusters, ",");// 是否订阅,默认是订阅的if (subscribe) {/*** 1.从缓存中获取ServiceInfo* ConcurrentMap<String, ServiceInfo> serviceInfoMap* key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString* value: ServiceInfo*/// 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUPserviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);// 2.缓存为空,执行订阅服务if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 4.筛选满足条件的实例return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());// 组装服务名(带组名):groupName@@serviceName// 例如:DEFAULT_GROUP@@discovery-providerString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 如果指定了集群,那么key还会加上"@@clusters"String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// ConcurrentMap<String, ServiceInfo> serviceInfoMap// 从缓存中获取服务信息return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);// 服务名称(带组名)  格式:groupName@@serviceNameString serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);// 如果集群名称非空,key还需要拼接上集群名称String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 调度更新,往线程池中提交一个UpdateTask任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的服务信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {// 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(result);return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {if (!asyncQuerySubscribeService) {return;}// 组装key   格式:groupName@@serviceName@@clustersString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);// Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()// futureMap用于保存UpdateTask线程池任务的执行结果if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {// double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务if (futureMap.get(serviceKey) != null) {return;}// 往线程池中添加一个更新任务// UpdateTask实现了Runnable接口,需要关注其run()方法ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {// 延迟1s执行UpdateTaskreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;/*** the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty*/private int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}if (serviceObj.getLastRefTime() <= lastRefTime) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {// 记录失败次数incFailCount();return;}// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败次数resetFailCount();} catch (NacosException e) {handleNacosException(e);} catch (Throwable e) {handleUnknownException(e);} finally {if (!isCancel) {// 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}private void handleNacosException(NacosException e) {incFailCount();int errorCode = e.getErrCode();if (NacosException.SERVER_ERROR == errorCode) {handleUnknownException(e);}NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());}private void handleUnknownException(Throwable throwable) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}// 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,// 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe// 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);// 使用grpc发送服务订阅请求return doSubscribe(serviceName, groupName, clusters);
}public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);// private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();synchronized (subscribes) {subscribes.put(key, redoData);}
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 构建一个SubscribeServiceRequest客户端订阅请求// 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handleSubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);// grpc请求Nacos服务端进行订阅SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);// 标记SubscriberRedoData重做数据为已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 构建一个Service服务,指定为临时实例Service service = Service.newService(namespaceId, groupName, serviceName, true);// 构建Subscriber订阅者对象Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// serviceStorage.getData(service): 从缓存中获取serviceInfo// metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata// ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅服务clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}// 添加到订阅者列表中,实际上就是保存在map中// 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();// 发布客户端订阅事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 获取老的服务ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 重新存入客户端缓存中serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 对比下服务信息是否发生变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifierNotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 同步serviceInfo数据到本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {// 构建服务查询请求// Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handleServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);request.setCluster(clusters);request.setHealthyOnly(healthyOnly);request.setUdpPort(udpPort);// 通过grpc请求Nacos服务端处理QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String groupName = request.getGroupName();String serviceName = request.getServiceName();Service service = Service.newService(namespaceId, groupName, serviceName);String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();// 从缓存中获取serviceInfoServiceInfo result = serviceStorage.getData(service);// 从内存(map)获取ServiceMetadataServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<>();}// 遍历所有实例,直接移除掉不满足条件的实例Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 筛选出健康、启用、权重大于0的实例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;
}

3.5、总结图

 

相关文章:

七、Nacos源码系列:Nacos服务发现

目录 一、服务发现 二、getServices()&#xff1a;获取服务列表 2.1、获取服务列表 2.2、总结图 三、getInstances(serviceId)&#xff1a;获取服务实例列表 3.1、从缓存中获取服务信息 3.2、缓存为空&#xff0c;执行订阅服务 3.2.1、调度更新&#xff0c;往线程池中…...

Vue源码系列讲解——模板编译篇【一】(综述)

目录 1. 前言 2. 什么是模板编译 3. 整体渲染流程 4. 模板编译内部流程 4.1 抽象语法树AST 4.2 具体流程 5. 总结 1. 前言 在前几篇文章中&#xff0c;我们介绍了Vue中的虚拟DOM以及虚拟DOM的patch(DOM-Diff)过程&#xff0c;而虚拟DOM存在的必要条件是得先有VNode&…...

【机器学习】数据清洗之识别异常点

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步…...

MacOS 制作 TF 卡/ U 盘镜像

最近有张老的 TF 卡没办法直接拷贝里面的数据&#xff0c;于是打算利用 dd 工具直接全卡拷贝为镜像再分析里面的数据 在终端中&#xff0c;输入以下命令来列出所有磁盘设备&#xff1a; diskutil list这将显示Mac上所有的磁盘设备。你需要找到TF卡对应的设备&#xff0c;它通…...

怎么用postman调用webservice(反推SoapUI)

<soapenv:Envelope xmlns:soapenv“http://schemas.xmlsoap.org/soap/envelope/” xmlns:lis“LisDataTrasen”> soapenv:Header/ soapenv:Body lis:Test lis:test111111111</lis:test> </lis:Test> </soapenv:Body> </soapenv:Envelope> Conten…...

【开源】JAVA+Vue.js实现衣物搭配系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 衣物档案模块2.2 衣物搭配模块2.3 衣物收藏模块 三、系统设计3.1 用例设计3.2 E-R图设计3.3 数据库设计3.3.1 衣物档案表3.3.2 衣物搭配表3.3.3 衣物收藏表 四、系统实现4.1 登录页4.2 衣物档案模块4.3 衣物搭配模块4.4…...

【Flask + AI】接入CHATGLM API 实现翻译接口

【Flask AI】接入CHATGLM API 实现翻译接口 最近的项目中&#xff0c;需要加一个翻译功能&#xff0c;正好chatglm4发布了&#xff0c;于是决定着手用它实现。 https://chatglm.cn 准备 首先&#xff0c;在chatglm开发者中心申请api key&#xff0c;这里不再赘述 其次&…...

并发事务带来的问题及解决方法

引言 在数据库系统中&#xff0c;事务是指一组操作被视为一个逻辑单元&#xff0c;要么全部执行成功&#xff0c;要么全部不执行&#xff0c;保证数据库的一致性和完整性。而并发事务则是指多个事务同时执行的情况。虽然并发事务能够提高系统的性能和吞吐量&#xff0c;但也会…...

CRNN介绍:用于识别图中文本的深度学习模型

CRNN&#xff1a;用于识别图中文本的深度学习模型 CRNN介绍&#xff1a;用于识别图中文本的深度学习模型CRNN的结构组成部分工作原理 CRNN结构分析卷积层&#xff08;Convolutional Layers&#xff09;递归层&#xff08;Recurrent Layers&#xff09;转录层&#xff08;Transc…...

机器人运动学林沛群——变换矩阵

对于仅有移动&#xff0c;由上图可知&#xff1a; A P B P A P B o r g ^AP^BP^AP_{B org} APBPAPBorg​ 对于仅有转动&#xff0c;可得&#xff1a; A P B A R B P ^AP^A_BR^BP APBA​RBP 将转动与移动混合后&#xff0c;可得&#xff1a; 一个例子 在向量中&#xff…...

阿里云增加数据库访问白名单

阿里云增加数据库访问白名单 概况 我们希望在外网访问数据库时&#xff0c;可能会遇到无法连接的问题&#xff0c;这有可能是被拦截了。这时就需要去查看自己的ip有没有在白名单里面&#xff0c;没有的话就把ip加入到白名单。 路径 阿里云控制台-搜索RDS-进入RDS管理控制台…...

Rust基础拾遗--辅助功能

Rust基础拾遗 前言1.错误处理1.1 panic为什么是 Result 2. create与模块3. 宏4. 不安全代码5. 外部函数 前言 通过Rust程序设计-第二版笔记的形式对Rust相关重点知识进行汇总&#xff0c;读者通读此系列文章就可以轻松的把该语言基础捡起来。 1.错误处理 Rust 中的两类错误处理…...

【数据结构】双向链表(链表实现+测试+原码)

前言 在双向链表之前&#xff0c;如果需要查看单链表来复习一下&#xff0c;链接在这里&#xff1a; http://t.csdnimg.cn/Ib5qS 1.双向链表 1.1 链表的分类 实际中链表的结构非常多样&#xff0c;以下情况组合起来就有8种链表结构&#xff1a; 1.1.1 单向或者双向 1.1.2 …...

ChatGPT 3.5与4.0:深入解析技术进步与性能提升的关键数据

大家好&#xff0c;欢迎来到我的博客&#xff01;今天我们将详细比较两个引人注目的ChatGPT版本——3.5和4.0&#xff0c;通过一些关键数据来深入解析它们之间的差异以及4.0版本的技术进步。 1. 模型规模与参数 ChatGPT 3.5&#xff1a; 参数数量&#xff1a;约1.7亿个模型层数…...

前端JavaScript篇之ajax、axios、fetch的区别

目录 ajax、axios、fetch的区别AjaxAxiosFetch总结注意 ajax、axios、fetch的区别 在Web开发中&#xff0c;ajax、axios和fetch都是用于与服务器进行异步通信的技术&#xff0c;但它们在实现方式和功能上有所不同。 Ajax 定义与特点&#xff1a;Ajax是一种在无需重新加载整个…...

【PyTorch][chapter 15][李宏毅深度学习][Neighbor Embedding-LLE]

前言&#xff1a; 前面讲的都是线性降维&#xff0c;本篇主要讨论一下非线性降维. 流形学习&#xff08;mainfold learning&#xff09;是一类借鉴了拓扑流行概念的降维方法. 如上图,欧式距离上面 A 点跟C点更近&#xff0c;距离B 点较远 但是从图形拓扑结构来看&#xff0c; …...

在JSP中实现JAVABEAN

在JSP中实现JAVABEAN 问题陈述 创建Web应用程序以连接数据库并检索作者名、地址、城市、州及邮政编码等与作者的详细信息。JavaBean组件应接受作者ID、驱动程序名及URL作为参数。信息要从authors表中检索。 解决方案 要解决上述问题,需要执行以下任务: 创建Web应用程序。创…...

智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码) 源码设计 %%%% clear all clc SearchAgents_no=100; % Number of search ag...

LSF 主机状态 unreach 分析

在LSF集群运行过程中&#xff0c;有主机状态变为 unreach。熟悉LSF的朋友都知道主机状态为 unreach 表示主机上的 SBD 服务中断服务了&#xff0c;但其它服务 LIM 和 RES 还在正常运行。 影响分析 那么主机上的 SBD 服务中断的影响是什么呢&#xff1f; 我们需要先明白 SBD …...

SpringBoot日志

自定义日志 导入的是slf4j的Logger类 package app.controller;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping;RestController pu…...

006集——where语句进行属性筛选——arcgis

在arcgis中&#xff0c; dBASE 文件除了 WHERE 语句以外&#xff0c;不支持 其它 SQL 命令。选择窗口如下&#xff1a; 首先&#xff0c;我们了解下什么是where语句。 WHERE语句是SQL语言中使用频率很高的一种语句。它的作用是从数据库表中选择一些特定的记录行来进行操作。WHE…...

《动手学深度学习(PyTorch版)》笔记8.3

注&#xff1a;书中对代码的讲解并不详细&#xff0c;本文对很多细节做了详细注释。另外&#xff0c;书上的源代码是在Jupyter Notebook上运行的&#xff0c;较为分散&#xff0c;本文将代码集中起来&#xff0c;并加以完善&#xff0c;全部用vscode在python 3.9.18下测试通过&…...

静态时序分析:建立时间分析

静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 在静态时序分析中&#xff0c;建立时间检查约束了触发器时钟引脚&#xff08;时钟路径&#xff09;和输入数据引脚&#xff08;数据路径&#xff09;之间的时序关系&#x…...

深入探究 HTTP 简化:httplib 库介绍

✏️心若有所向往&#xff0c;何惧道阻且长 文章目录 简介特性主要类介绍httplib::Server类httplib::Client类httplib::Request类httplib::Response类 示例服务器客户端 总结 简介 在当今的软件开发中&#xff0c;与网络通信相关的任务变得日益普遍。HTTP&#xff08;Hypertext…...

ARP欺骗攻击利用之抓取https协议的用户名与密码

1.首先安装sslstrip 命令执行&#xff1a;apt-get install sslstrip 2.启动arp欺骗 arpspoof -i ech0 -t 192.168.159.148 192.168.159.2 arpspoof -i ech0(网卡) -t 目标机ip 本地局域网关 3.命令行输入: vim /etc/ettercap/etter.conf进入配置文件 找到下红框的内容&a…...

<s-table>、<a-table>接收后端数据

对于 中的 <template #bodyCell“{ column, record }”> &#xff1a; <s-tableref"table":columns"columns":data"loadData":alert"options.alert.show"bordered:row-key"(record) > record.id":tool-config&…...

[数学]高斯消元

介绍 用处&#xff1a;求解线性方程组 加减消元法和代入消元法 这里引用了高斯消元解线性方程组----C实现_c用高斯消元法解线性方程组-CSDN博客 改成了自己常用的形式&#xff1a; int gauss() {int c, r; // column, rowfor (c 1, r 1; c < n; c ){int maxx r; //…...

【Linux】gdb调试与make/makefile工具

目录 导读 1. make/Makefile 1.1 引入 1.2 概念 1.3 语法规则 1.4 示例 2. Linux调试器-gdb 2.1 引入 2.2 概念 2.3 使用 导读 我们在上次讲了Linux编辑器gcc\g的使用&#xff0c;今天我们就来进一步的学习如何调试&#xff0c;以及makefile这个强大的工具。 1. mak…...

使用Arcgis裁剪

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、掩膜提取二、随意裁剪三、裁剪 前言 因为从网站下载的是全球气候数据&#xff0c;而我们需要截取成中国部分&#xff0c;需要用到Arcgis的裁剪工具 一、掩…...

sheng的学习笔记-网络爬虫scrapy框架

基础知识&#xff1a; scrapy介绍 何为框架&#xff0c;就相当于一个封装了很多功能的结构体&#xff0c;它帮我们把主要的结构给搭建好了&#xff0c;我们只需往骨架里添加内容就行。scrapy框架是一个为了爬取网站数据&#xff0c;提取数据的框架&#xff0c;我们熟知爬虫总…...

平乡县网站建设/打开app下载

一、安装docker 1、Docker 要求 CentOS 系统的内核版本高于 3.10 &#xff0c;查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。 通过 uname -r 命令查看你当前的内核版本 $ uname -r 2、使用 root 权限登录 Centos。确保 yum 包更新到最新。 $ sudo yum upda…...

北京建设网站公司哪家好/足球排名最新排名世界

Shallow Convolutional Neural Network for Implicit Discourse Relation Recognition 略读&#xff0c;科普&#xff0c;1h Motivation 浅层卷积神经网络进行隐式篇章关系识别&#xff0c;浅层结构减轻了过拟合问题&#xff0c;而卷积和非线性操作有助于保持我们的模型的识别和…...

可以用自己的电脑做网站主机/windows优化大师win10

1.先交VS2012的ISO通过Ultraiso载入2.DOS命中输入 I:\vs_ultimate.exe /uninstall /force 或 I:vs_ultimate.exe /uninstall /force 都OK&#xff0c;参数运行卸载的&#xff08;在此之前应先手动卸载vs的各种外来插件或组件&#xff0c;然后再卸载vssdk、vs&#xff0c;我按相…...

深圳网站制作的公司网络服务/搜狗站长平台打不开

原文&#xff1a;https://automatetheboringstuff.com/2e/chapter3/ 您已经熟悉了前几章中的print()、input()和len()函数。Python 提供了几个这样的内置函数&#xff0c;但是您也可以编写自己的函数。函数就像一个程序中的一个小程序。 为了更好地理解函数是如何工作的&#…...

网页微信能不能传文件/企业seo优化

require 的使用方法如 require("MyRequireFile.php"); 。这个函数通常放在 PHP 程序的最前面&#xff0c;PHP 程序在执行前&#xff0c;就会先读入 require 所指定引入的文件&#xff0c;使它变成 PHP 程序网页的一部份。常用的函数&#xff0c;亦可以这个方法将它引…...

用什么技术来做网站/天津seo网站排名优化公司

参考&#xff1a;https://blog.csdn.net/wangxueming/article/details/73294777 使用方式,使用场景 standard:普通创建 ----------- 普通activity singleTop&#xff1a;顶上不是target Activity,new 一个 ----------需要展示推送过来的消息 singletask:顶上不是targetActivit…...