Redis实现分布式锁

要考虑的点

  1. 互斥(独占,只能有一个客户端获取锁)
  2. 不能死锁
  3. 容错(只要大部分 redis 节点创建了这把锁就认为成功获取到锁)
  4. 支持重入,超时获取锁等特性

redis实现分布式锁

1.使用set

1.1.加锁

1
SET key randomValue NX PX 30000

NX: (not exists)表示只有 key 不存在的时候才会设置成功.(如果此时 redis 中存在这个 key,那么设置失败,返回 nil)
PX 30000: 30s 后 key 失效,意味着 30s 后锁自动释放.

随机数 randomValue 可以使用 时间戳 + 客户端编号 的方式 生成随机数.

1.2.释放锁

1
2
3
4
5
6
-- 删除锁的时候,找到 key 对应的 value,跟自己传过去的 value 做比较,如果是一样的才删除.保证解的是自己加的锁
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end

如果 key 对应的 value 一致,则删除这个 key.

通过这个方式释放锁是为了避免 Client 释放了其他 Client 申请的锁.

问题

这种加的是单机的 redis 锁,会有单点故障问题.

1.单点故障
如果是 redis 单实例,那就是单点故障.

如果是 redis 主从架构,即使有 redis 主从异步复制.如果 master 节点挂了(key 就没有了),key 还没同步到 slave 节点,此时 slave 节点 切换为 master 节点,别人就可以 set key,从而拿到锁.

2.超时释放锁的问题
1.client A 获得锁成功
2.client B 等待 A 释放锁
3.client A 超时没释放锁(比如 full gc)
4.redis 删除掉 key,释放锁
5.client B 获得锁成功
6.client B 做修改操作
7.client A 恢复做修改操作,覆盖了 client B 的修改

在上面基础上 加 乐观锁

在lua脚本中,获取锁成功的同时,使用redis.call("INCRBY", key, "1")返回一个单调递增的version.
这样假设 A 超时锁被 redis server 释放掉,此时 B 获得锁更新数据成功.A试图更新数据时判断A的版本号低于现在数据的版本号,则不允许A更改.

2.RedLock 实现

RedLock 原理

假设有 N 个 Redis master 节点,不考虑主从复制.
像在 Redis 单实例上一样获取 和 释放锁.
假设有 5 台 Redis 实例,这样同时宕机的概率很低.

获取锁流程:
1.获取当前时间

2.依次尝试从 5 个实例上,使用相同的 key 和 唯一的value 获取锁.
client 设置一个 网络连接 和 超时响应时间,远小于锁的失效时间.(这能避免 Redis 实例挂了,client 一直等待获得锁的响应).
若 server 没有在规定时间内响应,client 应立即尝试去另外一个 Redis 实例上 请求获取锁.

3.client 使用 当前时间 减去 开始获取锁的时间(1中的时间),得到的就是 获取锁使用的时间.当 过半 Redis 节点都获取到锁,且 使用的时间 小于 锁失效时间,锁才算获取成功.

4.若成功获取锁,key的失效时间 等于 有效时间 减去 获取锁使用的时间(3中得到的时间).

5.若获得锁失败(比如 没能在过半Redis实例上获得到锁 或 获得锁的时间已经超过了 有效时间),client需要在所有Redis实例上解锁(不管是否加锁了都解锁).

Redisson 对 RedLock 的实现

  1. 使用api

    1
    2
    3
    4
    5
    RLock redLock = redissonClient.getLock("REDLOCK_KEY");
    // 加锁.500ms获取不到锁则认为获取锁失败.锁的失效时间是 10s
    boolean isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    // 解锁
    redLock.unlock();
  2. 加锁时的lua逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// RedissonLock.tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 1.若 加锁的key(hash)不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 通过 hset key uuid+threadId 1 来加锁
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置锁(hash)失效时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 2.若 加锁的key(hash)存在 且 uuid+threadId 也存在
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 重入计数加1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 重新设置(hash)失效时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 3.返回锁(hash)的失效时间
"return redis.call('pttl', KEYS[1]);",
// KEYS[1]锁key, ARGV[1]锁失效时间, ARGV[2] uuid+threadId
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

1.加锁的hash不存在.创建hash,通过给 hash 的 key(uuid+threadId) 设置 value 为1.返回 nil表示加锁成功.
2.加锁的hash存在 且 key 也存在,表示当前线程重入的情况,重入计数加1.
3.走到这表示获取锁失败,返回锁的失效时间.

  1. 解锁时的lua逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 若锁key不存在,说明锁已释放.直接执行 publish 命令 发布释放锁消息 并返回 1
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 向channel(即redisson_lock__channel:{lockName})发送一条消息
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 下面都是锁key存在的情况
// 锁key存在但是 field(id+threadId) 不匹配,说明不是当前线程加的锁,不管,返回 nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 若锁key存在 且 field(id+threadId) 匹配,说明就是当前线程加的锁,重入计数减 1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入计数减1后count>0,说明没完全释放掉锁.重新设置失效时间,返回0
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 重入计数减为0的情况,完全释放锁,删除key解锁,publish解锁消息,返回1
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// KEYS[1]锁key, KEYS[2] channelName, ARGV[1]解锁消息 值为0, ARGV[2]锁失效时间, ARGV[3] id+threadId
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

1.若锁key不存在,说明锁已释放.直接执行 publish 命令 发布释放锁消息 并返回 1
2.锁key存在但是 field(id+threadId) 不匹配,说明不是当前线程加的锁,不管,返回 nil
3.若锁key存在 且 field(id+threadId) 匹配,说明就是当前线程加的锁,重入计数减 1
3.1.重入计数减1后count>0,说明没完全释放掉锁.重新设置失效时间,返回0
3.2.重入计数减为0的情况,完全释放锁,删除key解锁,publish解锁消息,返回1

@see

Redisson项目官方介绍wiki

Redis实现分布式锁官网

Redisson实现分布式锁官方wiki