Java Resilience4j-RateLimiter学习
一. 介绍
Resilience4j-RateLimiter 是 Resilience4j 中的一个限流模块,我们对 Resilience4j 的 CircuitBreaker、Retry 已经有了一定的了解,现在来学习 RateLimiter 限流器;
引入依赖;
<dependency><groupId>io.github.resilience4j</groupId><artifactId>resilience4j-ratelimiter</artifactId><!--jdk17对应的版本--><version>2.2.0</version>
</dependency>
二. 配置项
和 Retry 类似,RateLimiter 中也有一些配置项,对应 RateLimiterConfig 类的配置项;
RateLimiter 的配置项相比 CircuitBreaker、Retry 来说非常少,我们看下它的几个配置项;
- limitRefreshPeriod:刷新限流的时间;
- limitForPeriod:在刷新周期内的最大允许请求数,也就是最大 permission 数;
- timeoutDuration:获取 permission 的最大等待时间,超过此时间的话则认为无法获取到 permission,需要进行限流;
三. 简单使用
我们模拟在一个主线程中循序执行逻辑,看是否触发限流,以及触发几次限流;
public class TestRateLimiter01 {public static void main(String[] args) {// 创建一个限流配置RateLimiterConfig config = RateLimiterConfig.custom().limitRefreshPeriod(Duration.ofSeconds(1)) // 每秒刷新限流.limitForPeriod(10) // 每秒允许的最大请求数.timeoutDuration(Duration.ofMillis(200)) // 获取 permission 的最大等待时间,200ms.build();RateLimiterRegistry registry = RateLimiterRegistry.custom().withRateLimiterConfig(config).build();RateLimiter rateLimiter = registry.rateLimiter("myRateLimiter");for (int i = 0; i < 23; i++) {try {rateLimiter.executeRunnable(() -> System.out.println("--" + System.currentTimeMillis()));} catch (RequestNotPermitted ex) {System.out.println("发生了限流" + System.currentTimeMillis());}}}
}
打印如下:
--1723888206695
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
--1723888206699
发生了限流1723888206903
发生了限流1723888207104
发生了限流1723888207309
发生了限流1723888207512
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
--1723888207698
可以看到出现了 4 次限流;
分析:
- 由于获取 permission 的最大等待时间是 200ms,permission 的刷新周期是 1 s,也就是 1000 ms;且我们只有一个主线程;前 10 次顺利执行后,所剩的 permission 为 0;
- 第 11 次请求,到下一个周期大概还有 800 ms,大于我们获取 permission 的最大等待时间 200 ms,此时获取不到 permission,阻塞等待 200 ms,并限流;以此类推;
- 第 15 次请求,到下一个周期大概还有 180 ms,小于我们获取 permission 的最大等待时间 200 ms,此时能够获取到 permission,需要阻塞等待 180 ms,等待下一个周期的到来;
- 第 16 - 23 次请求,正常调用;
四. 限流算法
我们先看官网的这张图;

Resilience4j 总共有两种实现:
- 基于 Java 信号量(Semaphore-Based Rate Limiter)
- 基于原子计数器(Atomic Rate Limiter)
原子计数器(Atomic Rate Limiter)是默认的实现,我们看 AtomicRateLimiter,有时间的话再了解基于信号量的算法;
上图就是 AtomicRateLimiter 的实现示意图,它通过 AtomicReference 管理其状态。 其中,AtomicRateLimiter.State 是不可变的,并且具有以下字段:
- activeCycle:上一次调用使用的周期号;
- activePermissions:上次调用后的可用权限数;如果可以保留某些权限,则可以为负;
- nanosToWait:等待上一次呼叫的等待许可的纳秒数;
主要逻辑是:
- 将时间分成相等的部分,称为循环;在任何时候,我们都可以通过计算 currentTime / cyclePeriod 来确定当前周期;
- 如果我们知道限制器最后一次使用的当前周期数和周期,那么我们实际上可以计算出应该在限制器中出现多少个权限;
- 经过此计算后,如果可用权限还不够,我们可以通过减少当前权限并计算我们等待它出现的时间来判断执行权限保留;
- 经过所有计算后,我们可以产生一个新的限制器状态并将其存储在 AtomicReference 中;
五. 分析
1. executeRunnable()
我们直接从 RateLimiter.executeRunnable() 入手;
// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(Runnable runnable) {// permits 为 1,即每次请求都获取一个 permitexecuteRunnable(1, runnable);
}// ------------------------------------- RateLimiter ------------------------------------
default void executeRunnable(int permits, Runnable runnable) {decorateRunnable(this, permits, runnable).run();
}// ------------------------------------- RateLimiter ------------------------------------
static Runnable decorateRunnable(RateLimiter rateLimiter, int permits, Runnable runnable) {return decorateCheckedRunnable(rateLimiter, permits, runnable::run).unchecked();
}// ------------------------------------- RateLimiter ------------------------------------
static CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, int permits,CheckedRunnable runnable) {return () -> {// 1. 等待获取 permissionwaitForPermission(rateLimiter, permits);try {// 2. 执行 runnablerunnable.run();// rateLimiter.onSuccess() 和 onError() 是统计用的,可以先不看rateLimiter.onSuccess();} catch (Exception exception) {rateLimiter.onError(exception);throw exception;}};
}
先等待获取 permission,只有获取到 permission 的情况下才能执行 runnable;waitForPermission() 是核心方法;
2. waitForPermission()
// ------------------------------------- RateLimiter ------------------------------------
static void waitForPermission(final RateLimiter rateLimiter, int permits) {// 1. 调用 rateLimiter.acquirePermission(permits) 来获取 permits 数量的 permission// 默认使用的 RateLimiter 是 AtomicRateLimiter,我们主要分析 AtomicRateLimiterboolean permission = rateLimiter.acquirePermission(permits);if (Thread.currentThread().isInterrupted()) {throw new AcquirePermissionCancelledException();}// 2. 如果获取失败,此时需要限流,抛出 RequestNotPermitted 异常if (!permission) {throw RequestNotPermitted.createRequestNotPermitted(rateLimiter);}
}
3. acquirePermission()
获取 permission 调用的是 RateLimiter.acquirePermission(int permits),我们主要看 AtomicRateLimiter(令牌桶限流);
// ------------------------------------- AtomicRateLimiter ------------------------------------
public boolean acquirePermission(final int permits) {// 1. timeoutInNacnos 为获取 permission 的最大等待时间long timeoutInNanos = state.get().config.getTimeoutDuration().toNanos();// 2. 获取下一个状态State modifiedState = updateStateWithBackOff(permits, timeoutInNanos);// 3. 看是否能获取到 permission,获取到返回 true,获取不到返回 falseboolean result = waitForPermissionIfNecessary(timeoutInNanos, modifiedState.nanosToWait);// 4. 发布事件publishRateLimiterAcquisitionEvent(result, permits);// 返回获取结果return result;
}
我们主要看第 2 步和第 3 步;
3.1 updateStateWithBackOff()
updateStateWithBackOff() 主要用于更新 State,通过 CAS 的方式更新 State;
// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State updateStateWithBackOff(long timeoutInNanos) {AtomicRateLimiter.State prev;AtomicRateLimiter.State next;do {prev = (AtomicRateLimiter.State)this.state.get();// 执行 calculateNextState()next = this.calculateNextState(timeoutInNanos, prev);} while(!this.compareAndSet(prev, next));return next;
}
calculateNextState() 比较复杂,逻辑如下:
// ------------------------------------- AtomicRateLimiter ------------------------------------
private AtomicRateLimiter.State calculateNextState(long timeoutInNanos, AtomicRateLimiter.State activeState) {// 每个时间段对应纳秒数,由配置文件中的 limitRefreshPeriodInMillis 计算而来long cyclePeriodInNanos = activeState.config.getLimitRefreshPeriodInNanos();//每个时间段内可执行次数,对应配置文件中的limitForPeriodint permissionsPerCycle = activeState.config.getLimitForPeriod();// 计算从本类初始化到现在的纳秒数long currentNanos = this.currentNanoTime();// 计算当前 cycle 数 long currentCycle = currentNanos / cyclePeriodInNanos;long nextCycle = activeState.activeCycle;int nextPermissions = activeState.activePermissions;// 1. 如果已经进入后续的 cycle,重置 nextCycle 和 nextPermissions 值// nextPermissions 需要通过计算得到// 这是因为 activeState.activePermissions 会有赊账的情况,可能会存在负值// 所以 nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle)long nextNanosToWait;if(nextCycle != currentCycle) {nextNanosToWait = currentCycle - nextCycle;long nextState = nextNanosToWait * permissionsPerCycle;nextCycle = currentCycle;nextPermissions = Long.min(nextPermissions + nextState, permissionsPerCycle);}// 2. 计算所需等待时间nextNanosToWait = this.nanosToWaitForPermission(cyclePeriodInNanos, permissionsPerCycle, nextPermissions, currentNanos, currentCycle);// 3. 需要根据 nextNanosToWait 和 timeoutInNanos 做对比// 所需时间和超时时间做对比,判断能否在能及时执行完AtomicRateLimiter.State nextState1 = this.reservePermissions(activeState.config, timeoutInNanos, nextCycle, nextPermissions, nextNanosToWait);return nextState1;
}
3.1.1 nanosToWaitForPermission()
我们看下 nanosToWaitForPermission() 的实现,逻辑为判断是否还有可用执行次数,如果还有次数则直接返回 0,表示不需要等待时间;
否则计算总共需要等待的时间,如果所需的 permits 过大,可能会导致需要等待很多个 cycle;对于我们正常使用来说,permits 一般都为 1,这里一般最多等待 nanosToNextCycle,即到下一个时间周期的剩余时间;
// ------------------------------------- AtomicRateLimiter ------------------------------------
private long nanosToWaitForPermission(final int permits, final long cyclePeriodInNanos,final int permissionsPerCycle,final int availablePermissions, final long currentNanos, final long currentCycle) {if (availablePermissions >= permits) {return 0L;}long nextCycleTimeInNanos = (currentCycle + 1) * cyclePeriodInNanos;long nanosToNextCycle = nextCycleTimeInNanos - currentNanos;int permissionsAtTheStartOfNextCycle = availablePermissions + permissionsPerCycle;int fullCyclesToWait = divCeil(-(permissionsAtTheStartOfNextCycle - permits),permissionsPerCycle);// 一般等待时间都为 nanosToNextCyclereturn (fullCyclesToWait * cyclePeriodInNanos) + nanosToNextCycle;
}
下述为了解内容;
1、如果 permits 过大,示例如下,需要等待一个周期 + nanosToNextCycle;
availablePermissions = 2
permits = 20
permissionsAtTheStartOfNextCycle = 2+10 = 12
fullCyclesToWait = divCeil (-(12-20), 10) = divCeil(8, 10) = 1
2、如果我们设置的 timeoutInNanos 过大,比如为 6 秒,可能会出现赊账严重,示例如下,需要等待两个周期 + nanosToNextCycle;所以我们尽量不要设置 timeoutInNanos 过大;
availablePermissions = -22
permits = 1
permissionsAtTheStartOfNextCycle = -22+10 = -12
fullCyclesToWait = divCeil (-(-12-1), 10) = divCeil(13, 10) = 2
3.1.2 reservePermissions()
我们再来看下 reservePermissions() 的实现;
根据 nextNanosToWait 和 timeoutInNanos 做对比,将所需时间和超时时间做对比,判断能否在能及时执行完;
- timeoutInNanos >= nanosToWait:能及时执行完,可用次数 permission-1,同时更新 cycle、nanosToWait;返回新的 State 对象;
- timeoutInNanos < nanosToWait:不能及时执行完,permission 不变,同时更新 cycle、nanosToWait;返回新的 State 对象;
// ------------------------------------- AtomicRateLimiter ------------------------------------
private State reservePermissions(final RateLimiterConfig config, final int permits,final long timeoutInNanos,final long cycle, final int permissions, final long nanosToWait) {boolean canAcquireInTime = timeoutInNanos >= nanosToWait;int permissionsWithReservation = permissions;if (canAcquireInTime) {permissionsWithReservation -= permits;}return new State(config, cycle, permissionsWithReservation, nanosToWait);
}
3.2 waitForPermissionIfNecessary()
// ------------------------------------- AtomicRateLimiter ------------------------------------
private boolean waitForPermissionIfNecessary(final long timeoutInNanos,final long nanosToWait) {boolean canAcquireImmediately = nanosToWait <= 0;boolean canAcquireInTime = timeoutInNanos >= nanosToWait;// 1. nanosToWait == 0 的情况,表示立即获取到了 permission,返回 trueif (canAcquireImmediately) {return true;}// 2. timeoutInNanos >= nanosToWait,表示需要等待 nacosToWait 到下一个时间周期// 调用线程会在此处阻塞等待 nanosToWait 时间,等待完成后返回 trueif (canAcquireInTime) {return waitForPermission(nanosToWait);}// 3. timeoutInNanos < nanosToWait,超过我们指定的获取 permission 的最大等待时间// 调用线程会在此处阻塞等待 timeoutInNanos 时间,等待完成后返回 false,表示获取失败,需要限流waitForPermission(timeoutInNanos);return false;
}
相关文章:
Java Resilience4j-RateLimiter学习
一. 介绍 Resilience4j-RateLimiter 是 Resilience4j 中的一个限流模块,我们对 Resilience4j 的 CircuitBreaker、Retry 已经有了一定的了解,现在来学习 RateLimiter 限流器; 引入依赖; <dependency><groupId>io.g…...
Nginx--地址重写Rewrite
一、什么是Rewrite Rewrite对称URL Rewrite,即URL重写,就是把传入Web的请求重定向到其他URL的过程 URL Rewrite最常见的应用是URL伪静态化,是将动态页面显示为静态页面方式的一种技术。比如http://www.123.com/news/index.php?id123 使用U…...
webflux源码解析(1)-主流程
目录 1.关键实例的创建1.1 实例创建1.2 初始化 2.处理请求的关键流程2.1 从ReactorHttpHandlerAdapter开始2.1 DispatcherHandler的初始化2.2查找mapping handler2.3 处理请求(执行handler)2.4 返回结果处理 3.webflux的配置装配参考: WebFlux是Spring 5.0框架推出的…...
ipad作为扩展屏的最简单方式
将iPad用作扩展屏幕有几种简单而有效的方法。以下是几种常见的方式: 1. Sidecar(苹果官方功能) 适用设备:iPad和Mac(macOS Catalina及以上版本)。功能:Sidecar 是苹果官方的功能,可…...
【卡码网Python基础课 17.判断集合成员】
目录 题目描述与分析一、集合二、集合的常用方法三、代码编写 题目描述与分析 题目描述: 请你编写一个程序,判断给定的整数 n 是否存在于给定的集合中。 输入描述: 有多组测试数据,第一行有一个整数 k,代表有 k 组测…...
生物研究新范式!AI语言模型在生物研究中的应用
–https://doi.org/10.1038/s41592-024-02354-y 留意更多内容,欢迎关注微信公众号:组学之心 Language models for biological research: a primer 研究团队及研究单位 James Zou–Department of Biomedical Data Science, Stanford University, Stan…...
python语言day08 属性装饰器和property函数 异常关键字 约束
属性装饰器: 三个装饰器实现对私有化属性_creat_time的get,set,del方法; 三个装饰器下的方法名都一样,通过message.creat_time的不同操作实现调用get,set,del方法。 __inti__: 创建并…...
day01JS-数据类型-01
1. 浏览器内核 通常所谓的浏览器内核也就是浏览器所采用的渲染引擎,渲染引擎决定了浏览器如何显示网页的内容以及页面的格式信息。不同的浏览器内核对网页编写语法的解释也有不同,因此同一网页在不同的内核的浏览器里的渲染(显示)…...
MATLAB 手动实现一种高度覆盖值提取建筑物点云的方法(74)
专栏往期文章,包含本章 MATLAB 手动实现一种高度覆盖值提取建筑物点云的方法(74) 一、算法介绍二、算法实现1.代码2.效果总结一、算法介绍 手动实现一种基于高度覆盖值的建筑物点云提取方法,适用于高大的城市建筑物,比只利用高度提取建筑物的方法更加稳定和具有价值,主要…...
git的下载与安装(Windows)
Git是一个开源的分布式版本控制系统(Distributed Version Control System,简称DVCS),它以其高效、灵活和强大的功能,在现代软件开发中扮演着至关重要的角色。 git官网:Git (git-scm.com) 1.进入git官网 2…...
腾讯云AI代码助手 —— 编程新体验,智能编码新纪元
阅读导航 引言一、开发环境介绍1. 支持的编程语言2. 支持的集成开发环境(IDE) 二、腾讯云AI代码助手使用实例1. 开发环境配置2. 代码补全功能使用💻自动生成单句代码💻自动生成整个代码块 3. 技术对话3. 规范/修复错误代码4. 智能…...
使用 ESP32 和 TFT 屏幕显示实时天气信息 —— 基于 OpenWeatherMap API
实时监测环境数据是一个非常常见的应用场景,例如气象站、智能家居等。这篇博客将带你使用 ESP32 微控制器和一个 TFT 屏幕,实时显示当前城市的天气信息。通过 OpenWeatherMap API,我们能够获取诸如温度、天气情况以及经纬度等详细的天气数据&…...
高阶数据结构——B树
1. 常见的搜索结构 以上结构适合用于数据量相对不是很大,能够一次性存放在内存中,进行数据查找的场景。如果数据量很大,比如有100G数据,无法一次放进内存中,那就只能放在磁盘上了,如果放在磁盘上࿰…...
Vue2中watch与Vue3中watch对比和踩坑
上一节说到了 computed计算属性对比 ,虽然计算属性在大多数情况下更合适,但有时也需要一个自定义的侦听器。这就是为什么 Vue 通过 watch 选项提供了一个更通用的方法,来响应数据的变化。当需要在数据变化时执行异步或开销较大的操作时&#…...
在Java程序中执行Linux命令
在Java中执行Linux命令通常涉及到使用Java的运行时类 (java.lang.Runtime) 或者 ProcessBuilder 类来启动一个外部进程 1. 使用 Runtime.exec() Runtime.exec() 方法可以用来执行一个外部程序。它返回一个 Process 对象,可以通过这个对象与外部程序交互࿰…...
微信小程序在不同移动设备上的差异导致原因
在写小程序的时候用了rpx自适应单位,但是还是出现了在不同机型上布局不统一的问题,在此记录一下在首页做一个输入框,在测试的时候,这个输入框在不同的机型上到处跑,后来排查了很久都不知道为什么会这样 解决办法是后 …...
快速体验fastllm安装部署并支持AMD ROCm推理加速
序言 fastllm是纯c实现,无第三方依赖的高性能大模型推理库。 本文以国产海光DCU为例,在AMD ROCm平台下编译部署fastllm以实现LLMs模型推理加速。 测试平台:曙光超算互联网平台SCNet GPU/DCU:异构加速卡AI 显存64GB PCIE&#…...
报错:java: javacTask: 源发行版 8 需要目标发行版 1.8
程序报错: Executing pre-compile tasks... Loading Ant configuration... Running Ant tasks... Running before tasks Checking sources Copying resources... [gulimail-coupon] Copying resources... [gulimail-common] Parsing java… [gulimail-common] java…...
【数据结构篇】~单链表(附源码)
【数据结构篇】~链表 链表前言链表的实现1.头文件2.源文件 链表前言 链表是一种物理存储结构上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 1、链式机构在逻辑上是连续的,在物理结构上不一定连续 2、结点一般是从…...
旋转图像(LeetCode)
题目 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像,这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 解题 def rotate(matrix):n len(matrix)# 矩阵转置for i in range(n):for…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
数据库——redis
一、Redis 介绍 1. 概述 Redis(Remote Dictionary Server)是一个开源的、高性能的内存键值数据库系统,具有以下核心特点: 内存存储架构:数据主要存储在内存中,提供微秒级的读写响应 多数据结构支持&…...
【PX4飞控】mavros gps相关话题分析,经纬度海拔获取方法,卫星数锁定状态获取方法
使用 ROS1-Noetic 和 mavros v1.20.1, 携带经纬度海拔的话题主要有三个: /mavros/global_position/raw/fix/mavros/gpsstatus/gps1/raw/mavros/global_position/global 查看 mavros 源码,来分析他们的发布过程。发现前两个话题都对应了同一…...
表单设计器拖拽对象时添加属性
背景:因为项目需要。自写设计器。遇到的坑在此记录 使用的拖拽组件时vuedraggable。下面放上局部示例截图。 坑1。draggable标签在拖拽时可以获取到被拖拽的对象属性定义 要使用 :clone, 而不是clone。我想应该是因为draggable标签比较特。另外在使用**:clone时要将…...
【阅读笔记】MemOS: 大语言模型内存增强生成操作系统
核心速览 研究背景 研究问题:这篇文章要解决的问题是当前大型语言模型(LLMs)在处理内存方面的局限性。LLMs虽然在语言感知和生成方面表现出色,但缺乏统一的、结构化的内存架构。现有的方法如检索增强生成(RA…...
