cluster

Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个.

集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其屏蔽服务提供者集群的情况,使其能够专心进行远程调用.

cluster 继承结构

1
2
3
4
5
6
7
8
9
10
Cluster(com.alibaba.dubbo.rpc.cluster)
MergeableCluster(com.alibaba.dubbo.rpc.cluster.support)
FailfastCluster(com.alibaba.dubbo.rpc.cluster.support)
MockClusterWrapper(com.alibaba.dubbo.rpc.cluster.support.wrapper)
FailsafeCluster(com.alibaba.dubbo.rpc.cluster.support)
ForkingCluster(com.alibaba.dubbo.rpc.cluster.support)
AvailableCluster(com.alibaba.dubbo.rpc.cluster.support)
FailbackCluster(com.alibaba.dubbo.rpc.cluster.support)
FailoverCluster(com.alibaba.dubbo.rpc.cluster.support)
BroadcastCluster(com.alibaba.dubbo.rpc.cluster.support)

这些 Cluster 实现类里均实现了 join() 方法,用于创建其对应的 ClusterInvoker.

Invoker接口

1
2
3
4
5
6
7
8
9
10
Invoker(com.alibaba.dubbo.rpc)
ProviderInvokerWrapper(com.alibaba.dubbo.registry.support)
MockClusterInvoker(com.alibaba.dubbo.rpc.cluster.support.wrapper)
AbstractProxyInvoker(com.alibaba.dubbo.rpc.proxy)
ListenerInvokerWrapper(com.alibaba.dubbo.rpc.listener)
DelegateInvoker(com.alibaba.dubbo.rpc.support)
InvokerWrapper(com.alibaba.dubbo.rpc.protocol)
AbstractClusterInvoker(com.alibaba.dubbo.rpc.cluster.support)
MergeableClusterInvoker(com.alibaba.dubbo.rpc.cluster.support)
AbstractInvoker(com.alibaba.dubbo.rpc.protocol)

这些 Invoker实现类的父类 AbstractClusterInvoker.invoke() 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 @Override
public Result invoke(final Invocation invocation) throws RpcException {
// 校验是否销毁
checkWhetherDestroyed();

// 通过 directory 获得所有服务提供者 Invoker 集合
List<Invoker<T>> invokers = list(invocation);

// 获得 LoadBalance 对象
LoadBalance loadbalance;
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}

// 设置调用编号,若是异步调用
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

// 执行调用
return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker

FailoverClusterInvoker 的 doInvoke() 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试.

在 for 循环内,首先是通过 loadBalance 选择一个 Invoker,然后再通过这个 Invoker.invoke() 方法进行远程调用.如果失败了,记录下异常,并进行重试.重试时会再次调用父类的 list() 方法列举 Invoker.

FailbackClusterInvoker

FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者.

并通过定时任务对失败的调用进行重传,适合执行消息通知等操作.

doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.调用select()通过loadbalance选择一个invoker,并发起rpc调用.
3.若调用失败,则添加到 failed 这个map中,并延迟初始化定时任务每隔5秒遍历一次 failed 里的 invoker,发起调用,若调用成功则从中移除.

FailfastClusterInvoker

FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常.适用于幂等操作,比如新增记录.

doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.调用select()通过loadbalance选择一个invoker,并发起rpc调用.
3.若调用失败,则先判断异常类型 若是 RpcException 或 业务异常,直接抛出.否则包装一层 RpcException 并抛出.

FailsafeClusterInvoker

FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker.

所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常.适用于写入审计日志等操作.

doInvoke() 逻辑:
没啥好说的,调用抛异常,捕获 Throwable 异常,打印异常日志,创建 RpcResult 对象返回.

ForkingClusterInvoker

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者.

只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行.

ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源.

doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.获取并发数 forks,调用 forks 次 select()方法获得 invokers 列表
3.遍历 invokers, 并发调用每个 invoker.invoke(),返回结果添加到 LinkedBlockingQueue 中
4.main线程阻塞在 ref.poll(timeout, TimeUnit.MILLISECONDS) 上等待第一个返回结果

BroadcastClusterInvoker

BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常.

该类通常用于通知所有提供者更新缓存或日志等本地资源信息.
doInvoke() 逻辑:
没啥好说的,循环调用 invokers,调用失败记录 exception.最后有异常就抛出.

官网cluster部分

directory

主要看 RegistryDirectory 的实现.

RegistryDirectory

RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息.

收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表.

RegistryDirectory 中有几个比较重要的逻辑:
1.Invoker.doList() 逻辑
2.接收服务配置变更的逻辑
3.Invoker 列表的刷新逻辑

doList()逻辑:
没啥好说的,就是从 methodInvokerMap(即 methodInvokerMap) 缓存中获取 methodName 对应的 invokers

接收服务变更通知
RegistryDirectory 是一个动态服务目录,会随注册中心配置的变化进行动态调整.因此 RegistryDirectory 实现了 NotifyListener 接口,通过这个接口获取注册中心变更通知.

1.ZookeeperRegistry.doSubscribe() 订阅时会创建一个 ChildListener,当发生 childChanged 事件时,会回调它的 childChanged() 方法
2.调用ZookeeperRegistry.this.notify(),由于 RegistryDirectory 实现了 NotifyListener 接口,会调用 RegistryDirectory.notify()
3.RegistryDirectory.notify() 调用 refreshInvoker(invokerUrls);
将 url 转成 Invoker,得到 <url, Invoker> 的映射关系.然后进一步进行转换,得到 <methodName, InvokerList> 映射关系.之后进行多组 Invoker 合并操作,并将合并结果赋值给 methodInvokerMap.methodInvokerMap 变量在 doList 方法中会被用到,doList 会对该变量进行读操作,在这里是写操作.当新的 Invoker 列表生成后,还要一个重要的工作要做,就是销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务.

官网directory部分

router

服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者.
比如条件路由 ConditionRouter

rule格式 [服务消费者匹配条件] => [服务提供者匹配条件],比如 rule = “host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4”

ConditionRouter 构造方法里 先是对路由规则做预处理.处理结果存入 MatchPair.matches 和 MatchPair.mismatches,分别对应 匹配 和 不匹配的条件.最后 whenCondition 保存 消费者匹配条件,thenCondition 保存 提供者地址列表的过滤条件.

ConditionRouter.router(): 服务路由的逻辑.
1.调用 matchWhen 对服务消费者进行匹配,如果匹配失败,直接返回 Invoker 列表
2.如果匹配成功,再对服务提供者进行匹配

ConditionRouterTest

官网router部分

loadBalance

官网loadBalance部分
loadBalance 类的继承结构

1
2
3
4
5
AbstractLoadBalance (com.alibaba.dubbo.rpc.cluster.loadbalance)
RoundRobinLoadBalance (com.alibaba.dubbo.rpc.cluster.loadbalance)
LeastActiveLoadBalance (com.alibaba.dubbo.rpc.cluster.loadbalance)
RandomLoadBalance (com.alibaba.dubbo.rpc.cluster.loadbalance)
ConsistentHashLoadBalance (com.alibaba.dubbo.rpc.cluster.loadbalance)

RandomLoadBalance

按权重设置随机概率.如果所有invoker权重都相等,则随机取一个.不相等,则按照权重的比例随机取.

LeastActiveLoadBalance

最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。

初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。

ConsistentHashLoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

RoundRobinLoadBalance