Dubbo 与 Spring 的结合

代码在 dubbo-config 下的 dubbo-config-spring 模块中

Dubbo Config整体类图

META-INF/dubbo.xsd

dubbo.xsd 定义了 dubbo 自定义标签

META-INF/spring.schemas

spring.schemas 指定了 dubbo.xsd 文件的位置.

1
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd

META-INF/spring.handlers

关联 DubboNamespaceHandler 命名空间处理器和 dubbo.xsd 中的 targetNamespace,即 http://code.alibabatech.com/schema/dubbo

1
http\://code.alibabatech.com/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHand

DubboNamespaceHandler.init()

DubboNamespaceHandler.init(): 注册各个自定义元素的解析器,所有元素都是交给 DubboBeanDefinitionParser 处理的,传入不同的 Class 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void init() { // 注册BeanDefinitionParser用来解析dubbo定义的那些xml元素节点
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));// <dubbo:application name="dubbo-admin" />
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));// <dubbo:registry address="${dubbo.registry.address}" check="false" file="false" />
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));// <dubbo:reference id="registryService" interface="com.alibaba.dubbo.registry.RegistryService" check="false" />
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
// <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" group="g1" ref="demoService" filter="demo" deprecated="false" callbacks="1000" timeout="200000" accesslog="true"> 解析成 ServiceBean
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));

registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser()); // 废弃
}

DubboBeanDefinitionParser

DubboBeanDefinitionParser.parse()中
根据 beanClass 创建 RootBeanDefinition,设置为非懒加载.
若未配置 id 属性,则会根据 name 或 interface 属性生成,并通过判断 BeanRegistry 中是否存在该 id 的 bean,用 则增序列解决重复问题.
通过id注册到BeanRegistry中,并设置 beanDefinition 的 id 属性.
下面就是对 ServiceBean 等元素的各自的属性配置的处理.

在Spring应用上下文启动过程中,加载XML配置文件(比如提供者配置和消费者配置文件)为 Resource 对象,并通过 XmlBeanDefinitionReader 调用BeanDefinitionParser 解析 XML 标签为 BeanDefinition,并注册到 BeanDefinitionRegistry 中.

ServiceBean

ServiceBean 实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware 这几个接口.

  1. 实现的 ApplicationContextAware 接口的 setApplicationContext() 方法中,会:
    保存 applicationContext 引用.
    applicationContext 引用 保存到 SpringExtensionFactory 中.

  2. 实现的 InitializingBean 接口的 afterPropertiesSet() 方法中,会: 设置相关属性.

    1
    2
    - ServiceBean.afterPropertiesSet()
    - 从BeanFactory中获取ProviderConfig,ProtocolConfig等,这些bean是之前在Spring bean解析时加载进来的.将这些配置对象赋值到ServiceBean中,当然也有可能在其父类中.
  3. 实现的 ApplicationListener<ContextRefreshedEvent> 接口的 onApplicationEvent(ContextRefreshedEvent event) 方法中,有服务暴露的相关逻辑.

