【分布式锁】Redisson分布式锁的使用(推荐使用)
文章目录
- 前言
- 一、常见分布式锁方案对比
- 二、分布式锁需满足四个条件
- 三、什么是Redisson?
- 官网和官方文档
- Redisson使用
- 四、Redisson 分布式重入锁用法
- Redisson 支持单点模式、主从模式、哨兵模式、集群模式
- 自己先思考下,如果要手写一个分布式锁组件,怎么做?
- 五、加锁&解锁Lua脚本
- 1、加锁Lua脚本
- 脚本入参
- 脚本内容
- 脚本解读
- 2、解锁Lua脚本
- 脚本入参
- 脚本内容
- 脚本解读
- 六、源码解读
- 加锁流程源码
- 加锁过程小结
- 解锁流程源码
- 加锁&解锁流程串起来
- 概括下整个流程
- 七、其它相关
- 八、redlock算法
- 用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)
- Redisson 实现redlock算法源码分析(RedLock)
- 加锁核心代码
- 参考文献
前言
在某些场景中,多个进程必须以互斥的方式独占共享资源,这时用分布式锁是最直接有效的。
随着技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。
以往的工作中看到或用到几种实现方案,有基于zk的,也有基于redis的。由于实现上逻辑不严谨,线上时不时会爆出几个死锁case。那么,究竟什么样的分布式锁实现,才算是比较好的方案?
一、常见分布式锁方案对比
分类 | 方案 | 实现原理 | 优点 | 缺点 |
基于数据库 | 基于mysql 表唯一索引 | 1.表增加唯一索引 2.加锁:执行insert语句,若报错,则表明加锁失败 3.解锁:执行delete语句 | 完全利用DB现有能力,实现简单 | 1.锁无超时自动失效机制,有死锁风险 2.不支持锁重入,不支持阻塞等待 3.操作数据库开销大,性能不高 |
基于MongoDB findAndModify原子操作 | 1.加锁:执行findAndModify原子命令查找document,若不存在则新增 2.解锁:删除document | 实现也很容易,较基于MySQL唯一索引的方案,性能要好很多 | 1.大部分公司数据库用MySQL,可能缺乏相应的MongoDB运维、开发人员 2.锁无超时自动失效机制 | |
基于分布式协调系统 | 基于ZooKeeper | 1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点 2.解锁:删除节点 | 1.由zk保障系统高可用 2.Curator框架已原生支持系列分布式锁命令,使用简单 | 需单独维护一套zk集群,维保成本高 |
基于缓存 | 基于redis命令 | 1. 加锁:执行setnx,若成功再执行expire添加过期时间 2. 解锁:执行delete命令 | 实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好 | 1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入 |
基于redis Lua脚本能力 | 1. 加锁:执行SET lock_name random_value EX seconds NX 命令 2. 解锁:执行Lua脚本,释放锁时验证random_value -- ARGV[1]为random_value, KEYS[1]为lock_name if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end | 同上;实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大。 | 不支持锁重入,不支持阻塞等待 |
表格中对比了几种常见的方案,redis+lua基本可应付工作中分布式锁的需求。然而,当偶然看到redisson分布式锁实现方案(传送门),相比以上方案,redisson保持了简单易用、支持锁重入、支持阻塞等待、Lua脚本原子操作,不禁佩服作者精巧的构思和高超的编码能力。下面就来学习下redisson这个牛逼框架,是怎么实现的。
二、分布式锁需满足四个条件
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
- 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。
三、什么是Redisson?
官网和官方文档
官网:https://redisson.org/
官方文档: https://github.com/redisson/redisson/wiki
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson使用
- 加入jar包的依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>2.7.0</version>
</dependency>
- 配置Redisson
public class RedissonManager {private static Config config = new Config();//声明redisso对象private static Redisson redisson = null;//实例化redissonstatic{config.useClusterServers()// 集群状态扫描间隔时间,单位是毫秒.setScanInterval(2000)//cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用).addNodeAddress("redis://127.0.0.1:6379" ).addNodeAddress("redis://127.0.0.1:6380").addNodeAddress("redis://127.0.0.1:6381").addNodeAddress("redis://127.0.0.1:6382").addNodeAddress("redis://127.0.0.1:6383").addNodeAddress("redis://127.0.0.1:6384");//得到redisson对象redisson = (Redisson) Redisson.create(config);}//获取redisson对象的方法public static Redisson getRedisson(){return redisson;}
}
- 锁的获取和释放
public class DistributedRedisLock {//从配置类中获取redisson对象private static Redisson redisson = RedissonManager.getRedisson();private static final String LOCK_TITLE = "redisLock_";//加锁public static boolean acquire(String lockName){//声明key对象String key = LOCK_TITLE + lockName;//获取锁对象RLock mylock = redisson.getLock(key);//加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadIdmylock.lock(2,3,TimeUtil.SECOND);//加锁成功return true;}//锁的释放public static void release(String lockName){//必须是和加锁时的同一个keyString key = LOCK_TITLE + lockName;//获取所对象RLock mylock = redisson.getLock(key);//释放锁(解锁)mylock.unlock();
- 业务逻辑中使用分布式锁
public String discount() throws IOException{String key = "lock001";//加锁DistributedRedisLock.acquire(key);//执行具体业务逻辑dosoming//释放锁DistributedRedisLock.release(key);//返回结果return soming;}
四、Redisson 分布式重入锁用法
Redisson 支持单点模式、主从模式、哨兵模式、集群模式
这里以单点模式为例:
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
}finally{//无论如何, 最后都要解锁rLock.unlock();
}
redisson这个框架重度依赖了Lua脚本和Netty,代码很牛逼,各种Future及FutureListener的异步、同步操作转换。
自己先思考下,如果要手写一个分布式锁组件,怎么做?
- 肯定要定义2个接口:加锁、解锁;大道至简,redisson的作者就是在加锁和解锁的执行层面采用Lua脚本,逼格高,而且重要有原子性保证啊。
- 当然,redisson的作者毕竟牛逼,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能,后面会讲到。下面先对加锁和解锁Lua脚本了解下。
五、加锁&解锁Lua脚本
加锁、解锁Lua脚本是redisson分布式锁实现最重要的组成部分。首先不看代码,先研究下Lua脚本都是什么逻辑
1、加锁Lua脚本
脚本入参
参数 | 示例值 | 含义 |
---|---|---|
KEY个数 | 1 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
脚本内容
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
if (redis.call('exists', KEYS[1]) == 0) thenredis.call('hset', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[2], 1);redis.call('pexpire', KEYS[1], ARGV[1]);return nil;
end;-- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
return redis.call('pttl', KEYS[1]);
脚本解读
Q:返回nil、返回剩余过期时间有什么目的?
A:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果
2、解锁Lua脚本
脚本入参
参数 | 示例值 | 含义 |
---|---|---|
KEY个数 | 2 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
KEYS[2] | redisson_lock__channel:{my_first_lock_name} | 解锁消息PubSub频道 |
ARGV[1] | 0 | redisson定义0表示解锁消息 |
ARGV[2] | 30000 | 设置锁的过期时间;默认值30秒 |
ARGV[3] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识;同加锁流程 |
脚本内容
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) thenredis.call('publish', KEYS[2], ARGV[1]);return 1;
end;-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) thenreturn nil;
end; -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;
end;return nil;
脚本解读
Q1:广播解锁消息有什么用?
A:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。
Q2:返回值0、1、nil有什么不一样?
A:当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本;但客户端又并不关心解锁请求的返回值,好像没什么用?
六、源码解读
加锁流程源码
读加锁源码时,可以把tryAcquire(leaseTime, unit, threadId)
方法直接视为执行加锁Lua脚本。直接进入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
源码
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 获取锁能容忍的最大等待时长long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();final long threadId = Thread.currentThread().getId();// 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}// 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间time -= (System.currentTimeMillis() - current);if (time <= 0) {acquireFailed(threadId);return false;}current = System.currentTimeMillis();// 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage/*** 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:* 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争* 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败* 当 this.await返回true,进入循环尝试获取锁*/final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);//await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}// 订阅成功try {// 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间time -= (System.currentTimeMillis() - current);if (time <= 0) {// 超出可容忍的等待时长,直接返回获取锁失败acquireFailed(threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();// 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();// 【核心点3】根据锁TTL,调整阻塞等待时长;// 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;//2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。if (ttl >= 0 && ttl < time) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= (System.currentTimeMillis() - currentTime);if (time <= 0) {acquireFailed(threadId);return false;}}} finally {// 取消解锁消息的订阅unsubscribe(subscribeFuture, threadId);}}
接下的再获取锁方法 tryAcquire的实现,真的就是执行Lua脚本!
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {// tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果return get(tryAcquireAsync(leaseTime, unit, threadId));
}// 见org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {// 实质是异步执行加锁Lua脚本return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {//先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquired//如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewalif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}// 见org.redisson.RedissonLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
加锁过程小结
-
锁其实也是一种资源,各线程争抢锁操作对应到redisson中就是争抢着去创建一个hash结构,谁先创建就代表谁获得锁;hash的名称为锁名,hash里面内容仅包含一条键值对,键为redisson客户端唯一标识+持有锁线程id,值为锁重入计数;给hash设置的过期时间就是锁的过期时间。放个图直观感受下:
-
加锁流程核心就3步
Step1:尝试获取锁,这一步是通过执行加锁Lua脚本来做;
Step2:若第一步未获取到锁,则去订阅解锁消息,当获取锁到剩余过期时间后,调用信号量方法阻塞住,直到被唤醒或等待超时
Step3:一旦持有锁的线程释放了锁,就会广播解锁消息。于是,第二步中的解锁消息的监听器会释放信号量,获取锁被阻塞的那些线程就会被唤醒,并重新尝试获取锁。
比如 RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默认值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制,那么看门狗到底做了什么,为什么这么做,来看下核心代码.
先思考一个问题,假设在一个分布式环境下,多个服务实例请求获取锁,其中服务实例1成功获取到了锁,在执行业务逻辑的过程中,服务实例突然挂掉了或者hang住了,那么这个锁会不会释放,什么时候释放?回答这个问题,自然想起来之前我们分析的lua脚本,其中第一次加锁的时候使用pexpire给锁key设置了过期时间,默认30000毫秒,由此来看如果服务实例宕机了,锁最终也会释放,其他服务实例也是可以继续获取到锁执行业务。但是要是30000毫秒之后呢,要是服务实例1没有宕机但是业务执行还没有结束,所释放掉了就会导致线程问题,这个redisson是怎么解决的呢?这个就一定要实现自动延长锁有效期的机制。
异步执行完lua脚本执行完成之后,设置了一个监听器,来处理异步执行结束之后的一些工作。在操作完成之后会去执行operationComplete方法,先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果,如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal,回想一下之前的lua脚本,当加锁逻辑
处理结束,返回了一个nil;如此说来 就一定会走定时任务了。来看下定时调度scheduleExpirationRenewal代码
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,如果已经存在了,就直接返回;第一次加锁,肯定是不存在的,接下来就是搞了一个TimeTask,延迟internalLockLeaseTime/3之后执行,这里就用到了文章一开始就提到奇妙的变量,算下来就是大约10秒钟执行一次,调用了一个异步执行的方法
如图也是调用异步执行了一段lua脚本,首先判断这个锁key的map结构中是否存在对应的key8a9649f5-f5b5-48b4-beaa-d0c24855f9ab:anyLock:1,如果存在,就直接调用pexpire命令设置锁key的过期时间,默认30000毫秒。
OK,现在思路就清晰了,在上面任务调度的方法中,也是异步执行并且设置了一个监听器,在操作执行成功之后,会回调这个方法,如果调用失败会打一个错误日志并返回,更新锁过期时间失败;然后获取异步执行的结果,如果为true,就会调用本身,如此说来又会延迟10秒钟去执行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去执行一次,并且,在锁key还没有失效的情况下,会把锁的过期时间继续延长到30000毫秒,也就是说只要这台服务实例没有挂掉,并且没有主动释放锁,看门狗都会每隔十秒给你续约一下,保证锁一直在你手中。完美的操作。
到现在来说,加锁,锁自动延长过期时间,都OK了,然后就是说在你执行业务,持有锁的这段时间,别的服务实例来尝试加锁又会发生什么情况呢?或者当前客户端的别的线程来获取锁呢?很显然,肯定会阻塞住,我们来通过代码看看是怎么做到的。还是把眼光放到之前分析的那段加锁lua代码上,当加锁的锁key存在的时候并且锁key对应的map结构中当前客户端的唯一key也存在时,会去调用hincrby命令,将唯一key的值自增一,并且会pexpire设置key的过期时间为30000毫秒,然后返回nil,可以想象这里也是加锁成功的,也会继续去执行定时调度任务,完成锁key过期时间的续约,这里呢,就实现了锁的可重入性。
那么当以上这种情况也没有发生呢,这里就会直接返回当前锁的剩余有效期,相应的也不会去执行续约逻辑。此时一直返回到上面的方法,如果加锁成功就直接返回;否则就会进入一个死循环,去尝试加锁,并且也会在等待一段时间之后一直循环尝试加锁,阻塞住,知道第一个服务实例释放锁。对于不同的服务实例尝试会获取一把锁,也和上面的逻辑类似,都是这样实现了锁的互斥。
紧接着,我们来看看锁释放的逻辑,其实也很简单,调用了lock.unlock()方法,跟着代码走流程发现,也是异步调用了一段lua脚本,lua脚本,应该就比较清晰,也就是通过判断锁key是否存在,如果不存在直接返回;否则就会判断当前客户端对应的唯一key的值是否存在,如果不存在就会返回nil;否则,值自增-1,判断唯一key的值是否大于零,如果大于零,则返回0;否则删除当前锁key,并返回1;返回到上一层方法,也是针对返回值进行了操作,如果返回值是1,则会去取消之前的定时续约任务,如果失败了,则会做一些类似设置状态的操作,这一些和解锁逻辑也没有什么关系,可以不去看他。
解锁流程源码
解锁流程相对比较简单,完全就是执行解锁Lua脚本,无额外的代码逻辑,直接看org.redisson.RedissonLock#unlock代码
@Overridepublic void unlock() {// 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}if (opStatus) {cancelExpirationRenewal();}}// 见org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
加锁&解锁流程串起来
上面结合Lua脚本和源码,分别分析了加锁流程和解锁流程。下面升级下挑战难度,模拟下多个线程争抢锁会是怎样的流程。示意图如下,比较关键的三处已用红色字体标注。
概括下整个流程
-
线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。
-
线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;
-
解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁
七、其它相关
Q1:订阅频道名称(如:redisson_lock__channel:{my_first_lock_name})为什么有大括号?
A:
- 在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel;
- HashTag是用{}包裹key的一个子串,若设置了HashTag,集群会根据HashTag决定key分配到哪个slot;HashTag不支持嵌套,只有第一个左括号{和第一个右括号}里面的内容才当做HashTag参与slot计算;通常,客户端都会封装这个计算逻辑。
// 见org.redisson.cluster.ClusterConnectionManager#calcSlot
@Override
public int calcSlot(String key) {if (key == null) {return 0;}int start = key.indexOf('{');if (start != -1) {int end = key.indexOf('}');key = key.substring(start+1, end);}int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;log.debug("slot {} for {}", result, key);return result;
}
- 在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot
Q2:redisson代码几乎都是以Lua脚本方式与redis服务端交互,如何跟踪这些脚本执行过程?
A:启动一个redis客户端终端,执行monitor命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本
eg:上面整体流程示意图的测试用例位:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);@Resourceprivate DistributedLocker distributedLocker;private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();@Testpublic void tryLockUnlockCost() throws Exception {StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");stopWatch.start();for (int i = 0; i < 10000; i++) {String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);Assert.assertTrue(optLocked.isPresent());optLocked.get().unlock();}stopWatch.stop();log.info(stopWatch.prettyPrint());}@Testpublic void tryLock() throws Exception {String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);Assert.assertTrue(optLocked.isPresent());Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);Assert.assertTrue(optLocked2.isPresent());optLocked.get().unlock();}/*** 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁*/@Testpublic void tryLock2() throws Exception {String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");CountDownLatch countDownLatch = new CountDownLatch(1);Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {countDownLatch.await();log.info("B尝试获得锁:thread={}", currentThreadId());return distributedLocker.tryLock(key, 600000, 600000);});log.info("A尝试获得锁:thread={}", currentThreadId());Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);Assert.assertTrue(optLocked.isPresent());log.info("A已获得锁:thread={}", currentThreadId());countDownLatch.countDown();optLocked.get().unlock();log.info("A已释放锁:thread={}", currentThreadId());Optional<LockResource> lockResource2 = submit.get();Assert.assertTrue(lockResource2.isPresent());executorServiceB.submit(() -> {log.info("B已获得锁:thread={}", currentThreadId());lockResource2.get().unlock();log.info("B已释放锁:thread={}", currentThreadId());});}/*** 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁*/@Testpublic void tryLock3() throws Exception {String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");log.info("A尝试获得锁:thread={}", currentThreadId());Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);if (optLocked.isPresent()) {log.info("A已获得锁:thread={}", currentThreadId());}Assert.assertTrue(optLocked.isPresent());CyclicBarrier cyclicBarrier = new CyclicBarrier(2);Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {cyclicBarrier.await();log.info("B尝试获得锁:thread={}", currentThreadId());return distributedLocker.tryLock(key, 600000, 600000);});Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {cyclicBarrier.await();log.info("C尝试获得锁:thread={}", currentThreadId());return distributedLocker.tryLock(key, 600000, 600000);});optLocked.get().unlock();log.info("A已释放锁:thread={}", currentThreadId());CountDownLatch countDownLatch = new CountDownLatch(2);executorServiceB.submit(() -> {log.info("B已获得锁:thread={}", currentThreadId());try {submitB.get().get().unlock();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}log.info("B已释放锁:thread={}", currentThreadId());countDownLatch.countDown();});executorServiceC.submit(() -> {log.info("C已获得锁:thread={}", currentThreadId());try {submitC.get().get().unlock();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}log.info("C已释放锁:thread={}", currentThreadId());countDownLatch.countDown();});countDownLatch.await();}private static Long currentThreadId() {return Thread.currentThread().getId();}@Testpublic void tryLockWaitTimeout() throws Exception {String key = "mock-key:" + UUID.randomUUID().toString();Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);Assert.assertTrue(optLocked.isPresent());Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {long now = System.currentTimeMillis();Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);long cost = System.currentTimeMillis() - now;log.info("cost={}", cost);return optLockedAgain;}).exceptionally(th -> {log.error("Exception: ", th);return Optional.empty();}).join();Assert.assertTrue(!optLockResource.isPresent());}@Testpublic void tryLockWithLeaseTime() throws Exception {String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);Assert.assertTrue(optLocked.isPresent());// 可重入Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);Assert.assertTrue(optLockedAgain.isPresent());}/*** 模拟1000个并发请求枪一把锁*/@Testpublic void tryLockWithLeaseTimeOnMultiThread() throws Exception {int totalThread = 1000;String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);AtomicInteger acquiredLockTimes = new AtomicInteger(0);ExecutorService executor = Executors.newFixedThreadPool(totalThread);for (int i = 0; i < totalThread; i++) {executor.submit(new Runnable() {@Overridepublic void run() {tryAcquireLockTimes.getAndIncrement();Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);if (optLocked.isPresent()) {acquiredLockTimes.getAndIncrement();}}});}executor.awaitTermination(15, TimeUnit.SECONDS);Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);Assert.assertTrue(acquiredLockTimes.get() == 1);}@Testpublic void tryLockWithLeaseTimeOnMultiThread2() throws Exception {int totalThread = 100;String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);AtomicInteger acquiredLockTimes = new AtomicInteger(0);ExecutorService executor = Executors.newFixedThreadPool(totalThread);for (int i = 0; i < totalThread; i++) {executor.submit(new Runnable() {@Overridepublic void run() {long now = System.currentTimeMillis();Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);long cost = System.currentTimeMillis() - now;log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);if (optLocked.isPresent()) {acquiredLockTimes.getAndIncrement();// 主动释放锁optLocked.get().unlock();}}});}executor.awaitTermination(20, TimeUnit.SECONDS);log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);Assert.assertTrue(acquiredLockTimes.get() == totalThread);}}public interface DistributedLocker {Optional<LockResource> tryLock(String lockKey, int waitTime);Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);}public interface LockResource {void unlock();}
执行的Lua脚本如下:
加锁:redissonClient.getLock("my_first_lock_name").tryLock(600000, 600000);
解锁:redissonClient.getLock("my_first_lock_name").unlock();
# 线程A
## 1.1.1尝试获取锁 -> 成功
1568357723.205362 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357723.205452 [0 lua] "exists" "my_first_lock_name"
1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"# 线程B
### 2.1.1尝试获取锁,未获取到,返回锁剩余过期时间
1568357773.338018 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338161 [0 lua] "exists" "my_first_lock_name"
1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338197 [0 lua] "pttl" "my_first_lock_name"## 2.1.1.3 添加订阅(非Lua脚本) -> 订阅成功
1568357799.403341 [0 127.0.0.1:56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"## 2.1.1.4 再次尝试获取锁 -> 未获取到,返回锁剩余过期时间
1568357830.683631 [0 127.0.0.1:56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684371 [0 lua] "exists" "my_first_lock_name"
1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684485 [0 lua] "pttl" "my_first_lock_name"# 线程A
## 3.1.1 释放锁并广播解锁消息,0代表解锁消息
1568357922.122454 [0 127.0.0.1:56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123645 [0 lua] "exists" "my_first_lock_name"
1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
1568357922.123775 [0 lua] "del" "my_first_lock_name"
1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"# 线程B
## 监听到解锁消息消息 -> 释放信号量,阻塞被解除;4.1.1.1 再次尝试获取锁 -> 获取成功
1568357975.015206 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357975.015579 [0 lua] "exists" "my_first_lock_name"
1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"## 4.1.1.3 取消订阅(非Lua脚本)
1568358031.185226 [0 127.0.0.1:56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"# 线程B
## 5.1.1 释放锁并广播解锁消息
1568358255.551896 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552125 [0 lua] "exists" "my_first_lock_name"
1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
1568358255.552258 [0 lua] "del" "my_first_lock_name"
1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
需要特别注意的是,RedissonLock 同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。
所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。
八、redlock算法
Redis 官网对 redLock 算法的介绍大致如下:
The Redlock algorithm
在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:
-
获取当前Unix时间,以毫秒为单位。
-
依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
-
客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。
-
如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) - 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。
-
如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)
这里以三个单机模式为例,需要特别注意的是他们完全互相独立,不存在主从复制或者其他集群协调机制。
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);/*** 获取多个 RLock 对象*/
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);/*** 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);try {/*** 4.尝试获取锁* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)*/boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);if (res) {//成功获得锁,在这里处理业务}
} catch (Exception e) {throw new RuntimeException("aquire lock fail");
}finally{//无论如何, 最后都要解锁redLock.unlock();
}
最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。
下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。
Redisson 实现redlock算法源码分析(RedLock)
加锁核心代码
org.redisson.RedissonMultiLock#tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long newLeaseTime = -1;if (leaseTime != -1) {newLeaseTime = unit.toMillis(waitTime)*2;}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);/*** 1. 允许加锁失败节点个数限制(N-(N/2+1))*/int failedLocksLimit = failedLocksLimit();/*** 2. 遍历所有节点通过EVAL命令执行lua加锁*/List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;/*** 3.对节点尝试加锁*/try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {// 抛出异常表示获取锁失败lockAcquired = false;}if (lockAcquired) {/***4. 如果获取到锁则添加到已获取锁集合中*/acquiredLocks.add(lock);} else {/*** 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功*/if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1 && leaseTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}/*** 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false*/if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}/*** 7.如果逻辑正常执行完则认为最终申请锁成功,返回true*/return true;
}
参考文献
[1]Distributed locks with Redis
[2]Distributed locks with Redis 中文版
[3]SET - Redis
[4]EVAL command
[5] Redisson
[6]Redis分布式锁的正确实现方式
[7]Redlock实现分布式锁
[8]Redisson实现Redis分布式锁
相关文章:

