springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。
发布确认
发布确认方案
架构
配置文件
在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果
回退消息
Mandatory 参数
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}
结果
接收到被退回的消息
备份交换机
架构
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}
报警消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果
相关文章:
springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…...
【linux命令讲解大全】010. mapfile命令和tempfile命令的用法及示例
文章目录 mapfile概要主要用途选项参数返回值例子 tempfile补充说明tempfile 命令$$ 变量 从零学 python mapfile 从标准输入读取行并赋值到数组。 概要 mapfile [-d delim] [-n count] [-O origin] [-s count] [-t] [-u fd] [-C callback] [-c quantum] [array] 主要用途 …...
在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字
一、说明 为了预测从0到9的数字,我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中,我将根据像素数据(即数值数据)和卷积神经网络预测数字。 二、 卷积…...
前端分页处理
页面中实现的分页效果,要么后端提供接口,每次点击下一页就调用接口,若不提供接口,分页得前端自己去截取。 方法一:slice方法 slice(参数1,参数2)方法是返回一个新的数组对象,左开右闭 参数1&…...
【C语言】位操作符的一些题目与技巧
初学者在学完位操作符之后,总是不能很好的掌握,因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受,但是看完一定会有收获 目录 位操作符:一些题目:不创建临时变量交换整数整数转换二进制中1的…...
爬虫逆向实战(二十二)--某恩数据电影票房
一、数据接口分析 主页地址:某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密? 无请求头是否加密? 无响应是否加密? 通过查看“响应”模块可以发现,响应是…...
火山引擎发布自研视频编解码芯片
2023年8月22日,火山引擎视频云宣布其自研的视频编解码芯片已成功出片。经验证,该芯片的视频压缩效率相比行业主流硬件编码器可提升30%以上,未来将服务于抖音、西瓜视频等视频业务,并将通过火山引擎视频云开放给企业客户。 火山引…...
投递技术类简历的注意事项
简历修改的背景 作为程序员,随着工作年限的增加,要定期的去修改自己的简历中的工作项目,一方面可以促进自己复盘一下工作成果和个人成长,另外也能给自己换工作提供一个前置的便捷性。 注意事项 修改简历的时候有哪些需要注意的…...
每日一题——柱状图中最大的矩形
柱状图中最大的矩形 题目链接 用什么数据结构? 要得到柱状图中最大的矩形,我们就必须要知道对于每一个高度heights[i],他所能勾勒出的矩形最大是多少(即宽度最大是多少)。 而对应到图上我们可以知道,要知…...
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台:BPI-5202信创工业控制开发平台 BPI-5202 龙芯2K1000LA 信创工业控制开发平台 1.1 工控机的应用场景 物联网的狂潮,既是一场众多的计算机软硬件厂家(也包括通讯方案和产品厂家&…...
springCloud整合Zookeeper的时候调用找不到服务
SpringCloud整合Zookeeper的时候调用找不到服务 首先,我们在注册中心注册了这个服务: 然后我们使用RestTemplate 调用的时候发现失败了:找不到这个服务: 找了很多资料发现这个必须要加上负载才行 BeanLoadBalanced //负载publi…...
【kubernetes】使用kubepshere部署中间件服务
KubeSphere部署中间件服务 入门使用KubeSphere部署单机版MySQL、Redis、RabbitMQ 记录一下搭建过程 (内容学习于尚硅谷云原生课程) 环境准备 VMware虚拟机k8s集群,一主两从,master也作为工作节点;KubeSphere k8skubesphere devops比较占用磁…...
如何从tabbar页面传数据
无论是百度小程序还是微信小程序,app.json中规定的tabbar页面是不支持传参的,例如: <navigator url../service/service?typeid6 openType"switchTab"> 服务项目 </navigator> 上面的navigater跳转有个属性&#…...
软考高级系统架构设计师系列论文七十四:基于构件的软件开发
软考高级系统架构设计师系列论文七十四:基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...
图为科技_边缘计算在智能安防领域的作用
边缘计算在智能安防领域发挥着重要的作用。智能安防系统通常需要处理大量的图像、视频和传感器数据,并对其进行实时分析和处理。边缘计算可以将计算和数据处理功能移动到离数据源更接近的地方,例如摄像头、传感器设备或安防终端。 以下是边缘计算在智能…...
Android 13 - Media框架(7)- NuPlayer::Source
Source 在播放器中起着拉流(Streaming)和解复用(demux)的作用,Source 设计的好坏直接影响到播放器的基础功能,我们这一节将会了解 NuPlayer 中的通用 Source(GenericSource)关注本地…...
MySql015——使用子查询
一、创建customers表 ######################## # Create customers table ######################## use study;CREATE TABLE customers (cust_id int NOT NULL AUTO_INCREMENT,cust_name char(50) NOT NULL ,cust_address char(50) NULL ,cust_city char…...
leetcode 355 设计推特
用链表存储用户发送的每一个推特,用堆获取最先的10条动态 class Twitter {Map<Integer,Set<Integer>> followMap;//规定最新的放到最后Map<Integer,Tweet> postMap;//优先队列(堆)PriorityQueue<Tweet> priorityQueue;int time…...
倒数 2 周|期待 2023 Google开发者大会
9 月 6-7 日,中国上海 前沿科技,新知同享 趣味体验,灵感齐聚 技术生态,多元共进 关注官网最新信息,敬请期待大会开幕 2023 Google 开发者大会官网 相信你一定记得,在今年 5 月的 Google I/O 大会上&am…...
代码随想录day57
516最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {vector<vector<int>>dp(s.size(),vector<int>(s.size(),0));for(int i0;i<s.size();i)dp[i][i]1;for(int is.size()-1;i>0;i--){for(int ji1;j<s.size();j){if…...
YOLOv5、v8改进:CrissCrossAttention注意力机制
目录 1.简介 2. yolov5添加方法: 2.1common.py构建CrissCrossAttention模块 2.2yolo.py中注册 CrissCrossAttention模块 2.3修改yaml文件。 1.简介 这是ICCV2019的用于语义分割的论文,可以说和CVPR2019的DANet遥相呼应。 和DANet一样,…...
RabbitMQ特性介绍和使用案例
❤ 作者主页:李奕赫揍小邰的博客 ❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~* 🍊 记得点赞、收藏、评论⭐️⭐️⭐️ 📣 认真学习!!!🎉🎉 文章目录 RabbitMQ特性…...
Ansible 使用 RHEL 系统角色
安装 RHEL 系统角色软件包,并创建符合以下条件的 playbook /home/greg/ansible/timesync.yml 在所有受管节点上运行 使用 timesync 角色 配置该角色,以使用当前有效的 NTP 提供商 配置该角色,以使用时间服务器 172.25.254.254 配置该角色&am…...
重新认识Android中的线程
线程的几种创建方式 new Thread:可复写Thread#run方法。也可以传递Runnable对象,更加灵活。缺点:缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统的资源导致死机或oom。 new Thread(new…...
前端(十五)——GitHub开源一个react封装的图片预览组件
👵博主:小猫娃来啦 👵文章核心:GitHub开源一个react封装的图片预览组件 文章目录 组件开源代码下载地址运行效果展示实现思路使用思路和api实现的功能数据和入口部分代码展示 组件开源代码下载地址 Gitee:点此跳转下载…...
DELL Power Edge R740 安装 OracleLinux-R7-U9-Server
一、准备好 OracleLinux-R7-U9-Server-x86_64-dvd 安装介子: 二、通过 iDRAC挂dvd 安装介子 三、在 iDRAC 开机控制选择虚拟 CD/DCD/ISO 电源控制选择 复位系统(热启动) 四、进入安装阶段 五、配置时区 六、配置磁盘 七、删除之前的旧分区 …...
深入了解OpenStack:创建定制化QCOW2格式镜像的完全指南
OpenStack 创建自定义的QCOW2格式镜像 前言 建议虚机网络配置为 NAT 或 桥接,因为未来 KVM虚机 需要借助 虚机 的外网能力进行联网安装软件包 虚机在启动前,必须在 VMware Workstation 上为其开启虚拟化引擎 虚拟化 Intel VT-x/EPT 或 AMD-V 安装kvm …...
【Java 中级】一文精通 Spring MVC - 数据格式化器(六)
👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主 ⛪️ 个人社区&#x…...
Linux内核学习(十二)—— 页高速缓存和页回写(基于Linux 2.6内核)
目录 一、缓存手段 二、Linux 页高速缓存 三、flusher 线程 Linux 内核实现了一个被叫做页高速缓存(page cache)的磁盘缓存,它主要用来减少对磁盘的 I/O 操作。它是通过把磁盘中的数据缓存到内存中,把对磁盘的访问变为对物理内…...
大数据-玩转数据-Flink窗口函数
一、Flink窗口函数 前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种. Reduc…...
网页设计实训报告记录和结果分析/桔子seo
QQ概念版,触摸是王道 转载于:https://www.cnblogs.com/nuddle/archive/2010/05/06/1728535.html...
做网站的工作/靠谱的免费建站
继 2014 年 3 月 Java 8 发布之后,时隔 4 年,2018 年 9 月,Java 11 如期发布,其间间隔了 Java 9 和 Java 10 两个非LTS(Long Term Support)版本。作为最新的LTS版本,相比 Java 8,Java 11 包含了模块系统、改…...
做网站后有人抢注品牌关键字/seo站长综合查询
应用篇:qt写入版本号应用篇:qt写入版本号创建.rc文件在工程文件中载入.rc文件转载需注明出处应用篇:qt写入版本号 在软件开发及交付的过程中,常常需要对已经发布的软件进行技术维护和bug追踪,而这个恰恰依赖于工程师良…...
自己做外贸购物网站/电子商务网站建设论文
1.爬虫导出方法使用: 2.导入数据库: 1.创建表结构: 2.安装数据库驱动:(pip install -i https://pypi.douban.com/simple mysqlclient) 数据库连接: 声明到settings.py: 异步入库: 解决异步插入数据库重复插入问题: itemloader提取信息: 1. 追加修改爬取的值 可以增加多个值: …...
网站空间下载/网络服务电话
【实例简介】Android使用websocket 进行通讯,推送消息给客户端,实现即时通讯。【实例截图】【核心代码】153473androidWebsocketDemo└── androidWebsocketDemo-master├── README.md├── app│ ├── build.gradle│ ├── libs│ │ └…...
北京网站建设公司分享网站改版注意事项/百度seo培训
采样(sample): PCM audio不论是输入还是输出,都包含采样,采样达标声音的一个声道在某个特定时间点的振幅。 很多这样的采样组成了声音。样本是记录音频数据的最基本单位。对于CD audio,每秒有44100个采样。 采样的尺寸从8bit 到64…...