当前位置: 首页 > news >正文

Java常见限流用法介绍和实现

目录

一、现象

二、工具

​​​​​​1、AtomicInteger,AtomicLong 原子类操作

​​​​​​2、Redis+Lua

​​​​​​3、Google Guava的RateLimiter

1) 使用

2) Demo

3) 优化demo

4、阿里开源的Sentinel

三、算法

1、计数限流

(1)原理

(2)实现

(3)优缺点

2、固定窗口限流

(1)原理

(2)实现

(3)优缺点

3、滑动窗口限流

(1)原理

(2)实现

(3)优缺点

4、漏桶算法

(1)原理

(2)实现

(3)优缺点

5、令牌桶算法

(1)原理

(2)实现

(3)优缺点


一、现象

为什么要限流:用于在高并发环境中保护系统资源,避免因过多请求导致系统崩溃

线上服务运行中,偶尔会遇见如Api服务瞬时请求流量过高,服务被压垮;数据处理服务处理消息队列数据,消费速度过快,导致处理性能下降甚至崩溃。

限流前:数据推送给服务处理时,速度过快,服务未限流,导致CPU突然暴涨达到临界值,处理性能底下

限流后:消费速度平稳,CPU平稳,未超限,内存上涨也未超限

可见限流是非常重要的!

二、工具

​​​​​​1、AtomicInteger,AtomicLong 原子类操作

优点

  1. 性能高:AtomicInteger 和 AtomicLong 基于 CAS(Compare-And-Swap)操作,能够实现高效的并发访问,适用于高并发场景。
  2. 轻量级:这些类作为 Java 标准库的一部分,无需引入额外的依赖,使用简单方便。
  3. 内存操作:AtomicInteger 和 AtomicLong 的操作都在内存中完成,避免了网络开销。

缺点

  1. 单机限流:AtomicInteger 和 AtomicLong 主要适用于单机环境下的限流,对于分布式系统或微服务架构来说,可能需要额外的机制来实现全局限流。
  2. 功能相对单一:AtomicInteger 和 AtomicLong 的功能较为单一,可能无法满足复杂的限流需求。
  3. CAS 的潜在问题:在高并发场景下,CAS 操作可能导致自旋等待,增加 CPU 开销。此外,如果多个线程同时修改同一个值,可能导致性能下降。

​​​​​​2、Redis+Lua

优点

  1. 灵活性高:Redis+Lua 的限流方案允许根据业务需求灵活定制限流规则,如限制特定来源 IP 或 API 的访问频率。
  2. 分布式支持:Redis 作为分布式缓存,使得限流策略可以跨多个实例或节点生效,非常适合微服务架构或分布式系统。
  3. 原子性操作:Lua 脚本在 Redis 中执行时具有原子性,避免了并发访问时可能出现的数据不一致问题。

缺点

  1. 网络开销:虽然 Redis+Lua 减少了部分网络请求,但仍然存在网络 IO 开销,尤其在高并发场景下可能会成为性能瓶颈。
  2. 依赖 Redis:该方案高度依赖 Redis 的稳定性和可用性,一旦 Redis 出现故障,限流策略可能失效。
  3. 学习成本:Redis 和 Lua 的使用需要一定的学习成本,尤其是对于那些不熟悉这两个技术的开发者来说。

​​​​​​3、Google Guava的RateLimiter

优点

  1. 基于令牌桶算法:RateLimiter采用了令牌桶算法进行限流,该算法允许突发流量的处理,当请求空闲时,可以预先生成一部分令牌,从而在新请求到达时无需等待。
  2. 实现简单:RateLimiter的使用相对简单,可以方便地集成到现有系统中。

缺点

  1. 功能相对单一:RateLimiter主要关注限流功能,对于熔断降级等复杂场景的处理能力相对较弱。
  2. 缺乏实时监测和报警机制:RateLimiter没有提供实时的系统监测和报警功能,对于系统问题的发现和解决可能不够及时。

1) 使用

引入pom

<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>27.1-jre</version>
</dependency>

常用API介绍

  • RateLimiter.create(permitsPerSecond):设置当前接口的QPS
  • rateLimiter.tryAcquire(timeout, timeUnit):尝试在一定时间内获取令牌,超时则退出
  • rateLimiter.acquire():尝试获取对应数量令牌,默认一个令牌,如果没有可以用的,则该方法将一直阻塞线程,直到RateLimiter允许获得新的许可证 

2) Demo

/*** @author : YY-帆S* @since : 2024/2/26*/
@Slf4j
@Service
@ConditionalOnProperty(value = "xxxxx", havingValue = "true")
public class ForwardAndCustomDetailConsumer {// 创建RateLimiter,这里设置每秒最多处理300条消息private final RateLimiter rateLimiter = RateLimiter.create(300.0);@KafkaListener(topics = "#{'${xxxxx}'.split(',')}", groupId = "${xxxxx}", containerFactory = "ActCommonKafkaConfig")public void consumer(ConsumerRecord<?, ?> record) {MDCHelper.fillMDC();String value = null;try {// 通过RateLimiter进行限流rateLimiter.acquire();} catch (Exception e) {xxxx;}}
}

3) 优化demo

第二点的demo配置不好变动,策略修改时需要修改代码,重新编译服务

因此优化后通过依赖注入的方式创建RateLimiter实例

1. 创建RateLimit的配置类

/*** @author YY-帆S* @Date 2024/3/8 11:42*/
@Configuration
public class RateLimiterConfig {@Resourceprivate Act71225Properties act71225Properties;@Bean(name = "act72183RateLimiter")public RateLimiter act72183OfflineRateLimiter() {return RateLimiter.create(act71225Properties.getOfflineDataRateLimit());}
}

2.修改RateLimiter实现方式