【分布式锁】Redisson分布式锁的使用(推荐使用)
文章目录 前言一、常见分布式锁方案对比二、分布式锁需满足四个条件三、什么是Redisson?官网和官方文档Redisson使用 四、Redisson 分布式重入锁用法Redisson 支持单点模式、主从模式、哨兵模式、集群模式自己先思考下,如果要手写一个分布式锁组件,怎么做ÿ…...

创建XML的三种方式(二)
文章目录 1 使用XmlDocument创建XML文档2 使用XmlTextWriter写XML文档3 使用LINQ to XML 的XDocument类4 小结 本文介绍了在winform中使用C#开发语言来创建XML文档的三种方式,并介绍了各自的优缺点。 方法1是使用 XmlDocument创建XML文档,方法2是使用 …...

十分钟教你搭建类似ChatGPT的安卓应用程序
大家好,我是易安! Chat GPT 是当今著名的人工智能工具,就像聊天机器人一样。Chat GPT会回答发送给它的所有查询。今天,我将通过集成 OpenAI API (ChatGPT)构建一个简单的类似 ChatGPT 的 android 应用程序,我们可以在其…...

问题 E: 起止位置(C++)(二分查找)
目录 1.题目描述 2.AC 1.题目描述 问题 E: 起止位置 时间限制: 1.000 Sec 内存限制: 128 MB提交 状态 题目描述 有n位同学按照年龄从小到大排好队。 王老师想要查询,年龄为x的同学,在队伍中首次出现的位置和最后一次出现的位置;如果队…...