服务本地暴露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- ServiceBean.onApplicationEvent()
监听spring容器初始化完成时触发
- ServiceConfig.doExport()根据配置选择立即暴露或延迟暴露
- 暴露服务doExportUrls()
- loadRegistries(true)加载注册中心 URL 数组
- 将xml中配置的<dubbo:registry>解析成url对象(如registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&logger=jcl&pid=52008&qos.port=22222&registry=zookeeper&timestamp=1574666370052)
- doExportUrlsFor1Protocol(protocolConfig, registryURLs)根据不同的协议把服务export到不同的注册中心(如zk)上去
- ServiceConfig.exportLocal()中:服务本地暴露
- 1.通过传入的协议url构造本地injvm协议的url
- 2.通过proxyFactory.getInvoker()获取Invoker
- ProxyFactory$Adaptive.getInvoker()方法
- 1.ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist")获取StubProxyFactoryWrapper 对象
- 2.extension.getInvoker(arg0, arg1, arg2)通过ProxyFactory获取Invoker对象
- StubProxyFactoryWrapper.getInvoker()直接调用内部的JavassistProxyFactory.getInvoker()
- JavassistProxyFactory.getInvoker()方法中
- 1.根据proxy对象创建Wrapper
- 2.根据proxy, type, url创建AbstractProxyInvoker对象,它的doInvoke()方法会调用wrapper类的invokeMethod()方法
- 3.protocol.export()将Invoker转为Exporter
- Protocol$Adaptive.export()生成的代理类
- 1.通过ExtensionLoader获取name为injvm的Protocol拓展对象.吐出的是ProtocolListenerWrapper的实例,包裹了InjvmProtocol
- 2.extension.export(invoker)
- ProtocolFilterWrapper.export()中
- 1.buildInvokerChain()建立带有 Filter 过滤链的 Invoker,再暴露服务
- 2.protocol.export()
- ProtocolListenerWrapper.export()中
- 1.通过InjvmProtocol.export()把invoker暴露为exporter
- InjvmProtocol.export()中
- 1.创建InjvmExporter对象
- 2.获取ExporterListener拓展对象列表
- 3.创建ListenerExporterWrapper对象作为exporter返回

ServiceConfig.exportLocal()中:服务本地暴露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 协议不以injvm开头
// 1.创建本地 injvm URL
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // injvm
.setHost(LOCALHOST) // 本地
.setPort(0); // 端口=0
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 2.使用 ProxyFactory 创建 Invoker 对象
// 3.使用 Protocol 暴露 Invoker 对象,将 Invoker 转成 exporter
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
// 添加到 `exporters`
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}

这个 proxyFactory 是 ProxyFactory$Adaptive 对象,会根据传入的参数创建具体的 proxyFactory 对象.
proxyFactory.getInvoker(ref, (Class) interfaceClass, local)传入的参数分别是 服务具体实现类对象ref,服务接口DemoService,本地暴露的URL对象

ProxyFactory$Adaptive.getInvoker()方法

1
2
3
4
5
6
7
8
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {// 获取Invoker.参数 arg0 为 ref 对象, arg1 为 服务接口(如 DemoService), args 为 服务暴露的 URL
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");// 从url参数中获取 ProxyFactory 拓展使用的方式,默认是javassist
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); // 获取 ProxyFactory 接口 的拓展实现(默认使用的实现类为 JavassistProxyFactory)
return extension.getInvoker(arg0, arg1, arg2);// extension为StubProxyFactoryWrapper实例,包裹了JavassistProxyFactory对象
}

通过 ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist") 获取的是 JavassistProxyFactory 对象,同时 ProxyFactory 有个 Wrapper 实现类 StubProxyFactoryWrapper,会被 SPI AOP 包装到 JavassistProxyFactory 对象 外层,所以最后吐出的是 StubProxyFactoryWrapper 对象.

1
2
3
4
5
6
7
8
9
10
11
12
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 1.根据传入的 proxy 对象的类信息创建对它的包装对象 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {// 2.返回Invoker对象实例,这个invoker对象invoke()方法可以根据传入的invocation对象中包含的方法名,方法参数来调用proxy对象 返回调用结果
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

Invoker -> Exporter

ProtocolFilterWrapper.export()方法中

1
2
3
4
5
6
7
8
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker); // 向注册中心发布服务的时候并不会组装 filter 调用链
}
// 建立带有 Filter 过滤链的 Invoker,再暴露服务
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 创建带 Filter 链的 Invoker 对象
*
* @param invoker Invoker 对象
* @param key 获取 URL 参数名
* @param group 分组
* @param <T> 泛型
* @return Invoker 对象
*/
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获得匹配激活的过滤器数组
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// 倒序循环 Filter,创建带 Filter 链的 Invoker 对象
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {

@Override
public Class<T> getInterface() {
return invoker.getInterface();
}

@Override
public URL getUrl() {
return invoker.getUrl();
}

@Override
public boolean isAvailable() {
return invoker.isAvailable();
}

@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}

@Override
public void destroy() {
invoker.destroy();
}

