zk实现分布式锁

zk实现分布式锁的两种方式

如果要自己通过 zookeeper 的原生 api 实现一个分布式独占锁的话,有两种方式:

方式1: 创建临时节点,创建失败的client注册watcher

所有需要获取锁的 client 都尝试到 zk 上创建同一个临时节点.创建成功则表示成功获取锁;若创建失败,则对这个节点注册 watcher.

若获得锁的 client 释放锁(自己 delete 掉节点) 或 宕机(zk 会自动移除掉该临时节点),

其他 client 会收到 watcher 通知,再尝试去抢锁.

这种方式的问题: 很明显竞争很大.在节点失效瞬间,如果争锁的 client 较多,会有大量 client 接收 watcher 通知.

方式2: 基于临时顺序节点

创建锁
1.每个 client 都在 zk 同一个父节点上创建一个 临时顺序节点
2.然后每个 client 获取到父节点下的所有节点并排序,判断自己是否是顺序最小的那个节点
2.1.如果是,加锁成功
2.2.如果不是,加锁失败.注册 watcher,只用监听自己之前的那个节点即可.

释放锁
1.client1 删除自己的临时顺序节点
2.后面一个 client2 通过 watcher 感知到节点已经删除,自己是当前最小的那个节点,则获取锁.
这种方式,明显 watcher 压力会小很多.不会出现大量竞争.

curator 实现的分布式锁

很明显之前的思路只是简单的实现.有很多其他的东西没有考虑,比如: 可重入.
看一下 curator 实现的时候,对 zookeeper 分布式锁做了哪些处理.

主要看 InterProcessMutex: 分布式可重入排它锁 的实现.

首先 curator 就是使用上面的方式2: 基于临时顺序节点 实现的分布式锁.

curator创建分布式锁的代码如下:

1
2
3
InterProcessMutex lock = new InterProcessMutex(client, lock_path);
lock.acquire(); // 获取锁
lock.release(); // 释放锁

获取锁流程

简单来说就是获取锁时,先处理是否需要重入.然后创建临时顺序节点.然后获取父节点的所有子节点并排序.若当前节点排在最前面获取锁,否则对前一个节点注册watcher并wait()等待被唤醒,如果传入超时时间,就wait(millisToWait).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 获取锁
- lock.acquire()一直阻塞直到获取到锁,支持重入.
- InterProcessMutex.internalLock(-1, null)
- 1.获取当前线程对象
- 2.ConcurrentMap<Thread, LockData>类型的threadData对象,key是线程对象.value是LockData类型的对象.lockData.lockCount属性用于保存当前线程的重入计数.如果是当前线程重入,则计数加1
- 3.尝试加锁internals.attemptLock(time, unit, getLockNodeBytes())
- LockInternals.attemptLock()中
- 1.循环
- 1.1.driver.createsTheLock()用这个/curator_recipes_lock_path/lock-前缀创建临时顺序节点
- 1.2.hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
- internalLockLoop()中
- 1.只要没获取到锁,循环
- 1.1.getSortedChildren()比如_c_62f35524-d650-4efc-a121-ea6611b47285-lock-0000000066按照lock-后面的数字排序
- 1.2.predicateResults = driver.getsTheLock()返回是否能拿到锁还是注册watcher
- driver.getsTheLock()逻辑
- 1.获取当前临时节点在childern里的index并校验
- 2.如果是第0个节点getsTheLock设为true表示能获取锁.否则获取当前临时节点的前一个节点设置到pathToWatch里,表示要注册watcher.
- 3.封装成PredicateResults结果返回
- 1.3.若1.2中返回能拿到锁,haveTheLock置为true标识拿到锁,跳出循环.否则加锁,对前一个节点注册watcher并wait()等待被唤醒.

释放锁流程

1
2
3
4
5
6
- 释放锁
- lock.release();
- 1.从threadData中获取当前线程的lockData.lockCount重入计数并减1
- 2.当1中重入计数减少到0则internals.releaseLock(lockData.lockPath)释放锁
- 删除节点deleteOurPath(lockPath)
- 3.从threadData中,删除当前线程的缓存threadData.remove(currentThread)

watcher唤醒

watcher在 获取锁流程 时候注册上去的.在锁释放的时候,会触发watcher逻辑,唤醒wait的线程.

1
2
3
4
5
6
7
8
9
10
11
12
13
// 注册watcher
client.getData().usingWatcher(watcher).forPath(previousSequencePath);

// watcher实例如下
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
// 此处会唤醒wait()的线程
notifyFromWatcher();
}
};