政府网站建设项目背景/全网整合营销外包
前言
上一文中,我们从官方的图示了解到Nacos的服务数据结构。但我关心的是,Nacos2.x不是重构了吗?怎么还是这种数据结构?我推测,必然是为了对Nacos1.x的兼容,实际存储应该不是这样的。于是,沿着这个问题出发我们一起来翻一下源码。
从NamingService的使用开始
在扎入源码之前,我们需要回忆一下,我们是怎么使用Nacos的?
- 构建NamingService
NamingService serviceRegistry = NacosFactory.createNamingService(properties);
实际上,这个动作的背后,意味着我们连接了Nacos服务端。 - 注册服务
serviceRegistry.registerInstance(serviceName, groupName, instance);
- 查询服务
serviceRegistry.getAllInstances(serviceName, groupName, List.of(clusterName));
因此,我们就沿着这几个操作,摸一摸源码。
!!!高能警告!!!
没有耐心看源码的同学,可以直接翻到总结,直接看结论。
构建NamingService
客户端
// com.alibaba.nacos.client.naming.NacosNamingService/*** 初始化方法* <p>由NacosNamingService构造器调用,用于初始NamingService</p>*/private void init(Properties properties) throws NacosException {final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);// 省略...// 创建客户端this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);}// com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate/*** NamingClientProxyDelegate构造器*/ public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,InstancesChangeNotifier changeNotifier) throws NacosException {// 省略...// 初始化了两个客户端,一个是Http,另一个是Grpc。不过,在注册实例时,如果该实例为临时实例,则使用Grpc,因此我们重点关注Grpcthis.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);}// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy/*** NamingGrpcClientProxy构造器*/ public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {// 省略...// 创建RPC客户端this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);// 启动客户端start(serverListFactory, serviceInfoHolder);}// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxyprivate void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 启动客户端rpcClient.start();NotifyCenter.registerSubscriber(this);}// com.alibaba.nacos.common.remote.client.RpcClient#start/*** 启动客户端*/public final void start() throws NacosException {// 控制只启动一次boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}// 创建一个只有2个线程的定时任务线程池clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});// 提交-处理连接事件的TaskclientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take;take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}}});// 提交-心跳任务clientEventExecutor.submit(() -> {while (true) {// 由于这里有一大堆逻辑,省略。// 1. 超过时间间隔,发起心跳请求// 1.1 心跳请求失败,记录当前状态为不健康,并记录上下文。// 1.2 检查当前配置的推荐的Nacos服务器是否在服务器列表中。在,则尝试重新连接推荐的服务器。});// connect to server, try to connect to server sync retryTimes times, async starting if failed.// 连接服务端,尝试retryTimes次,同步地连接服务端,如果依然失败,则改为异步连接。Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = rpcClientConfig.retryTimes();while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;ServerInfo serverInfo = nextRpcServer();// 连接服务器connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER,"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);}}if (connectToServer != null) {this.currentConnection = connectToServer;rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {switchServerAsync();}registerServerRequestHandler(new ConnectResetRequestHandler());// register client detection request.// 注册客户端检测请求处理器,用于响应服务端的探测registerServerRequestHandler(request -> {if (request instanceof ClientDetectionRequest) {return new ClientDetectionResponse();}return null;});}
服务端-处理连接请求
服务端的源码首先我们得找到GrpcServer
@Overridepublic void startServer() throws Exception {// 1. 创建请求处理器注册器final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();// 2. 注册请求处理器,并封装拦截器器。封装后,有点类似于SpringMVC的HandlerAdapteraddServices(handlerRegistry, new GrpcConnectionInterceptor());NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());// 省略server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry).compressorRegistry(CompressorRegistry.getDefaultInstance()).decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addTransportFilter(new AddressTransportFilter(connectionManager)).keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS).keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS).permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();// 启动服务server.start();}private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {// unary common call register.// 通用调用注册final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,GrpcServerConstants.REQUEST_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();// 定义服务器调用处理器。核心处理逻辑可就在这lambda表达式定义的匿名内部类里了。也只有一个方法:// grpcCommonRequestAcceptor.request(request, responseObserver)final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));// bi stream register.// bi流式调用服务,主要是连接请求、连接断开// 核心处理逻辑:// grpcBiStreamRequestAcceptor.requestBiStream(responseObserver)final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));}
处理连接请求:
// com.alibaba.nacos.core.remote.grpc.GrpcBiStreamRequestAcceptor@Overridepublic StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {final String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();final Integer localPort = GrpcServerConstants.CONTEXT_KEY_CONN_LOCAL_PORT.get();final int remotePort = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_PORT.get();String remoteIp = GrpcServerConstants.CONTEXT_KEY_CONN_REMOTE_IP.get();String clientIp = "";@Overridepublic void onNext(Payload payload) {// 处理连接请求clientIp = payload.getMetadata().getClientIp();traceDetailIfNecessary(payload);Object parseObj;// 省略...// 检查if (parseObj instanceof ConnectionSetupRequest) {ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;// 设置label,省略// 构建ConnectionConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());metaInfo.setTenant(setUpRequest.getTenant());// 第三个参数Channel,是发生网路数据的关键Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());connection.setAbilities(setUpRequest.getAbilities());boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();// 注册连接, 重点在 “或” 条件上// connectionManager.registerif (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {//Not register to the connection manager if current server is over limit or server is starting.// 如果当前服务器已超限制,或者服务器还在启动过程中,则注册失败。connection.request(new ConnectResetRequest(), 3000L);connection.close();}} // 省略。。。}// 省略。。。};return streamObserver;}
这里出现了我们接触到的第一个概念:Connection-连接,他有个属性ConnectionMeta,记录连接相关的信息。当需要发起请求时,他会将这些信息设置到Request中,然后通过GrpcUtils转换成Payload发出请求
继续看com.alibaba.nacos.core.remote.ConnectionManager#register
public synchronized boolean register(String connectionId, Connection connection) {if (connection.isConnected()) {String clientIp = connection.getMetaInfo().clientIp;// 省略入参检查// 注册客户端connections.put(connectionId, connection);// 登记客户端IPif (!connectionForClientIp.containsKey(clientIp)) {connectionForClientIp.put(clientIp, new AtomicInteger(0));}connectionForClientIp.get(clientIp).getAndIncrement();// 通知客户端连接ListenerclientConnectionEventListenerRegistry.notifyClientConnected(connection);return true;}return false;}
此处出现第一个Manager:ConnectionManager。用来管理所有客户端的连接。登记连接后,调用了所有的Listener的clientConnected方法。其中,有个ConnectionBasedClientManager,看名字就知道,可能是负责管理客户端的。
// > ConnectionBasedClientManager#clientConnected(com.alibaba.nacos.core.remote.Connection)// > ConnectionBasedClientManager#clientConnected(java.lang.String, com.alibaba.nacos.naming.core.v2.client.ClientAttributes)// ConnectionBasedClientManager@Overridepublic boolean clientConnected(String clientId, ClientAttributes attributes) {String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);// 通过ClientFactory创建客户端// 从以上的两行代码,我们通过ClientConstants.CONNECTION_TYPE就知道工厂是ConnectionBasedClientFactory,对应的客户端自然是ConnectionBasedClientreturn clientConnected(clientFactory.newClient(clientId, attributes));}@Overridepublic boolean clientConnected(final Client client) {// 登记客户端clients.computeIfAbsent(client.getClientId(), s -> {return (ConnectionBasedClient) client;});return true;}
至此,我们又发现一个新概念:Client-客户端。由Grpc连接的客户端,都由ConnectionBasedClientManager进行管理。
小结
概念 | 类 | 管理者 |
---|---|---|
连接 | com.alibaba.nacos.core.remote.Connection | ConnectionManager |
客户端 | com.alibaba.nacos.naming.core.v2.client.Client | ClientManager |
注册实例
客户端
我们重点看看
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {// 创建请求。每个Request在Nacos服务端都由对应的HandlerInstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,NamingRemoteConstants.REGISTER_INSTANCE, instance);requestToServer(request, Response.class);redoService.instanceRegistered(serviceName, groupName);}
服务端
我们前面说服务端启动时,说这个是负责处理通用请求的:
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
我们就顺着往下看
// com.alibaba.nacos.core.remote.grpc.GrpcRequestAcceptor#request@Overridepublic void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {String type = grpcRequest.getMetadata().getType();// 省略如下内容:// 检查服务是否已启动// 如果是客户端对服务端的健康检查,则直接响应// ----------------------------// 从对应的请求处理器RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);// 省略:no handler found. 的异常处理// ----------------------------String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();// 省略:检查连接是否正常.Object parseObj = null;parseObj = GrpcUtils.parse(grpcRequest);// 省略:转换异常、无效请求异常Request request = (Request) parseObj;// 从ConnectionManager获取到对应的ConnectionConnection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());// 组装RequestMetaRequestMeta requestMeta = new RequestMeta();requestMeta.setClientIp(connection.getMetaInfo().getClientIp());requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());requestMeta.setClientVersion(connection.getMetaInfo().getVersion());requestMeta.setLabels(connection.getMetaInfo().getLabels());connectionManager.refreshActiveTime(requestMeta.getConnectionId());// 调用requestHandler处理请求Response response = requestHandler.handleRequest(request, requestMeta);Payload payloadResponse = GrpcUtils.convert(response);traceIfNecessary(payloadResponse, false);responseObserver.onNext(payloadResponse);responseObserver.onCompleted();}
这些便是通用请求处理的核心逻辑。现在我们便来看InstanceRequest的处理com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler
@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE:return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE:return deregisterInstance(service, request, meta);default:throw new NacosException}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)throws NacosException {// 1. 注册实例clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());// 2. 发布事件:RegisterInstanceTraceEventNotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),request.getInstance().getIp(), request.getInstance().getPort()));return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}// 注册实例:
// com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance@Overridepublic void registerInstance(Service service, Instance instance, String clientId) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);// 从ServiceManager获取已注册服务。而我们当前是要注册实例,所以,这个方法肯定还内含玄机Service singleton = ServiceManager.getInstance().getSingleton(service);// 省略:如果获取到的是持久化实例,意味着当前注册临时实例冲突了,返回异常。Client client = clientManager.getClient(clientId);InstancePublishInfo instanceInfo = getPublishInfo(instance);// 记录当前客户端发布的实例client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();client.recalculateRevision();// 发布服务注册事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}// com.alibaba.nacos.naming.core.v2.ServiceManager/*** Get singleton service. Put to manager if no singleton.* 获取单例服务(单例意味着整个应用只有一个对象),如果不存在,则注册到Manager*/public Service getSingleton(Service service) {// 如果不存在就注册singletonRepository.computeIfAbsent(service, key -> {// 发布服务元信息数据事件。不过该事件对于持久实例才有用处。NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));return service;});Service result = singletonRepository.get(service);// 将服务登记到namespacenamespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());namespaceSingletonMaps.get(result.getNamespace()).add(result);return result;}// 再看看ClientOperationEvent.ClientRegisterServiceEvent// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#onEvent// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation// > com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#addPublisherIndexes// 登记发布服务的客户端private void addPublisherIndexes(Service service, String clientId) {publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());publisherIndexes.get(service).add(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}
小结
我们总结一下,以上涉及到的概念。
概念 | 类 | 管理者 | 描述 |
---|---|---|---|
服务 | com.alibaba.nacos.naming.core.v2.pojo.Service | ServiceManager |
除了这个概念,实际上我们还看到Client的内部结构:
AbstractClient:
- 记录客户端发布的服务:ConcurrentHashMap<Service, InstancePublishInfo> publishers
- 记录客户端订阅的服务:ConcurrentHashMap<Service, Subscriber> subscribers
这个点其实要到订阅服务请求才会分析到,但为了信息不会太分散,所以就放到一起了。
ClientServiceIndexesManager
- 客户端索引管理者。这里的索引指的是,通过Service快速找到客户端,只是客户端有ClientManager,如果这里再存一份也不合适,不利于数据维护。因此这里存的是clientId。估计也是如此,他才叫客户端索引管理者。
查询和订阅实例
> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>)> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(java.lang.String, java.lang.String, java.util.List<java.lang.String>, boolean)> com.alibaba.nacos.client.naming.NacosNamingService#getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe)> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#subscribe> com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doSubscribepublic ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 重点SubscribeServiceRequest,看服务端代码需要知道是什么请求SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();}
服务端
// com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);Service service = Service.newService(namespaceId, groupName, serviceName, true);// 订阅者Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// 服务信息,这里有几个参数是需要通过方法来获取的// 重点是:serviceStorage.getData(service)// 而这个方法也是个重要的方法,过滤cluster、健康实例,并执行自动保护机制,都是他实现的ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());// 发布订阅事件NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getData// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData// > com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getAllInstancesFromIndex// > com.alibaba.nacos.naming.utils.ServiceUtil#selectInstancesWithHealthyProtection(com.alibaba.nacos.api.naming.pojo.ServiceInfo, com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata, java.lang.String, boolean, boolean, java.lang.String)// > com.alibaba.nacos.naming.utils.ServiceUtil#doSelectInstances// 上面是调用路径,这里把生产数据的方法重点捞出来// ServiceStorage的数据生产public ServiceInfo getPushData(Service service) {ServiceInfo result = emptyServiceInfo(service);if (!ServiceManager.getInstance().containSingleton(service)) {return result;}Service singleton = ServiceManager.getInstance().getSingleton(service);result.setHosts(getAllInstancesFromIndex(singleton));// 从ServiceManager拿到服务的实例信息,并登记到ServiceStorage#serviceDataIndexes中serviceDataIndexes.put(singleton, result);return result;}private List<Instance> getAllInstancesFromIndex(Service service) {Set<Instance> result = new HashSet<>();Set<String> clusters = new HashSet<>();for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {// 获取实例信息Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);if (instancePublishInfo.isPresent()) {InstancePublishInfo publishInfo = instancePublishInfo.get();//If it is a BatchInstancePublishInfo type, it will be processed manually and added to the instance listif (publishInfo instanceof BatchInstancePublishInfo) {BatchInstancePublishInfo batchInstancePublishInfo = (BatchInstancePublishInfo) publishInfo;List<Instance> batchInstance = parseBatchInstance(service, batchInstancePublishInfo, clusters);result.addAll(batchInstance);} else {Instance instance = parseInstance(service, instancePublishInfo.get());result.add(instance);clusters.add(instance.getClusterName());}}}// cache clusters of this service// 缓存集群信息serviceClusterIndex.put(service, clusters);return new LinkedList<>(result);}private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {// 通过客户端ID,获取到Client,进而从其获取客户端发布的服务。Client client = clientManager.getClient(clientId);if (null == client) {return Optional.empty();}return Optional.ofNullable(client.getInstancePublishInfo(service));}
从查询实例这里,我们看到有个数据存储:ServiceStorage。重要的是,这个虽然叫存储,但是实际上里面的数据却是从别处获取来的。来源于:ServiceManager、ServiceIndexesManager、ClientManager。从这个角度说,更像是个缓存。
总结
上面的整了一堆源代码,容易看烦了。感兴趣的,可以根据上面的源码深入看看。为了方便大家,我画了图给大家:
为了让大家重点看到数据生产过程:
从图中,我们可以看到,nacos2.x的数据结构并不像官方的Service->Group->Instance。而是按照Connection、Client、Service分别通过对应的管理器进行管理。此外,为了避免数据多处存储,还有ClientServiceIndexesManager作为Client和Service的桥梁。
除此之外,还有ServiceStorage,作为数据缓存。不过,当我们深入了解ServiceStorage时,会发现他的数据一致性/数据的更新,是在给订阅服务的客户端定时推送时通过调用com.alibaba.nacos.naming.core.v2.index.ServiceStorage#getPushData
来实现的。个人认为这是有可以优化空间的,他们完全可以通过各种事件来监听实例的生死来更新数据。
总而言之,如果不算ServiceStorage这个缓存,那么数据主要存在于一下的Manager中:
ConnectionManager、ClientManager、ServiceManager、ClientServiceIndexesManager。
到这里,可能有同学就有疑问了。那么Group、Cluster这些概念去哪了呢?
这些概念都成为了属性/字段了。
com.alibaba.nacos.naming.core.v2.pojo.Service#group
com.alibaba.nacos.api.naming.pojo.Instance#clusterName
即使在ServiceStorage封装ServiceInfo时,他们也是作为属性来存储的。通过ServiceUtil来过滤目标实例。
最后,提醒大家一下,我们这里只是分析了临时实例。是最常用的场景。当然,如果我们用Nacos的持久实例,SpringCloud也就自然支持了持久实例。不过,咱们不深究了,感兴趣的同学,可以顺着往下挖一挖持久实例。
后记
这种深度刨析源码、深挖一个技术细节的实现,太费时间、也太费篇幅了。我自己都感觉差点把整个nacos的源码都搬上来了。莫见怪。。。
关于nacos的一致性协议,就不在这里聊了,这个东西得单独倒腾,还要与其他分布式中间件相互对比,还有理论。。
下次,咱们先往后聊OpenFeign。
推荐
Nacos的实现原理在官网有一电子书《Nacos架构&原理》,想要了解顶层设计原理的同学,建议看看。
相关文章:

【探索SpringCloud】服务发现-Nacos服务端数据结构和模型
前言 上一文中,我们从官方的图示了解到Nacos的服务数据结构。但我关心的是,Nacos2.x不是重构了吗?怎么还是这种数据结构?我推测,必然是为了对Nacos1.x的兼容,实际存储应该不是这样的。于是,沿着…...

基于简单的信息变换实现自然语言模型
题目:基于简单的信息变换实现自然语言模型 摘要:在自然语言处理中,自然语言模型是至关重要的。本论文提出了一种基于简单的信息变换实现自然语言模型的方法。该方法将输入信息进行一系列的信息变换,如分割、属性、等效替换、增加删除等变换,与原始信息进行比较,得知信息是…...

低配版消息队列,redis——Stream
消息队列实现:群发效果 redis原有的消息队列数据不会持久化,所以它加了一个Stream数据结构。 添加数据:XADD s1(组名) *(*为自动生成) key val 取出数据: XREAD COUNT 1(取出个数) BLOCK 1(队列为空时等待时间0为一直等) STREA…...

【OpenCV入门】第五部分——图像运算
文章结构 掩模图像的加法运算图像的位运算按位与运算按位或运算按位取反运算按位异或运算图像位运算的运用 合并图像加权和覆盖 掩模 当计算机处理图像时,有些内容需要处理,有些内容不需要处理。能够覆盖原始图像,仅暴露原始图像“感兴趣区域…...

【Seata】00 - Seata Server 部署(Windows、Docker 基于 Jpom)
文章目录 前言参考目录版本说明Windows 部署 seata-server1:下载压缩包2:文件存储模式3:db 存储模式3.1:建表3.2:修改配置文件3.3:启动脚本4:源码部署 Docker 部署 seata-server (基…...

菜鸟教程第一天
1、Java变量类型 1.1 成员变量可以声明在使用前或者使用后。这句话怎么理解? 大家都知道,如果你在代码中使用这个变量,但是在此之前没有声明这个变量,是会报错的。如果是局部变量,在使用前,必须得声明并初…...

数据结构--5.2马踏棋盘算法(骑士周游问题)
题目渊源: 马踏棋盘问题(又称骑士周游问题或骑士漫游问题)是算法设计的经典问题之一。 题目要求: 国际象棋的棋盘为8*8的方格棋盘,现将“马”放在任意指定的方格中,按照“马”走棋的规则将“马”进行移动。…...

如何使用CSS实现一个响应式图片幻灯片(Responsive Image Slider)效果?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 响应式图片幻灯片⭐ HTML结构⭐ CSS样式⭐ JavaScript交互⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个…...

Linux学习之lvm删除
umount /mnt/logicvolumntest卸载挂载。 lvremove /dev/vgname/my_lv可以删除逻辑卷,其中vgname是指定逻辑卷所在的卷组名称,my_lv是逻辑卷的名称。 注意:使用lvremove命令会永久删除逻辑卷和其中的数据,因此请在使用之前进行适当…...

bazel介绍以及其发展历史
简介 Bazel Google开源的,是一款与 Make、Maven 和 Gradle 类似的开源构建和测试工具。 它使用人类可读的高级构建语言。Bazel 支持多种语言的项目,可为多个平台构建输出。Bazel支持任意大小的构建目标,并支持跨多个代码库和大量用户的大型代…...

固定资产管理分析怎么写?
对企业内的固定资产进行全面的统计和分析,包括设备、装修、维修等方面的信息,有助于企业进行资产管理和风险控制。 通过该软件,用户可以实现对资产的跟踪和管理,如实时监测设备的使用情况,提高设备利用率和维护效率…...

【项目源码】一套基于springboot+Uniapp框架开发的智慧医院3D人体导诊系统源码
智慧医院3D人体导诊系统源码 开发语言:java 开发工具:IDEA 前端框架:Uniapp 后端框架:springboot 数 据 库:mysql 移 动 端:微信小程序、H5 “智慧导诊”以人工智能手段为依托,为…...

可能的二分法 -- 二分图判定【DFS、BFS分别实现】
886. 可能的二分法 class PossibleBipartition:"""可能的二分法「其实考察的就是二分图的判定」用dfs和bfs 两种方法分别实现https://leetcode.cn/problems/possible-bipartition/"""def __init__(self):self.success Trueself.color []self.…...

六级翻译备考
classical 经典的 Chinese literature 中国文学 朝代dynasty 统治 rule 社会稳定 steady society 治理有序 orderly governance 伟大的greatest 时代 times或者periods 被人们描绘成人类历史上伴随着治理有序,社会稳定的最伟大的时代之一 more and more越来越多 …...

Vue框架--Vue中的数据绑定
Vue中有两种数据绑定的方式 1.单向数据绑定(v-band):数据只能够从data流向页面 2.双向数据绑定(v-model):数据不仅仅能够从data流向页面,也可以从页面流向data。 备注: 1.双向绑定一般都应用在表单类元素上。(如:input、select等有value属性值的标签上) 2.…...

Unity——热更新浅析
热更新的思想从本质上来讲,要考虑一些问题。例如,一个完整的游戏最多可以有多大比例的资源通过网络加载?能否让尽可能多的资源通过网络加载? 通过网络加载有很多好处,不仅可以极大减小安装包的体积,而且有…...

IMPLEMENT_DYNCREATE的分析
一、介绍 IMPLEMENT_DYNCREATE 是一个宏(macro),通常与DECLARE_DYNCREATE宏一起在MFC框架中使用。它的作用是为一个派生自 CObject 的MFC类提供运行时类型信息(RTTI)和对象的动态创建支持。 具体来说,IMP…...

Java实现根据短连接获取1688商品详情数据,1688淘口令接口,1688API接口封装方法
要通过1688的API获取商品详情数据,您可以使用1688开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例,展示如何通过1688开放平台API获取商品详情属性数据接口: 首先,确保您已注册成为1688开放平台的开发者…...

ABAP FICO 凭证替代 凭证校验
凭证校验 1.T-CODE--->GGX2--->GBLR-->ZRGGBR000 2.将程序RGGBR000 复制为ZRGGBR000 3.GGB0--》财务会计--》凭证抬头或者行项目维护检验规则 4.OB28 维护特定的公司代码和调用点和确认,活动等级设置为1 5.GGB4-->激活校验 凭证替代 1.T-CODE--->GG…...

项目验收有哪些流程?
验收流程 科技计划项目验收/课题验收测试服务的被测对象是国家重大专项、科研课题的软件成果物,可以是一个模块、软件或系统等,也可以是软件套件或软件原型等。测试范围主要来源于课题的合同书/可行吧报告/申报书中的技术指标要求。所出具的科技项目验收…...

C++,类的继承
一、继承的基本概念 继承使得C能够从已有的类派生出新的类,而派生类继承了原有类的特征,包括方法。被继承者称为父类或基类,继承者称为子类或派生类。 继承的目的: 实现代码的重用性建立父类和子类之间的联系在实现多态的时候&a…...

作业33333333
一、正向解析 在开启DNS服务之前先将防火墙关闭。 systemctl stop firewalld.service 开启DNS服务,需要开启端先进行安装主软件,以及配置包管理软件 配置包管理软件一般会跟随主软件一起安装,如果没有手动安装一次就可以了。 安装完成之后&…...

Spring Cloud--从零开始搭建微服务基础环境【二】
😀前言 本篇博文是关于Spring Cloud–从零开始搭建微服务基础环境【二】,希望你能够喜欢 🏠个人主页:晨犀主页 🧑个人简介:大家好,我是晨犀,希望我的文章可以帮助到大家,…...

算法工程题(中序遍历)
* 题意说明: * 给定一个二叉树的根节点 root ,返回 它的 中序 遍历 。 * * 示例 1: * 输入:root [1,null,2,3] * 输出:[1,3,2] * * 示例 2: * 输入:root [] * 输出:[] * *…...

jsch网页版ssh
使用依赖 implementation com.jcraft:jsch:0.1.55Server端代码 import com.jcraft.jsch.Channel; import com.jcraft.jsch.JSch; import com.jcraft.jsch.Session; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.TimeUnit; import o…...

教程i.MX8MPlus开发板SPI转CAN操作
飞凌嵌入式OKMX8MP-C核心板有两路原生CAN总线,但用户在开发产品时可能需要用到更多的CAN,这该如何解决呢?今天小编将为大家介绍一种SPI转CAN的方法,供各位工程师小伙伴参考。 说明 OKMX8MP-C核心板有两路原生的SPI总线,…...

Docker中容器的随机命名方式
使用 docker 创建容器时,如果没有用 --name 指定,docker 会为用户选择一个名称, 格式是两个带有下划线的单词,如xxx_yyyy 其相关的实现在此处 pkg/namesgenerator/names-generator.go[1] 源码中有两个数组,第一个是一个…...

大数据Flink实时计算技术
1、架构 2、应用场景 Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。在启用高可用选项的情况下,它不存在单点失效问题。事实证明&#…...

数学中的自由与我们的生活
数学中的这些自由可以帮助我们养成很多优秀的品格。具体来说,知识的自由使我们变得足智多谋,让我们可以根据问题的具体情况选择恰当的工具和方法。探索的自由使我们在集体讨论时敢于大声发言,积极提问,让我们在为探索发现而欢呼雀…...

8 python的迭代器和生成器
概述 在上一节,我们介绍了Python的模块和包,包括:什么是模块、导入模块、自定义模块、__name__、什么是包、创建包、导入包等内容。在这一节中,我们将介绍Python的迭代器和生成器。在Python中,迭代器是一个非常重要的概…...