当前位置: 首页 > news >正文

RabbitMQ消息的可靠传输和防止消息丢失

在Spring Cloud项目中,为了确保RabbitMQ消息的可靠传输和防止消息丢失,需要考虑以下几个方面:

  1. 消息持久化:确保消息在RabbitMQ中持久化。
  2. 队列持久化:确保队列是持久化的。
  3. 发布确认:使用发布确认机制确保消息发送到RabbitMQ。
  4. 消费者确认:确保消费者正确地确认消息。
  5. 重试机制:在消息消费失败时,设置重试机制。

下面详细介绍如何实现这些措施:

1. 添加依赖

确保在你的pom.xml中添加了Spring Boot和RabbitMQ的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 配置RabbitMQ

application.ymlapplication.properties文件中配置RabbitMQ:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: true

3. 定义配置类

创建一个配置类来配置队列、交换机和绑定:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "myQueue";public static final String EXCHANGE_NAME = "myExchange";public static final String ROUTING_KEY = "myRoutingKey";@Beanpublic Queue myQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}@Beanpublic DirectExchange myExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding myBinding(Queue myQueue, DirectExchange myExchange) {return BindingBuilder.bind(myQueue).to(myExchange).with(ROUTING_KEY);}
}

4. 配置消息生产者

确保消息生产者配置了发布确认和消息持久化:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {// 设置发布确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("Message delivered successfully: " + correlationData);} else {System.err.println("Failed to deliver message: " + correlationData + ", cause: " + cause);}}});// 设置消息返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("Returned Message: " + new String(message.getBody()) +", replyCode: " + replyCode + ", replyText: " + replyText +", exchange: " + exchange + ", routingKey: " + routingKey);});}public void sendMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);}
}

5. 配置消息消费者