/*** @author : YY-帆S* @since : 2024/2/26*/
@Slf4j
@Service
@ConditionalOnProperty(value = "xxxxx", havingValue = "true")
public class ForwardAndCustomDetailConsumer {// 创建RateLimiter,这里设置每秒最多处理300条消息@Resource(name = "act72183RateLimiter")private RateLimiter rateLimiter; @KafkaListener(topics = "#{'${xxxxx}'.split(',')}", groupId = "${xxxxx}", containerFactory = "ActCommonKafkaConfig")public void consumer(ConsumerRecord<?, ?> record) {MDCHelper.fillMDC();String value = null;try {// 通过RateLimiter进行限流rateLimiter.acquire();} catch (Exception e) {xxxx;}}
}

4、阿里开源的Sentinel

文档:introduction | Sentinel

优点

  1. 功能丰富:Sentinel不仅支持限流功能,还提供了熔断降级、系统保护、热点参数限流等多种应用场景的支持。
  2. 细粒度的控制:Sentinel可以实现细粒度的控制,满足复杂场景下对流量和资源的精确管理需求。
  3. 强大的实时监测和报警机制:Sentinel提供了实时的系统监测和报警功能,可以及时发现并解决系统问题。
  4. 易于扩展和集成:Sentinel为开发者提供了简单易用的扩展接口,支持多种开发框架的集成,方便开发者根据需求进行定制和扩展。

缺点

  1. 学习成本可能较高:由于Sentinel的功能丰富,对于初学者来说可能需要一定的学习成本来熟悉和掌握。
  2. 可能增加系统复杂度:引入Sentinel可能会增加系统的复杂度,特别是在大型项目中,需要仔细规划和管理相关的配置和规则。

三、算法

1、计数限流

(1)原理

系统同时只能处理N个请求,保存一个计数器,开始处理时计数器+1,处理完成后计数器-1

每次请求查看计数器的值,超过阈值就拒绝

(2)实现

  1. 实现类

如下以AtomicInteger 为工具实现计数限流,即同时只能处理N个数据

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicInteger;/*** @author YY-帆S* @Date 2024/3/25 21:07*/
@Slf4j
public class AtomicCounterRateLimiter {private final AtomicInteger counter;private final int limit;public AtomicCounterRateLimiter(int limit) {this.counter = new AtomicInteger(0);this.limit = limit;}/*** 限流** @return*/public synchronized boolean tryAcquire() {//获取计数器,不超过限制则则返回true,并追加数值if (counter.get() < limit) {counter.incrementAndGet();return true;}return false;}/*** 释放** @return*/public synchronized boolean tryRelease() {//获取计数器,不超过限制则则返回true,并追加数值if (counter.get() > 0) {counter.decrementAndGet();return true;}return false;}
}

