Rocket面试(五)Rocketmq发生流量控制的情况有哪些?
在使用rocketmq过程中总能看见一下异常
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5
这是因为Rocketmq出发了流量控制。
触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consumer流控
1.Broker流控
Rocketmq默认采取的是异步刷盘方式,Producer把消息发送到broker后,Broker会把消息暂放在Page Cache中刷盘线程定时的把数据刷到磁盘中
1.1 broker busy
Broker是开启快速失败的,处理逻辑类是BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {cleanExpiredRequest();}}}, 1000, 10, TimeUnit.MILLISECONDS);
}
1.1.1 Page Cache繁忙
清理过期请求之前会先判断一下Page Cache是否繁忙,如果繁忙就会给Producer返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),那怎么判断Page Cache繁忙呢?
当Broker收到消息后,会放到Page Cache中,这个过程,首先会取一个CommitLog写入锁,如果持有锁的时间超过1s就认为Page Cache繁忙具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。
1.1.2清理过期请求
清理过期请求时,如果请求线程创建的时间与当前系统时间的间隔大于200ms.然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")
1.2 system busy
这个异常在 NettyRemotingAbstract#processRequestCommand 方法.
1.拒绝请求
如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。
那什么情况下请求会被拒绝呢?看下面这段代码:
//SendMessageProcessor类
public boolean rejectRequest() {return this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是 TransientStorePoolDeficient。跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。
注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题
2.线程拒绝
Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")
判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:
public boolean isOnewayRPC() {int bits = 1 << RPC_ONEWAY;return (this.flag & bits) == bits;
}
1.3消息重试
Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:
//DefaultMQProducer类
private final Set<Integer> retryResponseCodes = new CopyOnWriteArraySet<Integer>(Arrays.asList(ResponseCode.TOPIC_NOT_EXIST,ResponseCode.SERVICE_NOT_AVAILABLE,ResponseCode.SYSTEM_ERROR,ResponseCode.NO_PERMISSION,ResponseCode.NO_BUYER_ID,ResponseCode.NOT_IN_CURRENT_UNIT
));
2.Consumer流控
DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。
2.1 缓存消息数量超过阈值
ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.2缓存消息大小超过阈值
ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;
}
2.3缓存消息跨度超过阈值
对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:
if (!this.consumeOrderly) {if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}
}
2.4获取锁失败
对于顺序消费的情况,ProcessQueue加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。
3.总结
本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。
Broker4种场景:
page cache 繁忙:获取commitlog写入锁超过1s
清理过期请求 如果请求过期请求的时间到当前系统时间超过了200ms
请求拒绝 一种是page cache繁忙 一种是 transientStorePoolEnable模式看是否可用buffer
线程池拒绝 Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000
Consumer4种场景
消息数量超过阈值1000
消息大小超过阈值100m
缓存消息跨度超过阈值 对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值2000
ProcessQueue加锁失败 也会延迟加载
相关文章:

Rocket面试(五)Rocketmq发生流量控制的情况有哪些?
在使用rocketmq过程中总能看见一下异常 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5这是因为Rocketmq出发了流量控制。 触发流量控制就是为了防止Broker压力过大挂掉。主要分为Broker流控,Consu…...

Tableau招聘信息数据可视化
获取的招聘信息数据为某招聘网站发布的大数据及数据分析相关岗位,对其他计算机相关岗位的招聘信息数据分析也有一定的参考价值。因为所获取的招聘信息数据数量只有1万左右,实际的招聘信息数量肯定不止1万,所以可能会与实际信息有一定的误差。…...
游戏服务器开发指南(八):合理应对异常
大家好!我是长三月,一位在游戏行业工作多年的老程序员,专注于分享服务器开发相关的文章。 本文是通用程序设计主题下的第二篇。这个主题主要探讨如何编写高效、健壮、易读的游戏业务代码,每篇从一个小点切入。本次讨论的要点是&a…...
【g】聚类算法之K-means算法
聚类算法是一种无监督学习方法,它将相似的数据样本划分为一组,同时将不相似的数据样本划分为另一组。这个过程由计算机自动完成,不需要任何人为的干预。 K-means算法是一种经典的聚类算法,它的主要思想是把数据集分成k个簇&#…...

scala内建控制结构
一、条件表达式 (一)语法格式 - if (条件) 值1 else 值2(二)执行情况 条件为真,结果是值1;条件为假,结果是值2。如果if和else的返回结果同为某种类型,那么条件表达式结果也是那种类…...

Linux SSH命令实战教程,提升你的服务器管理基本功!
前言 大家好,又见面了,我是沐风晓月,本文是专栏【linux基本功-基础命令实战】的第62篇文章。 专栏地址:[linux基本功-基础命令专栏] , 此专栏是沐风晓月对Linux常用命令的汇总,希望能够加深自己的印象&am…...