确保消息消费者配置了消息确认机制:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class MessageConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void handleMessage(String message, Channel channel, Message message) throws Exception {try {// 处理消息System.out.println("Received Message: " + message);// 消息确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 消费失败,重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

6. 启用重试机制

在Spring Cloud Stream中启用重试机制:

spring:cloud:stream:bindings:input:destination: myQueueconsumer:retry:max-attempts: 5backOffPolicy:initialInterval: 1000multiplier: 2.0maxInterval: 10000

7. 测试

测试消息生产和消费,确保消息在各种情况下都不会丢失,包括网络故障、RabbitMQ服务器重启等。

总结

通过以上步骤,你可以在Spring Cloud项目中使用RabbitMQ并确保消息不会丢失。关键在于:

  1. 消息和队列的持久化:确保消息和队列都是持久化的。
  2. 发布确认:启用发布确认回调机制,确保消息被正确地发送到RabbitMQ。
  3. 消费者确认:确保消费者正确地确认消息。
  4. 重试机制:在消费失败时启用重试机制,以确保消息最终能够被成功处理。

通过这些配置,可以显著提高消息传输的可靠性,防止消息丢失。

相关文章:

RabbitMQ消息的可靠传输和防止消息丢失

在Spring Cloud项目中&#xff0c;为了确保RabbitMQ消息的可靠传输和防止消息丢失&#xff0c;需要考虑以下几个方面&#xff1a; 消息持久化&#xff1a;确保消息在RabbitMQ中持久化。队列持久化&#xff1a;确保队列是持久化的。发布确认&#xff1a;使用发布确认机制确保消…...

.net8系列-图文并茂手把手教你使用Nlog记录.net日志

Nlog是什么&#xff1f; NLog是一个为.NET平台设计的灵活且免费的日志记录库。它适用于包括.NET Framework、.NET Core和.NET Standard在内的多种.NET环境。 Nlog有什么好处或者特点&#xff1f; 配置灵活&#xff1a;NLog允许开发者通过配置文件&#xff08;通常是NLog.conf…...

课时158:脚本发布_简单脚本_远程执行

2.1.3 远程执行 学习目标 这一节&#xff0c;我们从 基础知识、简单实践、小结 三个方面来学习 基础知识 简介 有时候&#xff0c;我们需要通过远程方式到另外一台主机进行脚本的执行 格式&#xff1a;ssh 远程主机登录用户名远程主机ip地址 "执行命令"效果 [r…...

3dmax2025能用云渲染吗?2025最新云渲染渲染100使用方法

3dmax2025还没用上云渲染&#xff1f;简单3步用上云渲染。 第一步&#xff0c;打开浏览器搜索渲染100&#xff0c;并进入下载客户端并安装 第二步&#xff0c;打开已安装的客户端进行安装&#xff0c;点击登录&#xff0c;未登录注册个账号即可&#xff08;注册账号时邀请码填…...

从零开始学GeoServer源码(一)(搭建开发环境Win10+IDEA23.3.5+jdk11+geoserver2.24.x)

搭建开发环境 参考资料 0、基础环境准备0.1、idea0.2、jdk0.3、源码 1、导入工程2、配置启动环境2.1、打开新增配置面板2.2、配置工作目录2.2.1、从常用配置中选择2.2.2、直接粘贴 2.3最终效果 3、调整源码3.1、添加maven引用3.2、注释无效代码3.3、删除测试代码 4、修改运行端…...

分类模型:MATLAB判别分析

1. 判别分析简介 判别分析&#xff08;Discriminant Analysis&#xff09; 是一种统计方法&#xff0c;用于在已知分类的样本中构建分类器&#xff0c;并根据特征变量对未知类别的样本进行分类。常见的判别分析方法包括线性判别分析&#xff08;Linear Discriminant Analysis, …...

生产 的mybatisplus 日志输入到日志文件

默认是输出到控制台.不输出到日志文件 输出到日志文件.需要修改配置 第一步. logging:config: classpath:logback-wshoto.xml第二步 mybatis-plus:configuration:cache-enabled: truedefault-executor-type: reuselog-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl第三步…...

八分钟生成一篇两万字的文章演示——《基于灰色预测的人口预测模型》

文章目录 工具使用 《基于灰色预测的人口预测模型》-全文由AI一次性生成文献综述研究方法模型开发灰色预测模型的数学构建参数估计模型验证 案例研究案例研究描述数据收集与预处理灰色预测模型的应用 文献综述研究方法模型开发灰色预测模型的数学构建参数估计模型验证 案例研究…...

golang 透明底图转白底

url : ""var s []byte// 请求线上图片s GetUrl(url)// 处理透明底图转白底img, _, err : image.Decode(bytes.NewReader(s))if err ! nil {fmt.Println("读取图片失败")}bounds : img.Bounds()dst : image.NewNRGBA(bounds)draw.Draw(dst, bounds, image.…...

【一】【网络使用小知识】使用aria2软件结合Windows PowerShell命令行快速下载文件

下载aria2软件 点击进入网址,aria2下载网址. 下载windows版本. 通过Windows PowerShell命令行使用aria2软件下载文件 通用下载文件命令行代码 aria2软件完整路径 -x 16 -s 32 -d 下载目录(文件夹) -o 文件名 下载链接路径示例,用aria2下载qq 找到aria2应用的直接地址,结合…...

报错:C1189#error: The <experimental/filesystem> header providing 解决方案

今天开发过程中&#xff0c;需要使用文件系统experimental/filesystem&#xff0c;报错C1189#error: The &#xff1c;experimental/filesystem&#xff1e; header providing &#xff0c;通过以下解决方案&#xff0c;成功运行程序。 目录 一、打开项目下的属性 二、选择C/…...

Elixir学习笔记——速构(函数式编程基础)

在 Elixir 中&#xff0c;循环遍历 Enumerable 是很常见的&#xff0c;通常会过滤掉一些结果并将值映射到另一个列表中。 速构是此类构造的语法糖&#xff1a;它们将这些常见任务分组为 for 特殊形式。 例如&#xff0c;我们可以将一串整数映射到它们的平方值&#xff1a; 速构…...

开源项目大合集(热门)

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…...

【JVM】JVisualVM的介绍、使用和GC过程

VisualVM介绍 VisualVM 是Netbeans的profile子项目&#xff0c;已在JDK6.0 update 7 中自带&#xff0c;能够监控线程&#xff0c;内存情况&#xff0c;查看方法的CPU时间和内存中的对 象&#xff0c;已被GC的对象&#xff0c;反向查看分配的堆栈(如100个String对象分别由哪几…...

个人在家如何获取World Scientific文献的经验分享

今天有位同学求助一篇World Scientific文献&#xff0c;他的学校虽然有这个数据库&#xff0c;但订购的该数据库资源内容有限&#xff0c;这位同学所需的文献不在学校订购范围内所以下载不了。今天小编就分享一个在家就可获取各个数据库文献的方法。本文以这篇求助文献为例&…...

Java 收集常见面试题

set和list的区别&#xff1f;给定一系列字符串&#xff0c;从集合的set和list中查询&#xff0c;如何查询出相关的数据&#xff1f; 在Java中&#xff0c;Set和List都是用于存储对象的集合 Set&#xff1a; 不允许包含重复的元素。 没有顺序&#xff08;即不保证元素的迭代顺序…...

JS 严格模式和正常模式的区别

严格模式使用"use strict"; 作用&#xff1a; 消除 Javascript 语法的一些不合理、不严谨之处&#xff0c;减少一些怪异行为;消除代码运行的一些不安全之处&#xff0c;保证代码运行的安全&#xff1b;提高编译器效率&#xff0c;增加运行速度&#xff1b;为未来新…...

9种编程语言的对比分析

在当今的软件开发领域&#xff0c;编程语言扮演着至关重要的角色。不同的编程语言各有其特点和适用场景&#xff0c;选择合适的编程语言能够提高开发效率和软件质量。本文将对十种常见的编程语言进行对比分析&#xff0c;帮助读者了解它们的优缺点和适用场景。 Java 特点&…...

模拟14位相机输出Verilog代码

1 代码 `timescale 1ns / 1psmodule simulate_camera_out (input clk,input rest_n,output camera_clk, //像素时钟output [13:0] camera_data, //像素值数据output [19:0] pixel_xy, //此时输出的像素值坐标output reg frame_valid //帧有效信号,1代表帧有效0代表帧无效…...

Linux远程访问及控制

SSH远程管理 SSH(Secure Shell)是一种安全通道协议&#xff0c;主要用来实现字符界面的远程登录、远程复制等功能。SSH 协议对通信双方的数据传输进行了加密处理&#xff0c;其中包括用户登录时输入的用户口令。与早期的 Telent(远程登录)、RSH(Remote Shell&#xff0c;远程执…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

es6+和css3新增的特性有哪些

一&#xff1a;ECMAScript 新特性&#xff08;ES6&#xff09; ES6 (2015) - 革命性更新 1&#xff0c;记住的方法&#xff0c;从一个方法里面用到了哪些技术 1&#xff0c;let /const块级作用域声明2&#xff0c;**默认参数**&#xff1a;函数参数可以设置默认值。3&#x…...

算术操作符与类型转换:从基础到精通

目录 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符&#xff1a;、-、*、/、% 赋值操作符&#xff1a;和复合赋值 单⽬操作符&#xff1a;、--、、- 前言&#xff1a;从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...

ThreadLocal 源码

ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物&#xff0c;因为每个访问一个线程局部变量的线程&#xff08;通过其 get 或 set 方法&#xff09;都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段&#xff0c;这些类希望将…...