Dubbo Cluster
cluster
Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个.
集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其屏蔽服务提供者集群的情况,使其能够专心进行远程调用.
cluster 继承结构
1 |
|
这些 Cluster 实现类里均实现了 join() 方法,用于创建其对应的 ClusterInvoker.
Invoker接口
1 |
|
这些 Invoker实现类的父类 AbstractClusterInvoker.invoke() 中
1 |
|
FailoverClusterInvoker
FailoverClusterInvoker 的 doInvoke() 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试.
在 for 循环内,首先是通过 loadBalance 选择一个 Invoker,然后再通过这个 Invoker.invoke() 方法进行远程调用.如果失败了,记录下异常,并进行重试.重试时会再次调用父类的 list() 方法列举 Invoker.
FailbackClusterInvoker
FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者.
并通过定时任务对失败的调用进行重传,适合执行消息通知等操作.
doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.调用select()通过loadbalance选择一个invoker,并发起rpc调用.
3.若调用失败,则添加到 failed 这个map中,并延迟初始化定时任务每隔5秒遍历一次 failed 里的 invoker,发起调用,若调用成功则从中移除.
FailfastClusterInvoker
FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常.适用于幂等操作,比如新增记录.
doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.调用select()通过loadbalance选择一个invoker,并发起rpc调用.
3.若调用失败,则先判断异常类型 若是 RpcException 或 业务异常,直接抛出.否则包装一层 RpcException 并抛出.
FailsafeClusterInvoker
FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker.
所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常.适用于写入审计日志等操作.
doInvoke() 逻辑:
没啥好说的,调用抛异常,捕获 Throwable 异常,打印异常日志,创建 RpcResult 对象返回.
ForkingClusterInvoker
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者.
只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行.
ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源.
doInvoke() 逻辑:
1.先检查 可用的 invoker 集合是否为空,若为空,则抛出异常.
2.获取并发数 forks,调用 forks 次 select()方法获得 invokers 列表
3.遍历 invokers, 并发调用每个 invoker.invoke(),返回结果添加到 LinkedBlockingQueue 中
4.main线程阻塞在 ref.poll(timeout, TimeUnit.MILLISECONDS) 上等待第一个返回结果
BroadcastClusterInvoker
BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常.
该类通常用于通知所有提供者更新缓存或日志等本地资源信息.
doInvoke() 逻辑:
没啥好说的,循环调用 invokers,调用失败记录 exception.最后有异常就抛出.
directory
主要看 RegistryDirectory 的实现.
RegistryDirectory
RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息.
收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表.
RegistryDirectory 中有几个比较重要的逻辑:
1.Invoker.doList() 逻辑
2.接收服务配置变更的逻辑
3.Invoker 列表的刷新逻辑
doList()逻辑:
没啥好说的,就是从 methodInvokerMap(即 methodInvokerMap) 缓存中获取 methodName 对应的 invokers
接收服务变更通知
RegistryDirectory 是一个动态服务目录,会随注册中心配置的变化进行动态调整.因此 RegistryDirectory 实现了 NotifyListener 接口,通过这个接口获取注册中心变更通知.
1.ZookeeperRegistry.doSubscribe() 订阅时会创建一个 ChildListener,当发生 childChanged 事件时,会回调它的 childChanged() 方法
2.调用ZookeeperRegistry.this.notify(),由于 RegistryDirectory 实现了 NotifyListener 接口,会调用 RegistryDirectory.notify()
3.RegistryDirectory.notify() 调用 refreshInvoker(invokerUrls);
将 url 转成 Invoker,得到 <url, Invoker>
的映射关系.然后进一步进行转换,得到 <methodName, InvokerList>
映射关系.之后进行多组 Invoker 合并操作,并将合并结果赋值给 methodInvokerMap.methodInvokerMap 变量在 doList 方法中会被用到,doList 会对该变量进行读操作,在这里是写操作.当新的 Invoker 列表生成后,还要一个重要的工作要做,就是销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务.
router
服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者
.
比如条件路由 ConditionRouter
rule格式 [服务消费者匹配条件] => [服务提供者匹配条件],比如 rule = “host = 2.2.2.2,1.1.1.1,3.3.3.3 => host = 1.2.3.4”
ConditionRouter 构造方法里 先是对路由规则做预处理.处理结果存入 MatchPair.matches 和 MatchPair.mismatches,分别对应 匹配 和 不匹配的条件.最后 whenCondition 保存 消费者匹配条件,thenCondition 保存 提供者地址列表的过滤条件.
ConditionRouter.router(): 服务路由的逻辑.
1.调用 matchWhen 对服务消费者进行匹配,如果匹配失败,直接返回 Invoker 列表
2.如果匹配成功,再对服务提供者进行匹配
ConditionRouterTest
loadBalance
官网loadBalance部分
loadBalance 类的继承结构
1 |
|
RandomLoadBalance
按权重设置随机概率.如果所有invoker权重都相等,则随机取一个.不相等,则按照权重的比例随机取.
LeastActiveLoadBalance
最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。
初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。
ConsistentHashLoadBalance
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。