如何确定RocketMQ中消费者的线程大小
背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ
就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。
在上一篇文章中,介绍了RocketMQ
的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小
当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。
如下代码所示,在消费者类上,会使用@RocketMQMessageListener
注解,并填写必要的属性:consumerThreadNumber
:消费线程、主题、消费组。
@RocketMQMessageListener(// 指定消费线程大小consumeThreadNumber = 16,topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt> {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}
}
其中的topic
、consumerGroup
可以指定一次就不会变啦,但是consumerThreadNumber
会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml
"。
那应该如何实现呢?
其中 consumerThreadNumber = 16
,表明填入的是一个static
的变量,因此如果简单地使用@Value
来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static
变量,采用了显示set
方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}
那在配置文件中就可以配置RocketMQThreadSize
的大小啦。如下在application.yaml
就可以搞定。
biz: RocketMQThreadSize: 200
如何使得自定义的线程大小生效
如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ
提供一个接口可以实现消费者线程的自定义。
在消费者的类需要实现RocketMQPushConsumerLifecycleListener
接口即可,然后在类中实现prepareStart
方法即可。具体如下所示:
@RocketMQMessageListener(topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 指定消费线程大小defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);}
}
在prepareStart
方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
- 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
如何确定合适的线程大小
以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?
线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。
当线程数量较少时,CPU
性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。
在RocketMQ
线程调优有两个指标可以帮助大致确定消费线程大小:
- 消费者的
TPS
,表明消费者的能力; - 机器负载,分为
CPU
负载和IO
负载,和自身的核心数量有关。
RocketMQ
提供web
界面,可以监测TPS
的大小,这个数量当然是越大越好,但是也要考虑负载。
在服务器输入top
命令就可以看大,当前机器的负载:
分别为1、5、15分钟负载。
在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。
在测试的时候可以从小到大调节线程数大小,并且关注TPS
和负载。
结尾
以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍
相关文章:

如何确定RocketMQ中消费者的线程大小
背景 随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求…...

OpenAPI SDK组件之Spring Aop源码拓展
Spring Aop 看这个分享的应该都用过Spring Aop,这里就不再过多介绍了它是什么了。 我抽取了Spring Aop的部分源码,通过它实现请求参数可变拦截,同时apisdk离开Spring框架,仍然可以正常运行。 讲拦截也好,通知也罢&a…...

蓝桥杯C/C++VIP试题每日一练之龟兔赛跑预测
💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客…...

为你的Vue2.x老项目安装Vite发动机吧
天下苦webpack久矣,相信作为前端开发者一定经历过在项目迭代时间较长的时候经历漫长等待的这一过程,每一次保存都会浪费掉大量时间,这是webpack这种机制所带来的问题。 于是,尤大为我们带来了新一代前端构建工具:vite…...

ZCMU--5012: 铺设道路(差分思路)
Description 春春是一名道路工程师,负责铺设一条长度为 n 的道路。 铺设道路的主要工作是填平下陷的地表。 整段道路可以看作是 n 块首尾相连的区域,一开始,第 i 块区域下陷的深度为 di。 春春每天可以选择一段连续区间 [L,R]&…...

算法模板总结(自用)
算法模板总结滑动窗口双指针算法数组相关合并两个有序数组左右指针技巧快慢指针技巧字符串相关左右指针反转字符串问题快慢指针替换空格字符问题链表相关快慢双指针删除链表的倒数第N个节点链表相交环形链表链表操作几数之和两数之和四个数组的四数之和三数之和同一数组中四数之…...

【架构师】零基础到精通——架构发展
博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…...

C++(20):三路比较运算符
C20增加了三路比较运算符<>(戏称航天飞机运算符),用于对类的比较运算符进行统一的设计。有两种使用方式:默认比较对于某些类,如果按照其成员逐一比较即可决定比较运算符的值,那么可以使用默认的三路运…...

MySQL workbench 字符集、字符序的概念与联系
在数据的存储上,MySQL提供了不同的字符集支持。而在数据的对比操作上,则提供了不同的字符序支持。 MySQL提供了不同级别的设置,包括server级、database级、table级、column级,可以提供非常精准的设置。 什么是字符集、字符序&am…...

DBA之路---数据库启动与关闭过程
DBA之路—数据库启动与关闭过程 1、启动过程 oracle启动的四个状态 shutdown、就是数据库关闭状态。 nomount模式 #启动instance ,读取参数文件、分配sga空间启动后台进程,打开alter日志和其他trace文件startup nomount #该模式下只会创建实例并不加…...

Shell文件包含
和其他语言一样,Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 一、语法格式 Shell 文件包含的语法格式如下: . filename # 注意点号(.)和文件名中间有一空格 或 source filename 在当前bash环境下读取并执行file…...

计算机网络(六): HTTP,HTTPS,DNS,网页解析全过程
文章目录一、HTTP头部包含的信息通用头部请求头部响应头部实体头部二、Keep-Alive和非Keep-Alive的区别三、HTTP的方法四、HTTP和HTTPS建立连接的过程4.1 HTTP4.2 HTTPS五、HTTP和HTTPS的区别六、HTTPS的加密方式七、cookie和sessionsessioncookie八、HTTP状态码状态码200&…...

Android仿京东金融的数值滚动尺功能
自定义数值滚动尺,这个用的还是挺多的,例如京东金融的通过滚动尺选择金额等,而这次就是高仿京东金融的数值滚动尺。首先看看下效果图,如下:首先先给你们各个变量的含义,以免在后面的讲解中不知变量的意思,代码如下://最…...