@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}

ProtocolListenerWrapper.export()

1
2
3
4
5
6
7
8
9
10
11
12
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心协议
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 暴露服务,创建 Exporter 对象
Exporter<T> exporter = protocol.export(invoker);
// 获得 ExporterListener 数组
List<ExporterListener> listeners = Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY));
// 创建带 ExporterListener 的 Exporter 对象
return new ListenerExporterWrapper<T>(exporter, listeners);
}

InjvmProtocol.export()

1
2
3
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

使用本地暴露的原因

本地调用使用了 injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链.

服务远程暴露

服务远程暴露的总体流程:

1.将 ref 封装为 Invoker

2.将 Invoker 转换为 Exporter

3.启动 Netty 服务端

4.注册 provider 到 zk

5.zk 订阅与通

6.返回 Exporter 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
- 服务远程暴露
- 使用 ProxyFactory 将 ref 创建成 Invoker 对象(这部分逻辑和本地暴露类似)
- ProxyFactory$Adaptive.getInvoker()中
- extension.getInvoker(arg0, arg1, arg2);
- StubProxyFactoryWrapper.getInvoker()直接调用内部的JavassistProxyFactory.getInvoker()
- JavassistProxyFactory.getInvoker()方法中
- 1.根据传入的 proxy对象(如DemoServiceImpl)的类信息创建对它的包装对象Wrapper
- 2.根据proxy, type, url创建AbstractProxyInvoker对象,它的doInvoke()方法会调用wrapper类的invokeMethod()方法
- 首先将AbstractProxyInvoker实例包装成DelegateProviderMetaDataInvoker实例
- 使用 Protocol 暴露 Invoker 对象为 Exporter
- Protocol$Adaptive.export()中
- 1.此时从传入的invoker.url.protocol获取到的协议为"registry",获取到的Protocol拓展点实现类为包裹了DubboProtocol 对象的 ProtocolFilterWrapper
- 2.extension.export(invoker)
- ProtocolFilterWrapper.export()这个类在Invoker基础上添加filter链
- 1.因为传入的协议时"registry",向注册中心暴露服务
- ProtocolListenerWrapper.export()中
- 1.调用RegistryProtocol.export(),向注册中心注册
- RegistryProtocol.export()中
- 1.doLocalExport()本地启动服务
- RegistryProtocol.doLocalExport()中
- 1.尝试从bounds缓存中获取exporter
- 2.若1中获取不到则创建exporter并缓存
- 3.根据invoker创建InvokerDelegete委托类,提供 获取 invoker 的方法
- 4.调用Protocol$Adaptive.export()方法
- Protocol$Adaptive.export()这次传入的协议是"dubbo"
- ProtocolFilterWrapper.export()中
- 1.buildInvokerChain()构建filter链
- 2.protocol.export()调用DubboProtocol.export()
- DubboProtocol.export()中
- 1.通过invoker创建DubboExporter
- // 2.参见netty启动流程
- 5.将4中返回的exporter包装成ExporterChangeableWrapper
- 获取注册中心,如zk
- 向注册中心注册自己
- FailbackRegistry.register(url)

ProtocolFilterWrapper.export()中

1
2
3
4
5
6
7
8
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);// 向注册中心发布服务的时候并不会组装 filter调用链
}
// 建立带有 Filter 过滤链的 Invoker,再暴露服务
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 暴露服务
// export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

// 获得注册中心 URL
URL registryUrl = getRegistryUrl(originInvoker);

// 获得注册中心对象(启动zookeeper)
// registry provider
final Registry registry = getRegistry(originInvoker);

// 获得服务提供者 URL
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

//to judge to delay publish whether or not.判断是否延迟发布
boolean register = registedProviderUrl.getParameter("register", true);

// 向注册中心订阅服务消费者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

// 向注册中心注册服务提供者(自己)
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); // 标记向本地注册表的注册服务提供者,已经注册
}

// 使用 OverrideListener 对象,订阅配置规则
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
// 创建订阅配置规则的 URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
// 创建 OverrideListener 对象,并添加到 `overrideListeners` 中
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心,发起订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}

