七、Nacos源码系列:Nacos服务发现
目录
一、服务发现
二、getServices():获取服务列表
2.1、获取服务列表
2.2、总结图
三、getInstances(serviceId):获取服务实例列表
3.1、从缓存中获取服务信息
3.2、缓存为空,执行订阅服务
3.2.1、调度更新,往线程池中提交一个UpdateTask任务
3.2.2、订阅服务
3.2.3、处理服务信息
3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求
3.4、筛选满足条件的实例
3.5、总结图
一、服务发现
在discovery-provider项目的pom.xml中,我们引入了如下依赖:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.9.RELEASE</version>
</dependency>
SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:
如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。
相关源码如下:
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {// 创建DiscoveryClient bean对象@Beanpublic DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {return new NacosDiscoveryClient(nacosServiceDiscovery);}@Bean@ConditionalOnMissingBean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",matchIfMissing = true)public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);}}@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {// 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,// 如namespace、username、password、serverAddr等属性return new NacosDiscoveryProperties();}// 创建NacosServiceDiscovery bean对象@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);}}
在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。
重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:
public class NacosDiscoveryClient implements DiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);/*** Nacos Discovery Client Description.*/public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}}
可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。
public class NacosServiceDiscovery {// 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上private NacosDiscoveryProperties discoveryProperties;// nacos服务管理器对象private NacosServiceManager nacosServiceManager;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {this.discoveryProperties = discoveryProperties;this.nacosServiceManager = nacosServiceManager;}/*** 返回指定group和servic的所有实例*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NacosNamingService#selectInstances()方法List<Instance> instances = namingService().selectInstances(serviceId, group,true);// 将Instance包装成NacosServiceInstance对象返回return hostToServiceInstanceList(instances, serviceId);}/*** 返回指定group的所有服务名称*/public List<String> getServices() throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NamingGrpcClientProxy#getServiceList()方法ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);// 返回所有服务名称return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");if (instance.getMetadata() != null) {metadata.putAll(instance.getMetadata());}metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}private NamingService namingService() {return nacosServiceManager.getNamingService();}}
接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。
二、getServices():获取服务列表
2.1、获取服务列表
// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
//getServices()调用栈大体如下:namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);NacosNamingService#getServicesOfServerclientProxy.getServiceList(pageNo, pageSize, groupName, selector)NamingClientProxyDelegate#getServiceListgrpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {// 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}// 发送服务列表请求给Nacos服务端,接下来由服务端处理ServiceListResponse response = requestToServer(request, ServiceListResponse.class);// 组装返回值出去ListView<String> result = new ListView<>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;
}
接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。
// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {// ServiceManager.getInstance()通过单例返回一个ServiceManager对象/*** 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。* ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)* key: namespaceId* value: Set<Service>* 注册实例的时候,就往这个map写入了数据** ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:* namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))*/Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());// 构建响应结果ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());if (!serviceSet.isEmpty()) {// 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceACollection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// 按分页裁剪serviceNameSetList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;
}
从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。
2.2、总结图
三、getInstances(serviceId):获取服务实例列表
// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);// NamingService#selectInstances(serviceName, groupName, healthy, true)// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 集群名称,使用逗号分隔String clusterString = StringUtils.join(clusters, ",");// 是否订阅,默认是订阅的if (subscribe) {/*** 1.从缓存中获取ServiceInfo* ConcurrentMap<String, ServiceInfo> serviceInfoMap* key: groupName@@serviceName 或者 groupName@@serviceName@@clusterString* value: ServiceInfo*/// 示例:serviceName=discovery-provider groupName=DEFAULT_GROUPserviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);// 2.缓存为空,执行订阅服务if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 4.筛选满足条件的实例return selectInstances(serviceInfo, healthy);
}
上述流程的基本逻辑为:
- 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
- 如果是非订阅模式,那就直接请求服务器端,获得服务信息。
3.1、从缓存中获取服务信息
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());// 组装服务名(带组名):groupName@@serviceName// 例如:DEFAULT_GROUP@@discovery-providerString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 如果指定了集群,那么key还会加上"@@clusters"String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// ConcurrentMap<String, ServiceInfo> serviceInfoMap// 从缓存中获取服务信息return serviceInfoMap.get(key);
}
3.2、缓存为空,执行订阅服务
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
实际上调用的是NamingClientProxyDelegate#subscribe()方法:
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);// 服务名称(带组名) 格式:groupName@@serviceNameString serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);// 如果集群名称非空,key还需要拼接上集群名称String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 调度更新,往线程池中提交一个UpdateTask任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的服务信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {// 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(result);return result;
}
主要逻辑有下面三个,分析如下。
3.2.1、调度更新,往线程池中提交一个UpdateTask任务
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {if (!asyncQuerySubscribeService) {return;}// 组装key 格式:groupName@@serviceName@@clustersString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);// Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()// futureMap用于保存UpdateTask线程池任务的执行结果if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {// double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务if (futureMap.get(serviceKey) != null) {return;}// 往线程池中添加一个更新任务// UpdateTask实现了Runnable接口,需要关注其run()方法ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {// 延迟1s执行UpdateTaskreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:
public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;/*** the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty*/private int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}if (serviceObj.getLastRefTime() <= lastRefTime) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {// 记录失败次数incFailCount();return;}// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败次数resetFailCount();} catch (NacosException e) {handleNacosException(e);} catch (Throwable e) {handleUnknownException(e);} finally {if (!isCancel) {// 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}private void handleNacosException(NacosException e) {incFailCount();int errorCode = e.getErrCode();if (NacosException.SERVER_ERROR == errorCode) {handleUnknownException(e);}NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());}private void handleUnknownException(Throwable throwable) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}
}
run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。
这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。
3.2.2、订阅服务
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}// 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,// 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe// 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);// 使用grpc发送服务订阅请求return doSubscribe(serviceName, groupName, clusters);
}public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);// private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();synchronized (subscribes) {subscribes.put(key, redoData);}
}
订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。
我们来看下如何订阅服务的。
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 构建一个SubscribeServiceRequest客户端订阅请求// 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handleSubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);// grpc请求Nacos服务端进行订阅SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);// 标记SubscriberRedoData重做数据为已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}
通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。
public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 构建一个Service服务,指定为临时实例Service service = Service.newService(namespaceId, groupName, serviceName, true);// 构建Subscriber订阅者对象Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// serviceStorage.getData(service): 从缓存中获取serviceInfo// metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata// ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅服务clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}
我们重点关注订阅服务的方法:
public void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}// 添加到订阅者列表中,实际上就是保存在map中// 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();// 发布客户端订阅事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}
将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,
核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
3.2.3、处理服务信息
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 获取老的服务ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 重新存入客户端缓存中serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 对比下服务信息是否发生变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifierNotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 同步serviceInfo数据到本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}
3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求
真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {// 构建服务查询请求// Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handleServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);request.setCluster(clusters);request.setHealthyOnly(healthyOnly);request.setUdpPort(udpPort);// 通过grpc请求Nacos服务端处理QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);return response.getServiceInfo();
}
queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。
public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String groupName = request.getGroupName();String serviceName = request.getServiceName();Service service = Service.newService(namespaceId, groupName, serviceName);String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();// 从缓存中获取serviceInfoServiceInfo result = serviceStorage.getData(service);// 从内存(map)获取ServiceMetadataServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());return QueryServiceResponse.buildSuccessResponse(result);
}
3.4、筛选满足条件的实例
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<>();}// 遍历所有实例,直接移除掉不满足条件的实例Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 筛选出健康、启用、权重大于0的实例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;
}
3.5、总结图
相关文章:
七、Nacos源码系列:Nacos服务发现
目录 一、服务发现 二、getServices():获取服务列表 2.1、获取服务列表 2.2、总结图 三、getInstances(serviceId):获取服务实例列表 3.1、从缓存中获取服务信息 3.2、缓存为空,执行订阅服务 3.2.1、调度更新,往线程池中…...
Vue源码系列讲解——模板编译篇【一】(综述)
目录 1. 前言 2. 什么是模板编译 3. 整体渲染流程 4. 模板编译内部流程 4.1 抽象语法树AST 4.2 具体流程 5. 总结 1. 前言 在前几篇文章中,我们介绍了Vue中的虚拟DOM以及虚拟DOM的patch(DOM-Diff)过程,而虚拟DOM存在的必要条件是得先有VNode&…...
【机器学习】数据清洗之识别异常点
🎈个人主页:甜美的江 🎉欢迎 👍点赞✍评论⭐收藏 🤗收录专栏:机器学习 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步…...
MacOS 制作 TF 卡/ U 盘镜像
最近有张老的 TF 卡没办法直接拷贝里面的数据,于是打算利用 dd 工具直接全卡拷贝为镜像再分析里面的数据 在终端中,输入以下命令来列出所有磁盘设备: diskutil list这将显示Mac上所有的磁盘设备。你需要找到TF卡对应的设备,它通…...
怎么用postman调用webservice(反推SoapUI)
<soapenv:Envelope xmlns:soapenv“http://schemas.xmlsoap.org/soap/envelope/” xmlns:lis“LisDataTrasen”> soapenv:Header/ soapenv:Body lis:Test lis:test111111111</lis:test> </lis:Test> </soapenv:Body> </soapenv:Envelope> Conten…...
【开源】JAVA+Vue.js实现衣物搭配系统
目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 衣物档案模块2.2 衣物搭配模块2.3 衣物收藏模块 三、系统设计3.1 用例设计3.2 E-R图设计3.3 数据库设计3.3.1 衣物档案表3.3.2 衣物搭配表3.3.3 衣物收藏表 四、系统实现4.1 登录页4.2 衣物档案模块4.3 衣物搭配模块4.4…...
【Flask + AI】接入CHATGLM API 实现翻译接口
【Flask AI】接入CHATGLM API 实现翻译接口 最近的项目中,需要加一个翻译功能,正好chatglm4发布了,于是决定着手用它实现。 https://chatglm.cn 准备 首先,在chatglm开发者中心申请api key,这里不再赘述 其次&…...
并发事务带来的问题及解决方法
引言 在数据库系统中,事务是指一组操作被视为一个逻辑单元,要么全部执行成功,要么全部不执行,保证数据库的一致性和完整性。而并发事务则是指多个事务同时执行的情况。虽然并发事务能够提高系统的性能和吞吐量,但也会…...
CRNN介绍:用于识别图中文本的深度学习模型
CRNN:用于识别图中文本的深度学习模型 CRNN介绍:用于识别图中文本的深度学习模型CRNN的结构组成部分工作原理 CRNN结构分析卷积层(Convolutional Layers)递归层(Recurrent Layers)转录层(Transc…...
机器人运动学林沛群——变换矩阵
对于仅有移动,由上图可知: A P B P A P B o r g ^AP^BP^AP_{B org} APBPAPBorg 对于仅有转动,可得: A P B A R B P ^AP^A_BR^BP APBARBP 将转动与移动混合后,可得: 一个例子 在向量中ÿ…...
阿里云增加数据库访问白名单
阿里云增加数据库访问白名单 概况 我们希望在外网访问数据库时,可能会遇到无法连接的问题,这有可能是被拦截了。这时就需要去查看自己的ip有没有在白名单里面,没有的话就把ip加入到白名单。 路径 阿里云控制台-搜索RDS-进入RDS管理控制台…...
Rust基础拾遗--辅助功能
Rust基础拾遗 前言1.错误处理1.1 panic为什么是 Result 2. create与模块3. 宏4. 不安全代码5. 外部函数 前言 通过Rust程序设计-第二版笔记的形式对Rust相关重点知识进行汇总,读者通读此系列文章就可以轻松的把该语言基础捡起来。 1.错误处理 Rust 中的两类错误处理…...
【数据结构】双向链表(链表实现+测试+原码)
前言 在双向链表之前,如果需要查看单链表来复习一下,链接在这里: http://t.csdnimg.cn/Ib5qS 1.双向链表 1.1 链表的分类 实际中链表的结构非常多样,以下情况组合起来就有8种链表结构: 1.1.1 单向或者双向 1.1.2 …...
ChatGPT 3.5与4.0:深入解析技术进步与性能提升的关键数据
大家好,欢迎来到我的博客!今天我们将详细比较两个引人注目的ChatGPT版本——3.5和4.0,通过一些关键数据来深入解析它们之间的差异以及4.0版本的技术进步。 1. 模型规模与参数 ChatGPT 3.5: 参数数量:约1.7亿个模型层数…...
前端JavaScript篇之ajax、axios、fetch的区别
目录 ajax、axios、fetch的区别AjaxAxiosFetch总结注意 ajax、axios、fetch的区别 在Web开发中,ajax、axios和fetch都是用于与服务器进行异步通信的技术,但它们在实现方式和功能上有所不同。 Ajax 定义与特点:Ajax是一种在无需重新加载整个…...
【PyTorch][chapter 15][李宏毅深度学习][Neighbor Embedding-LLE]
前言: 前面讲的都是线性降维,本篇主要讨论一下非线性降维. 流形学习(mainfold learning)是一类借鉴了拓扑流行概念的降维方法. 如上图,欧式距离上面 A 点跟C点更近,距离B 点较远 但是从图形拓扑结构来看, …...
在JSP中实现JAVABEAN
在JSP中实现JAVABEAN 问题陈述 创建Web应用程序以连接数据库并检索作者名、地址、城市、州及邮政编码等与作者的详细信息。JavaBean组件应接受作者ID、驱动程序名及URL作为参数。信息要从authors表中检索。 解决方案 要解决上述问题,需要执行以下任务: 创建Web应用程序。创…...
智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码)
文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码) 源码设计 %%%% clear all clc SearchAgents_no=100; % Number of search ag...
LSF 主机状态 unreach 分析
在LSF集群运行过程中,有主机状态变为 unreach。熟悉LSF的朋友都知道主机状态为 unreach 表示主机上的 SBD 服务中断服务了,但其它服务 LIM 和 RES 还在正常运行。 影响分析 那么主机上的 SBD 服务中断的影响是什么呢? 我们需要先明白 SBD …...
SpringBoot日志
自定义日志 导入的是slf4j的Logger类 package app.controller;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.GetMapping;RestController pu…...
006集——where语句进行属性筛选——arcgis
在arcgis中, dBASE 文件除了 WHERE 语句以外,不支持 其它 SQL 命令。选择窗口如下: 首先,我们了解下什么是where语句。 WHERE语句是SQL语言中使用频率很高的一种语句。它的作用是从数据库表中选择一些特定的记录行来进行操作。WHE…...
《动手学深度学习(PyTorch版)》笔记8.3
注:书中对代码的讲解并不详细,本文对很多细节做了详细注释。另外,书上的源代码是在Jupyter Notebook上运行的,较为分散,本文将代码集中起来,并加以完善,全部用vscode在python 3.9.18下测试通过&…...
静态时序分析:建立时间分析
静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 在静态时序分析中,建立时间检查约束了触发器时钟引脚(时钟路径)和输入数据引脚(数据路径)之间的时序关系&#x…...
深入探究 HTTP 简化:httplib 库介绍
✏️心若有所向往,何惧道阻且长 文章目录 简介特性主要类介绍httplib::Server类httplib::Client类httplib::Request类httplib::Response类 示例服务器客户端 总结 简介 在当今的软件开发中,与网络通信相关的任务变得日益普遍。HTTP(Hypertext…...
ARP欺骗攻击利用之抓取https协议的用户名与密码
1.首先安装sslstrip 命令执行:apt-get install sslstrip 2.启动arp欺骗 arpspoof -i ech0 -t 192.168.159.148 192.168.159.2 arpspoof -i ech0(网卡) -t 目标机ip 本地局域网关 3.命令行输入: vim /etc/ettercap/etter.conf进入配置文件 找到下红框的内容&a…...
<s-table>、<a-table>接收后端数据
对于 中的 <template #bodyCell“{ column, record }”> : <s-tableref"table":columns"columns":data"loadData":alert"options.alert.show"bordered:row-key"(record) > record.id":tool-config&…...
[数学]高斯消元
介绍 用处:求解线性方程组 加减消元法和代入消元法 这里引用了高斯消元解线性方程组----C实现_c用高斯消元法解线性方程组-CSDN博客 改成了自己常用的形式: int gauss() {int c, r; // column, rowfor (c 1, r 1; c < n; c ){int maxx r; //…...
【Linux】gdb调试与make/makefile工具
目录 导读 1. make/Makefile 1.1 引入 1.2 概念 1.3 语法规则 1.4 示例 2. Linux调试器-gdb 2.1 引入 2.2 概念 2.3 使用 导读 我们在上次讲了Linux编辑器gcc\g的使用,今天我们就来进一步的学习如何调试,以及makefile这个强大的工具。 1. mak…...
使用Arcgis裁剪
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、掩膜提取二、随意裁剪三、裁剪 前言 因为从网站下载的是全球气候数据,而我们需要截取成中国部分,需要用到Arcgis的裁剪工具 一、掩…...
sheng的学习笔记-网络爬虫scrapy框架
基础知识: scrapy介绍 何为框架,就相当于一个封装了很多功能的结构体,它帮我们把主要的结构给搭建好了,我们只需往骨架里添加内容就行。scrapy框架是一个为了爬取网站数据,提取数据的框架,我们熟知爬虫总…...
平乡县网站建设/打开app下载
一、安装docker 1、Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。 通过 uname -r 命令查看你当前的内核版本 $ uname -r 2、使用 root 权限登录 Centos。确保 yum 包更新到最新。 $ sudo yum upda…...
北京建设网站公司哪家好/足球排名最新排名世界
Shallow Convolutional Neural Network for Implicit Discourse Relation Recognition 略读,科普,1h Motivation 浅层卷积神经网络进行隐式篇章关系识别,浅层结构减轻了过拟合问题,而卷积和非线性操作有助于保持我们的模型的识别和…...
可以用自己的电脑做网站主机/windows优化大师win10
1.先交VS2012的ISO通过Ultraiso载入2.DOS命中输入 I:\vs_ultimate.exe /uninstall /force 或 I:vs_ultimate.exe /uninstall /force 都OK,参数运行卸载的(在此之前应先手动卸载vs的各种外来插件或组件,然后再卸载vssdk、vs,我按相…...
深圳网站制作的公司网络服务/搜狗站长平台打不开
原文:https://automatetheboringstuff.com/2e/chapter3/ 您已经熟悉了前几章中的print()、input()和len()函数。Python 提供了几个这样的内置函数,但是您也可以编写自己的函数。函数就像一个程序中的一个小程序。 为了更好地理解函数是如何工作的&#…...
网页微信能不能传文件/企业seo优化
require 的使用方法如 require("MyRequireFile.php"); 。这个函数通常放在 PHP 程序的最前面,PHP 程序在执行前,就会先读入 require 所指定引入的文件,使它变成 PHP 程序网页的一部份。常用的函数,亦可以这个方法将它引…...
用什么技术来做网站/天津seo网站排名优化公司
参考:https://blog.csdn.net/wangxueming/article/details/73294777 使用方式,使用场景 standard:普通创建 ----------- 普通activity singleTop:顶上不是target Activity,new 一个 ----------需要展示推送过来的消息 singletask:顶上不是targetActivit…...