Redis中的分布式锁(步步为营)
分布式锁
概述
分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。
分布式锁是可以跨越多个实例,多个进程的锁

分布式锁具备的条件:
- 互斥性:任意时刻,只能有一个客户端持有锁
- 锁超时释放:持有锁超时,可以释放,防止死锁
- 可重入性:一个线程获取了锁之后,可以再次对其请求加锁
- 高可用、高性能:加锁和解锁开销要尽可能低,同时保证高可用
- 安全性:锁只能被持有该锁的服务(或应用)释放。
- 容错性:在持有锁的服务崩溃时,锁仍能得到释放,避免死锁。
分布式锁实现方案
分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:
- 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
- Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
- Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同
Redis实现分布式锁
SETNX
基本方案:Redis提供了setXX指令来实现分布式锁
highlighter-
格式: setnx key value 将key 的值设为value ,当且仅当key不存在。 若给定的 key已经存在,则SETNX不做任何动作。

设置分布式锁后,能保证并发安全,但上述代码还存在问题,如果执行过程中出现异常,程序就直接抛出异常退出,导致锁没有释放造成最终死锁的问题。(即使将锁放在finally中释放,但是假如是执行到中途系统宕机,锁还是没有被成功的释放掉,依然会出现死锁现象)
设置超时时间
highlighter- SQL
SET lock_key unique_value NX PX 10000
但是,即使设置了超时时间后,还存在问题。
假设有多个线程,假设设置锁的过期时间10s,线程1上锁后执行业务逻辑的时长超过十秒,锁到期释放锁,线程2就可以获得锁执行,此时线程1执行完删除锁,删除的就是线程2持有的锁,线程3又可以获取锁,线程2执行完删除锁,删除的是线程3的锁,如此往后,这样就会出问题。
让线程只删除自己的锁
解决办法就是让线程只能删除自己的锁,即给每个线程上的锁添加唯一标识(这里UUID实现,基本不会出现重复),删除锁时判断这个标识:

但上述红框中由于判定和释放锁不是原子的,极端情况下,可能判定可以释放锁,在执行删除锁操作前刚好时间到了,其他线程获取锁执行,前者线程删除锁删除的依然是别的线程的锁,所以要让删除锁具有原子性,可以利用redis事务或lua脚本实现原子操作判断+删除
Redis的单条命令操作是原子性的,但是多条命令操作并不是原子性的,因此Lua脚本实现的就是令Redis的多条命令也实现原子操作
redis事务不是原子操作的,详情请看 Redis的事务
但是,可以利用Redis的事务和watch实现的乐观锁 来监视锁的状态
java
@RequestMapping(" /deduct_stock")public String deductStock() {String REDIS_LOCK = "good_lock";// 每个人进来先要进行加锁,key值为"good_lock"String value = UUID.randomUUID().toString().replace("-","");try{// 为key加一个过期时间Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS);// 加锁失败if(!flag){return "抢锁失败!";}System.out.println( value+ " 抢锁成功");String result = template.opsForValue().get("goods:001");int total = result == null ? 0 : Integer.parseInt(result);if (total > 0) {// 如果在此处需要调用其他微服务,处理时间较长。。。int realTotal = total - 1;template.opsForValue().set("goods:001", String.valueOf(realTotal));System.out.println("购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002");return "购买商品成功,库存还剩:" + realTotal + "件, 服务端口为8002";} else {System.out.println("购买商品失败,服务端口为8002");}return "购买商品失败,服务端口为8002";}finally {// 谁加的锁,谁才能删除// 也可以使用redis事务// https://redis.io/commands/set// 使用Lua脚本,进行锁的删除Jedis jedis = null;try{jedis = RedisUtils.getJedis();String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +"then " +"return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value));if("1".equals(eval.toString())){System.out.println("-----del redis lock ok....");}else{System.out.println("-----del redis lock error ....");}}catch (Exception e){}finally {if(null != jedis){jedis.close();}}// redis事务
// while(true){
// template.watch(REDIS_LOCK);
// if(template.opsForValue().get(REDIS_LOCK).equalsIgnoreCase(value)){
// template.setEnableTransactionSupport(true);
// template.multi();
// template.delete(REDIS_LOCK);
// List<Object> list = template.exec();
// if(list == null){
// continue;
// }
// }
// template.unwatch();
// break;
// }}}
}
尽管这样,还是会有问题,锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,但其实此时业务还在执行中,还是应该将业务执行结束之后再释放锁。
续时
因此可以设定,任务不完成,锁就不释放。
可以维护一个定时线程池 ScheduledExecutorService,每隔 2s 去扫描加入队列中的 Task,判断失效时间是否快到了,如果快到了,则给锁续上时间。
那如何判断是否快到失效时间了呢?可以用以下公式:【失效时间】<= 【当前时间】+【失效间隔(三分之一超时)】
java
// 扫描的任务队列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/*** 线程池,维护keyAliveTime*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{// 两秒执行一次「续时」操作SCHEDULER.scheduleAtFixedRate(() -> {// 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();while (iterator.hasNext()) {RedisLockDefinitionHolder holder = iterator.next();// 判空if (holder == null) {iterator.remove();continue;}// 判断 key 是否还有效,无效的话进行移除if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {iterator.remove();continue;}// 超时重试次数,超过时给线程设定中断if (holder.getCurrentCount() > holder.getTryCount()) {holder.getCurrentTread().interrupt();iterator.remove();continue;}// 判断是否进入最后三分之一时间long curTime = System.currentTimeMillis();boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;if (shouldExtend) {holder.setLastModifyTime(curTime);redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());holder.setCurrentCount(holder.getCurrentCount() + 1);}}}, 0, 2, TimeUnit.SECONDS);
}
Redisson
使用Redis + lua方式可能存在的问题
- 不可重入性。同一个线程无法多次获取同一把锁
- 不可重试。获取锁只尝试一次就返回false,没有重试机制
- 超时释放。锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁的释放,存在安全隐患
- 主从一致性。如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现线程安全问题。
RLock是Redisson分布式锁的最核心接口,继承了concurrent包的Lock接口和自己的RLockAsync接口,RLockAsync的返回值都是RFuture,是Redisson执行异步实现的核心逻辑,也是Netty发挥的主要阵地。
RLock如何加锁解锁,实现可重入性?
从RLock进入,找到RedissonLock类,找到tryLock 方法再继续找到tryAcquireOnceAsync 方法,这是加锁的主要代码(版本不一此处实现有差别,和最新3.15.x有一定出入,但是核心逻辑依然未变。此处以3.13.6为例)
java
// waitTime 等待时间,多久时间内都会在这尝试获取锁
// leaseTime 加锁时是否设置过期时间
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}}
此处出现leaseTime时间判断的2个分支,实际上就是加锁时是否设置过期时间,未设置过期时间(-1)时则会有watchDog 的锁续约 (下文),一个注册了加锁事件的续约任务。我们先来看有过期时间tryLockInnerAsync 部分
evalWriteAsync方法是eval命令执行lua的入口
java
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
eval命令执行Lua脚本的地方,此处将Lua脚本展开
lua
-- 不存在该key时
if (redis.call('exists', KEYS[1]) == 0) then -- 新增该锁并且hash中该线程id对应的count置1redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置过期时间redis.call('pexpire', KEYS[1], ARGV[1]); return nil;
end; -- 存在该key 并且 hash中线程id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 线程重入次数++redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil;
end;
return redis.call('pttl', KEYS[1]);
lua
// keyName KEYS[1] = Collections.singletonList(this.getName()) // leaseTime ARGV[1] = this.internalLockLeaseTime // uuid+threadId组合的唯一值 ARGV[2] = this.getLockName(threadId)
总共3个参数完成了一段逻辑:
- 判断该锁是否已经有对应hash表存在,
- 没有对应的hash表:则set该hash表中一个entry的key为锁名称,value为1,之后设置该hash表失效时间为leaseTime
- 存在对应的hash表:则将该lockName的value执行+1操作,也就是计算进入次数,再设置失效时间leaseTime
- 最后返回这把锁的ttl剩余时间
再看看RLock如何解锁?
看unlock方法,同样查找方法名,一路到unlockInnerAsync
java
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
}
将lua脚本展开
lua
-- 不存在key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;
end;
-- 存在,计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then -- 过期时间重设redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else-- 删除并发布解锁消息redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1;
end;
return nil;
该Lua KEYS有2个Arrays.asList(getName(), getChannelName())
lua
name 锁名称 channelName,用于pubSub发布消息的channel名称
ARGV变量有三个LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
lua
LockPubSub.UNLOCK_MESSAGE,channel发送消息的类别,此处解锁为0 internalLockLeaseTime,watchDog配置的超时时间,默认为30s lockName 这里的lockName指的是uuid和threadId组合的唯一值
具体执行步骤如下:
- 如果该锁不存在则返回nil;
- 如果该锁存在则将其线程的hash key计数器-1,
- 计数器counter>0,重置下失效时间,返回0;否则,删除该锁,发布解锁消息unlockMessage,返回1;
加锁解锁流程总结如下:

总的来说就是通过Hash类型来存储锁的次数:

RLock的锁重试问题
需要分析的是锁重试的,所以,在使用lock.tryLock()方法的时候,不能用无参的。
java
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {return this.tryLock(waitTime, -1L, unit);
}
在调用tryAcquire方法后,返回了一个Long的ttl
java
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {return true;} else {time -= System.currentTimeMillis() - current;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);return false;} else {//省略
继续跟着代码进去查看,最后会发现,调用tryLockInnerAsync方法。这个方法就是获取锁的Lua脚本的。
java
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
这个lua脚本上面提到了。就是 判断,如果获取到锁,返回一个nil.也就是null。如果没有获取到,就调用 pttl,name。其实就是获取当前name锁的剩余有效期。
获取到ttl。如果返回null说获取锁成功,直接返回true.如果返回的不是null,说明需要进行重试操作了。主要是根据时间进行判断的。经过一系列判断后,do,while是真正执行重试相关逻辑的。如下:
java
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);//如果返回null,说明获取到了锁,直接返回if (ttl == null) {return true;} else {//当前时间与进入方法时的时间进行比较//System.currentTimeMillis() - current表示前面获取锁消耗时间time -= System.currentTimeMillis() - current;time是重试锁的等待时间,if (time <= 0L) {//剩余等待时间,如果剩余等待时间<=0,设置获取锁失败。this.acquireFailed(waitTime, unit, threadId);return false;} else {//再次获取当前时间current = System.currentTimeMillis();//刚刚尝试完获取锁失败,如果继续立即尝试一般是获取不到锁的,因此这里选择订阅的方式//订阅当前锁,在unlock释放锁的时候有个:redis.call('publish', KEYS[2], ARGV[1]); 所以这里就订阅了RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);//进行等待RFuture的结果,等多久?等time的时间if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {//time时间过完了还没有等到锁释放的通知if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {//如果等待超时,就取消订阅this.unsubscribe(subscribeFuture, threadId);}});}this.acquireFailed(waitTime, unit, threadId);//返回获取锁失败return false;} else {//到这里表示在tme时间内获得了释放锁的通知boolean var16;try {//检查之前订阅等待的消耗时间time -= System.currentTimeMillis() - current;if (time <= 0L) {//当前的剩余等待时间this.acquireFailed(waitTime, unit, threadId);boolean var20 = false;return var20;}//这里开始进行重试相关逻辑。主要就是当前时间和进入方法时候的时间进行比较do {long currentTime = System.currentTimeMillis();//这里就是第一次重试ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);if (ttl == null) {//null表示获取锁失败var16 = true;return var16;}//再试一次time -= System.currentTimeMillis() - currentTime;if (time <= 0L) {this.acquireFailed(waitTime, unit, threadId);var16 = false;return var16;}currentTime = System.currentTimeMillis();if (ttl >= 0L && ttl < time) { //也不是一直试,等别人释放((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;} while(time > 0L);//时间还充足,继续等待//时间到期了,还没获取到锁,返回失败this.acquireFailed(waitTime, unit, threadId);var16 = false;} finally {this.unsubscribe(subscribeFuture, threadId);}return var16;}}}
}
主要是do while机制进行锁重试的,while会检查时间是否还充足会继续循环。当然这个循环不是直接while(true)的盲等机制,而是利用信号量和订阅的方式实现的,会等别人释放锁,再进行尝试,这种方式对cpu友好
Redisson的超时续约
跟随tryLock代码,在RedissonLock类中的tryAcquireOnceAsync方法中,会看到如下代码:
java
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {//设置了锁过期时间return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);} else {//leaseTime = -1时,即没有设置了锁过期时间RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),//,默认30秒TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);//ttlRemainingFuture完成以后ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {//没有抛异常if (ttlRemaining) {//获取锁成功this.scheduleExpirationRenewal(threadId);//自动更新续期时间的任务调度}}});return ttlRemainingFuture;}
}
- 在使用trylock的时候,如果设置了锁过期时间,就不会执行续命相关逻辑了。
- 其中默认的watchdogTimeout时间是30秒。
java
private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();//获取一个entry,将entry放到map里,getEntryName()就是当前锁名称。//放到map里,即一个锁对应一个entryRedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);if (oldEntry != null) {//表示重入的,第二次放oldEntry.addThreadId(threadId);} else {//表示第一次放entry.addThreadId(threadId);this.renewExpiration();//第一次放,进行续约}}
看门狗机制:在获取锁成功以后,开启一个定时任务,每隔一段时间就会去重置锁的超时时间,以确保锁是在程序执行完unlock手动释放的,不会发生因为业务阻塞,key超时而自动释放的情况。
到期续约方法:
java
private void renewExpiration() {RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());if (ee != null) { //Timeout定时任务,或者叫周期任务Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());if (ent != null) {Long threadId = ent.getFirstThreadId();if (threadId != null) {//执行续命的操作RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);} else {if (res) {RedissonLock.this.renewExpiration();//再次调用}}});}}}//刷新周期, this.internalLockLeaseTime / 3L, 默认释放时间是30秒,除以3就是每10秒更新一次//续命时间为1/3的过期时间,设置续命单位是秒},this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); ee.setTimeout(task);}
}
查看renewExpirationAsync方法源码,其调用了Lua脚本执行续命操作的。
java
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
pexpire重置锁的有效期。
总体逻辑如下:
- 开启一个任务,10秒钟后执行
- 开始的这个任务中重置有效期。假设设置的是默认30秒,则重置为30秒
- 更新后又重复步骤1、2
那么什么时候取消这个续约的任务呢?在释放锁unlock时
java
public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise();RFuture<Boolean> future = this.unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {//取消这个任务this.cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);} else if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);result.tryFailure(cause);} else {result.trySuccess((Object)null);}});return result;
}
multilock解决主从一致性问题
如果Redis是主从集群,主从同步存在延迟,当主机宕机时,从成为了主,但可能存在从此时还未完成同步,因此从上就没有锁标识,此时会出现并发安全问题。
因此redisson提出来了MutiLock锁,使用这把锁就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

