Dubbo 与 Spring 的结合 代码在 dubbo-config 下的 dubbo-config-spring 模块中
dubbo.xsd 定义了 dubbo 自定义标签
spring.schemas 指定了 dubbo.xsd 文件的位置.
1 http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
关联 DubboNamespaceHandler 命名空间处理器和 dubbo.xsd 中的 targetNamespace,即 http://code.alibabatech.com/schema/dubbo
1 http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHand
DubboNamespaceHandler.init() DubboNamespaceHandler.init(): 注册各个自定义元素的解析器,所有元素都是交给 DubboBeanDefinitionParser 处理的,传入不同的 Class 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void init () { registerBeanDefinitionParser("application" , new DubboBeanDefinitionParser (ApplicationConfig.class, true )); registerBeanDefinitionParser("module" , new DubboBeanDefinitionParser (ModuleConfig.class, true )); registerBeanDefinitionParser("registry" , new DubboBeanDefinitionParser (RegistryConfig.class, true )); registerBeanDefinitionParser("monitor" , new DubboBeanDefinitionParser (MonitorConfig.class, true )); registerBeanDefinitionParser("provider" , new DubboBeanDefinitionParser (ProviderConfig.class, true )); registerBeanDefinitionParser("consumer" , new DubboBeanDefinitionParser (ConsumerConfig.class, true )); registerBeanDefinitionParser("protocol" , new DubboBeanDefinitionParser (ProtocolConfig.class, true )); registerBeanDefinitionParser("service" , new DubboBeanDefinitionParser (ServiceBean.class, true )); registerBeanDefinitionParser("reference" , new DubboBeanDefinitionParser (ReferenceBean.class, false )); registerBeanDefinitionParser("annotation" , new AnnotationBeanDefinitionParser ()); }
DubboBeanDefinitionParser DubboBeanDefinitionParser.parse()中 根据 beanClass 创建 RootBeanDefinition,设置为非懒加载. 若未配置 id 属性,则会根据 name 或 interface 属性生成,并通过判断 BeanRegistry 中是否存在该 id 的 bean,用 则增序列解决重复问题. 通过id注册到BeanRegistry中,并设置 beanDefinition 的 id 属性. 下面就是对 ServiceBean 等元素的各自的属性配置的处理.
在Spring应用上下文启动过程中,加载XML配置文件(比如提供者配置和消费者配置文件)为 Resource 对象,并通过 XmlBeanDefinitionReader 调用BeanDefinitionParser 解析 XML 标签为 BeanDefinition,并注册到 BeanDefinitionRegistry 中.
ServiceBean ServiceBean 实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>
, BeanNameAware 这几个接口.
实现的 ApplicationContextAware 接口的 setApplicationContext() 方法中,会: 保存 applicationContext 引用. applicationContext 引用 保存到 SpringExtensionFactory 中.
实现的 InitializingBean 接口的 afterPropertiesSet() 方法中,会: 设置相关属性.
1 2 - ServiceBean.afterPropertiesSet() - 从BeanFactory中获取ProviderConfig,ProtocolConfig等,这些bean是之前在Spring bean解析时加载进来的.将这些配置对象赋值到ServiceBean中,当然也有可能在其父类中.
实现的 ApplicationListener<ContextRefreshedEvent>
接口的 onApplicationEvent(ContextRefreshedEvent event) 方法中,有服务暴露的相关逻辑.
服务本地暴露 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 - ServiceBean.onApplicationEvent() 监听spring容器初始化完成时触发 - ServiceConfig.doExport()根据配置选择立即暴露或延迟暴露 - 暴露服务doExportUrls() - loadRegistries(true)加载注册中心 URL 数组 - 将xml中配置的<dubbo:registry>解析成url对象(如registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&logger=jcl&pid=52008&qos.port=22222®istry=zookeeper×tamp=1574666370052) - doExportUrlsFor1Protocol(protocolConfig, registryURLs)根据不同的协议把服务export到不同的注册中心(如zk)上去 - ServiceConfig.exportLocal()中:服务本地暴露 - 1.通过传入的协议url构造本地injvm协议的url - 2.通过proxyFactory.getInvoker()获取Invoker - ProxyFactory$Adaptive.getInvoker()方法 - 1.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist")获取StubProxyFactoryWrapper 对象 - 2.extension.getInvoker(arg0, arg1, arg2)通过ProxyFactory获取Invoker对象 - StubProxyFactoryWrapper.getInvoker()直接调用内部的JavassistProxyFactory.getInvoker() - JavassistProxyFactory.getInvoker()方法中 - 1.根据proxy对象创建Wrapper - 2.根据proxy, type, url创建AbstractProxyInvoker对象,它的doInvoke()方法会调用wrapper类的invokeMethod()方法 - 3.protocol.export()将Invoker转为Exporter - Protocol$Adaptive.export()生成的代理类 - 1.通过ExtensionLoader获取name为injvm的Protocol拓展对象.吐出的是ProtocolListenerWrapper的实例,包裹了InjvmProtocol - 2.extension.export(invoker) - ProtocolFilterWrapper.export()中 - 1.buildInvokerChain()建立带有 Filter 过滤链的 Invoker,再暴露服务 - 2.protocol.export() - ProtocolListenerWrapper.export()中 - 1.通过InjvmProtocol.export()把invoker暴露为exporter - InjvmProtocol.export()中 - 1.创建InjvmExporter对象 - 2.获取ExporterListener拓展对象列表 - 3.创建ListenerExporterWrapper对象作为exporter返回
ServiceConfig.exportLocal()中:服务本地暴露
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void exportLocal (URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0 ); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry" ); } }
这个 proxyFactory 是 ProxyFactory$Adaptive 对象,会根据传入的参数创建具体的 proxyFactory 对象. proxyFactory.getInvoker(ref, (Class) interfaceClass, local)传入的参数分别是 服务具体实现类对象ref,服务接口DemoService,本地暴露的URL对象
ProxyFactory$Adaptive.getInvoker()方法
1 2 3 4 5 6 7 8 public com.alibaba.dubbo.rpc.Invoker getInvoker (java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException { if (arg2 == null ) throw new IllegalArgumentException ("url == null" ); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy" , "javassist" ); if (extName == null ) throw new IllegalStateException ("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])" ); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); }
通过 ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist")
获取的是 JavassistProxyFactory 对象,同时 ProxyFactory 有个 Wrapper 实现类 StubProxyFactoryWrapper,会被 SPI AOP 包装到 JavassistProxyFactory 对象 外层,所以最后吐出的是 StubProxyFactoryWrapper 对象.
1 2 3 4 5 6 7 8 9 10 11 12 public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker <T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
Invoker -> Exporter
ProtocolFilterWrapper.export()方法中
1 2 3 4 5 6 7 8 public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private static <T> Invoker<T> buildInvokerChain (final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1 ; i >= 0 ; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker <T>() { @Override public Class<T> getInterface () { return invoker.getInterface(); } @Override public URL getUrl () { return invoker.getUrl(); } @Override public boolean isAvailable () { return invoker.isAvailable(); } @Override public Result invoke (Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy () { invoker.destroy(); } @Override public String toString () { return invoker.toString(); } }; } } return last; }
ProtocolListenerWrapper.export()
1 2 3 4 5 6 7 8 9 10 11 12 public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } Exporter<T> exporter = protocol.export(invoker); List<ExporterListener> listeners = Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)); return new ListenerExporterWrapper <T>(exporter, listeners); }
InjvmProtocol.export()
1 2 3 public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { return new InjvmExporter <T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
使用本地暴露的原因 本地调用使用了 injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链.
服务远程暴露 服务远程暴露的总体流程:
1.将 ref 封装为 Invoker
2.将 Invoker 转换为 Exporter
3.启动 Netty 服务端
4.注册 provider 到 zk
5.zk 订阅与通
6.返回 Exporter 实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 - 服务远程暴露 - 使用 ProxyFactory 将 ref 创建成 Invoker 对象(这部分逻辑和本地暴露类似) - ProxyFactory$Adaptive.getInvoker()中 - extension.getInvoker(arg0, arg1, arg2); - StubProxyFactoryWrapper.getInvoker()直接调用内部的JavassistProxyFactory.getInvoker() - JavassistProxyFactory.getInvoker()方法中 - 1.根据传入的 proxy对象(如DemoServiceImpl)的类信息创建对它的包装对象Wrapper - 2.根据proxy, type, url创建AbstractProxyInvoker对象,它的doInvoke()方法会调用wrapper类的invokeMethod()方法 - 首先将AbstractProxyInvoker实例包装成DelegateProviderMetaDataInvoker实例 - 使用 Protocol 暴露 Invoker 对象为 Exporter - Protocol$Adaptive.export()中 - 1.此时从传入的invoker.url.protocol获取到的协议为"registry",获取到的Protocol拓展点实现类为包裹了DubboProtocol 对象的 ProtocolFilterWrapper - 2.extension.export(invoker) - ProtocolFilterWrapper.export()这个类在Invoker基础上添加filter链 - 1.因为传入的协议时"registry",向注册中心暴露服务 - ProtocolListenerWrapper.export()中 - 1.调用RegistryProtocol.export(),向注册中心注册 - RegistryProtocol.export()中 - 1.doLocalExport()本地启动服务 - RegistryProtocol.doLocalExport()中 - 1.尝试从bounds缓存中获取exporter - 2.若1中获取不到则创建exporter并缓存 - 3.根据invoker创建InvokerDelegete委托类,提供 获取 invoker 的方法 - 4.调用Protocol$Adaptive.export()方法 - Protocol$Adaptive.export()这次传入的协议是"dubbo" - ProtocolFilterWrapper.export()中 - 1.buildInvokerChain()构建filter链 - 2.protocol.export()调用DubboProtocol.export() - DubboProtocol.export()中 - 1.通过invoker创建DubboExporter - // 2.参见netty启动流程 - 5.将4中返回的exporter包装成ExporterChangeableWrapper - 获取注册中心,如zk - 向注册中心注册自己 - FailbackRegistry.register(url)
ProtocolFilterWrapper.export()中
1 2 3 4 5 6 7 8 public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public <T> Exporter<T> export (final Invoker<T> originInvoker) throws RpcException { final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); boolean register = registedProviderUrl.getParameter("register" , true ); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true ); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener (overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter <T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
Netty ExchangeHandlerAdapter 定义了与客户端连接成功/断开连接/接受到客户端消息/响应消息,以及创造Invocation的方法
1 2 3 public ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer (Transporters.bind(url, new DecodeHandler (new HeaderExchangeHandler (handler)))); }
HeaderExchangeHandler: 基于消息头(Header)的信息交换服务器实现类
DecodeHandler: 解码处理器,处理接收到的消息
Netty 启动流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 - netty启动流程 - openServer(url)启动通信服务器(如Netty) - DubboProtocol.openServer()中 - DubboProtocol.createServer()中 - 1.创建ExchangeHandlerAdapter实例,作为请求处理器 - 2.Exchangers.bind(url,requestHandler) - Exchangers.bind()中 - 1.根据url获取Exchanger,返回的是HeaderExchanger对象 - HeaderExchanger.bind() - HeaderExchanger.bind()方法中 - 1. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))会对传入的ExchangeHandlerAdapter包装两层,然后使用Transporters.bind()创建NettyServer对象. - Transporters.bind()中 - 1.返回Transporter的适配对象 - 2.调用Transporter$Adaptive.bind() - netty4/NettyTransporter.bind()通过new NettyServer(url, listener)创建nettyServer对象 - NettyServer(url,handler)构造器中会通过ChannelHandlers会传入的DecodeHandler进行包装 - ChannelHandler.wrapInternal()方法中 - 1.获取Dispatcher的适配对象 - 2.调用Dispatcher.dispatch(handler, url) - AllDispatcher.dispatch()中new AllChannelHandler(handler, url) - AllChannelHandler调用父类构造方法WrappedChannelHandler - WrappedChannelHandler()构造方法中 - 1.创建线程池 - 3.把2中的返回结果用HeartbeatHandler和MultiMessageHandler再包装 - NettyServer.doOpen() - 1.创建 - 2.exchanger.bind() - ExchangeServer.createServer() 创建ExchangeServer
Zookeeper 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public ZookeeperRegistry (URL url, ZookeeperTransporter zookeeperTransporter) { super (url); if (url.isAnyHost()) { throw new IllegalStateException ("registry address == null" ); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this .root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener () { public void stateChanged (int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
AbstractRegistry:
1.通用的注册,订阅,查询,通知等方法
2.读取和持久化注册数据到文件,以 properties 格式存储
FailbackRegistry: 主要用来做失败重试操作(包括: 注册失败,反注册失败,订阅失败,反订阅失败,通知失败的重试);也提供了供 ZookeeperRegistry 使用的 zk 重连后的恢复工作的方法.
ZookeeperRegistry: 创建 zk 客户端,启动会话;并且调用 FailbackRegistry 实现 zk 重连后的恢复工作.
注册到 zk 流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 - 注册到zk流程 - RegistryProtocol.export() - 1.获得注册中心 URL - 就是把url改为zookeeper://协议开头 - 2.获得注册中心对象(启动zookeeper),通过ZookeeperRegistryFactory实例创建并返回ZookeeperRegistry对象 - AbstractRegistryFactory.getRegistry(url)中 - 1.加锁 从缓存中获得 Registry 对象 - 2.若获取不到,则createRegistry(url)创建 Registry 对象并缓存 - ZookeeperRegistryFactory.createRegistry()中会new ZookeeperRegistry(url, zookeeperTransporter)创建ZookeeperRegistry实例 - 3.获得服务提供者 URL - 4.向注册中心注册服务提供者 - RegistryProtocol.register(registedProviderUrl) - 1.从registryFactory中获取registry - 2.registry.register() - ZookeeperRegistry.doRegister() - 1.通过url得到zk路径,创建临时节点 - 5.向注册中心发起订阅 当前的provider订阅了/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,当其下的子节点发生变化(url)时,通知到provider,使得providerUrl发生变化,则提供者需要重新暴露 - RegistryProtocol.export() - FailbackRegistry.subscribe() - ZookeeperRegistry.doSubscribe() - 创建持久节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators - 6.返回Exporter