Nginx 和 Tomcat 实现负载均衡
Nginx 和 tomcat 实现负载均衡 🏆荣誉认证:51CTO博客专家博主、TOP红人、明日之星;阿里云开发者社区专家博主、技术博主、星级博主。 💻微信公众号:微笑的段嘉许 📌本文由微笑的段嘉许原创! &am…...

【万能排序之qsort、b_sort 、s_sort】
文章目录前言:star:qsort函数函数参数qsort函数的使用:star:模拟实现万冒泡排序函数参数模拟实现b_sort注意点:star:模拟实现万能选择排序函数参数模拟实现s_sort最后前言 我们所熟悉的冒泡排序,选择排序,插入排序,二分排序等都是基于给定的一…...

利用InceptionV3实现图像分类
最近在做一个机审的项目,初步希望实现图像的四分类,即:正常(neutral)、涉政(political)、涉黄(porn)、涉恐(terrorism)。有朋友给推荐了个github上…...

【Java】CAS锁
一、什么是CAS机制(compare and swap) 1.概述 CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值。经过调查发现,…...

Linux服务器配置系统安全加固方法
1. SSH空闲超时时间建议为: 600-900 解决方案: 在【/etc/ssh/sshd_config】文件中设置【ClientAliveInterval】设置为600到900之间 vim /etc/ssh/sshd_config #将 ClientAliveInterval 参数值设置为 900 2. 修改检查SSH密码修改最小间隔 解决方案: 在【/etc/login.defs】文件…...

Codeforces Round #850 (Div. 2, based on VK Cup 2022 - Final Round)(A~E)
t宝酱紫喜欢出这种分类讨论的题?!A1. Non-alternating Deck (easy version)给出n张牌,按照题目给的顺序分给两人,问最后两人手中各有几张牌。思路:模拟。AC Code:#include <bits/stdc.h>typedef long…...

qt源码--信号槽
本篇主要从Qt信号槽的连接、断开、调用、对象释放等方面展开; 1.信号建立连接过程 connect有多个重载函数,主要是为了方便使用者,比较常用的有2种方式: a. QObject::connect(&timer, &QTimer::timeout, &loop, &am…...

RecycleView详解
listview缓存请看: listview优化和详解RecycleView 和 ListView对比:使用方法上ListView:继承重写 BaseAdapter,自定义 ViewHolder 与 converView优化。RecyclerView: 继承重写 RecyclerView.Adapter 与 RecyclerView.ViewHolder。设置 Layou…...

【算法】最短路算法
😀大家好,我是白晨,一个不是很能熬夜😫,但是也想日更的人✈。如果喜欢这篇文章,点个赞👍,关注一下👀白晨吧!你的支持就是我最大的动力!Ǵ…...

< Linux > 进程间通信
目录 1、进程间通信介绍 进程间通信的概念 进程间通信的本质 进程间通信的分类 2、管道 2.1、什么是管道 2.2、匿名管道 匿名管道的原理 pipe函数 匿名管道使用步骤 2.3、管道的读写规则 2.4、管道的特点 2.5、命名管道 命名管道的原理 使用命令创建命名管道 mkfifo创建命名管…...

学习 Python 之 Pygame 开发魂斗罗(二)
学习 Python 之 Pygame 开发魂斗罗(二)魂斗罗的需求开始编写魂斗罗1. 搭建主类框架2. 设置游戏运行遍历和创建窗口3. 获取窗口中的事件4. 创建角色5. 完成角色更新函数魂斗罗的需求 魂斗罗游戏中包含很多个物体,现在要对这些物体进行总结 类…...

户籍管理系统测试用例
目录 一、根据页面的不同分别设计测试用例 登录页面 用户信息列表 用户编辑页面 用户更新页面 二、根据目的不同分别设计测试用例 一、根据页面的不同分别设计测试用例 上图是针对一个网站的测试,按照页面的不同分别来设计对应的测试用例。 登录页面 用户信息列…...

(三)代表性物质点邻域的变形分析
本文主要内容如下:1. 伸长张量与Cauchy-Green 张量2. 线元长度的改变2.1. 初始/当前构型下的长度比2.2. 主长度比与 Lagrange/Euler 主方向2.3. 初始/当前构型下任意方向的长度比3. 线元夹角的改变4. 面元的改变5. 体元的改变1. 伸长张量与Cauchy-Green 张量 由于变…...

Stream操作流 练习
基础数据:Data AllArgsConstructor NoArgsConstructor public class User {private String name;private int age;private String sex;private String city;private Integer money; static List<User> users new ArrayList<>();public static void m…...

【模拟集成电路】宽摆幅压控振荡器(VCO)设计
鉴频鉴相器设计(Phase Frequency Detector,PFD)前言一、VCO工作原理二、VCO电路设计VCO原理图三、压控振荡器(VCO)测试VCO测试电路图瞬态测试(1)瞬态输出(2)局部放大图&a…...

《英雄编程体验课》第 13 课 | 双指针
文章目录 零、写在前面一、最长不重复子串1、初步分析2、朴素算法3、优化算法二、双指针1、算法定义2、算法描述3、条件1)单调性2)时效性三、双指针的应用1、前缀和问题2、哈希问题3、K 大数问题零、写在前面 该章节节选自 《夜深人静写算法》,主要讲解最基础的枚举算法 ——…...

DS期末复习卷(十)
一、选择题(24分) 1.下列程序段的时间复杂度为( A )。 i0,s0; while (s<n) {ssi;i;} (A) O(n^1/2) (B) O(n ^1/3) © O(n) (D) O(n ^2) 12…xn xn^1/2 2.设某链表中最常用的…...