     2. 测试类

import org.junit.Test;/*** @author YY-帆S* @Date 2024/3/25 21:27*/
public class AtomicCounterRateLimiterTest {@Testpublic void testLimit() {//创建一个限流器,允许同时只能处理10个请求AtomicCounterRateLimiter atomicCounterRateLimiter = new AtomicCounterRateLimiter(10);// 模拟15个请求,每个请求间隔100毫秒for (int i = 0; i < 15; i++) {new Thread(() -> {// 尝试获取许可if (atomicCounterRateLimiter.tryAcquire()) {System.out.println(Thread.currentThread().getName() + " 获得了许可,执行操作。");// 模拟请求之间的间隔try {Thread.sleep((long) (Math.random() * 100));} catch (InterruptedException e) {e.printStackTrace();}atomicCounterRateLimiter.tryRelease();} else {System.out.println(Thread.currentThread().getName() + " 请求被拒绝。");}}).start();}}}

执行结果:

Thread-0 获得了许可,执行操作。
Thread-1 获得了许可,执行操作。
Thread-4 获得了许可,执行操作。
Thread-2 获得了许可,执行操作。
Thread-3 获得了许可,执行操作。
Thread-5 获得了许可,执行操作。
Thread-7 获得了许可,执行操作。
Thread-6 获得了许可,执行操作。
Thread-8 获得了许可,执行操作。
Thread-9 获得了许可,执行操作。
Thread-10 请求被拒绝。
Thread-11 请求被拒绝。
Thread-13 请求被拒绝。
Thread-14 请求被拒绝。
Thread-12 请求被拒绝。

(3)优缺点

优点:简单,代码好实现,单机可用Atomic 等原子类、分布式集群可以用Redis

缺点:扛不住突发性的流量,假设阈值=1w,即服务器可以同时处理1w个请求,当1w个请求在1s内同时涌进来时,服务有可能扛不住

2、固定窗口限流

(1)原理

固定窗口限流是在计数限流的概念上,加上了时间窗口的概念,计数器每过一个时间窗口就重置为0,限流规则如在N秒内只允许处理M个请求

(2)实现

如下以Redis+Lua为工具实现固定时间窗口限流,即 N秒内同时只能处理M个数据

import com.bigo.web.yummy.center.dao.redis.RedisDao;
import com.bigo.web.yummy.inner.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;/*** @author YY-帆S* @Date 2024/3/25 18:30*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RedisFixedTimeRateLimit {@ResourceRedisDao redisDao;private boolean tryAcquire() {String key = "limit:test";//同时只能处理10个请求int limitCount = 10;//1s内int second = 1;String luaCode = "local key = KEYS[1]\n" +"local limit = tonumber(ARGV[1])\n" +"local expire_time = ARGV[2]\n" +"local is_exists = redis.call(\"EXISTS\", key)\n" +"if is_exists == 1 then\n" +"    if redis.call(\"INCR\", key) > limit then\n" +"        return false\n" +"    else\n" +"        return true\n" +"    end\n" +"else\n" +"    redis.call(\"INCRBY\", key, 1)\n" +"    redis.call(\"EXPIRE\", key, expire_time)\n" +"    return true\n" +"end";List<String> keys = new ArrayList<>();keys.add(key);List<String> values = new ArrayList<>();values.add(String.valueOf(limitCount));values.add(String.valueOf(second));RedisTemplate<String, String> redisTemplate = redisDao.getLongCodis().getRedisTemplate();DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(luaCode, Boolean.class);return redisTemplate.execute(redisScript, keys, String.valueOf(limitCount), String.valueOf(second));}@Testpublic void testLimit() {// 模拟15个请求for (int i = 0; i < 15; i++) {new Thread(() -> {// 尝试获取许可if (tryAcquire()) {System.out.println(Thread.currentThread().getName() + " 获得了许可,执行操作。");} else {System.out.println(Thread.currentThread().getName() + " 请求被拒绝。");}}).start();}}
}

(3)优缺点

优点:简单,代码好实现,单机可用Atomic 等原子类、分布式集群可以用Redis

缺点:限流机制不够平滑,如每秒允许请求100个请求,在第一毫秒内就请求了100个请求,此后都开始限流,导致剩余窗口内的所有请求都会被拒绝
又如在最后1个毫秒内请求了100个请求,下一个毫秒开始新的时间窗口,计数清0,此时又涌入了100个请求,虽说固定时间窗口内没有超过阈值,但是全局看来,这两个毫秒内就涌入了200个请求,对于限流100的概念是不可接受的

3、滑动窗口限流

(1)原理

在固定时间窗口的基础上进行优化,对大的时间窗口进行划分,每个小窗口对应大窗口中的不同时间点,每个窗口独立计数。随时间的变化,小窗口随之平移,并且重置/舍弃过期的小窗口,每个小窗口的计数器相加,不超过大窗口的限流limit,即限流阈值之内。

(2)实现

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;/*** @author YY-帆S* @Date 2024/3/26 15:04*/
public class SlidingWindowRateLimiter {private int windowSize; //时间窗口大小, Unit: sprivate int slotNum; //用于统计的子窗口数量,默认为10private int slotTime; //子窗口的时间长度, Unit: msprivate int limit; //限流阈值/*** 存放子窗口统计结果的数组* note: counters[0]记为数组左边, counters[size-1]记为数组右边*/private AtomicInteger[] counters;private long lastTime;//初始化public SlidingWindowRateLimiter(int windowSize, int limit, int slotNum) {this.windowSize = windowSize;this.limit = limit;this.slotNum = slotNum;// 计算子窗口的时间长度: 时间窗口 / 子窗口数量this.slotTime = windowSize * 1000 / slotNum;this.lastTime = System.currentTimeMillis();this.counters = new AtomicInteger[slotNum];resetCounters();}public SlidingWindowRateLimiter(int windowSize, int limit) {this(windowSize, limit, 10);}private void resetCounters() {for (int i = 0; i < counters.length; i++) {counters[i] = new AtomicInteger(0); // 每个数组元素都是一个新的AtomicInteger实例}}/*** 限流请求* @return*/public synchronized boolean tryAcquire() {long currentTime = System.currentTimeMillis();// 计算滑动数, 子窗口统计时所对应的时间范围为左闭右开区间, 即[a,b)int slideNum = (int) Math.floor((currentTime - lastTime) / slotTime);// 滑动窗口slideWindow(slideNum);// 统计滑动后的数组之和int sum = Arrays.stream(counters).mapToInt(AtomicInteger::get).sum();// 以达到当前时间窗口的请求阈值, 故被限流直接返回falseif (sum >= limit) {return false;} else {    // 未达到限流, 故返回truecounters[slotNum - 1].incrementAndGet();return true;}}/*** 将数组元素全部向左移动num个位置** @param num*/private void slideWindow(int num) {if (num == 0) {return;}// 数组中所有元素都会被移出, 故直接全部清零if (num >= slotNum) {resetCounters();} else {// 对于a[0]~a[num-1]而言, 向左移动num个位置后, 则直接被移出了// 故从a[num]开始移动即可for (int index = num; index < slotNum; index++) {// 计算a[index]元素向左移动num个位置后的新位置索引int newIndex = index - num;counters[newIndex] = counters[index];counters[index].getAndSet(0);}}// 更新时间lastTime = lastTime + num * slotTime;}public static void main(String[] args) throws InterruptedException {//例子:5s内只能有50个请求SlidingWindowRateLimiter rateLimiter = new SlidingWindowRateLimiter(5, 50);int allNum = 3;  // 请求总数int passNum = 0; // 通过数int blockNum = 0; // 被限流数//模拟连续请求for (int i = 0; i < allNum; i++) {if (rateLimiter.tryAcquire()) {passNum++;} else {blockNum++;}}System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);// 延时以准备下一次测试Thread.sleep(5000);allNum = 100;passNum = 0;blockNum = 0;//模拟连续请求for (int i = 0; i < allNum; i++) {if (rateLimiter.tryAcquire()) {passNum++;} else {blockNum++;}}System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);}
}

 执行结果:

请求总数: 3, 通过数: 3, 被限流数: 0
请求总数: 100, 通过数: 50, 被限流数: 50

(3)优缺点

优点:避免了固定窗口算法可能出现的窗口切换时的流量峰值,使得流量控制更为平滑
缺点:

  1. 对时间区间精度要求越高,算法所需的空间容量越大,需要更多的计算和存储资源
  2. 还是存在限流不够平滑的问题。例如:限流是每秒100个,在第一毫秒发送了100个请求,达到限流,剩余窗口时间的请求都将会被拒绝

4、漏桶算法

(1)原理

该算法使用“桶”来比喻,不断有水(请求)进入桶内并以固定速率进行处理,模拟桶中的“泄漏”,当加水速度>漏水速度时,直到某一个时刻,存储桶己满,新的请求将被丢弃,直到有可用空间。
 

(2)实现

import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicInteger;/*** @author YY-帆S* @Date 2024/3/26 22:46*/
@Slf4j
public class LeakyBucketRateLimiter {private AtomicInteger bucketLevel; // 当前桶中的请求数量private int capacity; // 桶的容量private long leakRate; // 漏水速率,单位:请求/秒private long lastLeakTime; // 上一次漏水的时间戳public LeakyBucketRateLimiter(int capacity, long leakRate) {this.capacity = capacity;this.leakRate = leakRate;this.bucketLevel = new AtomicInteger(0);this.lastLeakTime = System.currentTimeMillis();}public synchronized boolean tryAcquire() {// 获取当前时间long currentTime = System.currentTimeMillis();//流出时间long elapsedTime = currentTime - lastLeakTime;//计算流出的水量 = (当前时间 - 上次时间) * 出水速率long leaked = (long) (elapsedTime * (leakRate / 1000.0));//只有有流出水才更新时间戳,不然会漏不出水if (leaked > 0) {//计算桶内水量 = 桶内当前水量 - 流出的水量int newLevel = Math.max(0, bucketLevel.get() - (int) leaked);bucketLevel.set(newLevel);//更新上次漏水时间戳lastLeakTime = currentTime;}// 尝试将请求加入桶中if (bucketLevel.get() < capacity) {bucketLevel.incrementAndGet();return true;} else {return false;}}public static void main(String[] args) throws InterruptedException {LeakyBucketRateLimiter limiter = new LeakyBucketRateLimiter(1, 1); // 容量为1,漏水速率为1请求/秒// 模拟发送请求for (int i = 0; i < 20; i++) {new Thread(() -> {if (limiter.tryAcquire()) {log.info(Thread.currentThread().getName() + " 获得了许可,执行操作。");} else {log.info(Thread.currentThread().getName() + " 请求被拒绝。");}}).start();//模拟执行时间Thread.sleep(500);}}}

 例子是1s/1个请求,执行结果一致

23:05:15.200 INFO - Thread-0 获得了许可,执行操作。
23:05:15.705 INFO - Thread-1 请求被拒绝。
23:05:16.215 INFO - Thread-2 获得了许可,执行操作。
23:05:16.724 INFO - Thread-3 请求被拒绝。
23:05:17.233 INFO - Thread-4 获得了许可,执行操作。
23:05:17.741 INFO - Thread-5 请求被拒绝。
23:05:18.252 INFO - Thread-6 获得了许可,执行操作。
23:05:18.762 INFO - Thread-7 请求被拒绝。
23:05:19.273 INFO - Thread-8 获得了许可,执行操作。
23:05:19.785 INFO - Thread-9 请求被拒绝。
23:05:20.299 INFO - Thread-10 获得了许可,执行操作。
23:05:20.813 INFO - Thread-11 请求被拒绝。
23:05:21.327 INFO - Thread-12 获得了许可,执行操作。
23:05:21.840 INFO - Thread-13 请求被拒绝。
23:05:22.353 INFO - Thread-14 获得了许可,执行操作。
23:05:22.867 INFO - Thread-15 请求被拒绝。
23:05:23.382 INFO - Thread-16 获得了许可,执行操作。
23:05:23.896 INFO - Thread-17 请求被拒绝。
23:05:24.411 INFO - Thread-18 获得了许可,执行操作。
23:05:24.925 INFO - Thread-19 请求被拒绝。

(3)优缺点

优点:

  1. 平滑流量输出:漏桶算法可以有效地平滑网络上的突发流量,为网络提供一个稳定的流量输出。通过将流量注入到漏桶中,并根据桶的漏水速率来控制流量的输出,可以确保流量的平稳性。
  2. 防止流量冲击:由于漏桶具有缓存功能,当流量突发超过设定阈值时,超出的部分可以被暂存在桶中/直接丢弃,从而避免了流量冲击对系统造成的压力。

缺点:

  1. 灵活性相对较差:漏桶算法的速率是恒定的,不能根据实际需要动态调整。这可能导致在某些情况下,系统无法充分利用网络资源,造成一定的资源浪费。
  2. 无法应对突发流量:由于漏桶的出口速度是固定的,在面对突发流量时,即使是在流量较小的情况下,仍然是以固定速率处理,也无法以更快的速度处理请求

5、令牌桶算法

(1)原理

        与漏桶算法相反,系统以固定的速率往桶里放入令牌,称为令牌桶,如果有请求需要这个令牌,这可以从桶里拿一个,拿到了令牌即允许放行,直到令牌被拿完即令牌不足,则请求需等待或被丢弃。

        令牌的数量与时间和发放速率强相关,随着时间流逝,系统会不断往桶里放入更多的令牌,如果放令牌的速度比拿令牌的速度快,则令牌桶最终会被放满。

(2)实现

推荐用Google-Guava的RateLimiter,比较成熟,如上第二点工具有例子,但是Google-Guava的RateLimiter是基于单机的限流工具。

如下基于Redis+lua实现一个令牌桶限流算法,可以实现分布式集群限流的目的。