Netty

ExchangeHandlerAdapter 定义了与客户端连接成功/断开连接/接受到客户端消息/响应消息,以及创造Invocation的方法

1
2
3
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

HeaderExchangeHandler: 基于消息头(Header)的信息交换服务器实现类

DecodeHandler: 解码处理器,处理接收到的消息

Netty 启动流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- netty启动流程
- openServer(url)启动通信服务器(如Netty)
- DubboProtocol.openServer()中
- DubboProtocol.createServer()中
- 1.创建ExchangeHandlerAdapter实例,作为请求处理器
- 2.Exchangers.bind(url,requestHandler)
- Exchangers.bind()中
- 1.根据url获取Exchanger,返回的是HeaderExchanger对象
- HeaderExchanger.bind()
- HeaderExchanger.bind()方法中
- 1. return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))会对传入的ExchangeHandlerAdapter包装两层,然后使用Transporters.bind()创建NettyServer对象.
- Transporters.bind()中
- 1.返回Transporter的适配对象
- 2.调用Transporter$Adaptive.bind()
- netty4/NettyTransporter.bind()通过new NettyServer(url, listener)创建nettyServer对象
- NettyServer(url,handler)构造器中会通过ChannelHandlers会传入的DecodeHandler进行包装
- ChannelHandler.wrapInternal()方法中
- 1.获取Dispatcher的适配对象
- 2.调用Dispatcher.dispatch(handler, url)
- AllDispatcher.dispatch()中new AllChannelHandler(handler, url)
- AllChannelHandler调用父类构造方法WrappedChannelHandler
- WrappedChannelHandler()构造方法中
- 1.创建线程池
- 3.把2中的返回结果用HeartbeatHandler和MultiMessageHandler再包装
- NettyServer.doOpen()
- 1.创建
- 2.exchanger.bind()
- ExchangeServer.createServer() 创建ExchangeServer

Zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获得 Zookeeper 根节点
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 参数值
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper Client
zkClient = zookeeperTransporter.connect(url);
// 添加 StateListener 对象.该监听器,在重连时,调用恢复方法.
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}

AbstractRegistry:

  • 1.通用的注册,订阅,查询,通知等方法
  • 2.读取和持久化注册数据到文件,以 properties 格式存储

FailbackRegistry:
主要用来做失败重试操作(包括: 注册失败,反注册失败,订阅失败,反订阅失败,通知失败的重试);也提供了供 ZookeeperRegistry 使用的 zk 重连后的恢复工作的方法.

ZookeeperRegistry:
创建 zk 客户端,启动会话;并且调用 FailbackRegistry 实现 zk 重连后的恢复工作.

注册到 zk 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- 注册到zk流程
- RegistryProtocol.export()
- 1.获得注册中心 URL
- 就是把url改为zookeeper://协议开头
- 2.获得注册中心对象(启动zookeeper),通过ZookeeperRegistryFactory实例创建并返回ZookeeperRegistry对象
- AbstractRegistryFactory.getRegistry(url)中
- 1.加锁 从缓存中获得 Registry 对象
- 2.若获取不到,则createRegistry(url)创建 Registry 对象并缓存
- ZookeeperRegistryFactory.createRegistry()中会new ZookeeperRegistry(url, zookeeperTransporter)创建ZookeeperRegistry实例
- 3.获得服务提供者 URL
- 4.向注册中心注册服务提供者
- RegistryProtocol.register(registedProviderUrl)
- 1.从registryFactory中获取registry
- 2.registry.register()
- ZookeeperRegistry.doRegister()
- 1.通过url得到zk路径,创建临时节点
- 5.向注册中心发起订阅
当前的provider订阅了/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,当其下的子节点发生变化(url)时,通知到provider,使得providerUrl发生变化,则提供者需要重新暴露
- RegistryProtocol.export()
- FailbackRegistry.subscribe()
- ZookeeperRegistry.doSubscribe()
- 创建持久节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
- 6.返回Exporter