临沂网站建设推广/饥饿营销的十大案例
前言
在之前我们讲过Spring和Dubbo的集成,我们在服务上标注了@DubboService的注解,然后最终Dubbo会调用到ServiceBean#export方法中,本次我们就来剖析下服务导出的全流程。
一、前置回顾
由于ServiceBean实现了ApplicationListener接口,那么spring会在上下文刷新完成后回调其onApplicationEvent方法,然后在这个方法内会调用export方法,最终会调用到父类也就是ServiceConfig的export方法,来执行服务暴露全流程。
二、服务导出源码剖析
2.1、ServiceConfig#export方法
- 这个方法主要是判断我们是不是需要延迟暴露,可以通过@Service(delay=100),来使用延迟暴露功能,最终都会调用到doExport方法
public synchronized void export() {if (provider != null) {if (export == null) {export = provider.getExport();}if (delay == null) {delay = provider.getDelay();}}if (export != null && !export) {return;}//延迟暴露,如果想使用延迟暴露功能,可以在@Service注解中添加delay属性 -》@Service(delay =1000 )if (delay != null && delay > 0) {delayExportExecutor.schedule(new Runnable() {@Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);} else {//go doExportdoExport();}}
2.2、ServiceConfig#export方法
- 首先判断服务是否暴露过,如果没有暴露则进行服务导出,则把exported设置为true,然后检查interfaceName是否为空,为空抛出异常
- checkDefault检查provider是否为null,为null就创建并把配置信息设置到provider中
- 把provider中的application,module,registries,monitor,protocols等属性赋值给ServiceConfig的相关属性,如果赋值后registries和monitor则再尝试从module和application中获取赋值
- 判断实现的接口类型是否是GenericService,这个就是判断是否是泛化实现,如果是的话,则把interfaceClass设置为GenericService.class,然后generic设置为true
- 如果不是泛化实现,则通过反射创建出来接口的class对象,然后检查我们要导出的方法是否在接口中,检查实现类是否是这个接口的实现类,然后再把generic设置为false,代表不是泛化导出。
- 下面的checkApplication,checkRegistry,checkProtocol方法就是做下参数检查,判断我们刚刚设置的application,protocol…等不能为空。
- 调用doExportUrls进行导出url
- 给已经暴露的服务构造一个ProviderModel,然后缓存起来,这里ProviderModel表示服务提供者模型,里面存储这服务提供者相关信息,ApplicationModel 缓存所有的 ProviderModel。
protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("Already unexported!");}if (exported) {return;}exported = true;if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");}checkDefault();if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {registries = provider.getRegistries();}if (monitor == null) {monitor = provider.getMonitor();}if (protocols == null) {protocols = provider.getProtocols();}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {monitor = module.getMonitor();}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {monitor = application.getMonitor();}}if (ref instanceof GenericService) {interfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {generic = Boolean.TRUE.toString();}} else {try { //创建class对象interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}//检查方法是否在接口中checkInterfaceAndMethods(interfaceClass, methods);checkRef(); //检查实现类generic = Boolean.FALSE.toString(); //不是泛化}if (local != null) {if ("true".equals(local)) {local = interfaceName + "Local";}Class<?> localClass;try {localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface "+ interfaceName);}}if (stub != null) {if ("true".equals(stub)) {stub = interfaceName + "Stub";}Class<?> stubClass;try {stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface "+ interfaceName);}}checkApplication();checkRegistry();checkProtocol();appendProperties(this);checkStub(interfaceClass);checkMock(interfaceClass);if (path == null || path.length() == 0) {path = interfaceName;}//go doExportUrlsdoExportUrls();CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);}
2.2.1、ServiceConfig#checkDefault
- 判断ServiceConfig的provider属性是否为空,为空则new出来一个新的ProviderConfig
- 然后调用appendProperties给provider对象填充其属性
private void checkDefault() {//判断provider如果为空则new一个出来if (provider == null) {provider = new ProviderConfig();}//填充属性到provider中appendProperties(provider);}
2.2.2.1、ServiceConfig#appendProperties
- 首先根据传进来的配置类型拼接出来一个prefix,这里如果是providerConfig那么前缀就是dubbo.provider.
- 遍历providerConfig的所有方法,然后获取方法名,判断方法长度是大于3并且是set开头的,说明是setxxx,并且方法修饰符是public的且只有一个入参且参数类型是基本类型or包装类型,然后满足上述条件继续后续流程
- 首先截取出来需要注入的属性名,例:setName,这里需要注入的属性名称就是name
- 判断providerConfig是否有指定id,我们可以在配置文件中指定id,例:dubbo.protocols.p1.id=dubbo1,dubbo.protocols.p2.id=dubbo2
- 如果有指定id,那么把id也拼接上,拼接出来的配置名是:dubbo.provider.dubbo1.name,这里会根据这个配置名从JVM系统属性中查找有没有,我们可以通过在服务启动时设置VM arguments: -D dubbo.provider.dubbo1.name=111指定
- 如果拼接上id没有获取到配置,那么把id去掉再获取一次,这时候配置名是:dubbo.provider.name,在根据这个配置名从JVM属性中查找有没有
- 如果上面都没有,再查找类中有没有getName的方法,如果有并且getName有返回结果,则找到了属性值
- 如果get方法返回的是空,则再根据配置名:dubbo.provider.dubbo1.name 从properties文件中获取
- 如果没获取到,再根据配置名:dubbo.provider.name 从properties文件中获取
- 如果也没获取到,则再获取一下映射的key,例:dubbo.protocol.name":“dubbo.service.protocol”,然后再根据dubbo.service.protoco再从JVM系统属性中和properties文件中再获取一次
- 如果获取到对应的属性值,则调用其set方法把属性值设置进去
protected static void appendProperties(AbstractConfig config) {if (config == null) {return;}//这里如果我们是providerConfig,prefix就是dubbo.provider.String prefix = "dubbo." + getTagName(config.getClass()) + ".";Method[] methods = config.getClass().getMethods();//遍历providerConfig的所有方法for (Method method : methods) {try {//获取方法名String name = method.getName();//如果方法长度大于3,并且是以set开头,说明是setxxx,并且方法是public的并且方法只有一个入参且方法//的参数类型是原始的也就是java提供的数据类型if (name.length() > 3 && name.startsWith("set") && Modifier.isPublic(method.getModifiers())&& method.getParameterTypes().length == 1 && isPrimitive(method.getParameterTypes()[0])) {//获取到需要注入的属性名,例:setName,这里拿到的属性名就是nameString property = StringUtils.camelToSplitName(name.substring(3, 4).toLowerCase() + name.substring(4), ".");String value = null;//判断config有没有指定ID//例: dubbo.protocols.p1.id=dubbo1,dubbo.protocols.p2.id=dubbo2if (config.getId() != null && config.getId().length() > 0) {//去JVM系统属性中查找有没有 -D dubbo.provider.dubbo1.name属性//这里我们可以通过在服务启动时设置VM argumentsString pn = prefix + config.getId() + "." + property;value = System.getProperty(pn);if (!StringUtils.isBlank(value)) {logger.info("Use System Property " + pn + " to config dubbo");}}//如果上面指定ID没有获取到配置,那么把ID去掉再获取一次if (value == null || value.length() == 0) {//通过dubbo.provider.name获取String pn = prefix + property;value = System.getProperty(pn);if (!StringUtils.isBlank(value)) {logger.info("Use System Property " + pn + " to config dubbo");}}//如果把ID去掉还没获取到if (value == null || value.length() == 0) {Method getter;//查找类中有没有 getName的方法try {getter = config.getClass().getMethod("get" + name.substring(3));} catch (NoSuchMethodException e) {try {getter = config.getClass().getMethod("is" + name.substring(3));} catch (NoSuchMethodException e2) {getter = null;}}//如果有get方法if (getter != null) {//get方法的返回结果是空if (getter.invoke(config) == null) {//如果config中有指定ID,则dubbo.provider.dubbo1.nameif (config.getId() != null && config.getId().length() > 0) {value = ConfigUtils.getProperty(prefix + config.getId() + "." + property);}//如果上面指定id没有获取到,则再次尝试从properties文件中获取if (value == null || value.length() == 0) {value = ConfigUtils.getProperty(prefix + property);}//如果从properties文件中没有获取到if (value == null || value.length() == 0) {//则尝试看有没有给这个服务指定配置String legacyKey = legacyProperties.get(prefix + property);//如果有则再从properties文件中获取这个服务指定的配置/* 尝试获取配置并转换(dubbo.service.max.retry.providers 和dubbo.service.allow.no.provider 两个配置需要转换) */if (legacyKey != null && legacyKey.length() > 0) {value = convertLegacyValue(legacyKey, ConfigUtils.getProperty(legacyKey));}}}}}//如果到这里value不为空,则通过反射调用其set方法把value设置进去if (value != null && value.length() > 0) {method.invoke(config, convertPrimitive(method.getParameterTypes()[0], value));}}} catch (Exception e) {logger.error(e.getMessage(), e);}}}
2.2.2.2、AbstractConfig#getTagName
- 首先获取到class的类名,例:ProviderConfig
- 然后判断如果以指定后缀结尾的则去除掉,SUFFIXES = new String[]{“Config”, “Bean”},这里的ProviderConfig会变成Provider
- 然后把Provider全转成小写的,也就是provider返回
private static String getTagName(Class<?> cls) {String tag = cls.getSimpleName();for (String suffix : SUFFIXES) {if (tag.endsWith(suffix)) {tag = tag.substring(0, tag.length() - suffix.length());break;}}tag = tag.toLowerCase();return tag;}
2.2.2.3、AbstractConfig#isPrimitive
这个方法就是判断类型是不是基本数据类型或者是基本数据的包装类型或者是Object类型
private static boolean isPrimitive(Class<?> type) {return type.isPrimitive()|| type == String.class|| type == Character.class|| type == Boolean.class|| type == Byte.class|| type == Short.class|| type == Integer.class|| type == Long.class|| type == Float.class|| type == Double.class|| type == Object.class;}
2.2.2.4、ConfigUtils#getProperty
- 这里假如我们传入的配置名是dubbo.provider.dubbo1.name,那么会根据这个配置名先从JVM系统属性中获取,获取到了直接返回
- 从JVM系统属性中没有获取到,再从properties文件中获取,这里因为可能属性值存在${}表达式类型,这里需要替换成对应的配置
public static String getProperty(String key, String defaultValue) {//先从JVM系统属性中获取String value = System.getProperty(key);//获取到了直接返回if (value != null && value.length() > 0) {return value;}//获取不到再从properties中获取Properties properties = getProperties();//将$和${}表达式替换成对应的配置return replaceProperty(properties.getProperty(key, defaultValue), (Map) properties);}
2.2.2.5、ConfigUtils#getProperties
- 首先看JVM系统属性中有没有通过 -D dubbo.properties.file = xxx 指定dubbo的properties文件
- 如果JVM系统属性中没有指定,在从环境变量中查找dubbo.properties.file的属性值
- 如果环境变量中也没有则默认使用dubbo.properties
- 最后就是加载配置文件,赋值到对应的属性中
public static Properties getProperties() {if (PROPERTIES == null) {synchronized (ConfigUtils.class) {if (PROPERTIES == null) {//先看系统属性中有没有通过 -D dubbo.properties.file = xxx 指定dubbo的properties文件String path = System.getProperty(Constants.DUBBO_PROPERTIES_KEY);//如果没有指定if (path == null || path.length() == 0) {//则再从环境变量中看有没有指定dubbo.properties.filepath = System.getenv(Constants.DUBBO_PROPERTIES_KEY);//如果环境变量中也没有指定则就使用dubbo.propertiesif (path == null || path.length() == 0) {path = Constants.DEFAULT_DUBBO_PROPERTIES;}}//这里就是读取properties文件赋值给属性PROPERTIES = ConfigUtils.loadProperties(path, false, true);}}}return PROPERTIES;}
2.2.2.6、ConfigUtils#replaceProperty
- 这里就是判断如果属性值包含表达式,则取出来表达式中的内容
- 根据取出来的内容再从系统属性中properties中获取
- 最后替换表达式为对应的配置值返回
public static String replaceProperty(String expression, Map<String, String> params) {//表达式不含$则忽略if (expression == null || expression.length() == 0 || expression.indexOf('$') < 0) {return expression;}// 正则为\$\s*\{?\s*([\._0-9a-zA-Z]+)\s*\}?Matcher matcher = VARIABLE_PATTERN.matcher(expression);StringBuffer sb = new StringBuffer();while (matcher.find()) {String key = matcher.group(1);//首先尝试从系统属性中获取String value = System.getProperty(key);if (value == null && params != null) {//没有再尝试从 properties 中获取value = params.get(key);}if (value == null) {value = "";}//替换表达式为配置值matcher.appendReplacement(sb, Matcher.quoteReplacement(value));}matcher.appendTail(sb);return sb.toString();}
2.2.2.7、legacyProperties
- legacyProperties中包含了很多映射的属性key,例如我们根据dubbo.protocal.name去获取对应的属性值,获取不到,然后可以再根据dubbo.service.protocol再获取一次。
legacyProperties.put("dubbo.protocol.name", "dubbo.service.protocol");legacyProperties.put("dubbo.protocol.host", "dubbo.service.server.host");legacyProperties.put("dubbo.protocol.port", "dubbo.service.server.port");legacyProperties.put("dubbo.protocol.threads", "dubbo.service.max.thread.pool.size");legacyProperties.put("dubbo.consumer.timeout", "dubbo.service.invoke.timeout");legacyProperties.put("dubbo.consumer.retries", "dubbo.service.max.retry.providers");legacyProperties.put("dubbo.consumer.check", "dubbo.service.allow.no.provider");legacyProperties.put("dubbo.service.url", "dubbo.service.address");
2.2.2、AbstractInterfaceConfig#checkInterfaceAndMethods
这个方法就是遍历接口类的所有方法,然后判断暴露的方法名是不是此接口的方法,不是则报错。
protected void checkInterfaceAndMethods(Class<?> interfaceClass, List<MethodConfig> methods) {// 判断接口类是否为空if (interfaceClass == null) {throw new IllegalStateException("interface not allow null!");}//判断接口类是不是一个接口if (!interfaceClass.isInterface()) {throw new IllegalStateException("The interface class " + interfaceClass + " is not a interface!");}//如果方法列表不为空if (methods != null && !methods.isEmpty()) {//遍历对应的方法for (MethodConfig methodBean : methods) {String methodName = methodBean.getName();//方法名判空if (methodName == null || methodName.length() == 0) {throw new IllegalStateException("<dubbo:method> name attribute is required! Please check: <dubbo:service interface=\"" + interfaceClass.getName() + "\" ... ><dubbo:method name=\"\" ... /></<dubbo:reference>");}boolean hasMethod = false;//判断接口中是否包含此方法for (java.lang.reflect.Method method : interfaceClass.getMethods()) {if (method.getName().equals(methodName)) {hasMethod = true;break;}}//不包含抛出异常if (!hasMethod) {throw new IllegalStateException("The interface " + interfaceClass.getName()+ " not found method " + methodName);}}}}
2.2.3、ServiceConfig#checkRef
判断ref引用的对象是不是接口的实现。
private void checkRef() {// reference should not be null, and is the implementation of the given interfaceif (ref == null) {throw new IllegalStateException("ref not allow null!");}//判断ref是否是这个接口的实现if (!interfaceClass.isInstance(ref)) {throw new IllegalStateException("The class "+ ref.getClass().getName() + " unimplemented interface "+ interfaceClass + "!");}}
2.2.4、AbstractInterfaceConfig#checkApplication
- 这里首先判断如果没有 application 配置则尝试从系统属性中获取 dubbo.application.name,如果能获取到则创建一个ApplicationConfig
- 然后调用appendProperties给ApplicationConfig赋值,这个我们之前已经剖析过了
- 判断有没有配置dubbo.service.shutdown.wait,如果配置了设置到系统属性中。
注:这里checkRegistry,checkProtocol都是一样的实现,就不一一分析了
protected void checkApplication() {// 向后兼容if (application == null) {//没有 application 配置则尝试从系统属性中获取String applicationName = ConfigUtils.getProperty("dubbo.application.name");if (applicationName != null && applicationName.length() > 0) {application = new ApplicationConfig();}}//没有在配置文件中配置application,并且系统属性中也没有则抛出异常if (application == null) {throw new IllegalStateException("No such application config! Please add <dubbo:application name=\"...\" /> to your spring config.");}//给application赋值appendProperties(application);//如果配置了 dubbo.service.shutdown.wait,则设置到系统属性中String wait = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);if (wait != null && wait.trim().length() > 0) {System.setProperty(Constants.SHUTDOWN_WAIT_KEY, wait.trim());} else {wait = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);if (wait != null && wait.trim().length() > 0) {System.setProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY, wait.trim());}}}
2.2.5、ServiceConfig#checkStub
这里解析这个方法我们首先来看下什么是本地存根,官方解释:
Dubbo官网文档
远程服务后,客户端通常只剩下接口,而实现全在服务器端,但提供方有些时候想在客户端也执行部分逻辑,比如:做 ThreadLocal 缓存,提前验证参数,调用失败后伪造容错数据等等,此时就需要在 API 中带上 Stub,客户端生成 Proxy 实例,会把 Proxy 通过构造函数传给 Stub ,然后把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy。如下图:
如果需要使用本地存根,可以通过 其中 stub 参数可以 为true ,此时存根类默认为为 {intefaceName}+ Stub;stub 参数 也可以为 存根类路径名。此时存根类为stub 指向的类。
需要注意,存根类有两个条件:
- 存根类也需要实现暴露的服务接口
- 存根类需要一个有参构造函数,入参为服务接口的实现实例。
// stub 为 ture。 存根类为 {intefaceName}+ Stub,即 com.foo.BarServiceStub
<dubbo:service interface="com.foo.BarService" stub="true" />// 存根类为 stub 指定的类 com.foo.BarServiceStub
<dubbo:service interface="com.foo.BarService" stub="com.foo.BarServiceStub" />
package com.foo;
public class BarServiceStub implements BarService {private final BarService barService;// 构造函数传入真正的远程代理对象public BarServiceStub(BarService barService){this.barService = barService;}public String sayHello(String name) {// 此代码在客户端执行, 你可以在客户端做ThreadLocal本地缓存,或预先验证参数是否合法,等等try {return barService.sayHello(name);} catch (Exception e) {// 你可以容错,可以做任何AOP拦截事项return "容错数据";}}
}
有了上面的案例,这里我们看这个逻辑就简单了
- 获取存根类,如果 stub 为 true 则使用 interfaceName + stub 作为存根类,否则反射获取存根类
- 对stub检查,判断是否存在入参为 interface 的构造函数
void checkStub(Class<?> interfaceClass) {//这里local已经过时了,现在都用stub作为本地存根if (ConfigUtils.isNotEmpty(local)) {Class<?> localClass = ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() + "Local") : ReflectUtils.forName(local);if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());}try {ReflectUtils.findConstructor(localClass, interfaceClass);} catch (NoSuchMethodException e) {throw new IllegalStateException("No such constructor \"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")\" in local implementation class " + localClass.getName());}}//如果stub不为空if (ConfigUtils.isNotEmpty(stub)) {//如果 stub 为 true 则使用 interfaceName + stub 作为存根类,否则反射获取存根类Class<?> localClass = ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() + "Stub") : ReflectUtils.forName(stub);if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());}try {//对stub检查,是否存在入参为 interface 的构造函数ReflectUtils.findConstructor(localClass, interfaceClass);} catch (NoSuchMethodException e) {throw new IllegalStateException("No such constructor \"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")\" in local implementation class " + localClass.getName());}}}
2.2.6、ServiceConfig#checkMock
这里首先我们要知道mock的作用:
用以实现服务降级,比如某验权服务,当服务提供方全部挂掉后,客户端不抛出异常,而是通过 Mock 数据返回授权失败。
具体使用案例可以看:Dubbo官方文档
- 首先检查有没有mock,如果没有配置接返回
- 获取mock的配置,然后判断mock是不是return方式,使用 return 来返回一个字符串表示的对象,作为 Mock 的返回值,这里就会检查返回值是否合法
- 判断mock是不是throw方式,使用 throw 来返回一个 Exception 对象,作为 Mock 的返回值,这里会检查是不是一个异常
- 如果是在工程中提供mock实现,则会判断本地是否存在mock实现,这里就是判断本地是否存在xxxMock的class,如果我们的接口是BarService,那么会判断是否存在BarServiceMock的实例。
void checkMock(Class<?> interfaceClass) {//没有配置mock直接返回if (ConfigUtils.isEmpty(mock)) {return;}//获取mock的配置方式String normalizedMock = MockInvoker.normalizeMock(mock);if (normalizedMock.startsWith(Constants.RETURN_PREFIX)) {normalizedMock = normalizedMock.substring(Constants.RETURN_PREFIX.length()).trim();try {//检查mock的值是否合法,如果不合法,则抛出异常MockInvoker.parseMockValue(normalizedMock);} catch (Exception e) {throw new IllegalStateException("Illegal mock return in <dubbo:service/reference ... " +"mock=\"" + mock + "\" />");}} else if (normalizedMock.startsWith(Constants.THROW_PREFIX)) {normalizedMock = normalizedMock.substring(Constants.THROW_PREFIX.length()).trim();if (ConfigUtils.isNotEmpty(normalizedMock)) {try {MockInvoker.getThrowable(normalizedMock);} catch (Exception e) {throw new IllegalStateException("Illegal mock throw in <dubbo:service/reference ... " +"mock=\"" + mock + "\" />");}}} else {//判断本地是否存在服务接口mock实现类MockInvoker.getMockObject(normalizedMock, interfaceClass);}}
2.2.6、ServiceConfig#doExportUrls
- 首先获取注册中心列表,因为可能你在配置文件中指定了多个注册中心,所以这里可能会返回多个
- 遍历protocols,调用doExportUrlsFor1Protocol依次暴露服务,因为有可能我们指定了多种protocol,如Dubbo,REST,这里会对每种协议都进行服务暴露。
private void doExportUrls() {//获取当前服务对应的服务注册中心实例List<URL> registryURLs = loadRegistries(true);for (ProtocolConfig protocolConfig : protocols) {//如果服务指定暴露多个协议(Dubbo,REST),则依次暴露服务doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}
2.2.6.1、AbstractInterfaceConfig#loadRegistries
- 检查配置中心不能为空,并且给配置中心做属性注入
- 遍历注册中心列表,判断RegistryConfig的address是否为空,如果为空就设置为0.0.0.0,然后从JVM系统属性中获取dubbo.registry.address配置的值,如果有,那么是最高优的,使用系统属性配置的
- 如果有注册中心地址的话,则把应用信息添加到参数map中,然后在把注册中心配置信息添加到map中,然后继续追加path,dubbo版本,时间戳,pid等信息追加到参数map中。
- 解析address,得到一个注册中心url列表
- 遍历url列表,然后把注册中心的类型添加到参数中,然后设置url的头部信息为registry,然后判断如果是(服务提供者并且register=true 或 null ) 或 (非服务提供者 && subscribe = true 或 null)则把当前url添加到注册中心列表
protected List<URL> loadRegistries(boolean provider) {//检查配置中心类checkRegistry();List<URL> registryList = new ArrayList<URL>();if (registries != null && !registries.isEmpty()) {for (RegistryConfig config : registries) {String address = config.getAddress();if (address == null || address.length() == 0) {//如果为空,就设为0.0.0.0address = Constants.ANYHOST_VALUE;}//从系统变量中获取注册中心地址String sysaddress = System.getProperty("dubbo.registry.address");//这里如果在系统变量中指定了,那么优先级最高if (sysaddress != null && sysaddress.length() > 0) {address = sysaddress;}//如果有注册中心地址的化if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {Map<String, String> map = new HashMap<String, String>();//把应用信息添加到map中appendParameters(map, application);//把注册中心配置信息添加到map中appendParameters(map, config);//把path/ dubbo版本/pid等信息添加到map中map.put("path", RegistryService.class.getName());map.put("dubbo", Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!map.containsKey("protocol")) {if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {map.put("protocol", "remote");} else {map.put("protocol", "dubbo");}}//解析得到URL列表,address 可能包含多个注册中心 ip//因此解析得到的是一个 URL 列表List<URL> urls = UrlUtils.parseURLs(address, map);for (URL url : urls) {url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());//设置头部信息 registryurl = url.setProtocol(Constants.REGISTRY_PROTOCOL);// 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:// (服务提供者 && register = true 或 null)// || (非服务提供者 && subscribe = true 或 null)if ((provider && url.getParameter(Constants.REGISTER_KEY, true))|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {registryList.add(url);}}}}}return registryList;}
2.2.6.1.1、AbstractInterfaceConfig#appendParameters
- 遍历Config的所有方法
- 判断方法是以get或者is开头的并且不是getClass并且方法的修饰符是public的并且方法不是无参的并且方法的返回类型是基本类型或者包装类或Object
- 如果满足条件2,则判断判断如果方法返回类型是Object 或者 方法上标注了Parameter注解但是Parameter中excluded为true则不处理
- 获取属性名 例: getName or isPerson 就拿到了 name or person
- 判断Parameter注解不为空,并且在注解中指定了key,如果指定了key就使用指定的没有就用属性名作为key
- 通过反射执行方法拿到返回值
- 判断如果返回值不为空则追加到参数中
- 如果parameter注解不为空,并且属性escaped设置为true,则对方法返回值进行编码
- 如果parameter注解不为空,并且属性append设置为true,则进行追加操作把当前值追加到map的键为:default.key的vlaue后面,用逗号拼接
- 如果指定了前缀,则给key拼接上前缀
- 加入到参数列表中
12.如果条件2不满足,则判断如果方法名是getParameters且方法修饰符是public,且方法的返回类型是Map- 如果满足条件12,则调用getParameters方法获取参数map,判断如果参数map不为空,则遍历所有的键值对,给所有的key拼接上前缀
protected static void appendParameters(Map<String, String> parameters, Object config, String prefix) {if (config == null) {return;}Method[] methods = config.getClass().getMethods();for (Method method : methods) {try {String name = method.getName();//判断方法是以get或者is开头的并且不是getClass并且方法的修饰符是public的//并且方法不是无参的并且方法的返回类型是基本类型或者包装类或Objectif ((name.startsWith("get") || name.startsWith("is"))&& !"getClass".equals(name)&& Modifier.isPublic(method.getModifiers())&& method.getParameterTypes().length == 0&& isPrimitive(method.getReturnType())) {Parameter parameter = method.getAnnotation(Parameter.class);//判断如果方法返回类型是Object 或者 方法上标注了Parameter注解但是Parameter中excluded为trueif (method.getReturnType() == Object.class || parameter != null && parameter.excluded()) {continue;}int i = name.startsWith("get") ? 3 : 2;//获取属性名 例: getName or isPerson 就拿到了 name or personString prop = StringUtils.camelToSplitName(name.substring(i, i + 1).toLowerCase() + name.substring(i + 1), ".");String key;//判断Parameter注解不为空,并且在注解中指定了key,如果指定了key就使用指定的没有就用属性名作为keyif (parameter != null && parameter.key().length() > 0) {key = parameter.key();} else {key = prop;}//执行方法拿到返回值Object value = method.invoke(config);String str = String.valueOf(value).trim();//返回值不为空则追加到参数中if (value != null && str.length() > 0) {//如果parameter注解不为空,并且属性escaped设置为true,则对返回值进行编码if (parameter != null && parameter.escaped()) {str = URL.encode(str);}//如果parameter注解不为空,并且属性append设置为true,则进行追加操作//把当前值追加到map的键为:default.key的vlaue后面,用逗号拼接if (parameter != null && parameter.append()) {String pre = parameters.get(Constants.DEFAULT_KEY + "." + key);if (pre != null && pre.length() > 0) {str = pre + "," + str;}pre = parameters.get(key);if (pre != null && pre.length() > 0) {str = pre + "," + str;}}//如果指定了前缀,则给key拼接上前缀if (prefix != null && prefix.length() > 0) {key = prefix + "." + key;}//把键值put到map中parameters.put(key, str);} else if (parameter != null && parameter.required()) {//如果方法上标注了parameter注解并且required属性为true,则抛出异常,因为方法没有返回值throw new IllegalStateException(config.getClass().getSimpleName() + "." + key + " == null");}//如果方法名是getParameters且方法修饰符是public,且方法的返回类型是Map} else if ("getParameters".equals(name)&& Modifier.isPublic(method.getModifiers())&& method.getParameterTypes().length == 0&& method.getReturnType() == Map.class) {//调用getParameters方法获取参数mapMap<String, String> map = (Map<String, String>) method.invoke(config, new Object[0]);//如果参数map不为空,则遍历所有的键值对,给所有的key拼接上前缀if (map != null && map.size() > 0) {String pre = (prefix != null && prefix.length() > 0 ? prefix + "." : "");for (Map.Entry<String, String> entry : map.entrySet()) {parameters.put(pre + entry.getKey().replace('-', '.'), entry.getValue());}}}} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}}
2.2.6.1.2、UrlUtils#parseURLs
- 这里对地址做判断,然后对地址按照分号或分号进行切割
- 遍历切割后的自己,调用parseURL方法解析成url。
public static List<URL> parseURLs(String address, Map<String, String> defaults) {//地址判空if (address == null || address.length() == 0) {return null;}//按照逗号或分号切割地址String[] addresses = Constants.REGISTRY_SPLIT_PATTERN.split(address);//如果切割后的地址为空则返回nullif (addresses == null || addresses.length == 0) {return null; //here won't be empty}List<URL> registries = new ArrayList<URL>();//遍历切割后的地址,调用parseURLfor (String addr : addresses) {registries.add(parseURL(addr, defaults));}return registries;}
2.2.6.1.3、UrlUtils#parseURL
- 根据地址类型,判断是否需要切割,例:
// 第一种,<dubbo:registry address=“zookeeper://10.20.153.10:2181?backup=10.20.153.11:2181,10.20.153.12:2181”/>
// 第二种,<dubbo:registry protocol=“zookeeper” address=“10.20.153.10:2181,10.20.153.11:2181,10.20.153.12:2181”/>- 从参数中获取获得 “protocol” “username” “password” “host” “port” “path” 到
defaultXXX
属性中,因为,在 Dubbo URL 中,这几个是独立的属性,不在Dubbo.parameters
属性中。- 把刚刚那几个属性从参数列表中移除,因为是独立属性
- 调用URL#valueOf创建Dubbo url
- 判断生成的url是否有指定protocol,username,password等属性,如果没有则从参数列表中获取
- 判断地址有没有指定端口,没有指定缺省为9090
- 判断url有没有需要补充的,也就是5,6做的判断,如果有则重新构造URL返回。
public static URL parseURL(String address, Map<String, String> defaults) {if (address == null || address.length() == 0) {return null;}// 以 Zookeeper 注册中心,配置集群的例子如下:// java config : dubbo.registry.address = zookeeper://lanboal:123456@127.0.0.1:2181?login=false// 第一种,<dubbo:registry address="zookeeper://10.20.153.10:2181?backup=10.20.153.11:2181,10.20.153.12:2181"/>// 第二种,<dubbo:registry protocol="zookeeper" address="10.20.153.10:2181,10.20.153.11:2181,10.20.153.12:2181"/>String url;if (address.indexOf("://") >= 0) { //第一行情况url = address;} else {//第二种情况,需要按照,切割String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(address);url = addresses[0];if (addresses.length > 1) {StringBuilder backup = new StringBuilder();for (int i = 1; i < addresses.length; i++) {if (i > 1) {backup.append(",");}backup.append(addresses[i]);}url += "?" + Constants.BACKUP_KEY + "=" + backup.toString();}}// 从 `defaults` 中,获得 "protocol" "username" "password" "host" "port" "path" 到 `defaultXXX` 属性种。// 因为,在 Dubbo URL 中,这几个是独立的属性,不在 `Dubbo.parameters` 属性中。String defaultProtocol = defaults == null ? null : defaults.get("protocol");if (defaultProtocol == null || defaultProtocol.length() == 0) {defaultProtocol = "dubbo";}String defaultUsername = defaults == null ? null : defaults.get("username");String defaultPassword = defaults == null ? null : defaults.get("password");int defaultPort = StringUtils.parseInteger(defaults == null ? null : defaults.get("port"));String defaultPath = defaults == null ? null : defaults.get("path");Map<String, String> defaultParameters = defaults == null ? null : new HashMap<String, String>(defaults);// 需要移除,因为这几个是独立属性。if (defaultParameters != null) {defaultParameters.remove("protocol");defaultParameters.remove("username");defaultParameters.remove("password");defaultParameters.remove("host");defaultParameters.remove("port");defaultParameters.remove("path");}//创建 Dubbo URLURL u = URL.valueOf(url);// 若 `u` 的属性存在非空的情况下,从 `defaultXXX` 属性,赋值到 `u` 的属性中。boolean changed = false; // 是否改变,即从 `defaultXXX` 属性,赋值到 `u` 的属性中。String protocol = u.getProtocol();String username = u.getUsername();String password = u.getPassword();String host = u.getHost();int port = u.getPort();String path = u.getPath();Map<String, String> parameters = new HashMap<String, String>(u.getParameters());// 如果原始url上未指定protocol,username,password, 但有default值,则追加到url上if ((protocol == null || protocol.length() == 0) && defaultProtocol != null && defaultProtocol.length() > 0) {changed = true;protocol = defaultProtocol;}if ((username == null || username.length() == 0) && defaultUsername != null && defaultUsername.length() > 0) {changed = true;username = defaultUsername;}if ((password == null || password.length() == 0) && defaultPassword != null && defaultPassword.length() > 0) {changed = true;password = defaultPassword;}/*if (u.isAnyHost() || u.isLocalHost()) {changed = true;host = NetUtils.getLocalHost();}*/if (port <= 0) {if (defaultPort > 0) {changed = true;port = defaultPort;} else {changed = true;port = 9090; // 如果地址没有端口缺省为9090}}if (path == null || path.length() == 0) {if (defaultPath != null && defaultPath.length() > 0) {changed = true;path = defaultPath;}}if (defaultParameters != null && defaultParameters.size() > 0) {for (Map.Entry<String, String> entry : defaultParameters.entrySet()) {String key = entry.getKey();String defaultValue = entry.getValue();if (defaultValue != null && defaultValue.length() > 0) {String value = parameters.get(key);if (value == null || value.length() == 0) {changed = true;parameters.put(key, defaultValue);}}}}//若改变,创建新的Dubbo URLif (changed) {u = new URL(protocol, username, password, host, port, path, parameters);}return u;}
2.2.6.2、ServiceConfig#doExportUrlsFor1Protocol
- 判断协议如果没有指定则默认使用dubbo协议
- 构造参数map,把一些信息put到map中,如side=provider,dubbo=2.6.0,时间戳,pid等,还有一些配置信息入applicationConfig,providerConfig,这里appendParameters我们前面已经分析过了
- 遍历处理MethodConfig,把方法信息和参数信息添加到参数map中
- 判断是不是泛化调用,如果是就把泛化调用的信息添加到map中,设置methods=*
- 如果不是泛化调用,就找到所有的method,然后将methods=所有的method名拼接起来
- 将token参数设置到map中,如果协议是injvm,则设置notify为false, protocolConfig.setRegister(false);
- 获取host,port,最终根据map中的参数,主机和端口构造出来一个新的URL
- 从url中获取配置的scope,如果这个scope不是none,则继续判断scope是不是不是remote,如果不是则进行本地暴露
- 判断scope是不是不是local的,不是则会进行远程暴露,然后遍历注册中心列表,首先获取监控中心,如果监控中心url不为空则把监控中心添加到url的参数中,然后将proxy_key属性添加到registryURL中,其实这个proxy_key 的值是可以设置的,就是告诉dubbo我用什么来进行生成代理
- 使用proxyFactory生成一个invoker对象,然后再构造一个DelegateProviderMetaDataInvoker,把原始的配置信息和invoker绑定到一起
- 调用protocol属性的export方法进行服务暴露,把返回的Exporter对象添加到导出列表中
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {String name = protocolConfig.getName();if (name == null || name.length() == 0) {name = "dubbo"; //默认协议是dubbo}Map<String, String> map = new HashMap<String, String>();map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); //side 哪一端map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());//版本map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestampif (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid}//读取其他配置信息到map,用于后续构造URLappendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}List<ArgumentConfig> arguments = method.getArguments();if (arguments != null && !arguments.isEmpty()) {for (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();// visit all methodsif (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() != -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {appendParameters(map, argument,method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :"+ argument.getIndex() + ", type:" + argument.getType());}} else {// multiple callbacks in the methodfor (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + j);if (argument.getIndex() != -1 && argument.getIndex() != j) {throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :"+ argument.getIndex() + ", type:"+ argument.getType());}}}}}}}} else if (argument.getIndex() != -1) {appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");}}}} // end of methods for}//泛化调用if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("NO method found in service interface " + interfaceClass.getName());map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}//本地injvmif (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put("notify", "false");}// export serviceString contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}//hostString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);URL url = new URL(name, host, port,(contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope = url.getParameter(Constants.SCOPE_KEY);// don't export when none is configuredif (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote)//本地服务暴露 不是remote就本地暴露,如果不配置scope也进行本地暴露if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);}// export to remote if the config is not local (export to local only when config is local)//远程服务暴露 不是localif (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}if (registryURLs != null && !registryURLs.isEmpty()) {//遍历注册中心for (URL registryURL : registryURLs) {url =url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));URL monitorUrl = loadMonitor(registryURL);//获取监控中心if (monitorUrl != null) { //将监控中心添加到url中url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url+ " to registry " + registryURL);}// For providers, this is used to enable custom proxy to generate invokerString proxy = url.getParameter(Constants.PROXY_KEY);//配置中有proxy_key 的话就使用配置的if (StringUtils.isNotEmpty(proxy)) { //设置配置的 proxy_keyregistryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}//invoker 使用ProxyFactory 生成 invoker对象,这里这个invoker其实是一个代理对象Invoker<?> invoker =proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));//创建 DelegateProvoderMetaInvoker对象,就是包装了一下这个invoker,主要是把原始的配置信息跟invoker绑定在一起。DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);// filter ---->listener --->registryProtocol ( 这里使用了wapper 包装机制)// filter ----> listener ----> dubboProtocol 服务暴露//go export 我们看下这个protocol的默认实现是 SPI("dubbo")Exporter<?> exporter = protocol.export(wrapperInvoker);//添加exporterexporters.add(exporter);}} else {//没有注册中心Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url);}
2.2.6.3、ServiceConfig#exportLocal
- 判断protocol是不是injvm,不是则把URL中的protocol改成injvm,然后host改成127.0.0.1,port改成1,这里就是新生成一个URL,把之前url的配置拷贝过来了
- 往congtext中添加一个键值,key是接口的全类名,value是实现类的全类名
- 然后调用proxyFactory#getInvoker生成一个Invoker对象,然后调用protocol#export导出这个对象
private void exportLocal(URL url) {//本地服务暴露//如果protocol不是injvmif (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {//设置protocol是injvmURL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST) //host 是127.0.0.1.setPort(0);//这里往congtext中添加一个键值,key是接口的全类名,value是实现类的全类名StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));//ref:接口实现类 interfaceClass:接口class local:URL//因为我们这里URL中的protocol是injvm,所以回去找对应的实现,也就是InjvmProtocol这个类//这个在参数中会调用 proxyFactory.getInvoker获取到一个invokerExporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");}}
2.2.6.3.1、proxyFactory#getInvoker
这里proxyFactory是我们的一个类属性,他是通过SPI查找其实现,之前我们已经分析过Dubbo的SPI,这里getAdaptiveExtension方法回去获取自适应的实现。
private static final ProxyFactory proxyFactory =ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
这里我们看getProxy方法会首先根据@Adaptive注解从我们的URL中去查找proxy对应的值,因为我们没有指定,就会走到类上面标注的@SPI(“javassist”)默认的实现,javassist实现类。
@SPI("javassist")
public interface ProxyFactory {/*** create proxy.** @param invoker* @return proxy*/@Adaptive({Constants.PROXY_KEY})<T> T getProxy(Invoker<T> invoker) throws RpcException;/*** create proxy.** @param invoker* @return proxy*///是否是泛化调用@Adaptive({Constants.PROXY_KEY})<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;/*** create invoker.** @param <T>* @param proxy* @param type* @param url* @return invoker*///咱们用的getInvoker方法,然后会根据proxy这个属性去咱们的URL中找对应的值,我们现在没有刻意设置这个proxy属性的话,就会走默认,// 也就是@SPI(“javassist”)中的javassist实现类。这块知识数据dubbo spi里面的。@Adaptive({Constants.PROXY_KEY})<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;}
所以上面我们调用proxy#getInvoker方法其实调用到了JavassistProxyFactory#getInvoker方法。
- 这里首先会调用Wrapper#getWrapper帮我们生成一个Wrapper,这个Wrapper会根据你提供的这个类型生成一个获取你这个类中成员变量的方法,设置成员变量的方法,执行你这个类中方法的方法。
- new一个AbstractProxyInvoker返回,并重写其doInvoke方法,在doInvoke方法汇总调用wrapper#invokeMethod方法
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'//这里是如果,接口实现类中有$符号,就是用接口类型,没有$符号,就用实现类的类型final Wrapper wrapper =Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {//进行调用@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}
下面这个类是我们通过arthas查看到帮我们生成的代理对象的结构如下:
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;public class Wrapper1 extends Wrapper implements ClassGenerator.DC {public static String[] pns; //字段名public static Map pts; //<字段名,字段类型>public static String[] mns; //方法名public static String[] dmns; //自己方法的名字public static Class[] mts0; //方法参数类型public String[] getPropertyNames() {return pns;}public boolean hasProperty(String string) {return pts.containsKey(string);}public Class getPropertyType(String string) {return (Class)pts.get(string);}public String[] getMethodNames() {return mns;}public String[] getDeclaredMethodNames() {return dmns;}public void setPropertyValue(Object object, String string, Object object2) {try {DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.").toString());}public Object getPropertyValue(Object object, String string) {try {DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.").toString());}// 这个就是Wrapper对象的invokerMethod方法public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {DemoServiceImpl demoServiceImpl;try {demoServiceImpl = (DemoServiceImpl)object;}catch (Throwable throwable) {throw new IllegalArgumentException(throwable);}try {if ("sayHello".equals(string) && arrclass.length == 1) {// 调用DemoServiceImpl实例对象的sayHello方法,并将结果返回return demoServiceImpl.sayHello((String)arrobject[0]);}}catch (Throwable throwable) {throw new InvocationTargetException(throwable);}throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.").toString());}
}
这里我们返回的AbstractProxyInvoker也非常简单,构造参数中会对 proxy 接口实现类,具体提供服务, Class type 接口类型, URL url ,这三个参数进行检验,然后存储。实现接口Invoker的invoke(Invocation invocation)方法,实际上还是子类实现doInvoke方法,这里会调用到wrapper.invokeMethod方法
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {private final T proxy;private final Class<T> type;private final URL url;public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {//参数校验if (proxy == null) {throw new IllegalArgumentException("proxy == null");}if (type == null) {throw new IllegalArgumentException("interface == null");}//proxy 需要是实现typeif (!type.isInstance(proxy)) {throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);}this.proxy = proxy;this.type = type;this.url = url;}@Overridepublic Class<T> getInterface() {return type;}@Overridepublic URL getUrl() {return url;}@Overridepublic boolean isAvailable() {return true;}@Overridepublic void destroy() {}@Overridepublic Result invoke(Invocation invocation) throws RpcException {try {return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));} catch (InvocationTargetException e) {return new RpcResult(e.getTargetException());} catch (Throwable e) {throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);}}//实际调用子类的实现protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;@Overridepublic String toString() {return getInterface() + " -> " + (getUrl() == null ? " " : getUrl().toString());}}
2.2.6.3.2、protocol#export
这里protocol是我们的一个类属性,他是通过SPI查找其实现,之前我们已经分析过Dubbo的SPI,这里getAdaptiveExtension方法回去获取自适应的实现。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
这里我们看export方法会首先根据@Adaptive注解从我们的URL中去查找protocol对应的值,因为我们在url中指定protocol=injvm,所以会找到InjvmProtocol实现
@SPI("dubbo")
public interface Protocol {int getDefaultPort();//我们可以看到Protocol这个扩展点export方法上是有@Adaptive注解的,然后没有value,根据自适应的规则,没有value则value=类名小写。//这样的话就是从url中获取protocol的值,咱们invoker里面包的url是registryURL,然后对应的protocol也就是注册中心的protocol,// RegistryProtocol这个类。但是在创建扩展实现类的时候dubbo会给我们setter注入与wrapper包装,所以我们拿到的RegistryProtocol最终样子是这样的@Adaptive<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;@Adaptive<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;void destroy();}
这里我们看InjvmProtocol的export方法会直接new一个InjvmExporter返回
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//这里直接 new InjvmExporter类返回return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);}
最终是将key=接口全类名,value=this(也就是InjvmExporter对象)put到exporterMap中了。
class InjvmExporter<T> extends AbstractExporter<T> {private final String key;private final Map<String, Exporter<?>> exporterMap;//最终是将key=接口全类名,value=this(也就是InjvmExporter对象)put到exporterMap中了。//再回到ServiceConfig的exportLocal方法中。还有最后一句exporters.add(exporter);//这里是将上面生成的InjvmExporter对象缓存了起来。//到这里我们这个服务本地暴露(injvm)就解析完成了。InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);this.key = key;this.exporterMap = exporterMap;exporterMap.put(key, this);}@Overridepublic void unexport() {super.unexport();exporterMap.remove(key);}}
2.2.6.4、protocol#export
这里protocol是我们的一个类属性,他是通过SPI查找其实现,之前我们已经分析过Dubbo的SPI,这里getAdaptiveExtension方法回去获取自适应的实现。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
这里由于我们在url中的protocol属性是registry,所以这里的实现是RegistryProtocol,但是Dubbo的SPI在创建扩展实现类的时候会给我进行wrapper包装,所以这里拿到的RegistryProtocol是经过几次包装的
Listener -> Filter -> Qos -> RegistryProtocol
2.2.6.4.1、ProtocolListenerWrapper#export
由于我们这里是registry协议,所以会直接会调用下一个的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//这里也是判断了一下是否是registry协议,如果是的话就直接进入下一个RegistryProtocol。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}//这里将服务暴露返回的exporter与自动激活的listener们绑在了一起。return new ListenerExporterWrapper<T>(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}
2.2.6.4.2、ProtocolFilterWrapper#export
由于我们这里是registry协议,所以会直接会调用下一个的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//首先判断这个Protocol是不是registry的,如果是的话就直接跳过的了,也就是没它什么事了。//如果不是就要走下面这个,这里调用了 buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)方法,// 其中 Constants.SERVICE_FILTER_KEY是service.filter,Constants.PROVIDER 是provider,我们来看下这个方法。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}
2.2.6.4.3、QosProtocolWrapper#export
判断是registry协议的话,就启动Qos服务器,然后继续调用下一个的export方法也就是RegistryProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//判断是registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {//启动Qos服务器startQosServer(invoker.getUrl());return protocol.export(invoker);}return protocol.export(invoker);}
2.2.6.4.4、RegistryProtocol#export
- 调用doLocalExport进行本地导出
- 从originInvoker获取注册中心对象,然后获取服务提供者的url
- 想本地服务注册表注册服务,是将刚刚塞到本地注册表中的invoker有个reg的属性设置成true表示注册了
- 向注册中心注册当前服务
- 订阅当前服务,当服务实例变更时可以更新本地缓存
- 构造DestroyableExporter返回。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {//export invoker//暴露服务 doLocalExport表示本地启动服务不包括去注册中心注册final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//获取注册中心URLURL registryUrl = getRegistryUrl(originInvoker);//registry provider//获得注册中心对象final Registry registry = getRegistry(originInvoker);//获取服务提供者的urlfinal URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);//to judge to delay publish whether or notboolean register = registeredProviderUrl.getParameter("register", true);//向本地服务注册表注册服务ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);if (register) { //向注册中心注册自己//go registerregister(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);}
2.2.6.4.5、RegistryProtocol#doLocalExport
- bounds为export的service缓存,key为通过Invoker获取的providerUrl,value为exporter的包装类ExporterChangeableWrapper。主要解决RMI重复暴露端口冲突的问题,已经暴露的服务不再暴露。
- 构造InvokerDelegete,这里调用getProviderUrl获取url,其实就是从origininvoker的url中找参数export对应的value,对应的value为dubbo,所以我们这里的导出协议是dubbo
- 调用protocol.export方法,这里protocol的实例是DubboProtocol的包装类,共包装了(ProtocolListenerWrapper,ProtocolFilterWrapper,QosProtocolWrapper) 三层
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {String key = getCacheKey(originInvoker);ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) { //之前没有暴露过synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {//封装InvokerDelegete 将url封装起来了final Invoker<?> invokerDelegete =new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));exporter =new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);//缓存起来bounds.put(key, exporter);}}}return exporter;}
2.2.6.4.5.1、ProtocolListenerWrapper#export
由于这里我们的协议是dubbo,所以这里会首先调用下一个protocol#export方法,然后把返回的exporter和自动激活的listener们绑在了一起。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//这里也是判断了一下是否是registry协议,如果是的话就直接进入下一个RegistryProtocol。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}//这里将服务暴露返回的exporter与自动激活的listener们绑在了一起。return new ListenerExporterWrapper<T>(protocol.export(invoker),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));}
这里在ListenerExporterWrapper构造方法中会遍历通知listener,告诉他们当前服务暴露完了。
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {//判断nullif (exporter == null) {throw new IllegalArgumentException("exporter == null");}this.exporter = exporter;this.listeners = listeners;if (listeners != null && !listeners.isEmpty()) {RuntimeException exception = null;//遍历通知for (ExporterListener listener : listeners) {if (listener != null) {try {//通知listener.exported(this);} catch (RuntimeException t) {logger.error(t.getMessage(), t);exception = t;}}}if (exception != null) {throw exception;}}}
2.2.6.4.5.2、ProtocolFilterWrapper#export
- 这里首先调用buildInvokerChain生成一个过滤调用链,最后到真实的invoker
- 继续调用下一个protocol#export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//首先判断这个Protocol是不是registry的,如果是的话就直接跳过的了,也就是没它什么事了。//如果不是就要走下面这个,这里调用了 buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)方法,// 其中 Constants.SERVICE_FILTER_KEY是service.filter,Constants.PROVIDER 是provider,我们来看下这个方法。if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));}
这里就是根据spi获取所有的,Filter然后用Filter包装起来invoker,调用这个invoker就会先过所有的过滤器,这个后续我们再逐个分析过滤器的实现
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {Invoker<T> last = invoker;//这个方法执行了List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); // ,这里我们本文的主角就登场了,获取了Filter的ExtensionLoader对象,然后调用getActivateExtension方法,参数分别是url,service.filter,provider,接着我们要看下这个方法了List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);if (!filters.isEmpty()) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new Invoker<T>() {@Overridepublic Class<T> getInterface() {return invoker.getInterface();}@Overridepublic URL getUrl() {return invoker.getUrl();}@Overridepublic boolean isAvailable() {return invoker.isAvailable();}//exception->moniter->timeout->trace->context->generic->classloader->echo@Overridepublic Result invoke(Invocation invocation) throws RpcException {return filter.invoke(next, invocation);}@Overridepublic void destroy() {invoker.destroy();}@Overridepublic String toString() {return invoker.toString();}};}}return last;}
2.2.6.4.5.3、QosProtocolWrapper#export
这里我们是dubbo协议,所以直接会调用下一个实现,也就是DubboProtocol#export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {//判断是registryif (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {//启动Qos服务器startQosServer(invoker.getUrl());return protocol.export(invoker);}return protocol.export(invoker);}
2.2.6.4.5.4、DubboProtocol#export
- 首先根据url生成一个服务key,然后创建DubboExporter放到exporterMap中
- 调用openServer启动服务器
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// export service. key = com.alibaba.dubbo.demo.DemoService:20880String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);exporterMap.put(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +"], has set stubproxy support event ,but no stub methods founded."));}} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}//go openServeropenServer(url);optimizeSerialization(url);return exporter;}
2.2.6.4.5.5、DubboProtocol#openServer
- 首先获取地址,然后判断是不是服务端,如果是服务端并且之前没有暴露过,则调用createServer创建服务器
private void openServer(URL url) {// find server. 获得地址 ip:port 192.168.1.104:22880String key = url.getAddress();//client can export a service which's only for server to invoke//客户端可以暴露仅供服务器调用的服务boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) { //判断是否是服务器//查找缓存的服务器ExchangeServer server = serverMap.get(key);if (server == null) { // 没有找到service 就要创建serverserverMap.put(key, createServer(url));} else {// server supports reset, use together with override//支持重置,与覆盖一起使用server.reset(url);}}}
2.2.6.4.5.6、DubboProtocol#createServer
- 首先设置一些参数,如下:。CHANNEL_READONLYEVENT_SENT_KEY为当server关闭时发布readonly事件,HEARTBEAT_KEY为心跳时间,CODEC_KEY为此url的通过DubboCodec进行的编码
- 然后再获取url中我们配置的server类型,默认是netty
- 再通过Exchangers的静态方法创建对应的ExchangeServer
private ExchangeServer createServer(URL url) {// send readonly event when server closes, it's enabled by default//服务器关闭时发送只读事件,默认情况下启用url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());// enable heartbeat by default//60*1000 设置心跳url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//获取配置的服务器类型,缺省就是使用netty,默认服务器是nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//不存在Transports 话,就抛出 不支持的server typeif (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);//设置 Codec 的类型为dubbourl = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {//go bind methodserver = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}str = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: " + str);}}return server;}
2.2.6.4.5.7、Exchangers#bind
进行参数校验,然后从url中获取exchanger,缺省是header,然后这里获得的就是HeaderExchanger,然后调用HeaderExchanger#bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//验证if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");//go HeaderExchanger -> bindreturn getExchanger(url).bind(url, handler);}public static Exchanger getExchanger(URL url) {//这里可以看到从url中获取exchanger ,缺省是header,然后使用dubbo spi 获取到HeaderExchanger,我们看下HeaderExchanger源码:String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);return getExchanger(type);}public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}
2.2.6.4.5.8、HeaderExchanger#bind
- 这里首先把handler包装成HeaderExchangeHandler,再把HeaderExchangeHandler包装成DecodeHandler。
- 调用Transporters#bind进行bind
- 创建HeaderExchangeServer对server进行增强,HeaderExchangeServer主要增加了心跳处理
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 创建一个通信server DecodeHandler << HeaderExchangeHandler << handler//这个Transporters也是门面类,对外统一了bind 与connect。//HeaderExchangeServer维护心跳return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}
2.2.6.4.5.9、Transporters#bind
- 首先进行参数校验,接着判断handler的个数,如果是多个则再把handler包装成ChannelHandlerDispatcher,里面主要是对多个handler进行分发
- 调用getTransporter获取Transporter自适应的扩展点,缺省是netty
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {//验证if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;//创建handlerif (handlers.length == 1) {handler = handlers[0];} else { //多个channel 对channel进行分发 ChannelHandlerDispatcherhandler = new ChannelHandlerDispatcher(handlers);}//真正服务器 进行bindreturn getTransporter().bind(url, handler);}
public static Transporter getTransporter() {return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}
2.2.6.4.10、NettyTransporter#bind
这里直接构造了一个NettyServer对象返回
public Server bind(URL url, ChannelHandler listener) throws RemotingException {//在这里直接new了一个NettyServer对象return new NettyServer(url, listener);}
2.2.6.4.11、NettyServer#constructor
调用父类的构造方法,这里又对handler包了多层,这里层次如下
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);}protected static ChannelHandlers getInstance() {return INSTANCE;}static void setTestingChannelHandlers(ChannelHandlers instance) {INSTANCE = instance;}protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {//调用多消息处理器,对心跳消息进行了功能加强return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));}
}
2.2.6.4.12、AbstractServer#constructor
- 先从url中获取一些配置信息,如下
- 调用doOpen开启服务,会调用到子类NettyServer#doOpen
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);//从url中获得本机地址localAddress = getUrl().toInetSocketAddress();//从url中获得绑定的ipString bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//从url中获得绑定的端口号int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());//判断url中配置anyhost是否为true或者判断host是否为不可用的本机hostif (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);//服务器最大可接受连接数this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {//go doOpendoOpen(); //子类实现,真正打开服务器if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export "+ getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}//fixme replace this with better method//获取线程池DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor =(ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}
2.2.6.4.13、AbstractServer#doOpen
这里就是标准的netty启动流程了,boss线程是1一个,worker线程是cpu核心数+1 与32进行比较 取小的那个 也就是最大不超过32,标准的reactor模式,这里服务已经启动起来了。
protected void doOpen() throws Throwable {bootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));//iothreads 默认是cpu核心数+1 与32进行比较 取小的那个 也就是最大不超过32workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),new DefaultThreadFactory("NettyServerWorker", true));final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();/*** ChannelOption.SO_REUSEADDR 这个参数表示允许重复使用本地地址和端口,** 比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,** 比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR** 就无法正常使用该端口。*/bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("handler", nettyServerHandler);}});// bindChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();}
2.2.6.4.6、RegistryProtocol#getRegistry
- 首先从originInvoker里面获取registryUrl,然后通过urll获取registryFactory对象,这个是SPI机制,这里因为我们指定注册中心是zk,所以这里拿到的是ZookeeperRegistryFactory
private Registry getRegistry(final Invoker<?> originInvoker) {//然后接着就是从originInvoker里面获取registryUrl,接着就是获取Registry的对象。我们看下这个getRegistry()方法URL registryUrl = getRegistryUrl(originInvoker);//通过registryFactory 获取Registry,其实这个registryFactory 是RegistryProtocol的一个成员,然后是dubbo spi 自动setter注入进来的。//我们来看一下registryFactory接口源代码return registryFactory.getRegistry(registryUrl);}
@SPI("dubbo")
public interface RegistryFactory {//我们可以看到@Adaptive的值是protocol,咱们上面的url中的protocol值正好是zookeeper,就可以找到ZookeeperRegistryFactory,我们看下ZookeeperRegistryFactory继承图@Adaptive({"protocol"})Registry getRegistry(URL url);}
2.2.6.4.6、ProviderConsumerRegTable#registerProvider
这里面就是维护了两个map,缓存了服务提供者和消费者,就是把这几个参数封装成ProviderInvokerWrapper对象,这个对象就是包装的作用,将这几个参数关联起来了,然后就是获取serviceKey,这个serviceKey就是暴露接口的全类名,然后就是根据serviceKey从providerInvokers 这个map中获取invokers,然后没有就创建,最后就是将包装的这个invoker缓存到invokers中了。
public class ProviderConsumerRegTable {//服务提供者public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();//服务消费者public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {//服务提供者包装类ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);//唯一的keyString serviceUniqueName = providerUrl.getServiceKey();Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);if (invokers == null) { //没有就创建,设置providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());invokers = providerInvokers.get(serviceUniqueName);}invokers.add(wrapperInvoker);}
}
2.2.6.4.7、ProviderConsumerRegTable#registerProvider
调用ZookeeperRegisty#register注册到注册中上
public void register(URL registryUrl, URL registedProviderUrl) {//这里直接调用ZookeeperRegistry#register 方法Registry registry = registryFactory.getRegistry(registryUrl);registry.register(registedProviderUrl);}
2.2.6.4.7、Registry#subscribe
向注册中心订阅这个url,这里具体的服务注册订阅的逻辑后续单独分析。
相关文章:

Dubbo源码解析-——服务导出
前言 在之前我们讲过Spring和Dubbo的集成,我们在服务上标注了DubboService的注解,然后最终Dubbo会调用到ServiceBean#export方法中,本次我们就来剖析下服务导出的全流程。 一、前置回顾 由于ServiceBean实现了ApplicationListener接口&…...

vue+django+neo4j 基于知识图谱红楼梦问答系统
vuedjangoneo4j 基于知识图谱红楼梦问答系统 项目背景 知识图谱是一种以图谱形式描述客观世界中存在的各种实体、概念及其关系的技术, 广泛应用于智能搜索、自动问答和决策支持等领域. 可视分析技术可以将抽象的知识图谱映射为图形元素, 帮助用户直观地感知和分析数据, 从而提…...

2023年全国最新食品安全管理员精选真题及答案13
百分百题库提供食品安全管理员考试试题、食品安全员考试预测题、食品安全管理员考试真题、食品安全员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 121.关于食品召回的说法,以下表述不正确的是(&am…...

Keychron K7 Pro 轻薄矮轴机械键盘开箱体验
文章目录1. 拆箱2. 零件3. 外观4. 声音5. 特点5.1 有线 / 无线5.2 RGB背光5.3 轻薄5.4 mac / win / iphone 切换5.5 人体工程学支持5.6 扁平双射PBT键帽5.7 重新设计的稳定器5.8 扁平Gateron(佳达隆)轴体5.9 热插拔5.10 支持 QMK / VIA 改键6. 对比6.1 K7 与 K7 Pro 参数对比6.…...

加油站ai视觉识别系统 yolov7
加油站ai视觉识别系统通过yolov7网络模型深度学习,加油站ai视觉识别算法对现场画面中人员打电话抽烟等违规行为,还有现场出现明火烟雾等危险状态。除此之外,模型算法还可以对卸油时灭火器未正确摆放、人员离岗不在现场、卸油过程静电释放时间…...

【电子学会】2022年12月图形化二级 -- 绘制风车
绘制风车 1. 准备工作 (1)隐藏默认的小猫角色; (2)选择背景:“Xy-grid”。 2. 功能实现 (1)小猫角色的初始位置为(x:0,y:0); (2)线条粗细为…...

【golang/go语言】Go语言代码实践——高复用、易扩展性代码训练
某个项目里有一段老代码写的不是很好,想着能否通过自己掌握的知识,将其改善一下。感兴趣的小伙伴可以通过了解背景和需求,自己试想下该如何实现,如果有更好的方案也欢迎留言讨论。 1. 背景及需求 (1) 背景 假设我们的下游提供了…...

[数据结构与算法(严蔚敏 C语言第二版)]第1章 绪论(学习复习笔记)
1.1 数据结构的研究内容 计算机解决问题的步骤 从具体问题抽象出数学模型设计一个解此数学模型的算法编写程序,进行测试、调试,直到解决问题 计算机解决问题的过程中寻求数学模型的实质是 分析问题,从中提取操作的对象,并找出这些…...

05_Pulsar的主要组件介绍与命令使用、名称空间、Pulsar的topic相关操作、Pulsar Topic(主题)相关操作_高级操作、
1.5.Apache Pulsar的主要组件介绍与命令使用 1.5.1.多租户模式 1.5.1.1. 什么是多租户 1.5.1.2.Pulsar多租户的相关特征_安全性(认证和授权) 1.5.1.3.Pulsar多租户的相关特性_隔离性 1.5.1.4.Pulsar多租户的相关操作 1-获取租户列表 2-创建租户 3-获取配…...

我的终端怎么莫名卡死了?shell下ctrl+s的含义
在终端下面一不小心按下了ctrl s,整个终端就锁住了,不知道原油的同学可能会以为终端卡死了,找不到原因只好关闭终端重新打开,然后下意识还不忘吐槽一句,垃圾ubuntu,动不动卡死。 事实上ctrl s在终端下是…...

【Vue】Vue的简单介绍与基本使用
一、什么是VueVue是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建,并提供了一套声明式的、组件化的编程模型,帮助你高效地开发用户界面。无论是简单还是复杂的界面,Vue 都可以胜任。1.构建用户界面传统方…...

网络知识篇
网络知识篇 局域网 当多台计算机或设备通过同一物理或逻辑连接(例如以太网或Wi-Fi网络)连接在一起,并且它们可以相互通信时,就构成了一个局域网(Local Area Network,LAN)。 子网划分 为了更…...

python 连接数据库
文章目录同步操作同步连Mysql同步连redis同步连mongodb异步操作异步连mysql异步连redis异步连mongodb同步操作 同步连Mysql python 连接mysql可以使用pymysql、mysqlclient等。 安装: # win pip install pymysql 连接mysql: # __author__ "laufing"…...

一文讲明白一致性hash算法
一致性Hash算法常用来解决数据分片时的数据扩容/缩容的性能问题。 一、业内数据分片用的Hash算法,将节点的hash值对节点数取余。 存取通过key / value的方式对节点取余。 二、数据分片使用hash算法的优缺点: 优点:简单,方便。 缺…...

Java分布式解决方案(一)
随着互联网的不断发展,互联网企业的业务在飞速变化,推动着系统架构也在不断地发生变化。 如今微服务技术越来越成熟,很多企业都采用微服务架构来支撑内部及对外的业务,尤其是在高 并发大流量的电商业务场景下,微服务…...

设备树系统学习(二)设备树的节点和属性
一、节点 1.节点命名格式 格式:<name>[@<unit-address>] name:是一个简单的 ASCII 字符串,长度最多为 31 个字符,节节点是根据它所代表的设备类型来命名的,比如 “gpu” 就表示这个节点是 gpu外设。 unit-address:一般表示设备的地址或寄存器首地址,可以为…...

【数据结构】二叉树的基本操作中的一些易错点
文章目录前言一、求二叉树节点个数二、求树的叶子结点个数三、求树的高度四、二叉树查找值为x的结点总结前言 笔者整理出了一些关于萌新在入门二叉树时容易犯的一些错误,你也来试试自己会不会掉到这些坑里把~ 一、求二叉树节点个数 错误示例: int Tre…...

在线图书借阅网站( Python +Vue 实现)
功能介绍 平台采用B/S结构,后端采用主流的Python语言进行开发,前端采用主流的Vue.js进行开发。 整个平台包括前台和后台两个部分。 前台功能包括:首页、图书详情页、用户中心模块。后台功能包括:总览、借阅管理、图书管理、分类…...

不平衡数据集的建模的技巧和策略
不平衡数据集是指一个类中的示例数量与另一类中的示例数量显著不同的情况。 例如在一个二元分类问题中,一个类只占总样本的一小部分,这被称为不平衡数据集。类不平衡会在构建机器学习模型时导致很多问题。不平衡数据集的主要问题之一是模型可能会偏向多数…...

3. 算法效率
同一个问题的不同算法在性能上的比较,现在的方法主要是算法时间复杂度。算法效率是算法操作(operate)或处理(treat)数据的重复次数最小。 例题选自《编程珠玑》第8章,算法设计技术。 这个问题是一维模式识别(人工智能)中的一个问题。 输入有n个元素的向量,输出连续子向…...

仪表放大器放大倍数分析-运算放大器
仪表放大器是一种非常特殊的精密差分电压放大器,它的主要特点是采用差分输入、具有很高的输入阻抗和共模抑制比,能够有效放大在共模电压干扰下的信号。本文简单分析一下三运放仪表放大器的放大倍数。 一、放大倍数理论分析 三运放仪表放大器的电路结构…...

laravel8多模块、多应用和多应用路由
1、安装多应用模块 composer require nwidart/laravel-modules2、执行命令,config文件夹下生成一个modules.php配置文件 php artisan vendor:publish --provider"Nwidart\Modules\LaravelModulesServiceProvider"3、修改config文件夹下的modules.php&am…...

【Java学习笔记】6.Java 变量类型
Java 变量类型 在Java语言中,所有的变量在使用前必须声明。声明变量的基本格式如下: type identifier [ value][, identifier [ value] ...] ;格式说明:type为Java数据类型。identifier是变量名。可以使用逗号隔开来声明多个同类型变量。 …...

Promise对象状态属性 工作流程 Promise对象的几个属性
Promise 对象状态属性介绍 实例对象中的一个属性 PromiseState pending 1、pending 变为 resolved / fullfilled 成功 2、pending 变为 rejected 失败 说明:只有这2种,且一个promise对象只能改变一次 无论变为成功还是失败,都会有一个结果…...

webgpu思考obj携带属性
今天在搞dbbh.js的时候,想到一个问题,啥问题呢,先看看情况 画2个材质不相同的box的时候 首先开始createCommandEncoder,然后beginRenderPass,分歧就在这里了 第一个box,他有自己的pipeline,第二个也有,那么…...

设计模式(只谈理解,没有代码)
1.什么是设计模式设计模式,是一套被反复使用、多数人知晓的、经过分类编目的、代码设计经验的总结。使用设计模式是为了可重用代码、让代码更容易被他人理解、保证代码可靠性、程序的重用性。2.为什么要学习设计模式看懂源代码:如果你不懂设计模式去看Jd…...

06、Eclipse 中使用 SVN
Eclipse 中使用 SVN1 在 Eclipse 中安装 SVN 客户端插件1.1 在线安装1.2 离线安装2 SVN 在 Eclipse 分享3 检出提交更新3.1 检出3.2 提交3.3 更新4 Eclipse 中 SVN 图标及其含义4.1 ?图标4.2 图标4.3 金色圆柱图标4.4 * 图标5 恢复历史版本5.1 恢复步骤5.2 权限控制…...

Zookeeper3.5.7版本——客户端命令行操作(命令行语法)
目录一、命令行语法二、help命令行语法示例一、命令行语法 命令行语法列表 命令基本语法功能描述help显示所有操作命令ls path使用 ls 命令来查看当前 znode 的子节点 [可监听]-w 监听子节点变化-s 附加次级信息create普通创建-s 含有序列-e 临时(重启或者超时消失…...

2023.03.05 学习周报
文章目录摘要文献阅读1.题目2.摘要3.介绍4.SAMPLING THE OUTPUT5.LOSS FUNCTION DESIGN5.1 ranking loss: Top1 & BPR5.2 VANISHING GRADIENTS5.3 ranking-max loss fuction5.4 BPR-max with score regularization6.实验7.结论深度学习1.相关性1.1 什么是相关性1.2 协方差1…...

java Spring JdbcTemplate配合mysql实现数据批量修改
其实这个操作和批量添加挺像的 调的同一个方法 首先 我们看数据库结构 这是我本地的 mysql 里面有一个test数据库 里面有一张user_list表 然后创建一个java项目 然后 引入对应的JAR包 在src下创建 dao 目录 在下面创建一个接口 叫 BookDao 参考代码如下 package dao;impo…...