ShenYu网关注册中心之Zookeeper注册原理
文章目录
- 1、客户端注册流程
- 1.1、读取配置
- 1.1.1、用于注册的 ZookeeperClientRegisterRepository
- 1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
- 1.2、扫描注解,注册元数据和URI
- 1.2.1、构建URI并写入Disruptor
- 1.2.2、构建元数据并写入Disruptor
- 1.2.3、Disruptor消费数据并向shenyu-admin注册数据
- 2、服务端注册流程
- 2.1、读取配置
- 2.1.1、用于监听的ShenyuClientServerRegisterRepository
- 2.2、注册元数据和URI
- 2.2.1、监听数据变更并写入Disruptor
- 2.2.2、Disruptor消费数据并持久化
1、客户端注册流程
当客户端启动后,根据相关配置,读取属性信息,然后写入队列。以官方提供的 shenyu-examples-http 为例,开始源码分析。
1.1、读取配置
该例子是一个springboot
项目,所以注册的入口往往在自动装配类中。不妨可以先看下项目的pom文件中引入了什么依赖:
<dependencies><dependency><groupId>org.apache.shenyu</groupId><artifactId>shenyu-spring-boot-starter-client-springmvc</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
这里面看到就shenyu-spring-boot-starter-client-springmvc
是跟ShenYu
相关的,所以入口应该就在这个依赖内了,看下这个依赖的项目结构:
发现就是两个配置类,ShenyuSpringMvcClientInfoRegisterConfiguration
由于使用了@Configuration(proxyBeanMethods = false)
,暂时不用关注,重点关注ShenyuSpringMvcClientConfiguration
,它是shenyu
客户端http
注册配置类。
/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig 客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}
通过@Configuration
表示这是一个配置类,通过@ImportAutoConfiguration
引入ShenyuClientCommonBeanConfiguration
配置类。
/*** shenyu客户端通用配置类,创建注册中心客户端通用的bean*/
@Configuration
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuClientCommonBeanConfiguration {/*** 根据注册中心配置通过SPI方式创建客户端注册类*/@Beanpublic ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {return ShenyuClientRegisterRepositoryFactory.newInstance(config);}/*** Shenyu 客户端注册中心配置,读取shenyu.register属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** Shenyu 客户端配置,读取shenyu.client属性配置*/@Bean@ConfigurationProperties(prefix = "shenyu")public ShenyuClientConfig shenyuClientConfig() {return new ShenyuClientConfig();}
}
ShenyuClientCommonBeanConfiguration
是ShenYu
客户端的通用配置类,创建了3个通用bean。
- ShenyuClientRegisterRepository:客户端注册类,用于将客户端接口信息注册到注册中心。
- ShenyuRegisterCenterConfig:
ShenYu
客户端注册中心配置类,读取shenyu.register
属性配置。 - ShenyuClientConfig:
ShenYu
客户端配置类,读取shenyu.client
属性配置。
1.1.1、用于注册的 ZookeeperClientRegisterRepository
上面生成的ShenyuClientRegisterRepository
是用于实现客户端注册的接口,会根据注册中心的配置通过SPI方式创建客户端注册类,每一个注册方式都对应一个实现类。
目前支持7种注册类型:
- Http:HttpClientRegisterRepository
- Apollo:ApolloClientRegisterRepository
- Zookeeper:ZookeeperClientRegisterRepository
- Etcd:EtcdClientRegisterRepository
- Nacos:NacosClientRegisterRepository
- Consul:ConsulClientRegisterRepository
- Polaris:PolarisClientRegisterRepository
public final class ShenyuClientRegisterRepositoryFactory {private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();/*** 根据注册中心类型实例化注册服务*/public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {// 通过SPI方式创建客户端注册类ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());// 初始化对应客户端注册类,比如创建zookeeper client,etcd client,admin平台的token等result.init(shenyuRegisterCenterConfig);ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);return result;}return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());}
}
加载类型通过registerType
指定,也就是我们在配置文件中指定的类型:
shenyu:register:registerType: zookeeperserverLists: http://localhost:2181
这里指定的是zookeeper
,所以这里创建的就是ZookeeperClientRegisterRepository
。
创建对应的注册客户端后,会调用init
方法根据shenyu.register
下的配置进行初始化:
@Join
public class ZookeeperClientRegisterRepository implements ShenyuClientRegisterRepository {@Overridepublic void init(final ShenyuRegisterCenterConfig config) {Properties props = config.getProps();int sessionTimeout = Integer.parseInt(props.getProperty("sessionTimeout", "3000"));int connectionTimeout = Integer.parseInt(props.getProperty("connectionTimeout", "3000"));int baseSleepTime = Integer.parseInt(props.getProperty("baseSleepTime", "1000"));int maxRetries = Integer.parseInt(props.getProperty("maxRetries", "3"));int maxSleepTime = Integer.parseInt(props.getProperty("maxSleepTime", String.valueOf(Integer.MAX_VALUE)));ZookeeperConfig zkConfig = new ZookeeperConfig(config.getServerLists());zkConfig.setBaseSleepTimeMilliseconds(baseSleepTime).setMaxRetries(maxRetries).setMaxSleepTimeMilliseconds(maxSleepTime).setSessionTimeoutMilliseconds(sessionTimeout).setConnectionTimeoutMilliseconds(connectionTimeout);String digest = props.getProperty("digest");if (!StringUtils.isEmpty(digest)) {zkConfig.setDigest(digest);}// 创建zookeeper客户端this.client = new ZookeeperClient(zkConfig);this.client.getClient().getConnectionStateListenable().addListener((c, newState) -> {if (newState == ConnectionState.RECONNECTED) {nodeDataMap.forEach((k, v) -> {if (!client.isExist(k)) {client.createOrUpdate(k, v, CreateMode.EPHEMERAL);LOGGER.info("zookeeper client register uri success: {}", v);}});}});// 启动客户端client.start();}
}
这里主要就是创建zookeeper
的客户端,为后面的发送注册数据做准备。其他注册类型的ShenyuClientRegisterRepository
也一样,创建各自注册中心的client
,连接注册中心,为发送数据做准备。类注解@Join
用于SPI的加载。
1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener
回到一开始的ShenyuSpringMvcClientConfiguration
配置类:
/*** shenyu 客户端http注册配置类*/
@Configuration
// shenyu客户端通用配置类
@ImportAutoConfiguration(ShenyuClientCommonBeanConfiguration.class)
@ConditionalOnProperty(value = "shenyu.register.enabled", matchIfMissing = true, havingValue = "true")
public class ShenyuSpringMvcClientConfiguration {static {VersionUtils.checkDuplicate(ShenyuSpringMvcClientConfiguration.class);}/**** 监听并处理http元数据和URI信息的注册** @param clientConfig 客户端注册配置* @param shenyuClientRegisterRepository 客户端注册类*/@Bean@ConditionalOnMissingBean(ClientRegisterConfiguration.class)// 这里的两个参数是由ShenyuClientCommonBeanConfiguration导入的public SpringMvcClientEventListener springHttpClientEventListener(final ShenyuClientConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {return new SpringMvcClientEventListener(clientConfig.getClient().get(RpcTypeEnum.HTTP.getName()), shenyuClientRegisterRepository);}
}
创建了SpringMvcClientEventListener
,负责客户端 元数据
和 URI
数据的构建和注册。SpringMvcClientEventListener
继承了AbstractContextRefreshedEventListener
,而AbstractContextRefreshedEventListener
是一个抽象类,它实现了ApplicationListener
接口,并重写了onApplicationEvent()
方法,当有Spring事件发生后,该方法会执行。每一种后端服务RPC调用协议都对应了一个监听类。
public class SpringMvcClientEventListener extends AbstractContextRefreshedEventListener<Object, ShenyuSpringMvcClient> {public SpringMvcClientEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {super(clientConfig, shenyuClientRegisterRepository);// client配置Properties props = clientConfig.getProps();// 是否是全部接口都注册this.isFull = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.IS_FULL, Boolean.FALSE.toString()));// http协议this.protocol = props.getProperty(ShenyuClientConstants.PROTOCOL, ShenyuClientConstants.HTTP);this.addPrefixed = Boolean.parseBoolean(props.getProperty(ShenyuClientConstants.ADD_PREFIXED,Boolean.FALSE.toString()));mappingAnnotation.add(ShenyuSpringMvcClient.class);mappingAnnotation.add(RequestMapping.class);}// ...}
SpringMvcClientEventListener
的构造函数主要就是调用父类AbstractContextRefreshedEventListener
的构造函数,传入客户端配置和客户端注册类,客户端配置指shenyu.client.http
下的配置:
shenyu:client:http:props:contextPath: /httpappName: http-appNameport: 8189isFull: false
public abstract class AbstractContextRefreshedEventListener<T, A extends Annotation> implements ApplicationListener<ContextRefreshedEvent> {protected static final String PATH_SEPARATOR = "/";// Disruptor 发布器private final ShenyuClientRegisterEventPublisher publisher = ShenyuClientRegisterEventPublisher.getInstance();// ...public AbstractContextRefreshedEventListener(final PropertiesConfig clientConfig,final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 读取 shenyu.client.http 配置信息Properties props = clientConfig.getProps();this.appName = props.getProperty(ShenyuClientConstants.APP_NAME);this.contextPath = Optional.ofNullable(props.getProperty(ShenyuClientConstants.CONTEXT_PATH)).map(UriUtils::repairData).orElse("");if (StringUtils.isBlank(appName) && StringUtils.isBlank(contextPath)) {String errorMsg = "client register param must config the appName or contextPath";LOG.error(errorMsg);throw new ShenyuClientIllegalArgumentException(errorMsg);}this.ipAndPort = props.getProperty(ShenyuClientConstants.IP_PORT);this.host = props.getProperty(ShenyuClientConstants.HOST);this.port = props.getProperty(ShenyuClientConstants.PORT);// 开始事件发布,启动 Disruptorpublisher.start(shenyuClientRegisterRepository);} }
取出相关配置信息后,就启动 Disruptor
队列,ShenyuClientRegisterEventPublisher
可以看作是一个生产者,用来向队列发送数据,
public class ShenyuClientRegisterEventPublisher {private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();private DisruptorProviderManage<DataTypeParent> providerManage;public static ShenyuClientRegisterEventPublisher getInstance() {return INSTANCE;}/*** Start.** @param shenyuClientRegisterRepository shenyuClientRegisterRepository*/public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {// 注册任务工厂类,用于创建注册的任务,客户端使用的是RegisterClientExecutorFactory, // 而在服务端(shenyu-admin)用于处理注册任务的是RegisterServerConsumerExecutor,// 都是用于消费Disruptor数据的任务RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();// 添加元数据订阅器factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));// 添加URI订阅器factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));// 添加ApiDoc订阅器factory.addSubscribers(new ShenyuClientApiDocExecutorSubscriber(shenyuClientRegisterRepository));providerManage = new DisruptorProviderManage<>(factory);// 启动Disruptor队列,并创建消费者providerManage.startup();}/*** 发布事件,向Disruptor队列发数据** @param data the data*/public void publishEvent(final DataTypeParent data) {DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();provider.onData(data);}
}
start
方法主要是为队列添加订阅器,会由消费者接收到信息后调用这些订阅器。然后启动启动Disruptor
队列,并创建消费者。
public class DisruptorProviderManage<T> {public void startup() {this.startup(false);}public void startup(final boolean isOrderly) {OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());int newConsumerSize = this.consumerSize;EventFactory<DataEvent<T>> eventFactory;if (isOrderly) {newConsumerSize = 1;eventFactory = new OrderlyDisruptorEventFactory<>();} else {eventFactory = new DisruptorEventFactory<>();}Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,size,DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),ProducerType.MULTI,new BlockingWaitStrategy());// 创建消费者@SuppressWarnings("all")QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];for (int i = 0; i < newConsumerSize; i++) {consumers[i] = new QueueConsumer<>(executor, consumerFactory);}// 设置消费者disruptor.handleEventsWithWorkerPool(consumers);disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());// 真正调用disruptor的api启动disruptor.start();RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();// disruptor的生产者provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);} }
这里就是准备Disruptor
队列的一些逻辑,就不细讲了,其中QueueConsumer
是Disruptor
的消费者,后面就是由它接收数据。
1.2、扫描注解,注册元数据和URI
上面说到SpringMvcClientEventListener
继承了AbstractContextRefreshedEventListener
,而AbstractContextRefreshedEventListener
实现了ApplicationListener
接口,并重写了onApplicationEvent()
方法,当有Spring事件发生后,该方法会执行。
// 当有上下文刷新事件ContextRefreshedEvent发生时,该方法会执行,算是客户端的执行入口吧
@Override
public void onApplicationEvent(@NonNull final ContextRefreshedEvent event) {context = event.getApplicationContext();// 获取客户端的接口类,比如http就是Controller类,dubbo就是@DubboService类,由子类实现Map<String, T> beans = getBeans(context);if (MapUtils.isEmpty(beans)) {return;}// 保证只注册一次if (!registered.compareAndSet(false, true)) {return;}// 构建URI并写入Disruptor,由子类实现publisher.publishEvent(buildURIRegisterDTO(context, beans));// 构建元数据并写入Disruptorbeans.forEach(this::handle);Map<String, Object> apiModules = context.getBeansWithAnnotation(ApiModule.class);apiModules.forEach((k, v) -> handleApiDoc(v, beans));
}
获取客户端服务的接口类,由具体的子类实现,http
就是Controller
类,这里对应的子类就是SpringMvcClientEventListener
@Override
protected Map<String, Object> getBeans(final ApplicationContext context) {// Filter outif (Boolean.TRUE.equals(isFull)) {// isFull=true,表示代理整个服务,就不需要注解扫描了,// 直接构建元数据和URI,写入DisruptorgetPublisher().publishEvent(MetaDataRegisterDTO.builder().contextPath(getContextPath()).addPrefixed(addPrefixed).appName(getAppName()).path(PathUtils.decoratorPathWithSlash(getContextPath())).rpcType(RpcTypeEnum.HTTP.getName()).enabled(true).ruleName(getContextPath()).build());LOG.info("init spring mvc client success with isFull mode");// 构建URIpublisher.publishEvent(buildURIRegisterDTO(context, Collections.emptyMap()));return Collections.emptyMap();}// 否则获取@Controller注解的beanreturn context.getBeansWithAnnotation(Controller.class);
}
这里会判断配置文件中的shenyu.client.http.props.isFull
,如果是true
,则直接构建一个元数据
和URI
,写入到Disruptor
中,然后返回一个空集合,后续的逻辑就没执行了。如果是false
,则从spring容器中获取带@Controller
注解的bean返回。
1.2.1、构建URI并写入Disruptor
构建一个URI
数据写入到Disruptor
,这个也是由子类实现的:
// 构建URI
@Override
protected URIRegisterDTO buildURIRegisterDTO(final ApplicationContext context,final Map<String, Object> beans) {try {return URIRegisterDTO.builder().contextPath(getContextPath()) // shneyu得contextPath.appName(getAppName()) // appName.protocol(protocol) // 服务协议.host(super.getHost()) // 服务host.port(Integer.valueOf(getPort())) // 服务端口.rpcType(RpcTypeEnum.HTTP.getName()) // rpc类型.eventType(EventType.REGISTER) // 事件类型.build();} catch (ShenyuException e) {throw new ShenyuException(e.getMessage() + "please config ${shenyu.client.http.props.port} in xml/yml !");}
}
可以看出来URI
跟接口类没有关系,一个后端服务实例生成一个URI。
1.2.2、构建元数据并写入Disruptor
之后遍历每个接口构建元数据beans.forEach(this::handle)
/*** 构建元数据并写入Disruptor*/
protected void handle(final String beanName, final T bean) {Class<?> clazz = getCorrectedClass(bean);// 获取当前bean的对应shenyu客户端的注解,比如http是@ShenyuSpringMvcClient, // dubbo是@ShenyuDubboClientfinal A beanShenyuClient = AnnotatedElementUtils.findMergedAnnotation(clazz, getAnnotationType());// 获取bean对应的path(类上注解的路径),由子类实现final String superPath = buildApiSuperPath(clazz, beanShenyuClient);// 如果有shenyu客户端注解并且path中包含*,则表示要注册整个类的方法,只需要构建一个类元数据if (Objects.nonNull(beanShenyuClient) && superPath.contains("*")) {// 由具体的子类构建类元数据写入DisruptorhandleClass(clazz, bean, beanShenyuClient, superPath);return;}// 类上没有shenyu客户端注解(类上没有注解,但方法上有注解,也是可以注册的),// 或者有注解但是path没有包含*,则就要遍历每个方法,为每个需要注册的方法构建方法元数据final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);for (Method method : methods) {// 由具体子类构建方法元数据写入Disruptor,并将每个method对应的元数据对象缓存在当前类里handleMethod(bean, clazz, beanShenyuClient, method, superPath);}
}protected void handleClass(final Class<?> clazz,final T bean,@NonNull final A beanShenyuClient,final String superPath) {publisher.publishEvent(buildMetaDataDTO(bean, beanShenyuClient, pathJoin(contextPath, superPath), clazz, null));
}protected void handleMethod(final T bean,final Class<?> clazz,@Nullable final A beanShenyuClient,final Method method,final String superPath) {// 如果方法上有Shenyu客户端注解,就表示该方法需要注册A methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, getAnnotationType());if (Objects.nonNull(methodShenyuClient)) {final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,buildApiPath(method, superPath, methodShenyuClient), clazz, method);publisher.publishEvent(metaData);metaDataMap.put(method, metaData);}
}
// 获取接口对应路径,如果shenyu注解上没有,就用@RequestMapping上的路径,
// 但是这个只支持第一个路径
@Override
protected String buildApiSuperPath(final Class<?> clazz, @Nullable final ShenyuSpringMvcClient beanShenyuClient) {if (Objects.nonNull(beanShenyuClient) && StringUtils.isNotBlank(beanShenyuClient.path())) {return beanShenyuClient.path();}RequestMapping requestMapping = AnnotationUtils.findAnnotation(clazz, RequestMapping.class);// Only the first path is supported temporarilyif (Objects.nonNull(requestMapping) && ArrayUtils.isNotEmpty(requestMapping.path()) && StringUtils.isNotBlank(requestMapping.path()[0])) {return requestMapping.path()[0];}return "";
}// springmvc接口上需要有 ShenyuSpringMvcClient 注解,
// 并且包含RequestMapping注解(表示是一个接口),才进行注册
protected void handleMethod(final Object bean, final Class<?> clazz,@Nullable final ShenyuSpringMvcClient beanShenyuClient,final Method method, final String superPath) {final RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(method, RequestMapping.class);ShenyuSpringMvcClient methodShenyuClient = AnnotatedElementUtils.findMergedAnnotation(method, ShenyuSpringMvcClient.class);methodShenyuClient = Objects.isNull(methodShenyuClient) ? beanShenyuClient : methodShenyuClient;// 如果有 ShenyuSpringMvcClient 注解并且包含RequestMapping注解(表示是一个接口),则进行注册if (Objects.nonNull(methodShenyuClient) && Objects.nonNull(requestMapping)) {// 构建元数据final MetaDataRegisterDTO metaData = buildMetaDataDTO(bean, methodShenyuClient,// 构建path = contextPath + 类上的路径 + 方法上的路径 buildApiPath(method, superPath, methodShenyuClient), clazz, method);// 发布元数据getPublisher().publishEvent(metaData);getMetaDataMap().put(method, metaData);}
}// path = contextPath + 类上的路径 + 方法上的路径,
// 如果@ShenyuSpringMvcClient注解上的路径不为空,则方法上的路径=@ShenyuSpringMvcClient上的value,
// 否则,方法上的路径=@RequestMapping上的value
@Override
protected String buildApiPath(final Method method, final String superPath,@NonNull final ShenyuSpringMvcClient methodShenyuClient) {String contextPath = getContextPath();if (StringUtils.isNotBlank(methodShenyuClient.path())) {return pathJoin(contextPath, superPath, methodShenyuClient.path());}final String path = getPathByMethod(method);if (StringUtils.isNotBlank(path)) {return pathJoin(contextPath, superPath, path);}return pathJoin(contextPath, superPath);
}
1.2.3、Disruptor消费数据并向shenyu-admin注册数据
上面启动Disruptor
的时候说到QueueConsumer
实现了WorkHandler
接口,是Disruptor
的消费者,消费逻辑就在它的onEvent
方法中:
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterClientConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}
QueueConsumerExecutor
是实现了Runnable
的消费任务,它有两个实现:
- RegisterClientConsumerExecutor:客户端消费者任务
- RegisterServerConsumerExecutor:服务端消费者任务
从名字也可以看出,RegisterClientConsumerExecutor
负责处理客户端任务,shenyu
客户端将元数据
和URI
写入disruptor
后由这个消费者任务来消费数据,执行实际向注册中心注册的操作。RegisterServerConsumerExecutor
负责处理服务端(shenyu-admin
)任务,服务端从注册中心监听到元数据
和URI
后写入disruptor
,然后由RegisterServerConsumerExecutor
任务来消费数据,处理数据入库操作和发布事件。
RegisterClientConsumerExecutor
的消费逻辑:
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {this.subscribers = new EnumMap<>(executorSubscriberMap);}@Overridepublic void run() {// 获取数据final T data = getData();// 根据数据类型获取对应的处理器进行处理,即在disruptor启动的时候添加的订阅器subscribers.get(data.getType()).executor(Lists.newArrayList(data));}// ...
}
根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor
启动的时候设置的。目前注册的数据类型有3种,元数据
,URI
和API文档
。
public enum DataType {/*** Meta data data type enum.*/META_DATA,/*** Uri data type enum.*/URI,/*** Api doc type enum.*/API_DOC,
}
所以相对应的订阅器也分为3类,分别处理元数据,URI和API文档。在客户端和服务端分别有两个,所以一共是6个。
元数据处理
public class ShenyuClientMetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {private final ShenyuClientRegisterRepository shenyuClientRegisterRepository;// .../*** 遍历元数据,对数据注册到注册中心*/@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {for (MetaDataRegisterDTO metaDataRegisterDTO : metaDataRegisterDTOList) {// 调用响应注册中心的客户端注册类注册元数据shenyuClientRegisterRepository.persistInterface(metaDataRegisterDTO);}}
}
遍历数据,然后又将数据委托给ShenyuClientRegisterRepository
执行。ShenyuClientRegisterRepository
是在一开始读取配置的时候就创建了,是客户端注册类,用来将数据发送到注册中心的类,不同的注册方式有不同的实现类,该示例使用zookeeper
方式注册(shenyu.register.registerType=zookeeper
)的实现类是ZookeeperClientRegisterRepository
。
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {// 后端服务rpc类型String rpcType = metadata.getRpcType();// contextPath = StringUtils.isEmpty(contextPath) ? appName : contextPathString contextPath = ContextPathUtils.buildRealNode(metadata.getContextPath(), metadata.getAppName());// 注册元数据registerMetadata(rpcType, contextPath, metadata);
}private void registerMetadata(final String rpcType,final String contextPath,final MetaDataRegisterDTO metadata) {// 构建元数据节点名称String metadataNodeName = buildMetadataNodeName(metadata);// 构建元数据的整体父路径 /shenyu/register/metadata/${rpcType}/${contextPath}String metaDataPath = RegisterPathConstants.buildMetaDataParentPath(rpcType, contextPath);// 当前元数据在zookeeper中的实际路径,上面两个拼起来String realNode = RegisterPathConstants.buildRealNode(metaDataPath, metadataNodeName);// 防止同一个元数据添加多次synchronized (metadataSet) {if (metadataSet.contains(realNode)) {return;}metadataSet.add(realNode);}// 使用客户端类往zookeeper添加数据,元数据是永久节点client.createOrUpdate(realNode, metadata, CreateMode.PERSISTENT);LOGGER.info("{} zookeeper client register metadata success: {}", rpcType, metadata);
}
client
是shenyu
对zookeeper
操作接口的封装
public void createOrUpdate(final String key, final Object value, final CreateMode mode) {if (value != null) {// 元数据以json字符串形式存储String val = GsonUtils.getInstance().toJson(value);createOrUpdate(key, val, mode);} else {createOrUpdate(key, "", mode);}
}public void createOrUpdate(final String key, final String value, final CreateMode mode) {String val = StringUtils.isEmpty(value) ? "" : value;try {// 使用Curator的API往zookeeper保存数据client.create().orSetData().creatingParentsIfNeeded().withMode(mode).forPath(key, val.getBytes(StandardCharsets.UTF_8));} catch (Exception e) {throw new ShenyuException(e);}
}
注册方式比较简单,将元数据的json文本设置到zookeeper
对应的路径节点中。
URI处理
public class ShenyuClientURIExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {for (URIRegisterDTO uriRegisterDTO : dataList) {Stopwatch stopwatch = Stopwatch.createStarted();// 这里的逻辑是为了探测客户端是否已经启动while (true) {try (Socket ignored = new Socket(uriRegisterDTO.getHost(), uriRegisterDTO.getPort())) {break;} catch (IOException e) {long sleepTime = 1000;// maybe the port is delay exposedif (stopwatch.elapsed(TimeUnit.SECONDS) > 5) {LOG.error("host:{}, port:{} connection failed, will retry",uriRegisterDTO.getHost(), uriRegisterDTO.getPort());// If the connection fails for a long time, Increase sleep timeif (stopwatch.elapsed(TimeUnit.SECONDS) > 180) {sleepTime = 10000;}}try {TimeUnit.MILLISECONDS.sleep(sleepTime);} catch (InterruptedException ex) {LOG.error("interrupted when sleep", ex);}}}ShenyuClientShutdownHook.delayOtherHooks();// 向注册中心注册URI数据shenyuClientRegisterRepository.persistURI(uriRegisterDTO);// 优雅停机ShutdownHookManager.get().addShutdownHook(new Thread(() -> {final URIRegisterDTO offlineDTO = new URIRegisterDTO();BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);offlineDTO.setEventType(EventType.OFFLINE);shenyuClientRegisterRepository.offline(offlineDTO);}), 2);}}
}
URI注册逻辑跟元数据的一样
@Override
public void persistURI(final URIRegisterDTO registerDTO) {// 后端服务rpc类型String rpcType = registerDTO.getRpcType();// contextPath = StringUtils.isEmpty(contextPath) ? appName : contextPathString contextPath = ContextPathUtils.buildRealNode(registerDTO.getContextPath(), registerDTO.getAppName());// 注册URIregisterURI(rpcType, contextPath, registerDTO);LOGGER.info("{} zookeeper client register uri success: {}", rpcType, registerDTO);
}private synchronized void registerURI(final String rpcType, final String contextPath, final URIRegisterDTO registerDTO) {// uri节点名称 ${ip:port}String uriNodeName = buildURINodeName(registerDTO);// uri父路径 /shenyu/register/uri/{rpcType}/${contextPath}String uriPath = RegisterPathConstants.buildURIParentPath(rpcType, contextPath);// uri的完整路径,上面两个拼起来String realNode = RegisterPathConstants.buildRealNode(uriPath, uriNodeName);// uri节点数据String nodeData = GsonUtils.getInstance().toJson(registerDTO);nodeDataMap.put(realNode, nodeData);// 往zookeeper设置uri数据,uri节点是临时节点client.createOrUpdate(realNode, nodeData, CreateMode.EPHEMERAL);
}
分析到这里就将客户端的注册逻辑分析完了,通过读取自定义的注解信息构造元数据
和URI
,将数据发到Disruptor
队列,然后从队列中消费数据,将数据写到Zookeeper
节点中。Zookeeper
存储结构如下:
shenyu├──regsiter├ ├──metadata├ ├ ├──${rpcType}├ ├ ├ ├────${contextPath}├ ├ ├ ├──${ruleName} : save metadata data of MetaDataRegisterDTO├ ├──uri├ ├ ├──${rpcType}├ ├ ├ ├────${contextPath}├ ├ ├ ├──${ip:port} : save uri data of URIRegisterDTO├ ├ ├ ├──${ip:port}
2、服务端注册流程
2.1、读取配置
客户端是将数据注册到注册中心上,所以服务端(shenyu-admin
)自然也是要从注册中心中监听数据的。注册中心配置类是RegisterCenterConfiguration
,我们先看这个配置类:
/*** 注册中心配置类*/
@Configuration
public class RegisterCenterConfiguration {/*** 读取shenyu.register配置*/@Bean@ConfigurationProperties(prefix = "shenyu.register")public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {return new ShenyuRegisterCenterConfig();}/*** 创建用于服务端的注册类,从注册中心中监听数据,然后将数据写入Disruptor队列中*/@Bean(destroyMethod = "close")public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,final List<ShenyuClientRegisterService> shenyuClientRegisterService) {// 从配置中获取注册类型String registerType = shenyuRegisterCenterConfig.getRegisterType();// 根据注册类型通过SPI方式创建对应的ShenyuClientServerRegisterRepositoryShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);// 创建Disruptor发布者RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();// 每种客户端类型(rpc类型)的处理类Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, Function.identity()));// 启动Disruptor,添加元数据和URI的订阅器publisher.start(registerServiceMap);// 初始化注册中心registerRepository.init(publisher, shenyuRegisterCenterConfig);return registerRepository;}
}
该配置类创建了2个bean:
- ShenyuRegisterCenterConfig:
shenyu-admin
注册中心配置,读取shenyu.register
属性配置。 - ShenyuClientServerRegisterRepository:服务端注册类,用于从注册中心中监听数据,然后将数据写入
Disruptor
队列中。
这里的创建Disruptor
发布者,启动Disruptor
等逻辑跟在客户端那边的一样,只是类是服务端这边的,就不再分析了。
2.1.1、用于监听的ShenyuClientServerRegisterRepository
上面生成的ShenyuClientServerRegisterRepository
是用于实现服务端注册的接口,会根据注册中心的配置通过SPI方式创建注册类,每一个注册方式都对应一个实现类。
目前支持7种注册类型:
- Http:ShenyuClientHttpRegistryController
- Apollo:ApolloClientServerRegisterRepository
- Zookeeper:ZookeeperClientServerRegisterRepository
- Etcd:EtcdClientServerRegisterRepository
- Nacos:NacosClientServerRegisterRepository
- Consul:ConsulClientServerRegisterRepository
- Polaris:PolarisClientServerRegisterRepository
加载类型通过registerType
指定,也就是我们在配置文件中指定的类型:
shenyu:register:registerType: zookeeperserverLists: 127.0.0.1:2181
服务端的注册类型必须跟客户端的注册类型一致,这样服务端才可以监听到注册信息。这里要指定的是zookeeper
,所以这里创建的就是ZookeeperClientServerRegisterRepository
。
初始化ZookeeperClientServerRegisterRepository
时会对zookeeper
进行监听
@Join
public class ZookeeperClientServerRegisterRepository implements ShenyuClientServerRegisterRepository {private ShenyuClientServerRegisterPublisher publisher;private ZookeeperClient client;@Overridepublic void init(final ShenyuClientServerRegisterPublisher publisher, final ShenyuRegisterCenterConfig config) {this.init(config);this.publisher = publisher;Properties props = config.getProps();int sessionTimeout = Integer.parseInt(props.getProperty("sessionTimeout", "3000"));int connectionTimeout = Integer.parseInt(props.getProperty("connectionTimeout", "3000"));int baseSleepTime = Integer.parseInt(props.getProperty("baseSleepTime", "1000"));int maxRetries = Integer.parseInt(props.getProperty("maxRetries", "3"));int maxSleepTime = Integer.parseInt(props.getProperty("maxSleepTime", String.valueOf(Integer.MAX_VALUE)));ZookeeperConfig zkConfig = new ZookeeperConfig(config.getServerLists());zkConfig.setBaseSleepTimeMilliseconds(baseSleepTime).setMaxRetries(maxRetries).setMaxSleepTimeMilliseconds(maxSleepTime).setSessionTimeoutMilliseconds(sessionTimeout).setConnectionTimeoutMilliseconds(connectionTimeout);String digest = props.getProperty("digest");if (!StringUtils.isEmpty(digest)) {zkConfig.setDigest(digest);}// 创建zookeeper客户端this.client = new ZookeeperClient(zkConfig);// 启动客户端client.start();// 初始化订阅initSubscribe();}private void initSubscribe() {// 订阅元数据节点,由于是按rpc类型分类的,所以需要分别监听这几个rpc节点RpcTypeEnum.acquireSupportMetadatas().forEach(rpcTypeEnum -> subscribeMetaData(rpcTypeEnum.getName()));// 订阅URI节点,由于是按rpc类型分类的,所以需要分别监听这几个rpc节点RpcTypeEnum.acquireSupportURIs().forEach(rpcTypeEnum -> subscribeURI(rpcTypeEnum.getName()));}// 订阅URIprivate void subscribeURI(final String rpcType) {// /shenyu/register/uri/${rpcType}String contextPathParent = RegisterPathConstants.buildURIContextPathParent(rpcType);// 添加监听client.addCache(contextPathParent, new URICacheListener());}// 订阅元数据节点private void subscribeMetaData(final String rpcType) {// /shenyu/register/metadata/${rpcType}String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);// 添加监听client.addCache(contextPathParent, new MetadataCacheListener());}// ...
}
2.2、注册元数据和URI
2.2.1、监听数据变更并写入Disruptor
前面分析到服务端启动初始化的时候,会对zookeeper
节点进行监听,zookeeper
节点结构如下:
shenyu├──regsiter├ ├──metadata├ ├ ├──${rpcType}├ ├ ├ ├────${contextPath}├ ├ ├ ├──${ruleName} : save metadata data of MetaDataRegisterDTO├ ├──uri├ ├ ├──${rpcType}├ ├ ├ ├────${contextPath}├ ├ ├ ├──${ip:port} : save uri data of URIRegisterDTO├ ├ ├ ├──${ip:port}
每一个rpcType
节点都会由一个监听器,当它下面的节点变更的时候,会接收到变更的信息。
abstract static class AbstractRegisterListener implements TreeCacheListener {@Overridepublic final void childEvent(final CuratorFramework client, final TreeCacheEvent event) {ChildData childData = event.getData();if (null == childData) {return;}// 变更的路径String path = childData.getPath();if (Strings.isNullOrEmpty(path)) {return;}event(event.getType(), path, childData);}protected abstract void event(TreeCacheEvent.Type type, String path, ChildData data);
}// 元数据注册监听
class MetadataCacheListener extends AbstractRegisterListener {@Overridepublic void event(final TreeCacheEvent.Type type, final String path, final ChildData data) {// 如果不是"/shenyu/register"开头的路径,则略过if (!path.contains(RegisterPathConstants.ROOT_PATH)) {return;}Optional.ofNullable(data).ifPresent(e -> {String str = new String(data.getData(), StandardCharsets.UTF_8);// 往disruptor写入元数据publishMetadata(str);LOGGER.info("zookeeper register metadata success: {}", str);});}
}// URI注册和下线监听
class URICacheListener extends AbstractRegisterListener {@Overridepublic void event(final TreeCacheEvent.Type type, final String path, final ChildData data) {// 不是叶子节点,即不是URI节点,则略过if (data.getData() == null || data.getData().length == 0) {return;}// 将节点的数据转为URI对象URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(new String(data.getData()), URIRegisterDTO.class);if (uriRegisterDTO == null) {return;}switch (type) {case NODE_ADDED: // 服务注册uriRegisterDTO.setEventType(EventType.REGISTER);// 发布URIpublishRegisterURI(Arrays.asList(uriRegisterDTO));break;case NODE_REMOVED: // 服务下线uriRegisterDTO.setEventType(EventType.OFFLINE);// 往disruptor写入URIpublishRegisterURI(Arrays.asList(uriRegisterDTO));break;default:break;}}
}
监听到元数据
和URI
变更后都是直接写入disruptor
队列。
2.2.2、Disruptor消费数据并持久化
QueueConsumer
实现了WorkHandler
接口,是Disruptor
的消费者,消费逻辑就在它的onEvent
方法中:
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {private final OrderlyExecutor executor;private final QueueConsumerFactory<T> factory;/*** Instantiates a new Queue consumer.** @param executor the executor* @param factory the factory*/public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {this.executor = executor;this.factory = factory;}@Overridepublic void onEvent(final DataEvent<T> t) {if (Objects.nonNull(t)) {// 根据事件类型使用不同的线程池ThreadPoolExecutor executor = orderly(t);// 通过工厂创建队列消费任务 RegisterServerConsumerExecutorQueueConsumerExecutor<T> queueConsumerExecutor = factory.create();// 为消费任务设置数据queueConsumerExecutor.setData(t.getData());t.setData(null);// 放在线程池中执行 消费任务executor.execute(queueConsumerExecutor);}}// ...
}
分析客户端注册流程的时候说到RegisterServerConsumerExecutor
是服务端消费者任务,处理数据入库操作和发布事件。
RegisterServerConsumerExecutor
消费逻辑:
public final class RegisterServerConsumerExecutor extends QueueConsumerExecutor<Collection<DataTypeParent>> {// 每种数据类型的订阅器执行器private final Map<DataType, ExecutorSubscriber<DataTypeParent>> subscribers;private RegisterServerConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> executorSubscriberMap) {this.subscribers = new HashMap<>(executorSubscriberMap);}@Overridepublic void run() {Collection<DataTypeParent> results = getData().stream().filter(this::isValidData).collect(Collectors.toList());if (CollectionUtils.isEmpty(results)) {return;}// 选择对应的数据类型的订阅器执行器去执行selectExecutor(results).executor(results);}private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {final Optional<DataTypeParent> first = list.stream().findFirst();return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());}// ...
}
根据不同的数据类型使用不同的订阅器执行器去执行,这些订阅器是在disruptor
启动的时候设置的。
服务端的订阅器有3个,分别为MetadataExecutorSubscriber
,URIRegisterExecutorSubscriber
和ApiDocExecutorSubscriber
,分别处理元数据
,URI
和API文档
。
元数据的处理
public class MetadataExecutorSubscriber implements ExecutorTypeSubscriber<MetaDataRegisterDTO> {// 每种客户端类型的注册服务private final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService;public MetadataExecutorSubscriber(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {this.shenyuClientRegisterService = shenyuClientRegisterService;}@Overridepublic DataType getType() {return DataType.META_DATA;}@Overridepublic void executor(final Collection<MetaDataRegisterDTO> metaDataRegisterDTOList) {// 遍历元数据metaDataRegisterDTOList.forEach(meta -> {// 根据客户端类型Optional.ofNullable(this.shenyuClientRegisterService.get(meta.getRpcType())).ifPresent(shenyuClientRegisterService -> {// 加锁,保证数据顺序执行,防止并发synchronized (shenyuClientRegisterService) {// 处理数据shenyuClientRegisterService.register(meta);}});});}
}
ShenyuClientRegisterService是注册方法接口,它有多个实现类:
- AbstractContextPathRegisterService:抽象类,处理部分公共逻辑;
- AbstractShenyuClientRegisterServiceImpl::抽象类,处理部分公共逻辑;
- ShenyuClientRegisterDivideServiceImpl:divide类,处理http注册类型;
- ShenyuClientRegisterDubboServiceImpl:dubbo类,处理dubbo注册类型;
- ShenyuClientRegisterGrpcServiceImpl:gRPC类,处理gRPC注册类型;
- ShenyuClientRegisterBrpcServiceImpl:bRPC类,处理bRPC注册类型;
- ShenyuClientRegisterMotanServiceImpl:Motan类,处理Motan注册类型;
- ShenyuClientRegisterSofaServiceImpl:Sofa类,处理Sofa注册类型;
- ShenyuClientRegisterSpringCloudServiceImpl:SpringCloud类,处理SpringCloud注册类型;
- ShenyuClientRegisterTarsServiceImpl:Tars类,处理Tars注册类型;
- ShenyuClientRegisterWebSocketServiceImpl:Websocket类,处理Websocket注册类型;
每一种rpc类型都对应一个注册处理类,本实例后端服务是http
接口,所以是使用ShenyuClientRegisterDivideServiceImpl
来处理。
public abstract class AbstractShenyuClientRegisterServiceImpl extends FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {@Resourceprivate ApplicationEventPublisher eventPublisher;// 这几个就是操作数据库的service@Resourceprivate SelectorService selectorService;@Resourceprivate MetaDataService metaDataService;@Resourceprivate RuleService ruleService;@Overridepublic String register(final MetaDataRegisterDTO dto) {// 1、注册选择器(可以认为一个服务就是一个选择器)// 选择器执行逻辑,默认情况是空的,需要在控制台另外手动配置// 子类实现String selectorHandler = selectorHandler(dto);// 持久化选择器并发布选择器变更事件(不存在的时候)ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEString selectorId = selectorService.registerDefault(dto, PluginNameAdapter.rpcTypeAdapter(rpcType()), selectorHandler);// 2、注册规则(可以认为一个元数据就是一个规则,根据path判断是否同一个)// 规则处理逻辑// 子类实现,,都是直接创建一个各自rpc类型的默认逻辑String ruleHandler = ruleHandler();// 构建规则DTORuleDTO ruleDTO = buildRpcDefaultRuleDTO(selectorId, dto, ruleHandler);// 持久化规则并发布规则变更事件(不存在的时候)ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleService.registerDefault(ruleDTO);// 3、注册元数据,并发布元数据变更事件(已存在,发布元数据更新事件,不存在,发布元数据创建事件)// 子类实现registerMetadata(dto);// 4、注册contextPath(只有http,springCloud,webSocket类型才有)String contextPath = dto.getContextPath();if (StringUtils.isNotEmpty(contextPath)) {registerContextPath(dto);}return ShenyuResultMessage.SUCCESS;}}
整个注册处理逻辑可以分为4步:
- 注册选择器,构建选择器,默认情况下一个服务就是一个选择器。之后将选择器插入数据库并发布选择器变更事件。
@Override
public String registerDefault(final MetaDataRegisterDTO dto, final String pluginName, final String selectorHandler) {// 以contextPath或appName作为选择器名称String contextPath = ContextPathUtils.buildContextPath(dto.getContextPath(), dto.getAppName());// 根据选择器名和插件名从数据库中查询选择器SelectorDO selectorDO = findByNameAndPluginName(contextPath, pluginName);// 如果还不存在,就创建一个选择器插入数据库if (Objects.isNull(selectorDO)) {// 构建选择器DTOSelectorDTO selectorDTO = SelectorUtil.buildSelectorDTO(contextPath, pluginMapper.selectByName(pluginName).getId());selectorDTO.setHandle(selectorHandler);// 注册选择器并发布事件 ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATEreturn registerDefault(selectorDTO);}return selectorDO.getId();
}
- 注册规则,可以认为一个元数据就是一个规则,根据
path
判断是否同一个。
构建规则:
private RuleDTO buildRpcDefaultRuleDTO(final String selectorId, final MetaDataRegisterDTO metaDataDTO, final String ruleHandler) {return buildRuleDTO(selectorId, ruleHandler, metaDataDTO.getRuleName(), metaDataDTO.getPath());
}private RuleDTO buildRuleDTO(final String selectorId, final String ruleHandler, final String ruleName, final String path) {// 构建规则DTORuleDTO ruleDTO = RuleDTO.builder().selectorId(selectorId).name(ruleName).matchMode(MatchModeEnum.AND.getCode()).enabled(Boolean.TRUE).loged(Boolean.TRUE).matchRestful(Boolean.FALSE).sort(1).handle(ruleHandler).build();// 将{xxx}替换成**String conditionPath = this.rewritePath(path);RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder().paramType(ParamTypeEnum.URI.getName()).paramName("/").paramValue(conditionPath).build();// 设置规则条件if (conditionPath.endsWith(AdminConstants.URI_SLASH_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.STARTS_WITH.getAlias());} else if (conditionPath.endsWith(AdminConstants.URI_SUFFIX)) {ruleConditionDTO.setOperator(OperatorEnum.PATH_PATTERN.getAlias());} else if (conditionPath.indexOf("*") > 1) {ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());} else {ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());}ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));return ruleDTO;
}
保存规则:
@Override
public String registerDefault(final RuleDTO ruleDTO) {// 选择器下已经存在同名的规则,则直接返回,什么也不干if (Objects.nonNull(ruleMapper.findBySelectorIdAndName(ruleDTO.getSelectorId(), ruleDTO.getName()))) {return "";}RuleDO ruleDO = RuleDO.buildRuleDO(ruleDTO);if (StringUtils.isEmpty(ruleDTO.getId())) {// 插入规则ruleMapper.insertSelective(ruleDO);// 插入规则条件addCondition(ruleDO, ruleDTO.getRuleConditions());}// 发布规则变更事件 ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATEruleEventPublisher.onRegister(ruleDO, ruleDTO.getRuleConditions());return ruleDO.getId();
}
具体的规则设计建议去看官方文档。
- 注册元数据,直接将注册上来的元数据保存
@Override
protected void registerMetadata(final MetaDataRegisterDTO dto) {if (dto.isRegisterMetaData()) {MetaDataService metaDataService = getMetaDataService();// 根据路径查询元数据时候已存在MetaDataDO exist = metaDataService.findByPath(dto.getPath());// 已存在,就更新,发布元数据更新事件,不存在,就插入,发布元数据创建事件(用于同步网关)metaDataService.saveOrUpdateMetaData(exist, dto);}
}
- 注册
ContextPath
,只有http
,springCloud
,webSocket
类型才有。处理的逻辑在AbstractContextPathRegisterService
中。
public abstract class AbstractContextPathRegisterService extends AbstractShenyuClientRegisterServiceImpl {@Overridepublic void registerContextPath(final MetaDataRegisterDTO dto) {// 持久化contextPath插件下的选择器并发布选择器变更事件String contextPathSelectorId = getSelectorService().registerDefault(dto, PluginEnum.CONTEXT_PATH.getName(), "");// 创建规则处理逻辑ContextMappingRuleHandle handle = new ContextMappingRuleHandle();handle.setContextPath(PathUtils.decoratorContextPath(dto.getContextPath()));handle.setAddPrefixed(dto.getAddPrefixed());// 注册contextPath插件默认的规则,contextPath就是规则名,并发布规则变更事件(用于同步网关)getRuleService().registerDefault(buildContextPathDefaultRuleDTO(contextPathSelectorId, dto, handle.toJson()));}
}
URI的处理
URI
数据是由URIRegisterExecutorSubscriber
订阅器处理:
public class URIRegisterExecutorSubscriber implements ExecutorTypeSubscriber<URIRegisterDTO> {@Overridepublic void executor(final Collection<URIRegisterDTO> dataList) {if (CollectionUtils.isEmpty(dataList)) {return;}// 根据rpc类型分类final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream().filter(data -> StringUtils.isNotBlank(data.getRpcType())).collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {// 根据不同rpc类型使用对应的shenyuClientRegisterService处理final String rpcType = entry.getKey();Optional.ofNullable(shenyuClientRegisterService.get(rpcType)).ifPresent(service -> {final List<URIRegisterDTO> list = entry.getValue();// 再以contextPath/appName分类Map<String, List<URIRegisterDTO>> listMap = buildData(list);listMap.forEach((selectorName, uriList) -> {final List<URIRegisterDTO> register = new LinkedList<>();final List<URIRegisterDTO> offline = new LinkedList<>();for (URIRegisterDTO d : uriList) {final EventType eventType = d.getEventType();// 判断是注册类型还是下线类型(服务实例启动和下线)if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {// eventType is null, should be old versionsregister.add(d);} else if (EventType.OFFLINE.equals(eventType)) {offline.add(d);}}if (CollectionUtils.isNotEmpty(register)) {// 注册URIservice.registerURI(selectorName, register);}if (CollectionUtils.isNotEmpty(offline)) {// 下线URIservice.offline(selectorName, offline);}});});}}private Map<String, List<URIRegisterDTO>> buildData(final Collection<URIRegisterDTO> dataList) {Map<String, List<URIRegisterDTO>> resultMap = new HashMap<>(8);for (URIRegisterDTO dto : dataList) {String contextPath = dto.getContextPath();String key = StringUtils.isNotEmpty(contextPath) ? contextPath : dto.getAppName();if (StringUtils.isNotEmpty(key)) {if (resultMap.containsKey(key)) {List<URIRegisterDTO> existList = resultMap.get(key);existList.add(dto);resultMap.put(key, existList);} else {resultMap.put(key, Lists.newArrayList(dto));}}}return resultMap;}
}
调到FallbackShenyuClientRegisterService
的registerURI()
方法
@Override
public String registerURI(final String selectorName, final List<URIRegisterDTO> uriList) {String result;String key = key(selectorName);try {this.removeFallBack(key);// 注册uriresult = this.doRegisterURI(selectorName, uriList);logger.info("Register success: {},{}", selectorName, uriList);} catch (Exception ex) {logger.warn("Register exception: cause:{}", ex.getMessage());result = "";this.addFallback(key, new FallbackHolder(selectorName, uriList));}return result;
}
FallbackShenyuClientRegisterService是用来异常处理的,然后调用doRegisterURI()做真正处理。
@Override
public String doRegisterURI(final String selectorName, final List<URIRegisterDTO> uriList) {if (CollectionUtils.isEmpty(uriList)) {return "";}// 查询对应的选择器SelectorDO selectorDO = selectorService.findByNameAndPluginName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));if (Objects.isNull(selectorDO)) {throw new ShenyuException("doRegister Failed to execute,wait to retry.");}// 过滤port或host为空的URIList<URIRegisterDTO> validUriList = uriList.stream().filter(dto -> Objects.nonNull(dto.getPort()) && StringUtils.isNotBlank(dto.getHost())).collect(Collectors.toList());// 由URI构建处理选择器中的handler信息,更新选择器中的handler// 应该就是相当于添加上服务实例信息String handler = buildHandle(validUriList, selectorDO);if (handler != null) {selectorDO.setHandle(handler);SelectorData selectorData = selectorService.buildByName(selectorName, PluginNameAdapter.rpcTypeAdapter(rpcType()));selectorData.setHandle(handler);// 更新数据库selectorService.updateSelective(selectorDO);// 发布选择器变更事件(用于同步给网关)eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE, Collections.singletonList(selectorData)));}return ShenyuResultMessage.SUCCESS;
}
总结就是admin
拿到URI
数据后,更新选择器中的handler
信息,然后写入到数据库,最后发布事件。
更新的就是这里的信息:
至此,服务端注册流程也就分析完了,主要通过对外提供的接口,接受客户端的注册信息,然后写入到Disruptor队列,再从中消费数据,根据接收到的元数据和URI数据更新admin的选择器、规则、元数据和选择器的handler。
参考资料:
官方博客
相关文章:
ShenYu网关注册中心之Zookeeper注册原理
文章目录 1、客户端注册流程1.1、读取配置1.1.1、用于注册的 ZookeeperClientRegisterRepository1.1.2、用于扫描构建 元数据 和 URI 的 SpringMvcClientEventListener 1.2、扫描注解,注册元数据和URI1.2.1、构建URI并写入Disruptor1.2.2、构建元数据并写入Disrupto…...
高级C#技术(二)
前言 本章为高级C#技术的第二节也是最后一节。前一节在下面这个链接 高级C#技术https://blog.csdn.net/qq_71897293/article/details/134930989?spm1001.2014.3001.5501 匿名类型 匿名类型如其名,匿名的没有指定变量的具体类型。 举个例子: 1 创建…...
【性能测试】基础知识篇-压力模型
常见压力模式 并发模式(即虚拟用户模式)和RPS模式(即Requests Per Second,每秒请求数,吞吐量模式)。 本文介绍这两种压力模式的区别,以便根据自身业务场景选择更合适的压力模式。 并发模式 …...
springboot-redis设置定时触发任务详解
最近研究了一下“redis定时触发”,网上查了查资料,这里记录一下。 从Redis 2.8.0开始,Redis加入了发布/订阅模式以及键空间消息提醒(keyspace notification)功能。键空间消息提醒提供了允许客户端通过订阅指定信道获取…...
Video anomaly detection with spatio-temporal dissociation 论文阅读
Video anomaly detection with spatio-temporal dissociation 摘要1.介绍2.相关工作3. Methods3.1. Overview3.2. Spatial autoencoder3.3. Motion autoencoder3.4. Variance attention module3.5. Clustering3.6. The training objective function 4. Experiments5. Conclusio…...
svn 安装
安装系统 ubuntu 22 安装命令: sudo apt-get install subversion 创建第一个工程: 创建版本库、项目 1、先创建svn根目录文件夹 sudo mkdir /home/svn 2、创建项目的目录文件夹 sudo mkdir /home/svn/demo_0 svnadmin create /home/svn/demo_0 配置&a…...
slurm 23.11.0集群 debian 11.5 安装
slurm 23.11.0集群 debian 11.5 安装 用途 Slurm(Simple Linux Utility for Resource Management, http://slurm.schedmd.com/ )是开源的、具有容错性和高度可扩展的Linux集群超级计算系统资源管理和作业调度系统。超级计算系统可利用Slurm对资源和作业进行管理&a…...
ffmpeg可以做什么
用途 FFmpeg是一个功能强大的多媒体处理工具,可以处理音频和视频文件。它是一个开源项目,可在各种操作系统上运行,包括Linux、Windows和Mac OS X等。以下是FFmpeg可以做的一些主要任务: 转换媒体格式:可将一个媒体格式…...
一种缩小数据之间差距的算法
先上代码: /** * 缩小数据之间的差距,但是大小关系不变的方法* param {Array} features */function minMaxData(data) {for (let i 0; i < data.length; i) {const f data[i];const x f[1];const yf[2];//此处5根据实际情况设置const y2 Math.pow(…...
【Axure RP9】动态面板使用------案例:包括轮播图和多方式登入及左侧菜单栏案例
目录 一 动态面板简介 1.1 动态面板是什么 二 轮播图 2.1 轮播图是什么 2.2 轮播图应用场景 2.3 制作实播图 三 多方式登入 3.1多方式登入是什么 3.3 多方式登入实现 四 左侧菜单栏 4.1左侧菜单栏是什么 4.2 左侧菜单栏实现 一 动态面板简介 1.1 动态面板是什么…...
在接口实现类中,加不加@Override的区别
最近的软件构造实验经常需要设计接口,我们知道Override注解是告诉编译器,下面的方法是重写父类的方法,那么单纯实现接口的方法需不需要加Override呢? 定义一个类实现接口,使用idea时,声明implements之后会…...
优质全套SpringMVC教程
三、SpringMVC 在SSM整合中,MyBatis担任的角色是持久层框架,它能帮我们访问数据库,操作数据库 Spring能利用它的两大核心IOC、AOP整合框架 1、SpringMVC简介 1.1、什么是MVC MVC是一种软件架构的思想(不是设计模式-思想就是我们…...
微信小程序---使用npm包安装Vant组件库
在小程序项目中,安装Vant 组件库主要分为如下3步: 注意:如果你的文件中不存在pakage.json,请初始化一下包管理器 npm init -y 1.通过 npm 安装(建议指定版本为1.3.3) 通过npm npm i vant/weapp1.3.3 -S --production 通过y…...
GPT-4V被超越?SEED-Bench多模态大模型测评基准更新
📖 技术报告 SEED-Bench-1:https://arxiv.org/abs/2307.16125 SEED-Bench-2:https://arxiv.org/abs/2311.17092 🤗 测评数据 SEED-Bench-1:https://huggingface.co/datasets/AILab-CVC/SEED-Bench SEED-Bench-2&…...
数据库_mongoDB
1 介绍 MongoDB 是一种 NoSQL 数据库,它将每个数据存储为一个文档,这里的文档类似于 JSON/BSON 对象,具体数据结构由键值(key/value)对组成。字段值可以包含其他文档,数组及文档数组。其数据结构非常松散&…...
Layui实现自定义的table列悬停事件并气泡提示信息
1、概要 使用layui组件实现table的指定列悬停时提示信息,因为layui组件中没有鼠标悬停事件支持,所以需要结合js原生事件来实现这个功能,并结合layui的tips和列的templte属性气泡提示实现效果。 2、效果图 3、代码案例 <!DOCTYPE html&g…...
Tomcat从认识安装到详细使用
文章目录 一.什么是Tomact?二.Tomcat的安装1.下载安装包2.一键下载3.打开Tomcat进行测试4.解决Tomcat中文服务器乱码 三.Tomcat基本使用1.启动与关闭Tomcat2.Tomcat部署项目与浏览器访问项目 四.Tomcat操作中的常见问题1.启动Tomcat后,启动窗口一闪而过?…...
07-Eventing及实践
1 Knative Eventing的相关组件 Knative Eventing具有四个最基本的组件:Sources、Brokers、Triggers 和 Sinks 事件会从Source发送至SinkSink是能够接收传入的事件可寻址(Addressable)或可调用(Callable)资源 Knative S…...
Linux下Netty实现高性能UDP服务
前言 近期笔者基于Netty接收UDP报文进行业务数据统计的功能,因为Netty默认情况下处理UDP收包只能由一个线程负责,无法像TCP协议那种基于主从reactor模型实现多线程监听端口,所以笔者查阅网上资料查看是否有什么方式可以接收UDP收包的性能瓶颈…...
Ubuntu 22.04 Tesla V100s显卡驱动,CUDA,cuDNN,MiniCONDA3 环境的安装
今天来将由《蓝创精英团队》带来一个Ubuntu 显卡环境的安装,主要是想记录下来,方便以后快捷使用。 主要的基础环境 显卡驱动 (nvidia-smi)CUDA (nvidia-smi 可查看具体版本)cuDNN (cuda 深度学习加速库)Conda python环境管理(Miniconda3) Nvidia 驱动…...
FFmpeg转码流程和常见概念
视频格式:mkv,flv,mov,wmv,avi,mp4,m3u8,ts等等 FFmpeg的转码工具,它的处理流程是这样的: 从输入源获得原始的音视频数据,解封装得到压缩封装的音…...
【01】GeoScene生产海图或者电子航道图
1.1 什么是电子海图制图模块 GeoScene海事模块是一个用于管理和制作符合国际水文组织(IHO)S-100系列标准和S-57标准的海事数据的系统。提供了S-100和S-57工具,用于加载基于S-100的要素目录、创建基于S-57传输结构的数据、输入数据、符号化数…...
TWS蓝牙耳机的船运模式
TWS蓝牙耳机的船运模式 是否需要申请加入数字音频系统研究开发交流答疑群(课题组)?可加我微信hezkz17, 本群提供音频技术答疑服务,+群赠送语音信号处理降噪算法,蓝牙耳机音频,DSP音频项目核心开发资料, TWS蓝牙耳机的船运模式是指在将耳机从一个地方运送到另一个地方时,…...
Vue系列之指令 v-html
文章の目录 1、v-html指令2、基本用法写在最后 1、v-html指令 v-html 指令类似于 v-text 指令,它与 v-text 区别在于 v-text 输出的是纯文本,浏览器不会对其再进行html解析,但v-html会将其当html标签解析后输出,类似于 JavaScrip…...
Mac如何安装stable diffusion
今天跟大家一起在Mac电脑上安装下stable diffusion,在midjourney等模型收费的情况下如何用自己的电脑算力用上免费的画图大模型呢?来吧一起实操起来 一、安装homebrew 官网地址:Homebrew — The Missing Package Manager for macOS (or Lin…...
Kubernetes (k8s) 快速认知
应用部署方式 传统部署时代 早期的时候,各个组织是在物理服务器上运行应用程序。缺点 资源分配问题: 无法限制在物理服务器中运行的应用程序资源使用 维护成本问题: 部署多个物理机,维护许多物理服务器的成本很高 虚拟化部署时…...
Electron V28主进程与渲染进程互相通信总结
本文示例采用ElectronVue3TS编写,请读者理顺思路,自行带入自己的项目。 注: 读本文前请先搞懂什么是主进程,什么是渲染进程。 在Electron中有着ipcMain和ipcRenderer、contextBridge模块,以及创建窗口对象上的webCont…...
MySQL主从复制详解
目录 1. 主从复制的工作原理 1.1. 主从复制的角色 1.2. 主从复制的流程 2. 配置MySQL主从复制 2.1. 确保主服务器开启二进制日志 2.2. 设置从服务器 2.3. 连接主从服务器 2.4. 启动复制 3. 主从复制的优化与注意事项 3.1. 优化复制性能 3.2. 注意复制延迟 3.3. 处理…...
verilog基础语法-计数器
概述: 计数器是FPGA开发中最常用的电路,列如通讯中记录时钟个数,跑马灯中时间记录,存储器中地址的控制等等。本节给出向上计数器,上下计数器以及双向计数器案例。 内容 1. 向上计数器 2.向下计数器 3.向上向下计数…...
有SCL,SDA,TRIG,I2C的元器件是什么?在哪找?proteus
寻找方法:...
网页小游戏4933/兰州网络推广关键词优化
JSTL JSTL(JSP Standard Tag Library,JSP标准标签库)是一个不断完善的开放源代码的JSP标签库,是由apache的jakarta小组来维护的。JSTL只能运行在支持JSP1.2和Servlet2.3规 范的容器上,如tomcat 4.x。在JSP 2.0中也是作为标准支持的…...
17一起做网站普宁站/佛山seo联系方式
概念:依赖注入与IOC模式类似工厂模式,是一种解决调用者和被调用者依赖耦合关系的模式;它解决了对象之间的依赖关系,使得对象只依赖IOC/DI容器,不再直接相互依赖,实现松耦合,然后在对象创建时&am…...
网站建设多语种自动翻译插件/免费数据统计网站
引言 某台RealServer down了,怎么办? --- 健康检测LVS本身down了,怎么办?- --LVS冗余Keepalived – LVS管理软件– 健康检测:支持4/7监测; – 主备冗余:采用VRRP协议的HeartBeat; – 如何配置?--- 配置文件 Keepalived –f /et…...
凡科互动游戏怎么修改程序/南昌seo招聘信息
解决方案:不要在角色蓝图里的SkeletalMeshComponent里面勾选EnableperpolyCollision,要找到SkeletalMesh资源蓝图,在资源蓝图里面勾选EnableperpolyCollision。(在UE4官方论坛也有人问过) 分析:这可能是UE…...
自己做短视频网站/seo排名专业公司
前面,我们已经了解了怎么在android app上打开关闭和扫描,搜索wifi,现在,我来写一下怎么通过连接wifi来使app获取到IPCamera摄像头的视频。 一、通过URL获取视频的地址 二、创建输入流 三、解析图片 首先,我是通过抓包软…...
江门企业网站建设/网络营销推广实战宝典
我是在主机上去开服务器上的虚拟机,远端虚拟机是10.64.75.57不开putty的话 用NX可以登进去 也不出现问题 用putty一会就有问题 还有就是putty别的linux(10.64.75.55)就没事,这应该是远端linux的问题 ,但是配置跟别的机…...