当前位置: 首页 > news >正文

SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷

前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。

但是呢,没有演示具体应用到项目中的实例。

这里使用RabbitMQ来实现流量的削峰添谷。

一:添加pom依赖

<!--rabbitmq-需要的 AMQP 依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二:yml配置

spring:
#配置rabbitmq 服务器
rabbitmq:virtual-host: /host: 1.15.157.156port: 5672username: xxxxxpassword: xxxxx# 开启发布确认机制#SIMPLE,     // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()#CORRELATED, // 使用 CorrelationData 关联确认与发送的消息#NONE        // 不启用发布确认publisher-confirm-type: correlated# publisher-confirms 消息的可靠投递, confirm 确认模式 默认为false#publisher-confirms: true# 添加发布确认返回, return 回退模式 默认为falsepublisher-returns: true### listenerlistener:# 每次从队列中预取5条消息prefetch: 20# 最小消费者数量concurrency: 1# 最大的消费者数量max-concurrency: 10simple:# 设置预取数量为1  每次取一个prefetch: 1# manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack,虽灵活但会提高编码复杂度。# auto:自动 ack,没有异常则返回 ack;抛出异常则返回 nack,消息重新入队,一直到没有异常为止,也可以设置最大重试次数,超过次数后发送到专门收集错误消息的队列进一步处理# none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除(消息投递是不可靠的,可能丢失)acknowledge-mode: manual# 失败重试retry:# 开启消费者失败重试enabled: true# 初始的失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 3# 最大重试次数max-attempts: 4# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true

具体的配置都有对应的注释,参照即可。

三:编写config配置类

package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认是呗,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调  =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}@BeanQueue trafficSpikedQueue(){return new Queue("trafficSpikedQueue", true);}@BeanDirectExchange trafficSpikedExchange(){return new DirectExchange("trafficSpikedExchange");}@BeanBinding binding(Queue trafficSpikedQueue, DirectExchange trafficSpikedExchange){return BindingBuilder.bind(trafficSpikedQueue).to(trafficSpikedExchange).with("trafficSpikedKey");}//*/
}

四:创建生产者

package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/traffic")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("trafficExchange","trafficKey","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}// rabbitTemplate.convertAndSend("trafficSpikedExchange", "trafficSpikedKey", message);return "Message sent";}
}

五:创建消费者