【Python】Python进阶系列教程-- Python3 CGI编程(二)
文章目录 前言什么是CGI网页浏览CGI架构图Web服务器支持及配置第一个CGI程序HTTP头部CGI环境变量GET和POST方法使用GET方法传输数据简单的表单实例:GET方法使用POST方法传递数据通过CGI程序传递checkbox数据通过CGI程序传递Radio数据通过CGI程序传递 Textarea 数据通…...

do..while、while、for循环反汇编剖析
1、循环语句重要特征提取 循环语句最重要的特点就是执行的过程中会往上跳!!! 箭头往上跳的一般都是循环语句,比如下面的for循环: 2、do..while语句反汇编 #include<iostream> using namespace std; #pragma …...
【代码随想录】刷题Day53
1.最长公共子序列 1143. 最长公共子序列 和之前的一道题目的区别就是这个子序列不需要每个字符相邻。那么条件就变成两种了,一种是当前的字符相同,一种是不同。相同跟之前的条件一样;不同则需要继承上次比较的较大值。if (text1[i - 1] tex…...

MySQL 索引及查询优化总结
一个简单的对比测试 前面的案例中,c2c_zwdb.t_file_count表只有一个自增id,FFileName字段未加索引的sql执行情况如下: 在上图中,typeall,keynull,rows33777。该sql未使用索引,是一个效率非常低…...

什么是AJAX?
AJAX是一种基于Web的技术,它允许Web应用程序在不刷新整个页面的情况下与服务器进行交互。通过AJAX,Web应用程序可以使用JavaScript向服务器发送异步请求并在不干扰用户的情况下更新页面的部分内容。 AJAX是Asynchronous JavaScript and XML的缩写。尽管…...

报表生成器FastReport .Net用户指南:显示数据列、HTML标签
FastReport .Net是一款全功能的Windows Forms、ASP.NET和MVC报表分析解决方案,使用FastReport .NET可以创建独立于应用程序的.NET报表,同时FastReport .Net支持中文、英语等14种语言,可以让你的产品保证真正的国际性。 FastReport.NET官方版…...
bootstrap-dialog弹框,去掉遮盖层,可移动
1.去掉遮盖层的设置data-backdrop"false" <div class"modal fade" id"modal" aria-modal"true" role"dialog" data-backdrop"false" style"width:50%"><div class"modal-dialog modal-l…...

7. user-Agent破解反爬机制
文章目录 1. 为什么要设置反爬机制2. 服务器如何区分浏览器访问和爬虫访问3. 反爬虫机制4. User-Agent是什么5. 如何查询网页的User-Agent6. user-agent信息解析7. 爬虫程序user-agent和浏览器user-agent的区别8. 代码查看爬虫程序的user-agent9. 在代码中加入请求头信息 1. 为…...
3.Nginx+Tomcat负载均衡和动静分离群集
文章目录 NginxTomcat负载均衡和动静分离群集Nginx作用实验七层反向代理nginx动静分离四层反向代理负载均衡 NginxTomcat负载均衡和动静分离群集 Nginx是-款非常优秀的HTTP服务器软件 支持高达50 000个并发连接数的响应拥有强大的静态资源处理能力运行稳定内存、CPU等系统资源…...

数据结构与算法之树结构
目录 为什么要使用树结构树结构基本概念树的种类树的存储与表示常见的一些树的应用场景为什么要使用树结构 线性结构中不论是数组还是链表,他们都存在着诟病;比如查找某个数必须从头开始查,消耗较多的时间。使用树结构,在插入和查找的性能上相对都会比线性结构要好 树结构…...

【python】 用来将对象持久化的 pickle 模块
pickle 模块可以对一个 Python 对象的二进制进行序列化和反序列化。说白了,就是它能够实现任意对象与二进制直接的相互转化,也可以实现对象与文本之间的相互转化。 比如,我程序里有一个 python 对象,我想把它存到磁盘里ÿ…...
【博客654】prometheus配置抓取保护以防止压力过载
prometheus抓取保护配置以防止压力过载 场景 担心您的应用程序指标可能突然激增,以及指标突然激增导致prometheus压力过载 就像生活中的许多事情一样,标签要有节制。当带有用户 ID 或电子邮件地址的标签被添加到指标时,虽然它不太可能结束…...
Backtrader官方中文文档:第十三章Observers观察者
本文档参考backtrader官方文档,是官方文档的完整中文翻译,可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 本章包含 backtrader 官方Observers章节全部内容,入口 : https://backtrader.com/docu/observers-and-sta…...

算法leetcode|54. 螺旋矩阵(rust重拳出击)
文章目录 54. 螺旋矩阵:样例 1:样例 2:提示: 分析:题解:rust:go:c:python:java:每次循环移动一步:每次循环完成一个顺时针:…...

遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...

iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...

页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...