当前位置: 首页 > news >正文

分布式锁之redis实现

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

87d429bb8dfa467baedf8733e62ac37b.png

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

b8ff2272d9354ac39d198b5819e62aef.png

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bashredis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

添加redis配置 


server.port= 10010spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {this.setKeySerializer(RedisSerializer.string());this.setValueSerializer(RedisSerializer.string());this.setHashKeySerializer(RedisSerializer.string());this.setHashValueSerializer(RedisSerializer.string());}

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}}
}

 5000请求压测,结果为4895,发生了超卖问题

a7fc4d7f1ee548169fd7f08b0fc6ca0a.png
e1d1f4af1d704938b8188b32b6ccbc36.png
redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

720c867464d145e68e27ad877dc0f155.png

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.List;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic  Object execute(RedisOperations redisOperations) throws DataAccessException {// 开启监听redisOperations.watch("ticket");//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 开启事务redisOperations.multi();Integer integer = Integer.valueOf(ticket);// 扣减票数redisOperations.opsForValue().set("ticket",String.valueOf(--integer));// 提交事务List exec = redisOperations.exec();// 如果获取锁失败 ,重试if(exec == null || exec.size() == 0){try {// 减少锁争抢,避免栈内存溢出Thread.sleep(40);sellTicket();} catch (InterruptedException e) {e.printStackTrace();}}}return null;}});}
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

b975be5edee2453597b57ccd66d557f1.pngredis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁操作redisTemplate.delete("lock");}}
}

压测1000,显示无超卖现象 

5629d991544d4b7c9b77472abca1c682.png042eaa1342124922a9d528dc660a16ec.png

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

4affdb3b239141e78d695451512263fb.png

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

9d6becfe33a84ebf93433896a4d65bcf.png

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

84e258e5fb574c91a78b4244a41923d9.png

添加uuid防止误删

    public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 判断是自己的锁在删除if(uuid.equals(redisTemplate.opsForValue().get("lock"))){redisTemplate.delete("lock");}}}

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then--[ if 条件为 true 时执行该语句块 --]print("a 小于 20" )
else--[ if 条件为 false 时执行该语句块 --]print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

4c5257c42e834f25958f88987fdfb151.png解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
thenreturn redis.call('del',KEYS[1])
elsereturn 0
endkeys:lockargv: uuid
 public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);}}

 压测1000 显示无超卖现象

daf44ce416a4420294d1bd923028b763.png

4b48b5d2be32427c9ec2bf1033598291.pnghash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then redis.call('hincrby',KEYS[1], ARGV[1], 1)redis.call('expire',KEYS[1],ARGV[2])return 1
else return 0
endkeys lock
argv uuid 30解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then return 0 
else redis.call('del',KEYS[1]) return 1 
endkeys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

ef39ebc7bb9840cd8169db5e279dc80a.png

加锁工具类

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.UUID;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";System.out.println(script);if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}}
}

测试可重入 

@Overridepublic  void checkAndLock(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();// 查询票数Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));// 判断不为空和票数大于0if(ticket!=null&& ticket.getCount() > 0){ticket.setCount(ticket.getCount()-1);ticketMapper.updateById(ticket);}// 测试可重入testRepeatEntry();lock.unlock();}public void testRepeatEntry(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();System.out.println("redis分布式锁测试可重入");lock.unlock();}

 压测1000,未发现超卖问题,并解决可重入的问题

7dfd3282120641e2a34d09da730a07c6.png

d920dd12a6cb4e7dbce13ea0c1b15000.png锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期if redis.call('hexists',KEYS[1],ARGV[1])==1
thenredis.call('expire',KEYS[1],ARGV[2]) return 1 
else return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.*;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;@SuppressWarnings("all")private static final Timer timer = new Timer();private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}// 自动续期renewExpire();}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}// 释放锁成功this.uuid = null;}@SuppressWarnings("all")private void renewExpire() {String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";timer.schedule(new TimerTask() {@Overridepublic void run() {if (uuid != null) {redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());renewExpire();}}},expire * 1000 / 3);}
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author sl*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();
//        config.useClusterServers()config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");return Redisson.create(config);}
}

Redission使用