使用multilock()方法。必须在所有的节点都获取锁成功,才算成功。 缺点是运维成本高,实现复杂。
java
@Resource private RedissonClient redissonClient; @Resource private RedissonClient2 redissonClient2; @Resource private RedissonClient3 redissonClient3;RLock lock = redissonClient.getMultilock(lock1,lock2,lock3)
总结Redisson
Redisson分布式锁解决前三个问题原理

总结Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能来实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用watchDog,开启一个定时任务,每隔一段时间(releaseTime/3),重置超时时间。
- 使用multilock: 多个独立的redis节点,必须在所有节点都获取重入锁,才算获取成功;
redLock
不管是redLock,还是redissonLock,两者底层都是通过相同的lua脚本来加锁、释放锁的,所以,两者只是外部形态的不同,底层是一样的。redLock是继承了redissonMultiLock,大部分的逻辑,都是在redissonMultiLock中去实现的,所以源码部分,大部分都是RedissonMultiLock
原理
- redLock的使用,需要有奇数台独立部署的Redis节点
- 在加锁的时候,会分别去N台节点上加锁,如果半数以上的节点加锁成功,就认为当前线程加锁成功

相关文章:
Redis中的分布式锁(步步为营)
分布式锁 概述 分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。 分布式锁是可以跨越多个实例,多个进程的锁 分布…...
CentOS 7安装mysql+JDK+Tomcat完成流程
一.安装mysql 即使是新的linux服务器,也要先验证是否有mysql已经安装,如果有进行卸载原版本,一定要确认是否mysql已不再使用 原安装情况(直接执行命令即可) whereis mysql rpm -qa | grep -i mysql rpm -e perl-DBD-M…...
C++笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec()
C笔记之不同框架中事件循环的核心函数:io_run()、ros_spin()、app_exec() code review! 参考笔记 1.qt-C笔记之使用QtConcurrent异步地执行槽函数中的内容,使其不阻塞主界面 2.qt-C笔记之QThread使用 3.qt-C笔记之多线程架构模式:事件信号监…...
C++异常处理
目录 一、异常的概念 二、异常的使用 (1)异常的抛出和捕获 (2)异常的重新抛出 (3)异常安全 (4)异常规范 三、自定义异常体系 四、c标注异常体系 五、异常的优缺点 在之前我们…...
【数据结构】哈希 ---万字详解
unordered系列关联式容器 在C98中,STL提供了底层为红黑树结构的一系列关联式容器,在查询时效率可达到log_2 N,即最差情况下需要比较红黑树的高度次,当树中的节点非常多时,查询效率也不理想。最好 的查询是,…...
4399大数据面试题及参考答案(数据分析和数据开发)
对数据分析的理解 数据分析是一个从数据中提取有价值信息以支持决策的过程。它涵盖了数据收集、清洗、转换、建模和可视化等多个环节。 首先,数据收集是基础。这包括从各种数据源获取数据,例如数据库、文件系统、网络接口等。这些数据源可以是结构化的数据,如关系型数据库中…...
快速理解倒排索引在ElasticSearch中的作用
一.基础概念 定义: 倒排索引是一种数据结构,用来加速文本数据的搜索和检索,和传统的索引方式不同,倒排索引会被每个词汇项与包含该词汇项的文档关联起来,从而去实现快速的全文检索。 举例: 在传统的全文…...
C++趣味编程玩转物联网:基于树莓派Pico控制无源蜂鸣器-实现音符与旋律的结合
无源蜂鸣器是一种多功能的声音输出设备,与有源蜂鸣器相比,它能够通过不同频率的方波生成丰富多样的音调。本项目使用树莓派Pico开发板,通过编程控制无源蜂鸣器播放经典旋律《归来有风》。本文将详细介绍项目实现中的硬件连接、C++代码解析,以及无源蜂鸣器的工作原理。 一、…...
《RuoYi基于SpringBoot+Vue前后端分离的Java快速开发框架学习》系列博客_Part4_三模态融合
系列博客目录 文章目录 系列博客目录目标Step1:之前工作形成子组件Step2:弥补缺失的文本子组件,同时举例如何子组件向父组件传数据Step3:后端代码需要根据上传的文件传给python服务器Step4:python服务器进行分析 目标 实现三模态融合,将文本、图片、音频…...
springboot365高校疫情防控web系统(论文+源码)_kaic
毕 业 设 计(论 文) 题目:高校疫情防控的设计与实现 摘 要 互联网发展至今,无论是其理论还是技术都已经成熟,而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播,搭配信息管理工具可以很好地为…...
STM32 USART串口数据包
单片机学习! 目录 前言 一、数据包 二、HEX数据包 三、文本数据包 四、HEX数据包和文本数据包优缺点 4.1 HEX数据包 4.2 文本数据包 五、HEX数据包接收 六、文本数据包接收 总结 前言 本文介绍了串口数据包收发的思路和流程。 一、数据包 数据包的作用是把一个个单独…...
【LC】3232. 判断是否可以赢得数字游戏
题目描述: 给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中,Alice 可以从 nums 中选择所有个位数 或 所有两位数,剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和,则 Alice 获胜。如果…...
Linux基础学习--vi与vim
0.绪论 前面的内容基本学完了相关命令行,后面进行shell与shell script的学习。第一部分就是编辑器的学习,之前有写过vi/vim编辑器,但是我看了一下鸟哥这个非常详细,还是打算重头学习一下。 1.vi/vim的使用 一般命令模式(command…...
JavaScript 高级教程:异步编程、面向对象与性能优化
在前两篇教程中,我们学习了 JavaScript 的基础和进阶内容。这篇文章将带领你进入更深层次,学习 JavaScript 的异步编程模型、面向对象编程(OOP),以及性能优化的技巧。这些内容对于构建复杂、流畅的前端应用至关重要。 …...
qt QToolBox详解
1、概述 QToolBox是Qt框架中的一个控件,它提供了一个带标签页的容器,用户可以通过点击标签页标题来切换不同的页面。QToolBox类似于一个带有多页选项卡的控件,但每个“选项卡”都是一个完整的页面,而不仅仅是标签。这使得QToolBo…...
翁知宜荣获“易学名师”与“国学文化传承人”称号
在2024年10月19日举行的北京第六届国学文化传承峰会上,翁知宜老师以其在易学界的卓越成就和对国学文化的传承与发扬,荣获“易学名师”和“国学文化传承人”两项荣誉称号。 翁知宜老师在易经学术竞赛中荣获第一名,其深厚的易学造诣和对玄学学…...
20241128解决Ubuntu20.04安装libwxgtk3.0-dev异常的问题
20241128解决Ubuntu20.04安装libwxgtk3.0-dev异常的问题 2024/11/28 16:17 缘起:中科创达的高通CM6125开发板的Android10的编译环境需要。 安装异常:rootrootrootroot-X99-Turbo:~$ rootrootrootroot-X99-Turbo:~$ sudo apt-get install libwxgtk3.0-de…...
sql分类
SQL(Structured Query Language)是一种用于管理和操作关系数据库管理系统(RDBMS)的编程语言。SQL 可以分为几个主要类别,每个类别都有其特定的用途和功能。以下是 SQL 的主要分类: 1. 数据定义语言&#x…...
stm32里一个定时器可以提供多路信号吗?
在STM32中,一个定时器通常只能提供一组信号(如输出PWM波形、定时中断等)。但是,定时器的多个通道可以提供不同的信号。例如,STM32的定时器可以通过不同的输出通道产生多种PWM信号,每个通道可以配置为不同的…...
Java安全—原生反序列化重写方法链条分析触发类
前言 在Java安全中反序列化是一个非常重要点,有原生态的反序列化,还有一些特定漏洞情况下的。今天主要讲一下原生态的反序列化,这部分内容对于没Java基础的来说可能有点难,包括我。 序列化与反序列化 序列化:将内存…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
是否存在路径(FIFOBB算法)
题目描述 一个具有 n 个顶点e条边的无向图,该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序,确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数,分别表示n 和 e 的值(1…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