package com.modules.controller.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.*;
import java.io.IOException;@Component
public class TrafficSpikedConsumer {@RabbitListener(queues = "trafficQueue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// =========================================// 处理消息,例如写入数据库或进行计算System.out.println("Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =========================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}

控制台输出的数据比较多。我这里就不做展示了。

PS:我这里测试的时候遇到一个小问题,发现消费者最后消费的数量跟生产者生产的数量对不上。我百思不得其解。这问题出在哪里呢?

后来,我才发现,我测试是在本地做的测试,对应的代码,我服务器端打包的jar里边也有一份,也就是说,我一个生产者,对应两个消费者(本地+服务器)这也是我本地消费者消费的数量跟生产数量不一致的原因。

以上大概就是Springboot集成RabbitMQ实现流量削峰添谷的一个小例子。

通过RabbitMQ的队列机制,可以有效地缓解高峰期的流量压力。

有好的建议,请在下方输入你的评论。

相关文章:

SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷

前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。 但是呢&#xff0c;没有演示具体应用到项目中的实例。 这里使用RabbitMQ来实现流量的削峰添谷。 一&#xff1a;添加pom依赖 <!--rabbitmq-需要的 AMQP 依赖--> <dependency><groupId>org.springfr…...

前端 Vue 3 后端 Node.js 和Express 结合cursor常见提示词结构

cursor 提示词 后端提示词 请为我开发一个基于 Node.js 和Express 框架的 Todo List 后端项目。项目需要实现以下四个 RESTful API 接口&#xff1a; 查询所有待办事项 接口名: GET /api/get-todo功能: 从数据库的’list’集合中查询并返回所有待办事项参数: 无返回: 包含所…...

类和对象(下):点亮编程星河的类与对象进阶之光

再探构造函数 在实现构造函数时&#xff0c;对成员变量进行初始化主要有两种方式&#xff1a; 一种是常见的在函数体内赋值进行初始化&#xff1b;另一种则是通过初始化列表来完成初始化。 之前我们在构造函数中经常采用在函数体内对成员变量赋值的方式来给予它们初始值。例如&…...

42.接雨水

目录 题目过程解法 题目 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 过程 发现有特殊情况就是&#xff0c;最高峰的地方&#xff0c;如果右边小于他&#xff0c;然后再右边也都很小的话&#xff0c…...

使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费

文章目录 1、指定 Offset 消费2、指定时间消费 1、指定 Offset 消费 auto.offset.reset earliest | latest | none 默认是 latest &#xff08;1&#xff09;earliest&#xff1a;自动将偏移量重置为最早的偏移量&#xff0c;–from-beginning &#xff08;2&#xff09;lates…...

Ubuntu安装不同版本的opencv,并任意切换使用

参考&#xff1a; opencv笔记&#xff1a;ubuntu安装opencv以及多版本共存 | 高深远的博客 https://zhuanlan.zhihu.com/p/604658181 安装不同版本opencv及共存、切换并验证。_pkg-config opencv --modversion-CSDN博客 Ubuntu下多版本OpenCV共存和切换_ubuntu20如同时安装o…...

突破内存限制:Mac Mini M2 服务器化实践指南

本篇文章&#xff0c;我们聊聊如何使用 Mac Mini M2 来实现比上篇文章性价比更高的内存服务器使用&#xff0c;分享背后的一些小的思考。 希望对有类似需求的你有帮助。 写在前面 在上文《ThinkPad Redis&#xff1a;构建亿级数据毫秒级查询的平民方案》中&#xff0c;我们…...

【排版教程】Word、WPS 分节符(奇数页等) 自动变成 分节符(下一页) 解决办法

毕业设计排版时&#xff0c;一般要求每章节的起始页为奇数页&#xff0c;空白页不显示页眉和页脚。具体做法如下&#xff1a; 1 Word 在一个章节的内容完成后&#xff0c;在【布局】中&#xff0c;点击【分隔符】&#xff0c;然后选择【奇数页】 这样在下一章节开始的时&…...

【在Linux世界中追寻伟大的One Piece】多线程(二)

目录 1 -> 分离线程 2 -> Linux线程互斥 2.1 -> 进程线程间的互斥相关背景概念 2.2 -> 互斥量mutex 2.3 -> 互斥量的接口 2.4 -> 互斥量实现原理探究 3 -> 可重入VS线程安全 3.1 -> 概念 3.2 -> 常见的线程不安全的情况 3.3 -> 常见的…...

flink学习(8)——窗口函数

增量聚合函数 ——指窗口每进入一条数据就计算一次 例如&#xff1a;要计算数字之和&#xff0c;进去一个12 计算结果为20&#xff0c; 再进入一个7 ——结果为27 reduce aggregate(aggregateFunction) package com.bigdata.day04;public class _04_agg函数 {public static …...

「实战应用」如何用图表控件LightningChart .NET实现散点图?(一)

LightningChart .NET完全由GPU加速&#xff0c;并且性能经过优化&#xff0c;可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D&#xff0c;高级3D&#xff0c;Polar&#xff0c;Smith&#xff0c;3D饼/甜甜圈&#xff0c;地理地图和GIS图表以及适用于科…...

鸿蒙Native使用Demo

DevecoStudio使用Native 今天,给大家带来的是关于DevecoStudio中使用Native进行开发 个人拙见:为什么要使用Native?无论是JS还是TS在复杂的情况下运行速度,肯定不如直接操作内存的C/C的运行速度快,所以,会选择使用Native;这里面的过程是什么?通过映射转化,使用napi提供的接口…...

29.UE5蓝图的网络通讯,多人自定义事件,变量同步

3-9 蓝图的网络通讯、多人自定义事件、变量同步_哔哩哔哩_bilibili 目录 1.网络通讯 1.1玩家Pawn之间的同步 1.2事件同步 1.3UI同步 1.4组播 1.5变量同步 1.网络通讯 1.1玩家Pawn之间的同步 创建一个第三人称项目 将网络模式更改为监听服务器&#xff0c;即将房主作为…...

Scala—列表(可变ListBuffer、不可变List)用法详解

Scala集合概述-链接 大家可以点击上方链接&#xff0c;先对Scala的集合有一个整体的概念&#x1f923;&#x1f923;&#x1f923; 在 Scala 中&#xff0c;列表&#xff08;List&#xff09;分为不可变列表&#xff08;List&#xff09;和可变列表&#xff08;ListBuffer&…...

【论文复现】偏标记学习+图像分类

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀ 偏标记学习图像分类 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文复现论文 Progressive Identification of True Labels for Pa…...

C嘎嘎探索篇:栈与队列的交响:C++中的结构艺术

C嘎嘎探索篇&#xff1a;栈与队列的交响&#xff1a;C中的结构艺术 前言&#xff1a; 小编在之前刚完成了C中栈和队列&#xff08;stack和queue&#xff09;的讲解&#xff0c;忘记的小伙伴可以去我上一篇文章看一眼的&#xff0c;今天小编将会带领大家吹奏栈和队列的交响&am…...

AIGC-----AIGC在虚拟现实中的应用前景

AIGC在虚拟现实中的应用前景 引言 随着人工智能生成内容&#xff08;AIGC&#xff09;的快速发展&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的应用也迎来了新的契机。AIGC与VR的结合为创造沉浸式体验带来了全新的可能性&#xff0c;这种组合不仅极大地降低了VR内容的…...

Django 路由层

1. 路由基础概念 URLconf (URL 配置)&#xff1a;Django 的路由系统是基于 urls.py 文件定义的。路径匹配&#xff1a;通过模式匹配 URL&#xff0c;并将请求传递给对应的视图处理函数。命名路由&#xff1a;每个路由可以定义一个名称&#xff0c;用于反向解析。 2. 基本路由配…...

《硬件架构的艺术》笔记(八):消抖技术

简介 在电子设备中两个金属触点随着触点的断开闭合便产生了多个信号&#xff0c;这就是抖动。 消抖是用来确保每一次断开或闭合触点时只有一个信号起作用的硬件设备或软件。&#xff08;就是每次断开闭合只对应一个操作&#xff09;。 抖动在某些模拟和逻辑电路中可能产生问…...

Spring 与 Spring MVC 与 Spring Boot三者之间的区别与联系

一.什么是Spring&#xff1f;它解决了什么问题&#xff1f; 1.1什么是Spring&#xff1f; Spring&#xff0c;一般指代的是Spring Framework 它是一个开源的应用程序框架&#xff0c;提供了一个简易的开发方式&#xff0c;通过这种开发方式&#xff0c;将避免那些可能致使代码…...

【算法】连通块问题(C/C++)

目录 连通块问题 解决思路 步骤&#xff1a; 初始化&#xff1a; DFS函数&#xff1a; 复杂度分析 代码实现&#xff08;C&#xff09; 题目链接&#xff1a;2060. 奶牛选美 - AcWing题库 解题思路&#xff1a; AC代码&#xff1a; 题目链接&#xff1a;687. 扫雷 -…...

如何选择黑白相机和彩色相机

我们在选择成像解决方案时黑白相机很容易被忽略&#xff0c;因为许多新相机提供鲜艳的颜色&#xff0c;鲜明的对比度和改进的弱光性能。然而&#xff0c;有许多应用&#xff0c;选择黑白相机将是更好的选择&#xff0c;因为他们产生更清晰的图像&#xff0c;更好的分辨率&#…...

Rust 力扣 - 740. 删除并获得点数

文章目录 题目描述题解思路题解代码题目链接 题目描述 题解思路 首先对于这题我们如果将所有点数装入一个切片f中&#xff0c;该切片f中的i号下标表示所有点数为i的点数之和 那么这题就转换成了打家劫舍这道题&#xff0c;也就是求选择了切片中某个下标的元素后&#xff0c;该…...

OpenCV从入门到精通实战(七)——探索图像处理:自定义滤波与OpenCV卷积核

本文主要介绍如何使用Python和OpenCV库通过卷积操作来应用不同的图像滤波效果。主要分为几个步骤&#xff1a;图像的读取与处理、自定义卷积函数的实现、不同卷积核的应用&#xff0c;以及结果的展示。 卷积 在图像处理中&#xff0c;卷积是一种重要的操作&#xff0c;它通过…...

Docker核心概念总结

本文只是对 Docker 的概念做了较为详细的介绍&#xff0c;并不涉及一些像 Docker 环境的安装以及 Docker 的一些常见操作和命令。 容器介绍 Docker 是世界领先的软件容器平台&#xff0c;所以想要搞懂 Docker 的概念我们必须先从容器开始说起。 什么是容器? 先来看看容器较为…...

环形缓冲区

什么是环形缓冲区 环形缓冲区,也称为循环缓冲区或环形队列,是一种特殊的FIFO(先进先出)数据结构。它使用一块固定大小的内存空间来缓存数据,并通过两个指针(读指针和写指针)来管理数据的读写。当任意一个指针到达缓冲区末尾时,会自动回绕到缓冲区开头,形成一个"环"。…...

jQuery-Word-Export 使用记录及完整修正文件下载 jquery.wordexport.js

参考资料&#xff1a; jQuery-Word-Export导出word_jquery.wordexport.js下载-CSDN博客 近期又需要自己做个 Html2Doc 的解决方案&#xff0c;因为客户又不想要 Html2pdf 的下载了&#xff0c;当初还给我费尽心思解决Html转pdf时中文输出的问题&#xff08;html转pdf文件下载之…...

云服务器部署WebSocket项目

WebSocket是一种在单个TCP连接上进行全双工通信的协议&#xff0c;其设计的目的是在Web浏览器和Web服务器之间进行实时通信&#xff08;实时Web&#xff09; WebSocket协议的优点包括&#xff1a; 1. 更高效的网络利用率&#xff1a;与HTTP相比&#xff0c;WebSocket的握手只…...

C#+数据库 实现动态权限设置

将权限信息存储在数据库中&#xff0c;支持动态调整。根据用户所属的角色、特定的功能模块&#xff0c;动态加载权限” 1. 数据库设计 根据这种需求&#xff0c;可以通过以下表设计&#xff1a; 用户表 (Users)&#xff1a;存储用户信息。角色表 (Roles)&#xff1a;存储角色…...

(原创)Android Studio新老界面UI切换及老版本下载地址

前言 这两天下载了一个新版的Android Studio&#xff0c;发现整个界面都发生了很大改动&#xff1a; 新的界面的一些设置可参考一些博客&#xff1a; Android Studio新版UI常用设置 但是对于一些急着开发的小伙伴来说&#xff0c;没有时间去适应&#xff0c;那么怎么办呢&am…...

无锡网站制作哪家便宜/个人免费开发app

进程管理 1. 进程管理基础 在Linux中&#xff0c;每个执行的**程序&#xff08;代码&#xff09;**都称为一个进程。每个进程都分配一个ID号每一个进程&#xff0c;都会对应一个父进程&#xff0c;而这个父进程可以复制多个子进程。例如www服务器。每个进程都可能以两种方式存…...

做版权保护的网站/百度软件开放平台

基于MATLAB的直流电动机启动的仿真研究.pdf电子技术研发 Electronics R & D10.3969/j.issn.1000-0755.2013.07.003基于MATLAB 的直流电动机启动的仿真研究汪明先 荣 军 吴祥营 陈 敏 刘 凌(湖南理工学院信息与通信工程学院&#xff0c;湖南 岳阳)摘 要&#xff1a;直流电动…...

正规的佛山网站建设价格/免费网站收录入口

目录&#xff1a;导读一、前言二、Monkey简介1、介绍2、adb 的常用命令3、Monkey的常用命令三、Monkey参数详情1、本参数2、发送事件的类型3、约束条件4、调试选项四、Monkey测试命令1、不输入日志2、输入日志五、目的一、前言 随机的命令对APP进行自动化测试&#xff0c;可以…...

做钢结构网站有哪些/免费seo在线优化

我们都知道调用WCF直接在Service References中引用可以远程调用的WCF Url就行了。 但是我们想过没&#xff0c;在Development环境中可以这样做&#xff0c;但是QA、UAT、Production上我们怎么做呢&#xff1f; WCF的通信方式主要有Http和Tcp&#xff0c;这次我们用Http。 好了&…...

奇璐荣获北京十大高端设计公司称号/上海公司网站seo

在Oracle安装目录\10.2\jdbc\lib 下&#xff0c;找到classes12.jar&#xff0c;它就是用于连接Oracle数据库的JDBC驱动程序。将其拷贝到Tomcat的安装目录"Tomcat 6.0\lib"目录下。重启Tomcat&#xff0c;这个驱动程序即可生效。 编写JSP程序 <% page language&quo…...

服装网站建设策划书/外链群发

1、Fragment的产生与介绍 Android运行在各种各样的设备中&#xff0c;有小屏幕的手机&#xff0c;超大屏的平板甚至电视。针对屏幕尺寸的差距&#xff0c;很多情况下&#xff0c;都是先针对手机开发一套app&#xff0c;然后拷贝一份&#xff0c;修改布局以适应什么超级大屏的。…...