ShenYu网关注册中心之HTTP注册原理
文章目录
- 1、客户端注册流程
- 1.1、读取配置
- 1.1.1、用于注册的 HttpClientRegisterRepository
- 1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
- 1.2、扫描注解,注册元数据和URI
- 1.2.1、构建URI并写入Disruptor
- 1.2.2、构建元数据并写入Disruptor
- 1.2.3、Disruptor消费数据并向shenyu-admin注册数据
- 2、服务端注册流程
- 2.1、读取配置
- 2.1.1、用于监听的ShenyuClientServerRegisterRepository
- 2.2、注册元数据和URI
- 2.2.1、注册接口接收数据写入Disruptor
- 2.2.2、Disruptor消费数据并持久化
1、客户端注册流程
当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。
1.1、读取配置
该例子是一个springboot项目,所以注册的入口往往在自动装配类中。不妨可以先看下项目的pom文件中引入了什么依赖:
<dependencies><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-client-springmvc</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
这里面看到就shenyu-spring-boot-starter-client-springmvc是跟ShenYu相关的,所以入口应该就在这个依赖内了,看下这个依赖的项目结构:

发现就是两个配置类,ShenyuSpringMvcClientInfoRegisterConfiguration由于使用了@Configuration(proxyBeanMethods = false),暂时不用关注,重点关注ShenyuSpringMvcClientConfiguration,它是shenyu客户端http注册配置类。
/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig 客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}
通过@Configuration表示这是一个配置类,通过@ImportAutoConfiguration引入ShenyuClientCommonBeanConfiguration配置类。
/*** shenyu客户端通用配置类,创建注册中心客户端通用的bean*/
@Configuration
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuClientCommonBeanConfiguration {/*** 根据注册中心配置通过SPI方式创建客户端注册类*/@Beanpublic ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {return ShenyuClientRegisterRepositoryFactory.newInstance(config);}/*** Shenyu 客户端注册中心配置,读取shenyu.register属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** Shenyu 客户端配置,读取shenyu.client属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu")public ShenyuClientConfig shenyuClientConfig() {return new ShenyuClientConfig();}
}
ShenyuClientCommonBeanConfiguration是ShenYu客户端的通用配置类,创建了3个通用bean。
- ShenyuClientRegisterRepository:客户端注册类,用于将客户端接口信息注册到注册中心。
- ShenyuRegisterCenterConfig:ShenYu客户端注册中心配置类,读取
shenyu.register属性配置。 - ShenyuClientConfig:ShenYu客户端配置类,读取
shenyu.client属性配置。
1.1.1、用于注册的 HttpClientRegisterRepository
上面生成的ShenyuClientRegisterRepository是用于实现客户端注册的接口,会根据注册中心的配置通过SPI方式创建客户端注册类,每一个注册方式都对应一个实现类。

目前支持7种注册类型:
- Http:HttpClientRegisterRepository
- Apollo:ApolloClientRegisterRepository
- Zookeeper:ZookeeperClientRegisterRepository
- Etcd:EtcdClientRegisterRepository
- Nacos:NacosClientRegisterRepository
- Consul:ConsulClientRegisterRepository
- Polaris:PolarisClientRegisterRepository
public final class ShenyuClientRegisterRepositoryFactory {private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();/*** 根据注册中心类型实例化注册服务*/public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {// 通过SPI方式创建客户端注册类ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());// 初始化对应客户端注册类,比如创建zookeeper client,etcd client,admin平台的token等result.init(shenyuRegisterCenterConfig);ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);return result;}return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());}
}
加载类型通过registerType指定,也就是我们在配置文件中指定的类型:
shenyu:register:registerType: httpserverLists: http://localhost:9095props:username: adminpassword: 123qweQWE$$
这里指定的是http,所以这里创建的就是HttpClientRegisterRepository。props是用于连接注册中心的一些额外属性,比如用户名,密码,命名空间等。
创建对应的注册客户端后,会调用init方法根据shenyu.register下的配置进行初始化:
@Join
public class HttpClientRegisterRepository extends FailbackRegistryRepository {public HttpClientRegisterRepository() {}public HttpClientRegisterRepository(final ShenyuRegisterCenterConfig config) {init(config);}@Overridepublic void init(final ShenyuRegisterCenterConfig config) {// shenyu-admin用户名this.username = config.getProps().getProperty(Constants.USER_NAME);// shenyu-admin密码this.password = config.getProps().getProperty(Constants.PASS_WORD);// shenyu-admin集群地址this.serverList = Lists.newArrayList(Splitter.on(",").split(config.getServerLists()));// 根据上面3个信息请求shenyu-admin获取accessToken,用于后面调用注册接口this.accessToken = Caffeine.newBuilder()//see org.apache.shenyu.admin.config.properties.JwtProperties#expiredSeconds.expireAfterWrite(24L, TimeUnit.HOURS).build(new CacheLoader<String, String>() {@Overridepublic @Nullable String load(@NonNull final String server) throws Exception {try {// 调用shenyu-admin的登录接口(/platform/login),获取accessTokenOptional<?> login = RegisterUtils.doLogin(username, password, server.concat(Constants.LOGIN_PATH));return login.map(String::valueOf).orElse(null);} catch (Exception e) {LOGGER.error("Login admin url :{} is fail, will retry. cause: {} ", server, e.getMessage());return null;}}});}
}
这里主要就是去调shenyu-admin的登录接口获取accessToken,为后面的发送注册数据做准备。其他注册类型的ShenyuClientRegisterRepository也一样,创建各自注册中心的client,连接注册中心,为发送数据做准备。类注解@Join用于SPI的加载。
1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
回到一开始的ShenyuSpringMvcClientConfiguration配置类:
/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig 客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}
创建了SpringMvcClientEventListener,负责客户端 元数据 和 URI 数据的构建和注册。SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener是一个抽象类,它实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。每一种后端服务RPC调用协议都对应了一个监听类。

public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {public SpringMvcClientEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {super(clientConfig, shenyuClientRegisterRepository);// client配置Properties props = clientConfig.getProps();// 是否是全部接口都注册this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));// http协议this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);this.addPrefixed = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.ADD_PREFIXED,Boolean.FALSE.toString()));mappingAnnotation.add(ShenyuSpringMvcClient.class);mappingAnnotation.add(RequestMapping.class);}// ...}
SpringMvcClientEventListener的构造函数主要就是调用父类AbstractContextRefreshedEventListener的构造函数,传入客户端配置和客户端注册类,客户端配置指shenyu.client.http下的配置:
shenyu:client:http:props:contextPath: /httpappName: http-appNameport: 8189isFull: false
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {protected static final String PATH_SEPARATOR = "/";// Disruptor 发布器private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();// ...public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 读取 shenyu.client.http 配置信息Properties props = clientConfig.getProps();this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {String errorMsg = "client register param must config the appName or contextPath";LOG.error(errorMsg);throw new ShenyuClientIllegalArgumentException(errorMsg);}this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);this.host = props.getProperty(ShenyuClientConstants.HOST);this.port = props.getProperty(ShenyuClientConstants.PORT);// 开始事件发布,启动 Disruptorpublisher.start(shenyuClientRegisterRepository);} }
取出相关配置信息后,就启动 Disruptor 队列,ShenyuClientRegisterEventPublisher可以看作是一个生产者,用来向队列发送数据
public class ShenyuClientRegisterEventPublisher {private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();private DisruptorProviderManage<DataTypeParent> providerManage;public static ShenyuClientRegisterEventPublisher getInstance() {return INSTANCE;}/*** Start.** @param shenyuClientRegisterRepository shenyuClientRegisterRepository*/public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 注册任务工厂类,用于创建注册的任务,客户端使用的是RegisterClientExecutorFactory, // 而在服务端(shenyu-admin)用于处理注册任务的是RegisterServerConsumerExecutor,// 都是用于消费Disruptor数据的任务RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();// 添加元数据订阅器factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));// 添加URI订阅器factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));// 添加ApiDoc订阅器factory.addSubscribers(new ShenyuClientApiDocExecutorSubscriber(shenyuClientRegisterRepository));providerManage = new DisruptorProviderManage<>(factory);// 启动Disruptor队列,并创建消费者providerManage.startup();}/*** 发布事件,向Disruptor队列发数据** @param data the data*/public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();provider.onData(data);}
}
start方法主要是为队列添加订阅器,会由消费者接收到信息后调用这些订阅器。然后启动启动Disruptor队列,并创建消费者。
public class DisruptorProviderManage<T> {public void startup() {this.startup(false);}public void startup(final boolean isOrderly) {OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;if (isOrderly) {newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {eventFactory = new DisruptorEventFactory<>();}Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());// 创建消费者@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}// 设置消费者disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());// 真正调用disruptor的api启动disruptor.start();RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();// disruptor的生产者provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);} }
这里就是准备Disruptor队列的一些逻辑,就不细讲了,其中QueueConsumer是Disruptor的消费者,后面就是由它接收数据。
1.2、扫描注解,注册元数据和URI
上面说到SpringMvcClientEventListener继承了AbstractContextRefreshedEventListener,而AbstractContextRefreshedEventListener实现了ApplicationListener接口,并重写了onApplicationEvent()方法,当有Spring事件发生后,该方法会执行。
// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行,算是客户端的执行入口吧
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {context = event.getApplicationContext();// 获取客户端的接口类,比如http就是Controller类,dubbo就是@DubboService类,由子类实现Map<String, T> beans = getBeans(context);if (MapUtils.isEmpty(beans)) {return;}// 保证只注册一次if (!registered.compareAndSet(false, true)) {return;}// 构建URI并写入Disruptor,由子类实现publisher.publishEvent(buildURIRegisterDTO(context, beans));// 构建元数据并写入Disruptorbeans.forEach(this::handle);Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}
获取客户端服务的接口类,由具体的子类实现,http就是Controller类,这里对应的子类就是SpringMvcClientEventListener
@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {// Filter outif (Boolean.TRUE.equals(isFull)) {// isFull=true,表示代理整个服务,就不需要注解扫描了,// 直接构建元数据和URI,写入DisruptorgetPublisher().publishEvent(MetaDataRegisterDTO.builder().contextPath(getContextPath()).addPrefixed(addPrefixed).appName(getAppName()).path(PathUtils.decoratorPathWithSlash(getContextPath())).rpcType(RpcTypeEnum.HTTP.getName()).enabled(true).ruleName(getContextPath()).build());LOG.info("init spring mvc client success with isFull mode");// 构建URIpublisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));return Collections.emptyMap();}// 否则获取@Controller注解的beanreturn context.getBeansWithAnnotation(Controller.class);
}
这里会判断配置文件中的shenyu.client.http.props.isFull,如果是true,则直接构建一个元数据和URI,写入到Disruptor中,然后返回一个空集合,后续的逻辑就没执行了。如果是false,则从spring容器中获取带@Controller注解的bean返回。
1.2.1、构建URI并写入Disruptor
构建一个URI数据写入到Disruptor,这个也是由子类实现的:
// 构建URI
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,final Map<String, Object> beans) {try {return URIRegisterDTO.builder().contextPath(getContextPath()) // shneyu得contextPath.appName(getAppName()) // appName.protocol(protocol) // 服务协议.host(super.getHost()) // 服务host.port(Integer.valueOf(getPort())) // 服务端口.rpcType(RpcTypeEnum.HTTP.getName()) // rpc类型.eventType(EventType.REGISTER) // 事件类型.build();} catch (ShenyuException e) {throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");}
}
可以看出来URI跟接口类没有关系,一个后端服务实例生成一个URI。
1.2.2、构建元数据并写入Disruptor
之后遍历每个接口构建元数据beans.forEach(this::handle)
/*** 构建元数据并写入Disruptor*/
protected void handle(final String beanName, final T bean) {Class<?> clazz = getCorrectedClass(bean);// 获取当前bean的对应shenyu客户端的注解,比如http是@ShenyuSpringMvcClient, // dubbo是@ShenyuDubboClientfinal A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());// 获取bean对应的path(类上注解的路径),由子类实现final String superPath = buildApiSuperPath(clazz, beanShenyuClient);// 如果有shenyu客户端注解并且path中包含*,则表示要注册整个类的方法,只需要构建一个类元数据if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {// 由具体的子类构建类元数据写入DisruptorhandleClass(clazz, bean, beanShenyuClient, superPath);return;}// 类上没有shenyu客户端注解(类上没有注解,但方法上有注解,也是可以注册的),// 或者有注解但是path没有包含*,则就要遍历每个方法,为每个需要注册的方法构建方法元数据final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);for (Method method : methods) {// 由具体子类构建方法元数据写入Disruptor,并将每个method对应的元数据对象缓存在当前类里handleMethod(bean, clazz, beanShenyuClient, method, superPath);}
}protected void handleClass(final Class<?> clazz,final T bean,@NonNull final A beanShenyuClient,final String superPath) {publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}protected void handleMethod(final T bean,final Class<?> clazz,@Nullable final A beanShenyuClient,final Method method,final String superPath) {// 如果方法上有Shenyu客户端注解,就表示该方法需要注册A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());if (Objects.nonNull(methodShenyuClient)) {final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,buildApiPath(method, superPath, methodShenyuClient), clazz, method);publisher.publishEvent(metaData);metaDataMap.put(method, metaData);}
}
// 获取接口对应路径,如果shenyu注解上没有,就用@RequestMapping上的路径,
// 但是这个只支持第一个路径
@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {return beanShenyuClient.path();}RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);// Only the first path is supported temporarilyif (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {return requestMapping.path()[0];}return "";
}// springmvc接口上需要有 ShenyuSpringMvcClient 注解,
// 并且包含RequestMapping注解(表示是一个接口),才进行注册
protected void handleMethod(final Object bean, final Class<?> clazz,@Nullable final ShenyuSpringMvcClient beanShenyuClient,final Method method, final String superPath) {final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;// 如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {// 构建元数据final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,// 构建path = contextPath + 类上的路径 + 方法上的路径 buildApiPath(method, superPath, methodShenyuClient), clazz, method);// 发布元数据getPublisher().publishEvent(metaData);getMetaDataMap().put(method, metaData);}
}// path = contextPath + 类上的路径 + 方法上的路径,
// 如果@ShenyuSpringMvcClient注解上的路径不为空,则方法上的路径=@ShenyuSpringMvcClient上的value,
// 否则,方法上的路径=@RequestMapping上的value
@Override
protected String buildApiPath(final Method method, final String superPath,@NonNull final ShenyuSpringMvcClient methodShenyuClient) {String contextPath = getContextPath();if (StringUtils.isNotBlank(methodShenyuClient.path())) {return pathJoin(contextPath, superPath, methodShenyuClient.path());}final String path = getPathByMethod(method);if (StringUtils.isNotBlank(path)) {return pathJoin(contextPath, superPath, path);}return pathJoin(contextPath, superPath);
}
1.2.3、Disruptor消费数据并向shenyu-admin注册数据
上面启动Disruptor的时候说到QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;/*** Instantiates a new Queue consumer.** @param executor the executor* @param factory the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterClientConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}
QueueConsumerExecutor是实现了Runnable的消费任务,它有两个实现:
- RegisterClientConsumerExecutor:客户端消费者任务
- RegisterServerConsumerExecutor:服务端消费者任务
从名字也可以看出,RegisterClientConsumerExecutor负责处理客户端任务,shenyu客户端将元数据和URI写入disruptor后由这个消费者任务来消费数据,执行实际向注册中心注册的操作。RegisterServerConsumerExecutor负责处理服务端(shenyu-admin)任务,服务端从注册中心监听到元数据和URI后写入disruptor,然后由RegisterServerConsumerExecutor任务来消费数据,处理数据入库操作和发布事件。
RegisterClientConsumerExecutor的消费逻辑:
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {this.subscribers = new EnumMap<>(executorSubscriberMap);}@Overridepublic void run() {// 获取数据final T data = getData();// 根据数据类型获取对应的处理器进行处理,即在disruptor启动的时候添加的订阅器subscribers.get(data.getType()).executor(Lists.newArrayList(data));}// ...
}
根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。目前注册的数据类型有3种,元数据,URI和API文档。
public enum DataType {/*** Meta data data type enum.*/META_DATA,/*** Uri data type enum.*/URI,/*** Api doc type enum.*/API_DOC,
}
所以相对应的订阅器也分为3类,分别处理元数据,URI和API文档。在客户端和服务端分别有两个,所以一共是6个。

元数据处理
public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;// .../*** 遍历元数据,对数据注册到注册中心*/@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {// 调用响应注册中心的客户端注册类注册元数据shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);}}
}
遍历数据,然后又将数据委托给ShenyuClientRegisterRepository执行。ShenyuClientRegisterRepository是在一开始读取配置的时候就创建了,是客户端注册类,用来将数据发送到注册中心的类,不同的注册方式有不同的实现类,该示例使用http方式注册(shenyu.register.registerType=http)的实现类是HttpClientRegisterRepository。HttpClientRegisterRepository并没有直接实现ShenyuClientRegisterRepository接口,而是继承FailbackRegistryRepository,FailbackRegistryRepository实现了ShenyuClientRegisterRepository接口,FailbackRegistryRepository本身主要用于对http注册过程中的失败重试。
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {try {this.doPersistInterface(metadata);} catch (Exception ex) {//If a failure occurs, it needs to be added to the retry list.// 如果注册失败,则添加到重试列表,过一段时间后重新注册logger.warn("Failed to persistInterface {}, cause:{}", metadata, ex.getMessage());this.addFailureMetaDataRegister(metadata);}
}@Override
public void doPersistInterface(final MetaDataRegisterDTO metadata) {// META_PATH = "/shenyu-client/register-metadata"doRegister(metadata, Constants.META_PATH, Constants.META_TYPE);
}// 发送注册数据到admin
private <T> void doRegister(final T t, final String path, final String type) {int i = 0;// admin集群中的每个都要去注册(admin之间没有同步机制么?)for (String server : serverList) {i++;// 拼接上admin的地址和具体的接口路径,构成完整的请求地址String concat = server.concat(path);try {// 调用admin接口的tokenString accessToken = this.accessToken.get(server);if (StringUtils.isBlank(accessToken)) {throw new NullPointerException("accessToken is null");}// 通过工具类发送http请求RegisterUtils.doRegister(GsonUtils.getInstance().toJson(t), concat, type, accessToken);// considering the situation of multiple clusters, we should continue to execute here} catch (Exception e) {LOGGER.error("Register admin url :{} is fail, will retry. cause:{}", server, e.getMessage());if (i == serverList.size()) {throw new RuntimeException(e);}}}
}
http注册方式比较简单,遍历每个admin服务,获取到accessToken后,向/shenyu-client/register-metadata接口地址发起http请求,将数据发送给admin。
URI处理
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {for (URIRegisterDTO uriRegisterDTO : dataList) {Stopwatch stopwatch = Stopwatch.createStarted();// 这里的逻辑是为了探测客户端是否已经启动while (true) {try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {break;} catch (IOException e) {long sleepTime = 1000;// maybe the port is delay exposedif (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {LOG.error("host:{}, port:{} connection failed, will retry",uriRegisterDTO.getHost(), uriRegisterDTO.getPort());// If the connection fails for a long time, Increase sleep timeif (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {sleepTime = 10000;}}try {TimeUnit.MILLISECONDS.sleep(sleepTime);} catch (InterruptedException ex) {LOG.error("interrupted when sleep", ex);}}}ShenyuClientShutdownHook.delayOtherHooks();// 向注册中心注册URI数据shenyuClientRegisterRepository.persistURI(uriRegisterDTO);// 优雅停机ShutdownHookManager.get().addShutdownHook(new Thread(() -> {final URIRegisterDTO offlineDTO = new URIRegisterDTO();BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);offlineDTO.setEventType(EventType.OFFLINE);shenyuClientRegisterRepository.offline(offlineDTO);}), 2);}}
}
URI注册逻辑基本相似,只是比元数据多了一步探测客户端时候已经启动完成的操作,保证客户端启动完成后再注册URI,后面的逻辑就跟元数据一样了。
分析到这里就将客户端的注册逻辑分析完了,通过读取自定义的注解信息构造元数据和URI,将数据发到Disruptor队列,然后从队列中消费数据,将消费者放到线程池中去执行,最终通过发送http请求到admin,元数据注册接口是/shenyu-client/register-metadata,URI注册接口是/shenyu-client/register-uri。
2、服务端注册流程
2.1、读取配置
从前面分析到,admin的两个注册接口分别是/shenyu-client/register-metadata和/shenyu-client/register-uri,通过全局搜索发现这两个接口在ShenyuClientHttpRegistryController类。
@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/@PostMapping("/register-metadata")@ResponseBodypublic String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/@PostMapping("/register-uri")@ResponseBodypublic String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}
但是这个类上并没有@RestController注解,那它就不能做为一个bean被spring扫描到,那它是如何创建的呢?
不着急,想要从注册中心中监听到数据(http注册方式可以将admin当作注册中心),自然需要有注册中心配置,来表明使用哪个注册中心。admin的注册中心配置是RegisterCenterConfiguration,我们先看这个配置类:
/*** 注册中心配置类*/
@Configuration
public class RegisterCenterConfiguration {/*** 读取shenyu.register配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** 创建用于服务端的注册类,从注册中心中监听数据,然后将数据写入Disruptor队列中*/@Bean(destroyMethod = "close")public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,final List<ShenyuClientRegisterService> shenyuClientRegisterService) {// 从配置中获取注册类型String registerType = shenyuRegisterCenterConfig.getRegisterType();// 根据注册类型通过SPI方式创建对应的ShenyuClientServerRegisterRepositoryShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);// 创建Disruptor发布者RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();// 每种客户端类型(rpc类型)的处理类Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));// 启动Disruptor,添加元数据和URI的订阅器publisher.start(registerServiceMap);// 初始化注册中心registerRepository.init(publisher, shenyuRegisterCenterConfig);return registerRepository;}
}
该配置类创建了2个bean:
- ShenyuRegisterCenterConfig:shenyu-admin注册中心配置,读取shenyu.register属性配置。
- ShenyuClientServerRegisterRepository:服务端注册类,用于从注册中心中监听数据,然后将数据写入Disruptor队列中。
这里的创建Disruptor发布者,启动Disruptor等逻辑跟在客户端那边的一样,只是类是服务端这边的,就不再分析了。
2.1.1、用于监听的ShenyuClientServerRegisterRepository
上面生成的ShenyuClientServerRegisterRepository是用于实现服务端注册的接口,会根据注册中心的配置通过SPI方式创建注册类,每一个注册方式都对应一个实现类。

目前支持7种注册类型:
- Http:ShenyuClientHttpRegistryController
- Apollo:ApolloClientServerRegisterRepository
- Zookeeper:ZookeeperClientServerRegisterRepository
- Etcd:EtcdClientServerRegisterRepository
- Nacos:NacosClientServerRegisterRepository
- Consul:ConsulClientServerRegisterRepository
- Polaris:PolarisClientServerRegisterRepository
加载类型通过registerType指定,也就是我们在配置文件中指定的类型:
shenyu:register:registerType: http
服务端的注册类型必须跟客户端的注册类型一致,这样服务端才可以监听到注册信息。这里要指定的是http,所以这里创建的就是ShenyuClientHttpRegistryController,这个不就是前面说到的注册接口所在的类么,所以回到前面的ShenyuClientHttpRegistryController。
2.2、注册元数据和URI
2.2.1、注册接口接收数据写入Disruptor
@RequestMapping("/shenyu-client")
@Join
public class ShenyuClientHttpRegistryController implements ShenyuClientServerRegisterRepository {/*** 注册元数据*/@PostMapping("/register-metadata")@ResponseBodypublic String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {// 直接将元数据发布到Disruptorpublisher.publish(metaDataRegisterDTO);return ShenyuResultMessage.SUCCESS;}/*** 注册URI*/@PostMapping("/register-uri")@ResponseBodypublic String registerURI(@RequestBody final URIRegisterDTO uriRegisterDTO) {// 直接将URI发布到Disruptorpublisher.publish(uriRegisterDTO);return ShenyuResultMessage.SUCCESS;}
}
两个注册接口获取到数据后,就直接调用了publisher.publish()方法,把数据发布到Disruptor队列中。
2.2.2、Disruptor消费数据并持久化
QueueConsumer实现了WorkHandler接口,是Disruptor的消费者,消费逻辑就在它的onEvent方法中:
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;/*** Instantiates a new Queue consumer.** @param executor the executor* @param factory the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterServerConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}
分析客户端注册流程的时候说到RegisterServerConsumerExecutor是服务端消费者任务,处理数据入库操作和发布事件。
RegisterServerConsumerExecutor消费逻辑:
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {// 每种数据类型的订阅器执行器private final Map<DataType, ExecutorSubscriber<DataTypeParent>> subscribers;private RegisterServerConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> executorSubscriberMap) {this.subscribers = new HashMap<>(executorSubscriberMap);}@Overridepublic void run() {Collection<DataTypeParent> results = getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}// 选择对应的数据类型的订阅器执行器去执行selectExecutor(results).executor(results);}private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {final Optional<DataTypeParent> first = list.stream().findFirst();return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());}// ...
}
根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor启动的时候设置的。
服务端的订阅器有3个,分别为MetadataExecutorSubscriber,URIRegisterExecutorSubscriber和ApiDocExecutorSubscriber,分别处理元数据,URI和API文档。
元数据的处理
public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {// 每种客户端类型的注册服务private final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService;public MetadataExecutorSubscriber(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {this.shenyuClientRegisterService = shenyuClientRegisterService;}@Overridepublic DataType getType() {return DataType.META_DATA;}@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {// 遍历元数据metaDataRegisterDTOList.forEach(meta -> {// 根据客户端类型Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())).ifPresent(shenyuClientRegisterService -> {// 加锁,保证数据顺序执行,防止并发synchronized (shenyuClientRegisterService) {// 处理数据shenyuClientRegisterService.register(meta);}});});}
}
ShenyuClientRegisterService是注册方法接口,它有多个实现类:

- AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
- AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
- ShenyuClientRegisterDivideServiceImpl:divide类,处理http注册类型;
- ShenyuClientRegisterDubboServiceImpl:dubbo类,处理dubbo注册类型;
- ShenyuClientRegisterGrpcServiceImpl:gRPC类,处理gRPC注册类型;
- ShenyuClientRegisterBrpcServiceImpl:bRPC类,处理bRPC注册类型;
- ShenyuClientRegisterMotanServiceImpl:Motan类,处理Motan注册类型;
- ShenyuClientRegisterSofaServiceImpl:Sofa类,处理Sofa注册类型;
- ShenyuClientRegisterSpringCloudServiceImpl:SpringCloud类,处理SpringCloud注册类型;
- ShenyuClientRegisterTarsServiceImpl:Tars类,处理Tars注册类型;
- ShenyuClientRegisterWebSocketServiceImpl:Websocket类,处理Websocket注册类型;
每一种rpc类型都对应一个注册处理类,所以本文是使用ShenyuClientRegisterDivideServiceImpl来处理。
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {@Resourceprivate ApplicationEventPublisher eventPublisher;// 这几个就是操作数据库的service@Resourceprivate SelectorService selectorService;@Resourceprivate MetaDataService metaDataService;@Resourceprivate RuleService ruleService;@Overridepublic String register(final MetaDataRegisterDTO dto) {// 1、注册选择器(可以认为一个服务就是一个选择器)// 选择器执行逻辑,默认情况是空的,需要在控制台另外手动配置// 子类实现String selectorHandler = selectorHandler(dto);// 持久化选择器并发布选择器变更事件(不存在的时候)ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEString selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);// 2、注册规则(可以认为一个元数据就是一个规则,根据path判断是否同一个)// 规则处理逻辑// 子类实现,,都是直接创建一个各自rpc类型的默认逻辑String ruleHandler = ruleHandler();// 构建规则DTORuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);// 持久化规则并发布规则变更事件(不存在的时候)ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleService.registerDefault(ruleDTO);// 3、注册元数据,并发布元数据变更事件(已存在,发布元数据更新事件,不存在,发布元数据创建事件)// 子类实现registerMetadata(dto);// 4、注册contextPath(只有http,springCloud,webSocket类型才有)String contextPath = dto.getContextPath();if (StringUtils.isNotEmpty(contextPath)) {registerContextPath(dto);}return ShenyuResultMessage.SUCCESS;}}
整个注册处理逻辑可以分为4步:
- 注册选择器,构建选择器,默认情况下一个服务就是一个选择器。之后将选择器插入数据库并发布选择器变更事件。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {// 以contextPath或appName作为选择器名称String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());// 根据选择器名和插件名从数据库中查询选择器SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);// 如果还不存在,就创建一个选择器插入数据库if (Objects.isNull(selectorDO)) {// 构建选择器DTOSelectorDTO selectorDTO = SelectorUtil.buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());selectorDTO.setHandle(selectorHandler);// 注册选择器并发布事件 ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEreturn registerDefault(selectorDTO);}return selectorDO.getId();
}
- 注册规则,可以认为一个元数据就是一个规则,根据path判断是否同一个。
构建规则:
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {// 构建规则DTORuleDTO ruleDTO = RuleDTO.builder().selectorId(selectorId).name(ruleName).matchMode(MatchModeEnum.AND.getCode()).enabled(Boolean.TRUE).loged(Boolean.TRUE).matchRestful(Boolean.FALSE).sort(1).handle(ruleHandler).build();// 将{xxx}替换成**String conditionPath = this.rewritePath(path);RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder().paramType(ParamTypeEnum.URI.getName()).paramName("/").paramValue(conditionPath).build();// 设置规则条件if (conditionPath.endsWith(AdminConstants.URI_SLASH_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.STARTS_WITH.getAlias());} else if (conditionPath.endsWith(AdminConstants.URI_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.PATH_PATTERN.getAlias());} else if (conditionPath.indexOf("*") > 1) {ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());} else {ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());}ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));return ruleDTO;
}
保存规则:
@Override
public String registerDefault(final RuleDTO ruleDTO) {// 选择器下已经存在同名的规则,则直接返回,什么也不干if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {return "";}RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);if (StringUtils.isEmpty(ruleDTO.getId())) {// 插入规则ruleMapper.insertSelective(ruleDO);// 插入规则条件addCondition(ruleDO, ruleDTO.getRuleConditions());}// 发布规则变更事件 ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());return ruleDO.getId();
}
具体的规则设计建议去看官方文档。
3. 注册元数据,直接将注册上来的元数据保存
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {if (dto.isRegisterMetaData()) {MetaDataService metaDataService = getMetaDataService();// 根据路径查询元数据时候已存在MetaDataDO exist = metaDataService.findByPath(dto.getPath());// 已存在,就更新,发布元数据更新事件,不存在,就插入,发布元数据创建事件metaDataService.saveOrUpdateMetaData(exist, dto);}
}
4、注册ContextPath,只有http,springCloud,webSocket类型才有。处理的逻辑在AbstractContextPathRegisterService中。
public abstract class AbstractContextPathRegisterService extends AbstractShenyuClientRegisterServiceImpl {@Overridepublic void registerContextPath(final MetaDataRegisterDTO dto) {// 持久化contextPath插件下的选择器并发布选择器变更事件String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");// 创建规则处理逻辑ContextMappingRuleHandle handle = new ContextMappingRuleHandle();handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));handle.setAddPrefixed(dto.getAddPrefixed());// 注册contextPath插件默认的规则,contextPath就是规则名,并发布规则变更事件getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));}
}
● URI的处理
URI数据是由URIRegisterExecutorSubscriber订阅器处理:
public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}// 根据rpc类型分类final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream().filter(data -> StringUtils.isNotBlank(data.getRpcType())).collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {// 根据不同rpc类型使用对应的shenyuClientRegisterService处理final String rpcType = entry.getKey();Optional.ofNullable(shenyuClientRegisterService.get(rpcType)).ifPresent(service -> {final List<URIRegisterDTO> list = entry.getValue();// 再以contextPath/appName分类Map<String, List<URIRegisterDTO>> listMap = buildData(list);listMap.forEach((selectorName, uriList) -> {final List<URIRegisterDTO> register = new LinkedList<>();final List<URIRegisterDTO> offline = new LinkedList<>();for (URIRegisterDTO d : uriList) {final EventType eventType = d.getEventType();// 判断是注册类型还是下线类型if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {// eventType is null, should be old versionsregister.add(d);} else if (EventType.OFFLINE.equals(eventType)) {offline.add(d);}}if (CollectionUtils.isNotEmpty(register)) {// 注册URIservice.registerURI(selectorName, register);}if (CollectionUtils.isNotEmpty(offline)) {// 下线URIservice.offline(selectorName, offline);}});});}}private Map<String, List<URIRegisterDTO>> buildData(final Collection<URIRegisterDTO> dataList) {Map<String, List<URIRegisterDTO>> resultMap = new HashMap<>(8);for (URIRegisterDTO dto : dataList) {String contextPath = dto.getContextPath();String key = StringUtils.isNotEmpty(contextPath) ? contextPath : dto.getAppName();if (StringUtils.isNotEmpty(key)) {if (resultMap.containsKey(key)) {List<URIRegisterDTO> existList = resultMap.get(key);existList.add(dto);resultMap.put(key, existList);} else {resultMap.put(key, Lists.newArrayList(dto));}}}return resultMap;}
}
调到FallbackShenyuClientRegisterService的registerURI()方法
@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {String result;String key = key(selectorName);try {this.removeFallBack(key);result = this.doRegisterURI(selectorName, uriList);logger.info("Register success: {},{}", selectorName, uriList);} catch (Exception ex) {logger.warn("Register exception: cause:{}", ex.getMessage());result = "";this.addFallback(key, new FallbackHolder(selectorName, uriList));}return result;
}
FallbackShenyuClientRegisterService是用来异常处理的,然后调用doRegisterURI()做真正处理。
@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {if (CollectionUtils.isEmpty(uriList)) {return "";}// 查询对应的选择器SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));if (Objects.isNull(selectorDO)) {throw new ShenyuException("doRegister Failed to execute,wait to retry.");}// 过滤port或host为空的URIList<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());// 由URI构建处理选择器中的handler信息,更新选择器中的handler// 应该就是相当于添加上服务实例信息String handler = buildHandle(validUriList, selectorDO);if (handler != null) {selectorDO.setHandle(handler);SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));selectorData.setHandle(handler);// 更新数据库selectorService.updateSelective(selectorDO);// 发布选择器变更事件eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));}return ShenyuResultMessage.SUCCESS;
}
总结就是admin拿到URI数据后,更新选择器中的handler信息,然后写入到数据库,最后发布事件。
更新的就是这里的信息:

至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler。
参考资料:
官方博客
相关文章:
ShenYu网关注册中心之HTTP注册原理
文章目录 1、客户端注册流程1.1、读取配置1.1.1、用于注册的 HttpClientRegisterRepository1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener 1.2、扫描注解,注册元数据和URI1.2.1、构建URI并写入Disruptor1.2.2、构建元数据并写入Disruptor1.2.…...
探索GameFi:区块链与游戏的未来融合
在过去的几年里,区块链技术逐渐渗透到各个领域,为不同行业带来了前所未有的变革。其中,游戏行业成为了一个引人注目的焦点,而这种结合被称为GameFi,即游戏金融。GameFi不仅仅是一个概念,更是一场区块链和游…...
Windows下使用CMake编译lua
Lua 是一个功能强大、高效、轻量级、可嵌入的脚本语言。它支持程序编程、面向对象程序设计、函数式编程、数据驱动编程和数据描述。 Lua的官方网站上只提供了源码,需要使用Make进行编译,具体的编译方法为 curl -R -O http://www.lua.org/ftp/lua-5.4.6.…...
【C语言(十一)】
C语言内存函数 一、memcpy使用和模拟实现 void * memcpy ( void * destination, const void * source, size_t num ); • 函数memcpy从source的位置开始向后复制num个字节的数据到destination指向的内存位置。 • 这个函数在遇到 \0 的时候并不会停下来。 • 如果sourc…...
系统运行占用过高
1、CPU过高的问题排查 示例代码: public class Test { static class MyThread extends Thread { public void run() { // 死循环,消耗CPU int i 0; while (true) { i; } } } public static void main(String args[]) throws InterruptedException { ne…...
HTML---初识CSS
文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 一.CSS概念 CSS是层叠样式表(Cascading Style Sheets)的缩写。它是一种用于描述HTML文档外观样式的标记语言。通过CSS,开发者可以在不改变HTML标记结构的情况…...
监控pod 容器外网请求网络带宽,过滤掉内网、基于k8spacket开发、prometheus开发export
首先安装k8spacket 安装k8spacket遇到问题,下载插件一直能不能下载成功,pod不能启动。所有手动下载处理。 helm repo add k8spacket https://k8spacket.github.io/k8spacket-helm-chart helm pull k8spacket/k8spacket打开values.yaml 文件 手动下载插…...
windows下docker环境安装
开启硬件虚拟化技术 win10中开启 Hyper-V Win10 下是否开启硬件虚拟化技术,在控制面板,启用 window 功能,找到 Hyper-V 选项,点勾选确认。如图: Windows 11 家庭中文版新增 Hyper-V选项 注意以下的解决方案来自win1…...
Python小程序 - 表格数值统计
题设:Excel表格中,计算如下图所示不同颜色(蓝、黄、桔)单元格值:各颜色填涂的单元格值的总和条件: - Excle表格中 - 分色标记,单元格有值 - 开始列(当前为D),…...
Unity | Shader基础知识(第一集:unity中最简单的shader)
目录 一、unity的shader 二、创建一个shader(在创建时,选前三种都可以) 三、内容解读 1.shader一直都在 2.我们写shader在写什么 四、没有被干预的shader(最简单的shader) 相关阅读 编写着色器概述 - Unity 手册…...
橘子学K8S01之容器中所谓的隔离
我们一直都在说容器就是一个沙盒,沙盒技术顾名思义就是像一个集装箱一样,把应用(服务,进程之类的)装起来的技术,这样每个进程在自己的沙盒中和其他的沙盒隔离开来,每个沙盒之间存在一个边界使得他们互不干扰࿰…...
利用svm进行模型训练
一、步骤 1、将文本数据转换为特征向量 : tf-idf 2、使用这些特征向量训练SVM模型 二、代码 from sklearn.model_selection import train_test_split from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.svm import SVC from sklearn.m…...
【Docker】WSL 2 上的 Docker 搭建和入门
▒ 目录 ▒ 🛫 导读开发环境 1️⃣ 安装安装Docker Desktop for Windows 2️⃣ 环境配置3️⃣ hello world第一次运行再次运行分析总结 📖 参考资料 🛫 导读 开发环境 版本号描述文章日期2023-12-14操作系统Win11 - 22H222621.2715WSL2 C:…...
pytorch环境配置
1.创建环境 conda create --name pytorch python3.11.5 2.激活环境 source activate pytorch 3.添加国内镜像源: conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --add channels https://mirrors.tuna.tsin…...
电子眼+无人机构建平安城市视频防控监控方案
电子眼(也称为监控摄像机)可以通过安装在城市的不同角落,实时监控城市的各个地方。它们可以用于监测交通违法行为、监控公共场所的安全以及实时监测特定区域的活动情况。通过电子眼的应用,可以帮助警方及时发现并响应各类安全事件…...
mysql binlog_ignore_db参数的效果详解
我们知道 binlog 会记录数据库所有执行的 DDL 和 DML 语句(除了数据查询语句select、show等)。 我们可以在mysql配置文件中关闭binlog [mysqld] skip-log-bin注意默认情况下会记录所有库的操作,那么如果我们有另类需求,比如说只让某个库记录 binglog 或排除某个库记录…...
HI3559AV100和FPGA 7K690T的PCIE接口调试记录-续
上文https://blog.csdn.net/fzktongyong/article/details/134963814?spm1001.2014.3001.5501 上一篇文中PCIE实测速度和理论计算有较大偏差,经过尝试后有所提升。 1、提升效果 1)、RC写操作,实测速度817MB/s(410407&…...
vivado约束方法4
时序约束向导 定时约束向导确定合成或上缺少的定时约束实现的设计。它分析了网表、时钟网络连接和现有的定时限制,以便根据《超快设计方法指南》提供建议用于FPGA和SoC(UG949)。以下11涵盖了三类约束页面,然后是摘要。包括以下步…...
LeetBook学习-C语言-数组
1.数组的操作 1.1 读取元素 知道内存地址可以快速访问,时间复杂度为O(1) 1.2 查找元素 从首地址开始,逐个查找,最坏时间复杂度为O(N) 1.3 插入元素 插入元素,首先位置要腾空,而后执行插入操作。 1.4 删除元素 删除掉某…...
23种策略模式之策略模式
23种策略模式之策略模式 文章目录 23种策略模式之策略模式前言优缺点使用场景角色定义UML模拟示例小结 前言 在软件开发中,设计模式是为了解决常见问题而提供的一套可重用的解决方案。策略模式(Strategy Pattern)是其中一种常见的设计模式&a…...
深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》
在注意力分散、内容高度同质化的时代,情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现,消费者对内容的“有感”程度,正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中࿰…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