【sop】基于灵敏度分析的有源配电网智能软开关优化配置[升级1](Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

LeetCode周赛复盘(第345场周赛)
文章目录 1、找出转圈游戏输家1.1 题目链接1.2 题目描述1.3 解题代码1.4 解题思路 2、相邻值的按位异或2.1 题目链接2.2 题目描述2.3 解题代码2.4 解题思路 3、 矩阵中移动的最大次数3.1 题目链接3.2 题目描述3.3 解题代码3.4 解题思路 4、 统计完全连通分量的数量4.1 题目链接…...

Call for Papers丨第三届GLB@KDD‘23 Workshop
鉴于介绍新数据集和Benchmark研究往往需要不同于常规论文的评审标准,计算机视觉和自然语言处理领域,以及最近的NeurIPS会议,都有专门致力于建立新Benchmark数据集和任务的Conference Track。然而在图机器学习领域,我们还没有类似的…...

【多线程】单例模式
目录 饿汉模式 懒汉模式-单线程版 懒汉模式-多线程版 懒汉模式-多线程版(改进) 单例是一种设计模式。 啥是设计模式 ? 设计模式好比象棋中的 " 棋谱 ". 红方当头炮 , 黑方马来跳 . 针对红方的一些走法 , 黑方应招的时候有一些固定的套路. 按照套路来走局势…...

7搜索管理
7搜索管理 7.1 准备环境 7.1.1 创建映射 创建xc_course索引库。 创建如下映射 post:http://localhost:9200/xc_course/doc/_mapping 参考 “资料”–》搜索测试-初始化数据.txt { "properties": { "description": { "type": &…...

在Pytorch中使用Tensorboard
Tensorboard是一款深度学习可视化软件,目前主要使用了它的可视化模型, 可视化模型权重和可视化损失函数功能。 x.1 tensorboard初始化 tensorboard初始化需要导入SummaryWriter包并指定存储位置和开放端口号。 from torch.utils.tensorboard import SummaryWrite…...

[笔记]深入解析Windows操作系统《四》管理机制
文章目录 前言4.1注册表查看和修改注册表注册表用法注册表数据类型注册表逻辑结构HKEY_CURRENT_USERHKEY_USERS 实验:观察轮廓加载和卸载HKEY_CLASSES_ROOTHKEY_LOCAL_MACHINE 实验:离线方式或远程编辑BCDHKEY_CURRENT_CONFIGHKEY_PERFORMANCE_DATA 前言 本章讲述了…...

【小沐学Python】Python实现在线英语翻译功能
文章目录 1、简介2、在线翻译接口2.1 Google Translate API2.2 Microsoft Translator API2.2.1 开发简介2.2.2 开发费用2.2.3 开发API 2.3 百度翻译开放平台 API2.3.1 开发简介2.3.2 开发费用2.3.3 开发API 2.4 Tencent AI 开放平台的翻译 API2.4.1 开发简介2.4.2 开发API 2.5 …...

k8s中pod使用详解
一、前言 在之前k8s组件一篇中,我们谈到了pod这个组件,了解到pod是k8s中资源管理的最小单位,可以说Pod是整个k8s对外提供服务的最基础的个体,有必要对Pod做深入的学习和探究。 二、再看k8s架构图 为了加深对k8s中pod的理解,再来回顾下k8s的完整架构 三、pod特点 结合上面这…...

案例说明:vue中Element UI下拉列表el-option中的key、value、label含义各是什么
可以简单理解为:label 是给用户展示的东西,value是前端往后端传递的真实值 <template><div><el-page-header back"goBack" content"注册"></el-page-header><el-divider></el-divider><el-…...

idea创建javaweb项目步骤超详细(2022最新版本)
目录 前言必读 一、新建文件 1.在idea里面点击文件-新建-项目 2.新建项目-更改名称为自己想要的项目名称-创建 3.右键自己建立的项目-添加框架支持(英文版是Add Framework Support...) 4.勾选Web应用程序-确定 5.建立成功界面 二、配置tomcat 6.…...

「SAP ABAP」OPEN SQL(六)【DELETE语句 | MODIFY语句】
💂作者简介: THUNDER王,一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学本科在读,同时任汉硕云(广东)科技有限公司ABAP开发顾问。在学习工作中,我通常使用偏后端的开发语言ABAP,SQL进行任务的完成,对SAP企业管理系统,SAP ABAP开发和数据库具有较…...

SpringCloud --- Feign远程调用
一、RestTemplate问题 先来看我们以前利用RestTemplate发起远程调用的代码: 存在下面的问题: 代码可读性差,编程体验不统一参数复杂URL难以维护 Feign是一个声明式的http客户端,官方地址:GitHub - OpenFeign/feign:…...

基于单片机的数字频率计设计
数字频率计概述 数字频率计是计算机、通讯设备、音频视频等科研生产领域不可缺少的测量仪器。它是一种用十进制数字显示被测信号频率的数字测量仪器。它的基本功能是测量正弦信号,方波信号及其他各种单位时间内变化的物理量。在进行模拟、数字电路的设计、安装、调试…...

我看看哪个靓仔还没把Github Copilot用起来?
本人经常分享有价值的生产力工具、技术、好物与书籍,可关注同名公众🐭并设为🌟星标,第一时间获得更新 Github Copilot 是一个AI编程助手,其使用 OpenAI CodeX 在你的编辑器中实时建议代码或给你实现整个功能。 视频版介…...

C++系列一: C++简介
C入门简介 1. C语言的特点2. C编译器3. 第一个 C 程序4. 总结(手稿版) C 是一种高级编程语言,是C语言的扩展和改进版本,由Bjarne Stroustrup于1983年在贝尔实验室为了支持C语言中的面向对象编程而创建。C 既能够进行底层的系统编程…...

信通初试第一:无科研无竞赛一战上岸上海交大819学硕感悟
笔者来自通信考研小马哥23上交819全程班学员 信通初试第一:无科研无竞赛一战上岸上海交大819学硕感悟 原创2023-04-27 11:04通信考研小马哥 笔者来自通信考研小马哥23上交819全程班学员 本人情况: 本人是19届交本,本科成绩很差,…...

Spring —— Spring Boot 配置文件
JavaEE传送门 JavaEE Spring —— Bean 作用域和生命周期 Spring —— Spring Boot 创建和使用 目录 Spring Boot 配置文件Spring Boot 配置文件格式properties配置文件properties 基本语法properties 缺点 yml 配置文件yml 基本语法yml 配置不同类型数据及 nullyml 配置对象…...

Python 网络爬虫与数据采集(一)
Python 网络爬虫与数据采集 第1章 序章 网络爬虫基础1 爬虫基本概述1.1 爬虫是什么1.2 爬虫可以做什么1.3 爬虫的分类1.4 爬虫的基本流程1.4.1 浏览网页的流程1.4.2 爬虫的基本流程 1.5 爬虫与反爬虫1.5.1 爬虫的攻与防1.5.2 常见的反爬与反反爬 1.6 爬虫的合法性与 robots 协议…...

2023年6月DAMA-CDGP数据治理专家认证请尽快报名啦!
目前6月DAMA-CDGP数据治理认证考试开放报名地区有:北京、上海、广州、深圳、长沙、呼和浩特。 目前南京、济南、西安、杭州等地区还在接近开考人数中,打算参加6月考试的朋友们可以抓紧时间报名啦!!! 5月初,…...

STM32+esp8266,让你的STM32开发板连接网络-----esp8266
分享一下,STM32开发板连接网络的第一种方法:连接esp8266。 esp8266与STM32利用串口通信连接,esp8266连接网络,把收到的数据通过串口的方式传输给STM32,之后STM32接收到消息做出对应的反应。 使用到的开发板如图&…...

分布式缓存的基础知识
前言 现代互联网应用中,分布式缓存成为了必不可少的一环。它通过在多台服务器之间共享数据,避免了网络通信的高延迟和低带宽的性能问题。本文将介绍分布式缓存的基础知识,包括缓存机制、常见的缓存策略以及缓存的使用场景。 缓存机制 缓存是…...

Vue3通透教程【七】生命周期函数
文章目录 🌟 写在前面🌟 生命周期钩子函数🌟 组合式API生命周期🌟 写在最后🌟 写在前面 专栏介绍: 凉哥作为 Vue 的忠实 粉丝输出过大量的 Vue 文章,应粉丝要求开始更新 Vue3 的相关技术文章,Vue 框架目前的地位大家应该都晓得,所谓三大框架使用人数最多,公司选…...

《“裸奔”时代的网络防护:如何保护你的隐私和数据安全》
一、引言 在此时此刻,你可能正在使用电子设备阅读这篇文章。你可能在一天中的大部分时间都在与网络世界互动,无论是通过电子邮件、社交媒体、在线购物,还是通过流媒体服务消费内容。然而,你有没有考虑过,当你在享受这些…...

mapreduce优化方法
1)数据输入: 1)合并小文件:在执行mr任务前将小文件进行合并,大量的小文件会产生大量的map任务,增大map任务装载次数,而 任务的装载比较耗时,从而导致 mr 运行较慢。 2)…...

06-nexus搭建Docker私仓
使用nexus创建docker私有仓库 Nexus的安装请参考该文档:https://www.yuque.com/tmfl/pom/uumrx2 Nexus配置Docker仓库步骤; nexus默认docker是失效的,需要 在security --> Realms,将docker配置成Active在 Repository 的 Blo…...