  1. 实现代码

lua限流脚本和初始化脚本

-- KEYS[1]: 令牌桶的key,格式为 "rate_limiter:{bucket_key}"
-- ARGV[1]: 请求的令牌数量
-- ARGV[2]: 当前时间戳(可选,用于避免Redis服务器与客户端时间不同步的问题)local key = KEYS[1]  --令牌桶的key
local tokens_requested = tonumber(ARGV[1]) --请求的令牌数量
local now = tonumber(ARGV[2]) or redis.call('TIME')[1] --当前时间戳(可选,用于避免Redis服务器与客户端时间不同步的问题)-- 获取当前桶中的令牌数和上次刷新时间
local ratelimit_info = redis.call("HMGET", key, "last_refreshed", "current_permits", "capacity", "rate")
local last_refreshed = ratelimit_info[1]
local current_permits = tonumber(ratelimit_info[2])
local capacity = tonumber(ratelimit_info[3])  --桶的容量
local fill_rate = tonumber(ratelimit_info[4]) --每秒添加的令牌数量(即速率)-- 初始化当前桶容量
local local_curr_permits = capacity;-- 如果上次刷新时间不存在,则初始化为当前时间
if (last_refreshed == nil or type(last_refreshed) =='boolean') thenlast_refreshed = nowredis.call("HMSET", key, "last_refreshed", now);
else--计算该时间间隔内加入了多少令牌local reverse_permits = (now - last_refreshed) * fill_rateif (reverse_permits > 0) thenredis.call("HMSET", key, "last_refreshed", now);end--计算当前总共有多少令牌,期望值local expect_curr_permits = reverse_permits + current_permits-- 与桶最大容量做对比local_curr_permits = math.min(expect_curr_permits, capacity);
endif (local_curr_permits >= tokens_requested) thenlocal_curr_permits = local_curr_permits - tokens_requestedredis.call("HMSET", key, "current_permits", local_curr_permits);return 1 -- 表示成功获取令牌
elseredis.call("HMSET", key, "current_permits", local_curr_permits);return 0 -- 表示获取令牌失败
end------ 初始化lua ------
local result = 1
redis.pcall("HMSET", KEYS[1],"last_refreshed", ARGV[1],"current_permits", ARGV[2],"capacity", ARGV[3],"rate", ARGV[4])
return result

 Redis+Lua限流核心逻辑:

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.ArrayList;
import java.util.List;/*** @author YY-帆S* @Date 2024/3/27 11:17*/
@Slf4j
public class TokenBucketRateLimiter {//Redis缓存key前缀private static final String KEY_PREFIX = "TokenRateLimiter:";/*** 初始化lua脚本*/private static final String rateLimitInitLuaCode = "local result = true\n" +"redis.pcall(\"HMSET\", KEYS[1],\n" +"        \"capacity\", ARGV[1],\n" +"        \"rate\", ARGV[2])\n" +"return true";/*** 令牌桶锁lua脚本*/private static final String rateLimitLuaCode = "" +"-- KEYS[1]: 令牌桶的key,格式为 \"rate_limiter:{bucket_key}\"\n" +"-- ARGV[1]: 请求的令牌数量\n" +"-- ARGV[2]: 当前时间戳(可选,用于避免Redis服务器与客户端时间不同步的问题)\n" +"\n" +"local key = KEYS[1]  --令牌桶的key\n" +"local tokens_requested = tonumber(ARGV[1]) --请求的令牌数量\n" +"local now = tonumber(ARGV[2]) or redis.call('TIME')[1] --当前时间戳(可选,用于避免Redis服务器与客户端时间不同步的问题)\n" +"\n" +"-- 获取当前桶中的令牌数和上次刷新时间\n" +"local ratelimit_info = redis.call(\"HMGET\", key, \"last_refreshed\", \"current_permits\", \"capacity\", \"rate\")\n" +"local last_refreshed = ratelimit_info[1]\n" +"local current_permits = tonumber(ratelimit_info[2])\n" +"local capacity = tonumber(ratelimit_info[3])  --桶的容量\n" +"local fill_rate = tonumber(ratelimit_info[4]) --每秒添加的令牌数量(即速率)\n" +"\n" +"-- 初始化当前桶容量\n" +"local local_curr_permits = capacity;\n" +"\n" +"-- 如果上次刷新时间不存在,则初始化为当前时间\n" +"if (last_refreshed == nil or type(last_refreshed) =='boolean') then\n" +"    last_refreshed = now\n" +"    redis.call(\"HMSET\", key, \"last_refreshed\", now);\n" +"else\n" +"    --计算该时间间隔内加入了多少令牌\n" +"    local reverse_permits = (now - last_refreshed)* fill_rate\n" +"    if (reverse_permits > 0) then\n" +"        redis.call(\"HMSET\", key, \"last_refreshed\", now);\n" +"    end\n" +"\n" +"    --计算当前总共有多少令牌,期望值\n" +"    local expect_curr_permits = reverse_permits + current_permits\n" +"    -- 与桶最大容量做对比\n" +"    local_curr_permits = math.min(expect_curr_permits, capacity);\n" +"end\n" +"\n" +"if(local_curr_permits >= tokens_requested) then\n" +"    local_curr_permits = local_curr_permits - tokens_requested\n" +"    redis.call(\"HMSET\", key, \"current_permits\", local_curr_permits);\n" +"    return true -- 表示成功获取令牌\n" +"else\n" +"    redis.call(\"HMSET\", key, \"current_permits\", local_curr_permits);\n" +"    return false -- 表示获取令牌失败\n" +"end\n";RedisTemplate redisTemplate;//初始化构造器public TokenBucketRateLimiter(RedisTemplate redisTemplate, int permitsPerSecond, String bucketKey) {//初始化构造信息List<String> keys = new ArrayList<>();keys.add(getRateLimiterKey(bucketKey));this.redisTemplate = redisTemplate;DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(rateLimitInitLuaCode, Boolean.class);this.redisTemplate.execute(redisScript, keys, String.valueOf(permitsPerSecond), String.valueOf(permitsPerSecond));}//构造令牌桶缓存keypublic String getRateLimiterKey(String bucketKey) {return KEY_PREFIX + bucketKey;}//默认一次拿一个令牌public boolean tryAcquire(String bucketKey) {return tryAcquire(1, bucketKey);}/*** 核心逻辑,获取令牌桶数据* @param request* @param bucketKey* @return*/public boolean tryAcquire(int request, String bucketKey) {DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(rateLimitLuaCode, Boolean.class);List<String> keys = new ArrayList<>();keys.add(getRateLimiterKey(bucketKey));return (boolean) this.redisTemplate.execute(redisScript, keys, String.valueOf(request), String.valueOf(System.currentTimeMillis() / 1000));}
}

