如何确定RocketMQ中消费者的线程大小
背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。
在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ
如何在配置文件端配置消费者线程大小
当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。
如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。
@RocketMQMessageListener(// 指定消费线程大小consumeThreadNumber = 16,topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt> {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}
}
其中的topic、consumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。
那应该如何实现呢?
其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法。
/**
* 注入mq消费的线程数量
*/
public static int RocketMQThreadSize;@Value("${biz.RocketMQThreadSize}")
public void setRocketMQThreadSize(int threadSize) {
RocketMQThreadSize = threadSize;
}
那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。
biz: RocketMQThreadSize: 200
如何使得自定义的线程大小生效
如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。
在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示:
@RocketMQMessageListener(topic = TOPIC_DEMO,consumerGroup = "consumer_demo_group")
@Component
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {private final String CHARSET = Charset.defaultCharset().name();@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String str = new String(body, Charset.forName(this.CHARSET));System.out.println("消费者消费的消息为: " + str);}@Overridepublic void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {// 指定消费线程大小defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);}
}
在prepareStart方法中,指定一些必要的线程参数
- 最大线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); - 最小线程:
defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);
并且通过实验和查看源码,其中最大、最小设置一样才会生效。
如何确定合适的线程大小
以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?
线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。
当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。
在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小:
- 消费者的
TPS,表明消费者的能力; - 机器负载,分为
CPU负载和IO负载,和自身的核心数量有关。
RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。

在服务器输入top命令就可以看大,当前机器的负载:

分别为1、5、15分钟负载。
在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。
在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。
结尾
以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!!
线程介绍
相关文章:
如何确定RocketMQ中消费者的线程大小
背景 随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求…...
OpenAPI SDK组件之Spring Aop源码拓展
Spring Aop 看这个分享的应该都用过Spring Aop,这里就不再过多介绍了它是什么了。 我抽取了Spring Aop的部分源码,通过它实现请求参数可变拦截,同时apisdk离开Spring框架,仍然可以正常运行。 讲拦截也好,通知也罢&a…...
蓝桥杯C/C++VIP试题每日一练之龟兔赛跑预测
💛作者主页:静Yu 🧡简介:CSDN全栈优质创作者、华为云享专家、阿里云社区博客专家,前端知识交流社区创建者 💛社区地址:前端知识交流社区 🧡博主的个人博客:静Yu的个人博客…...
为你的Vue2.x老项目安装Vite发动机吧
天下苦webpack久矣,相信作为前端开发者一定经历过在项目迭代时间较长的时候经历漫长等待的这一过程,每一次保存都会浪费掉大量时间,这是webpack这种机制所带来的问题。 于是,尤大为我们带来了新一代前端构建工具:vite…...
ZCMU--5012: 铺设道路(差分思路)
Description 春春是一名道路工程师,负责铺设一条长度为 n 的道路。 铺设道路的主要工作是填平下陷的地表。 整段道路可以看作是 n 块首尾相连的区域,一开始,第 i 块区域下陷的深度为 di。 春春每天可以选择一段连续区间 [L,R]&…...
算法模板总结(自用)
算法模板总结滑动窗口双指针算法数组相关合并两个有序数组左右指针技巧快慢指针技巧字符串相关左右指针反转字符串问题快慢指针替换空格字符问题链表相关快慢双指针删除链表的倒数第N个节点链表相交环形链表链表操作几数之和两数之和四个数组的四数之和三数之和同一数组中四数之…...
【架构师】零基础到精通——架构发展
博客昵称:架构师Cool 最喜欢的座右铭:一以贯之的努力,不得懈怠的人生。 作者简介:一名Coder,软件设计师/鸿蒙高级工程师认证,在备战高级架构师/系统分析师,欢迎关注小弟! 博主小留言…...
C++(20):三路比较运算符
C20增加了三路比较运算符<>(戏称航天飞机运算符),用于对类的比较运算符进行统一的设计。有两种使用方式:默认比较对于某些类,如果按照其成员逐一比较即可决定比较运算符的值,那么可以使用默认的三路运…...
MySQL workbench 字符集、字符序的概念与联系
在数据的存储上,MySQL提供了不同的字符集支持。而在数据的对比操作上,则提供了不同的字符序支持。 MySQL提供了不同级别的设置,包括server级、database级、table级、column级,可以提供非常精准的设置。 什么是字符集、字符序&am…...
DBA之路---数据库启动与关闭过程
DBA之路—数据库启动与关闭过程 1、启动过程 oracle启动的四个状态 shutdown、就是数据库关闭状态。 nomount模式 #启动instance ,读取参数文件、分配sga空间启动后台进程,打开alter日志和其他trace文件startup nomount #该模式下只会创建实例并不加…...
Shell文件包含
和其他语言一样,Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 一、语法格式 Shell 文件包含的语法格式如下: . filename # 注意点号(.)和文件名中间有一空格 或 source filename 在当前bash环境下读取并执行file…...
计算机网络(六): HTTP,HTTPS,DNS,网页解析全过程
文章目录一、HTTP头部包含的信息通用头部请求头部响应头部实体头部二、Keep-Alive和非Keep-Alive的区别三、HTTP的方法四、HTTP和HTTPS建立连接的过程4.1 HTTP4.2 HTTPS五、HTTP和HTTPS的区别六、HTTPS的加密方式七、cookie和sessionsessioncookie八、HTTP状态码状态码200&…...
Android仿京东金融的数值滚动尺功能
自定义数值滚动尺,这个用的还是挺多的,例如京东金融的通过滚动尺选择金额等,而这次就是高仿京东金融的数值滚动尺。首先看看下效果图,如下:首先先给你们各个变量的含义,以免在后面的讲解中不知变量的意思,代码如下://最…...
Nginx 和 Tomcat 实现负载均衡
Nginx 和 tomcat 实现负载均衡 🏆荣誉认证:51CTO博客专家博主、TOP红人、明日之星;阿里云开发者社区专家博主、技术博主、星级博主。 💻微信公众号:微笑的段嘉许 📌本文由微笑的段嘉许原创! &am…...
【万能排序之qsort、b_sort 、s_sort】
文章目录前言:star:qsort函数函数参数qsort函数的使用:star:模拟实现万冒泡排序函数参数模拟实现b_sort注意点:star:模拟实现万能选择排序函数参数模拟实现s_sort最后前言 我们所熟悉的冒泡排序,选择排序,插入排序,二分排序等都是基于给定的一…...
利用InceptionV3实现图像分类
最近在做一个机审的项目,初步希望实现图像的四分类,即:正常(neutral)、涉政(political)、涉黄(porn)、涉恐(terrorism)。有朋友给推荐了个github上…...
【Java】CAS锁
一、什么是CAS机制(compare and swap) 1.概述 CAS的全称为Compare-And-Swap,直译就是对比交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值。经过调查发现,…...
Linux服务器配置系统安全加固方法
1. SSH空闲超时时间建议为: 600-900 解决方案: 在【/etc/ssh/sshd_config】文件中设置【ClientAliveInterval】设置为600到900之间 vim /etc/ssh/sshd_config #将 ClientAliveInterval 参数值设置为 900 2. 修改检查SSH密码修改最小间隔 解决方案: 在【/etc/login.defs】文件…...
Codeforces Round #850 (Div. 2, based on VK Cup 2022 - Final Round)(A~E)
t宝酱紫喜欢出这种分类讨论的题?!A1. Non-alternating Deck (easy version)给出n张牌,按照题目给的顺序分给两人,问最后两人手中各有几张牌。思路:模拟。AC Code:#include <bits/stdc.h>typedef long…...
qt源码--信号槽
本篇主要从Qt信号槽的连接、断开、调用、对象释放等方面展开; 1.信号建立连接过程 connect有多个重载函数,主要是为了方便使用者,比较常用的有2种方式: a. QObject::connect(&timer, &QTimer::timeout, &loop, &am…...
Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
站群服务器的应用场景都有哪些?
站群服务器主要是为了多个网站的托管和管理所设计的,可以通过集中管理和高效资源的分配,来支持多个独立的网站同时运行,让每一个网站都可以分配到独立的IP地址,避免出现IP关联的风险,用户还可以通过控制面板进行管理功…...