@Autowiredprivate RedissonClient redissonClient;public void userRedisson(){// 获取锁RLock lock = redissonClient.getLock("lock");try {// 加锁lock.lock();//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁lock.unlock();}}

1000并发压测,发现并无超卖问题 

60440ac508af4380ba2ecc5de16754fa.png

c017a7803ee2469c9e89014ef00362c7.png

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁fairLock.lock(10, TimeUnit.SECONDS);System.out.println("加锁成功"+Thread.currentThread().getName());// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();}加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

22641eac255c4cd78315c6cda75863ee.png 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Overridepublic void useMutiLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");//联锁所有的锁都上锁成功才算成功RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);redissonMultiLock.lock();System.out.println("业务内容");redissonMultiLock.unlock();}

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功

public void useRedLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");RedissonRedLock readLock = new RedissonRedLock(lock1);// 红锁在大部分节点上加锁成功就算成功readLock.lock();System.out.println("业务内容");readLock.unlock();}

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {/*** 读-读 不阻塞 读-写 阻塞 写-写 阻塞* RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口*/RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");// 最常见的读锁rwlock.readLock().lock();// 写锁rwlock.writeLock().lock();// 10秒钟以后自动解锁无需调用unlock方法手动解锁rwlock.readLock().lock(10, TimeUnit.SECONDS);rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁// boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);rwlock.readLock().unlock();rwlock.writeLock().unlock();}

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;import java.util.concurrent.Semaphore;/*** @Author sl*/
public class SemaphoreTest {public static void main(String[] args) {// 3个有限资源Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 6; i++) {new Thread(()->{try{// 获取资源semaphore.acquire();System.out.println(Thread.currentThread().getName() + "抢到车位");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()  +"离开车位");}catch (Exception e){e.printStackTrace();}finally {// 释放资源semaphore.release();}}).start();}}
}Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {/*** RSemaphore 采用了与java.util.concurrent.semaphore相似的接口* 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流*/RSemaphore semaphore = redissonClient.getSemaphore("semaphore");try{semaphore.acquire();}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}

闭锁(CountDownLatch

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;import java.util.concurrent.CountDownLatch;/*** @Author sl*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t上完自习");countDownLatch.countDown();},String.valueOf(i)).start();}// 班长等待所有线程同学走完在锁门countDownLatch.await();System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");}
}1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;/*** @Author sl*/
public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{System.out.println("集齐了卡片,开始召唤神龙");});for (int i = 0; i < 7; i++) {String s = String.valueOf(i);new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {/*** RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法* 一个线程 等待一组线程完事* 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版*/RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");latch.trySetCount(6);latch.countDown();try{latch.await();}catch (Exception e){e.printStackTrace();}}

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

相关文章:

分布式锁之redis实现

docker安装redis 拉取镜像 docker pull redis:6.2.6 查看镜像 启动容器并挂载目录 需要挂在的data和redis.conf自行创建即可 docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/dat…...

Idea中如何在一个项目中引入其他子模块?

首先在Settings打开Project Structure&#xff0c;然后找到Modules&#xff0c;点击加号点击import module&#xff0c;将需要引进的module引进来。 然后点击Artifacts 可以看到比如说day22…这个是我现在的项目&#xff0c;day16是我需要引入的。那么就在红色横线上面右键点第…...

UDP协议概述

传输层里比较重要的两个协议&#xff0c;一个是 TCP&#xff0c;一个是 UDP。TCP 是面向连接的&#xff0c;UDP 是面向无连接的。 所谓的建立连接&#xff0c;是为了在客户端和服务端维护连接&#xff0c;而建立一定的数据结构来维护双方交互的状态&#xff0c;用这样的数据结…...

Python-tracemalloc-跟踪内存分配

tracemalloc 模块是一个用于对 python 已申请的内存块进行debug的工具。它能提供以下信息: 定位对象分配内存的位置 按文件、按行统计python的内存块分配情况: 总大小、块的数量以及块平均大小。 对比两个内存快照的差异&#xff0c;以便排查内存泄漏 显示前10项 显示内存…...

02 CSS技巧

02 CSS技巧 clip-path 自定义形状&#xff0c;或者使用自带的属性画圆等circle HTML结构 <body><div class"container"></div> </body>CSS结构 使用*polygon*自定义形状 .container {width: 300px;height: 300px;background-color: re…...

Yarn资源调度器

文章目录 一、Yarn资源调度器1、架构2、Yarn工作机制3、HDFS、YARN、MR关系4、作业提交之HDFS&MapReduce 二、Yarn调度器和调度算法1、先进先出调度器&#xff08;FIFO&#xff09;2、容量调度器&#xff08;Capacity Scheduler&#xff09;3、公平调度器&#xff08;Fair …...

android上架备案公钥和md5获取工具

最近很多公司上架遇到了一个问题&#xff0c;就是要提供app的备案证明&#xff0c;现在android上架都需要备案了&#xff0c;但是我们的证书都是通过工具生成的&#xff0c;哪里知道公钥和md5那些东西呢&#xff1f;无论安卓备案还是ios备案都需要提供公钥和md5。 包括ios的备案…...

SpringBoot系列(12):SpringBoot集成log4j2日志配置

最近项目上有使用到log4j2日志模板配置&#xff0c;本文简单总结一下之前的学习笔记&#xff0c;如有纰漏之处&#xff0c;请批评指正。 1. log4j2日志依赖 使用log4j2日志模板时&#xff0c;需要引入相关依赖&#xff0c;下边的两种依赖方式均可。 1.1 使用sl4j依赖时 <…...

HTML事件列表

鼠标事件 属性描述DOMonclick当用户点击某个对象时调用的事件句柄。2oncontextmenu在用户点击鼠标右键打开上下文菜单时触发ondblclick当用户双击某个对象时调用的事件句柄。2onmousedown鼠标按钮被按下。2onmouseenter当鼠标指针移动到元素上时触发。2onmouseleave当鼠标指针…...

并发-Executor框架笔记

Executor框架 jdk5开始&#xff0c;把工作单元与执行机制分离开来&#xff0c;工作单元包括Runable和Callable&#xff0c;执行机制由Executor框架来提供。 Executor框架简介 Executor框架的两级调度模型 Java线程被一对一映射为本地操作系统线程 java线程启动会创建一个本…...

【C进阶】分析 C/C++程序的内存开辟与柔性数组(内有干货)

前言&#xff1a; 本文是对于动态内存管理知识后续的补充&#xff0c;以及加深对其的理解。对于动态内存管理涉及的大部分知识在这篇文章中 ---- 【C进阶】 动态内存管理_Dream_Chaser&#xff5e;的博客-CSDN博客 本文涉及的知识内容主要在两方面&#xff1a; 简单解析C/C程序…...

深入理解 JVM 之——字节码指令与执行引擎

更好的阅读体验 \huge{\color{red}{更好的阅读体验}} 更好的阅读体验 类文件结构 Write Once&#xff0c;Run Anywhere 对于 C 语言从程序到运行需要经过编译的过程&#xff0c;只有经历了编译后&#xff0c;我们所编写的代码才能够翻译为机器可以直接运行的二进制代码&#x…...

C++:vector

目录 一、关于vector 二、vector的相关函数 三、相关函数的使用 ①构造函数 ②size ③[] ​编辑 ④push_back ⑤迭代器iterator ⑥reserve ⑦resize ⑧find ⑨insert ⑩erase ⑪sort 一、关于vector vector比较像数组 观察可知&#xff0c;vector有两个模板参数…...

Android Automotive编译

系统准备 安装系统 准备一台安装Ubuntu系统的机器&#xff08;windows系统的机器可以通过WSL安装ubuntu系统&#xff09; 安装docker 本文使用docker进行编译&#xff0c;因此提前安装docker。参考网络链接安装docker并设置为不使用sudo进行docker操作。 参考链接&#xff…...

什么是50ETF期权开户条件,怎么开期权交易权限?

50ETF期权是指上证50ETF期权&#xff0c;标的物是上证50ETF&#xff0c;代码是&#xff08;510500&#xff09;&#xff0c;期权是一种在上证50ETF基础上进行衍生品交易的金融工具&#xff0c;下文科普什么是50ETF期权开户条件&#xff0c;怎么开期权交易权限&#xff1f;本文来…...

React 从入门到精通——本文来自AI创作助手

React是一个流行的JavaScript库&#xff0c;用于构建用户界面。以下是React入门到精通的步骤&#xff1a; 入门 安装React 你可以在npm上下载React包&#xff0c;也可以使用其他包管理器。首先需要安装node.js&#xff0c;然后使用以下命令安装React&#xff1a; npm insta…...

【51单片机实验笔记】前篇(三) 模块功能封装汇总(持续更新)

文章目录 通用函数public.hpublic.c 延时函数delay.hdelay.c LED模块数码管模块smg.hsmg.c LED点阵模块独立按键模块矩阵按键模块外部中断模块定时器模块串口通讯模块ADC模块PWM模块 通用函数 包含常用头文件&#xff0c;宏定义&#xff0c;自定义类型&#xff0c;函数工具等。…...

Excel VSTO开发4 -其他事件

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 4 其他事件 针对插件的事件主要有Startup、Shutdown这两个事件&#xff0c;在第2节中已经讲解。在开发窗口中&#xff0c;选择对象…...

语音识别数据的采集方法:基本流程数据类型

“人工智能是一种模仿人类功能的产品。数据采集的方法需要针对特定的场景需求。”—–Mark Brayan (澳鹏CEO) 我们一直说&#xff0c;对于一个高质量的人工智能产品离不开高质量的训练数据。对于不同的人工智能我们需要不同的数据对其训练。要采集正确的数据去训练特定的模型才…...

oracle数据库给用户授权DBA权限Oracle查看哪些用户具有DBA权限

oracle数据库给用户授权DBA权限 步骤一&#xff1a;以sysdba身份登录到Oracle数据库 在授予DBA权限之前&#xff0c;我们首先要以sysdba身份登录到Oracle数据库。使用以下命令登录&#xff1a; sqlplus / as sysdba步骤二&#xff1a;创建用户&#xff08;如有用户跳过&#…...

024-从零搭建微服务-系统服务(六)

写在最前 如果这个项目让你有所收获&#xff0c;记得 Star 关注哦&#xff0c;这对我是非常不错的鼓励与支持。 源码地址&#xff08;后端&#xff09;&#xff1a;https://gitee.com/csps/mingyue 源码地址&#xff08;前端&#xff09;&#xff1a;https://gitee.com/csps…...

Arduino驱动TCS3200传感器(颜色传感器篇)

目录 1、传感器特性 2、硬件原理图 3、控制器和传感器连线图 4、驱动程序 TCS3200颜色传感器是一款全彩的颜色检测器,包括了一块TAOS TCS3200RGB感应芯片和4个白色LED灯,TCS3200能在一定的范围内检测和测量几乎所有的可见光。TCS3200有大量的光检测器,每个都有红绿蓝和清…...

基于Matlab实现多个数字水印案例(附上源码+数据集)

数字水印是一种在数字图像或视频中嵌入特定信息的技术&#xff0c;以保护知识产权和防止盗版。在本文中&#xff0c;我们将介绍如何使用Matlab实现数字水印。 文章目录 实现步骤源码数据集下载 实现步骤 首先&#xff0c;我们需要选择一个用于嵌入水印的图像。这可以是原始图像…...

C语言之指针进阶篇(2)

目录 函数指针 函数名和&函数名 函数指针的定义 函数指针的使用 函数指针陷阱 代码1 代码2 注意 函数指针数组定义 函数指针数组的使用 指向函数指针数组的指针 书写 终于军训圆满结束了&#xff0c;首先回顾一下指针进阶篇&#xff08;1&#xff09;主要是…...

C++ 进制转化入门知识(1)

一、什么是进制 进制是一种用来表示数值的系统或方法&#xff0c;它是基于一个特定的基数来工作的。在我们常见的几种进制中&#xff0c;有&#xff1a; 1. **二进制&#xff08;基数 2&#xff09;**&#xff1a; 二进制只用两个数字&#xff1a;0和1。这是计算机内部使用…...

【React】React学习:从初级到高级(四)

React学习[四] 4 应急方案4.1 使用ref引用值4.1.1 给组件添加ref4.1.2 ref和state的不同之处4.1.3 何时使用ref 4.2 使用ref操作DOM4.2.1 获取指向节点的ref4.2.3 使用 ref 回调管理 ref 列表4.2.4 访问另一个组件的DOM节点4.2.5 用 flushSync 同步更新 state 4.3 使用Effect同…...

微信小程序登录问题(思路简略笔记)

配置问题 这是小程序登录问题&#xff0c;必要的两个配置。 流程思路 1. 微信小程序端&#xff0c;会返回一个code。 2. 查看需要返回给微信小程序端的数据。 3. 既然需要返回三个数据&#xff0c;先看openid如何拿到 WX-Login https://api.weixin.qq.com/sns/jscode2ses…...

Go 锁扩展

文章目录 TryLock统计 goroutine数量读写锁读锁写锁常见死锁情况写锁重入写锁中调用读锁循环依赖 TryLock 源码中自带的(我的go是 1.20版本)TryLock 会尝试获取锁&#xff0c;如果获取不到返回false&#xff0c;并不会进行休眠阻塞(和 Lock的主要区别) func (m *Mutex) TryLo…...

Docker的简介及安装

[shouce]http://shouce.jb51.net/docker_practice/栾一峰菜鸟教程参考文献 1 环境配置的难题 软件开发最大的麻烦事之一&#xff0c;就是环境配置。用户计算机的环境都不相同&#xff0c;你怎么知道自家的软件&#xff0c;能在那些机器跑起来&#xff1f; 用户必须保证两件事…...

安卓核心板的不同核心规格及架构介绍

安卓核心板是将核心功能封装的一块电子主板&#xff0c;集成芯片、存储器和功放器件等&#xff0c;并提供标准接口的芯片。 其特点&#xff1a; ● 能跑 Android 等操作系统 强大的功能及丰富的接口 支持 LCD/TP&#xff0c;Audio&#xff0c;Camera&#xff0c;Video&#…...

中文网站建设公司排名/重庆seo网站

以下哪种不是进程的状态 ( B ) A 运行态 B 锁定态 C 睡眠态 D 停止态 fork()的返回值不可能是&#xff08;C) A -1 B 0 C 1 D 大于10000的正整数 常用来进行多任务同步的机制是( B ) A 管道 B 信号量 C 信号 D 共享内存 下列哪个函数无法传递进程结束时的状态 ( A) A close …...

织梦移动网站后缀/企业网页

作为一个百亿级的流量实时分析统计系统怎么能没有 PV / UV 这两经典的超级玛丽亚指标呢&#xff0c;话说五百年前它俩可是鼻祖&#xff0c;咳咳...&#xff0c;不好意思没忍住&#xff0c;回归正文&#xff0c;大猪 在上一篇已经介绍了 小巧高性能ETL程序设计与实现 了&#xf…...

网站建设 东方网景/今日北京新闻

Rust既不能避免一个trait与另一个trait拥有相同名称的方法&#xff0c;也不能阻止为同一类型同时实现这两个trait。甚至可以直接在类型上实现开始已经有的同名方法&#xff01;当然&#xff0c;当调用这些同名方法时&#xff0c;你必须要告诉Rust我们使用哪一个。 下面示例代码…...

做网站编辑是不是也要做推广/上海关键词优化按天计费

中国漫画的需求量在不断增加&#xff0c;而动漫制作成本一直居高不下。究其原因为动漫制作是一个复杂且耗时的过程&#xff0c;需要大量工作人员在不同阶段进行协作。动漫制作过程中&#xff0c;需先创作关键帧草图&#xff0c;接着完成中间动作草图&#xff0c;最后在设计的颜…...

营销网站建设计划书/求职seo

纠结了很久&#xff0c;px、dp、dip、sp有什么区别&#xff1f;纠结有时候用dp和dpi感觉没什么区别之类的问题。 px&#xff08;pixel&#xff09;&#xff1a; 即像素&#xff0c;1px代表屏幕上一个物理的像素点。 px单位不被建议使用&#xff0c;因为同样100px的图片&#xf…...

dedecms做网站全教程/网络营销百度百科

最近看一朋友机器上安装Xmanager之后&#xff0c;用SecureCRT登录后&#xff0c;在字符界面下就可以直接启动图形安装程序&#xff0c;随后自己尝试安装&#xff0c;却出现问题&#xff0c;现将解决方法描述如下&#xff1a; 一、安装了Xmanager软件&#xff0c;在客户端上开启…...