        2. 测试代码

import com.bigo.web.live.TimeUtil;
import com.bigo.web.yummy.center.dao.redis.RedisDao;
import com.bigo.web.yummy.inner.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;/*** @author YY-帆S* @Date 2024/3/27 13:25*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class TokenBucketRateLimiterTest {@ResourceRedisDao redisDao;@Testpublic void testTokenBucketRateLimiter() throws InterruptedException {//令牌桶String bucketKey = "yyfsTest";// 每秒新增3个桶TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(redisDao.getLongCodis().getRedisTemplate(), 3, bucketKey);// 模拟发送请求int allNum = 4;  // 请求总数//模拟连续请求for (int i = 0; i < allNum; i++) {if (limiter.tryAcquire(bucketKey)) {System.out.println(TimeUtil.parseTimestampToString("yyyy-MM-dd HH:mm:ss", System.currentTimeMillis()) + " 获得了许可,执行操作。");} else {System.out.println(TimeUtil.parseTimestampToString("yyyy-MM-dd HH:mm:ss", System.currentTimeMillis()) + " 请求被拒绝。");}}// 延时以准备下一次测试Thread.sleep(5000);allNum = 20;for (int i = 0; i < allNum; i++) {if (limiter.tryAcquire(bucketKey)) {System.out.println(TimeUtil.parseTimestampToString("yyyy-MM-dd HH:mm:ss", System.currentTimeMillis()) + " 获得了许可,执行操作。");} else {System.out.println(TimeUtil.parseTimestampToString("yyyy-MM-dd HH:mm:ss", System.currentTimeMillis()) + " 请求被拒绝。");}Thread.sleep(200);}}
}

初始化结果

执行过程结果

执行输出结果

(3)优缺点

优点:

  1. 应对突发流量:令牌桶算法允许流量突发,当桶满时,系统能以最大的速度处理请求
  2. 灵活性:算法允许根据实际需求调整令牌生成速率和令牌桶大小等参数
  3. 限制平均速度:长期运行的服务,数据处理速度最终会动态平衡,限制在预定义的平均速率,即生成令牌的速率

缺点:

  1. 导致过载的可能性:要控制令牌的产生速度,如果令牌产生的速度过快,可能会导致大量的突发流量,这可能会使网络或服务过载。
  2. 内存资源限制:令牌桶需要一定的存储空间来保存令牌,可能会导致内存资源的浪费。且对于特别频繁的请求,令牌桶算法可能会占用较多的计算资源,增加系统负担。
  3. 实现稍复杂:相比于计数器等其他限流算法,令牌桶算法的实现稍微复杂一些

参考文档:

https://zhuanlan.zhihu.com/p/494782784

https://blog.csdn.net/weixin_45583158/article/details/135664278

相关文章:

Java常见限流用法介绍和实现

目录 一、现象 二、工具 ​​​​​​1、AtomicInteger,AtomicLong 原子类操作 ​​​​​​2、RedisLua ​​​​​​3、Google Guava的RateLimiter 1&#xff09; 使用 2&#xff09; Demo 3&#xff09; 优化demo 4、阿里开源的Sentinel 三、算法 1、计数限流 &…...

算法——图论:判断二分图(染色问题)

题目&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 方法一&#xff1a;并查集 class Solution { public:vector<int>father;int find(int x){if (father[x] ! x)father[x] find(father[x]);return father[x];}void add(int x1, int x2){int fa1 find(x1), f…...

三步提升IEDA下载速度——修改IDEA中镜像地址

找到IDEA的本地安装地址 D:\tool\IntelliJ IDEA 2022.2.4\plugins\maven\lib\maven3\conf 搜索阿里云maven仓库 复制https://developer.aliyun.com/mvn/guide中红框部分代码 这里也是一样的&#xff1a; <mirror><id>aliyunmaven</id><mirrorOf>*&…...

CentOS7 RPM升级支持BBR TCP/CC的内核版本

列出安装的内核 rpm -qa kernel # yum list installed kernel 删除已安装内核 sudo dnf remove kernel-4.0.4-301.fc22.x86_64 安装内核 rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.org rpm -Uvh http://www.elrepo.org/elrepo-release-7.0-2.el7.elrepo.noar…...

文本向量模型BGE与BGE-M3

BGE模型 BGE模型对应的技术报告为《C-Pack: Packaged Resources To Advance General Chinese Embedding》 训练数据 为了训练BGE向量模型&#xff0c;构建了C-MTP数据集&#xff0c;它包括了用来训练文本向量的文本对数据&#xff08;问答对、两个同义句子、相同主题的两个文…...

【黑马头条】-day04自媒体文章审核-阿里云接口-敏感词分析DFA-图像识别OCR-异步调用MQ

文章目录 day4学习内容自媒体文章自动审核今日内容 1 自媒体文章自动审核1.1 审核流程1.2 内容安全第三方接口1.3 引入阿里云内容安全接口1.3.1 添加依赖1.3.2 导入aliyun模块1.3.3 注入Bean测试 2 app端文章保存接口2.1 表结构说明2.2 分布式id2.2.1 分布式id-技术选型2.2.2 雪…...

新能源充电桩站场AI视频智能分析烟火检测方案及技术特点分析

新能源汽车充电起火的原因多种多样&#xff0c;涉及技术、设备、操作等多个方面。从技术层面来看&#xff0c;新能源汽车的电池管理系统可能存在缺陷&#xff0c;导致电池在充电过程中出现过热、短路等问题&#xff0c;从而引发火灾。在设备方面&#xff0c;充电桩的设计和生产…...

springboot集成logback-spring.xml文件

彩色日志日志分debug和error文件输出&#xff0c;方便开发人员运维日志限制最大保管天数日志限制总量大小占用量GB日志限制单个文件大小MB日志显示最大保留天数屏蔽没用的日志 <?xml version"1.0" encoding"UTF-8"?> <!--~ Copyright (c) 2020…...

centos7 安装 nginx

一、yum 方式安装 1.安装yum工具 sudo yum install yum-utils 2. 安装epel yum install epel-release 3.安装nginx&#xff1a; yum install nginx 4.查看版本 nginx -v 5.设置开机自启动 systemctl enable nginx nginx 常用命令&#xff1a; 1&#xff09;启动nginx …...

29. UE5 RPG应用GamplayAbility

前面几篇文章&#xff0c;总算把GE给更新完了&#xff0c;GE的基础应用也算讲清楚了。接下来&#xff0c;我们将更新GA的相应的课程了&#xff0c;首先&#xff0c;这一篇先对GA做一个简单的介绍&#xff0c;然后实现一下如何实现给角色应用一个GA。 简介 GamplayAbility 简称…...

http和https的区别!

HTTP 明文传输&#xff0c;数据都是未加密的&#xff0c;安全性较差&#xff0c;HTTPS&#xff08;SSLHTTP&#xff09; 数据传输过程是加密的&#xff0c;安全性较好。 使用 HTTPS 协议需要到 CA&#xff08;Certificate Authority&#xff0c;数字证书认证机构&#xff09; …...

使用AOP实现打印日志

首先创建annotation.SystemLog类&#xff1a; package com.gjh.annotation;import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;Target(ElementType.METHOD…...

2024年新算法-冠豪猪优化算法(CPO),CPO-RF-Adaboost,CPO优化随机森林RF-Adaboost回归预测-附代码

冠豪猪优化算法&#xff08;CPO&#xff09;是一种基于自然界中猪群觅食行为启发的优化算法。该算法模拟了猪群在寻找食物时的集群行为&#xff0c;通过一系列的迭代过程来优化目标函数&#xff0c;以寻找最优解。在这个算法中&#xff0c;猪被分为几个群体&#xff0c;每个群体…...

浅谈高阶智能驾驶-NOA领航辅助的技术与发展

浅谈高阶智能驾驶-NOA领航辅助的技术与发展 附赠自动驾驶学习资料和量产经验&#xff1a;链接 2019年在国内首次试驾特斯拉NOA领航辅助驾驶的时候&#xff0c;当时兴奋的觉得未来已来;2020年在试驾蔚来NOP领航辅助驾驶的时候&#xff0c;顿时不敢小看国内新势力了;现在如果哪家…...

大模型 智能体 智能玩具 智能音箱 构建教程 wukong-robot

视频演示 10:27 一、背景 继上文《ChatGPT+小爱音响能擦出什么火花?》可以看出大伙对AI+硬件的结合十分感兴趣,但上文是针对市场智能音响的AI植入,底层是通过轮询拦截,算是hack兼容,虽然官方有提供开发者接口,也免不了有许多局限性(比如得通过特定指令唤醒),不利于我…...

Clickhouse-表引擎探索之MergeTree

引言 前文曾说过&#xff0c;Clickhouse是一个强大的数据库Clickhouse-一个潜力无限的大数据分析数据库系统 其中一个强大的点就在于支持各类表引擎以用于不同的业务场景。 MergeTree MergeTree系列的引擎被设计用于插入极大量的数据到一张表当中。数据可以以数据片段的形式一…...

网络电视盒子哪个好?小编分享电视盒子品牌排行榜

电视盒子使用频率高&#xff0c;功能丰富&#xff0c;价格划算&#xff0c;是我们日常不可或缺的部分&#xff0c;小编经常会被问到与电视盒子相关的问题&#xff0c;考虑到很多朋友并不了解网络电视盒子哪个好&#xff0c;这次我来分享业内权威电视盒子品牌排行榜&#xff0c;…...

开源模型应用落地-baichuan2模型小试-入门篇(三)

一、前言 相信您已经学会了如何在Windows环境下以最低成本、无需GPU的情况下运行baichuan2大模型。现在,让我们进一步探索如何在Linux环境下,并且拥有GPU的情况下运行baichuan2大模型,以提升性能和效率。 二、术语 2.1. CentOS CentOS是一种基于Linux的自由开源操作…...

景联文科技高质量大模型训练数据汇总!

3月25日&#xff0c;2024年中国发展高层论坛年会上&#xff0c;国家数据局局长刘烈宏在“释放数据要素价值&#xff0c;助力可持续发展”的演讲中表示&#xff0c;中国10亿参数规模以上的大模型数量已超100个。 当前&#xff0c;国内AI大模型发展仍面临诸多困境。其中&#xff…...

【python】正则表达式

文章目录 正则表达式对象re.RegexObjectre.MatchObject符号说明匹配基础匹配?=、?<=、?!、?<!字符类re模块编译正则表达式compile 函数匹配字符串re.matchre.searchre.findall...

学习vue3第十二节(组件的使用与类型)

1、组件的作用用途 目的&#xff1a; 提高代码的复用度&#xff0c;和便于维护&#xff0c;通过封装将复杂的功能代码拆分为更小的模块&#xff0c;方便管理&#xff0c; 当我们需要实现相同的功能时&#xff0c;我们只需要复用已经封装好的组件&#xff0c;而不需要重新编写相…...

flume配置文件后不能跟注释!!

先总结&#xff1a;Flume配置文件后面&#xff0c;不能跟注释&#xff0c;可以单起一行写注释 报错代码&#xff1a; [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows. org.apache.flume.EventDel…...

【docker】Dockerfile自定义镜像

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;中间件 ⛺️稳中求进&#xff0c;晒太阳 1.Dockerfile自定义镜像 常见的镜像在DockerHub就能找到&#xff0c;但是我们自己写的项目就必须自己构建镜像了。 而要自定义镜像&#xff0c;就…...

webpack项目打包console git分支、打包时间等信息 exec

相关链接 MDN toLocaleString child_process Node.js strftime 格式 代码 buildinfo.js const { execSync, exec } require("child_process"); // exec: 在 Windows 执行 bat 和 cmd 脚本// execSync 同步 // exec 异步// exec 使用方法 // exec(git show -s,…...

Linux centos7离线搭建FTP

1、下载、安装ftp 下载ftp安装包&#xff0c;可以从rpm下载站搜索合适的版本&#xff0c;使用wget命令下载。 wget https://mirrors.aliyun.com/centos/7/os/x86_64/Packages/vsftpd-3.0.2-28.el7.x86_64.rpm 安装&#xff1a; rpm -ivh vsftpd-3.0.2-28.el7.x86_64.rpm 2…...

关于GPT-SoVITS语音合成的效果展示(西游之西天送葬团)

目录 使用效果总结合成效果展示 使用效果总结 使用的是2024年03月21日22点28分更新的版本。 使用起来很方便&#xff0c;从它“自带界面”这点就能看出&#xff0c;易于使用也是目的之一&#xff0c;而且从训练到推理的每个步骤都能在界面中完成。 集成了多个实用工具&#…...

如何安装OceanBase的OBD

选择一&#xff1a;借助 all-in-one 安装包安装 OBD&#xff08;推荐&#xff09; OceanBase 社区版的all-in-one安装包是一个集成了多种工具的一键式安装包。它包含了数据库软件本身&#xff0c;以及OBD、OBProxy、OBClient&#xff0c;自4.1版本起&#xff0c;还额外加入了O…...

Unity 读写Excel打包后无法运行可能的解决方案

读写Excel打包后无法运行可能的解决方案 &#x1f4a1;.适用于NPOI、EPPlus。 &#x1f4a1;.下载 资源包&#x1f448;,解压后把dll放到Assets目录中再重新打包即可。...

算法沉淀 —— 深度搜索(dfs)

算法沉淀 —— 深度搜索&#xff08;dfs&#xff09; 一、计算布尔二叉树的值二、求根节点到叶节点数字之和三、二叉树剪枝四、验证二叉搜索树五、二叉搜索树中第K小的元素 一、计算布尔二叉树的值 【题目链接】&#xff1a;2331. 计算布尔二叉树的值 【题目】&#xff1a; …...

#设计模式#3.1用做松鼠桂鱼来理解抽象工厂(对象创建型模式)

概念&#xff1a;xx工厂&#xff0c;xx产品 区分 工厂是动作&#xff0c;产品是结果&#xff08;菜品&#xff09; 概念&#xff1a;抽象xx&#xff0c;具体xx 区分 抽象产品&#xff1a;“中式菜品” 具体产品&#xff1a;“麻婆豆腐”、“宫保鸡丁” 抽象工厂&#xff1a;“…...

wordpress 发布文章主动推送百度/泰州seo外包公司

从螺杆压缩机的结构来说&#xff0c;转子是负责在转动过程中完成气体压缩的核心零件&#xff0c;转子和转子之间&#xff0c;以及转子与壳体之间必然存在间隙。通过这一间隙&#xff0c;气体会从高压侧向低压侧流动&#xff0c;这就形成了螺杆压缩机的内泄漏。如此&#xff0c;…...

环保业网站建设的策划/提高工作效率的工具

冷水江免费上门监控安装店 [sw888lsa]、系统集成&#xff1a;楼宇自控、电话交换机、机房工程、监控系统、防盗报警、公共广播、门禁系统、楼宇对讲、一卡通、停车管理、消防系统。、网络维护&#xff1a;WIFI覆盖&#xff0c;机房建设&#xff0c;机房维护&#xff0c;服务器维…...

电子商务网站规划的原则/郑州seo多少钱

常规A纸尺寸大小&#xff1a;&#xff08;单位&#xff1a;毫米&#xff09;A6纸尺寸&#xff1a;105148&#xff1b;&#xff08;64开纸&#xff09;A5纸尺寸&#xff1a;148210&#xff1b;&#xff08;32开纸&#xff09;A4纸尺寸&#xff1a;210297&#xff1b;&#xff08…...

教怎么做ppt的网站/域名备案官网

创建一个不显示实际进度的循环滚动进度条&#xff0c;类似于Windows XP启动界面的进度条 新建一个MFC项目TestProgress&#xff0c;编辑界面&#xff0c;添加一个进度条&#xff0c;两个按钮&#xff0c;实现点击start进度条循环滚动&#xff0c;点击stop进度条消失 右键进度条…...

网站开发费入什么科目/网页制作网站

Kotlin注解之JvmName 我们首先定义给 Int定义一个扩展函数。 fun Int.compare(num: Int): Int {return if (this > num) {this} else num } 在Kotlin代码中我们可以直接调用&#xff1a; println(2.compare(1)) 但是在 Java代码中我们的调用方式变为&#xff1a; public s…...

绵阳网站建设优化/深圳seo公司助力网络营销飞跃

之前文章中我们讲到&#xff0c;java中实现同步的方式是使用synchronized block。在java 5中&#xff0c;Locks被引入了&#xff0c;来提供更加灵活的同步控制。本文将会深入的讲解Lock的使用。Lock和Synchronized Block的区别我们在之前的Synchronized Block的文章中讲到了使用…...