rabbitMq举例
新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!
生产者
代码举例
public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}
/*** 发送消息* @param exchangeName exchangeName* @param routingKey routingKey* @param msg mq message* @return msgId*/public String sendMsg(final String exchangeName,final String routingKey,final String msg) {final CorrelationData correlationDataInfo = new CorrelationData();final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));correlationDataInfo.setId(msgId);//send over callback lograbbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(correlationData!=null) {final String id = correlationData.getId();if (ack) {log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));} else {log.error("消息投递失败,消息Id[{}] [{}]", id, cause);}}else {log.error("消息投递失败,correlationData为null!");}});rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
分析代码
这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:
1. 方法签名
java
public String sendMsg(final String exchangeName, final String routingKey, final String msg)
- 该方法接收三个参数:
exchangeName
:消息发送的交换机名称。routingKey
:消息的路由键,用来确定消息将被路由到哪个队列。msg
:待发送的消息内容。
2. CorrelationData 创建与消息 ID 设置
java
final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);
CorrelationData
:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。- 生成
msgId
:msgId
是通过拼接一个前缀"yl:dps"
、交换机名exchangeName
、路由键routingKey
,以及通过IdUtil.getSnowflake(1, 3).nextId()
生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。 setId(msgId)
:将生成的msgId
设置到correlationDataInfo
对象中。
3. 设置回调函数
java
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });
setConfirmCallback
:通过rabbitTemplate.setConfirmCallback()
方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。- 参数说明:
correlationData
:携带消息相关信息的对象(这里是correlationDataInfo
),包含了消息的 ID 等。ack
:布尔值,表示消息是否成功投递(true
表示成功,false
表示失败)。cause
:如果ack
为false
,cause
会包含失败的原因。
- 回调内容:
- 如果
correlationData
不为空:- 如果
ack
为true
,则表示消息投递成功,记录日志。 - 如果
ack
为false
,则表示消息投递失败,记录失败日志,并包含失败原因。
- 如果
- 如果
correlationData
为null
,记录错误日志。
- 如果
4. 发送消息
java
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);
convertAndSend
:该方法用于将消息发送到指定的交换机(exchangeName
)、路由键(routingKey
)和消息内容(msg
)。同时,将correlationDataInfo
(包含msgId
)附带发送,以便在回调中进行确认和记录。rabbitTemplate
是RabbitTemplate
对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。
5. 返回消息 ID
return msgId;
- 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。
总结:
这段代码的主要作用是:
- 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
- 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
- 调用
rabbitTemplate.convertAndSend()
方法向指定的交换机发送消息,同时附带CorrelationData
(包含消息 ID)。 - 返回生成的消息 ID,以便后续追踪或处理。
关键点:
- 消息确认机制:使用
ConfirmCallback
确保消息是否成功投递,并且通过日志记录投递状态。 - 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
- 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。
消费者
代码举例
@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),key = "${rabbitmq.yl.x.routing.key}"))public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("MQ-收到【x】状态变更消息:{} ", payload);final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));if (NumberUtil.isNumber(objDeliveryTag)) {final long deliveryTag= Long.parseLong(objDeliveryTag);log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);channel.basicAck(deliveryTag,true);}if(CharSequenceUtil.isNotBlank(payload)) {}}
在这个 Java 方法 leaseStateChange
中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:
方法签名解析
public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException
1. public void leaseStateChange
public
: 表明这个方法是公开的,可以被外部调用。void
: 表明这个方法没有返回值。leaseStateChange
: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
-
@Payload
: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload
是一个String
类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。-
注解的作用:
@Payload
使得 Spring 能够自动将消息的主体部分注入到方法参数payload
中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给payload
参数。 -
示例: 假设接收到的消息体是
"{"state": "active", "leaseId": "12345"}"
,payload
将会是该字符串。
-
3. @Headers Map<String, Object> headers
-
@Headers
: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。-
注解的作用:
@Headers
会将消息头部的内容绑定到headers
参数,这个参数是一个Map<String, Object>
类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。 -
示例: 如果消息头包含如下信息:
{"correlationId": "abc123", "messageType": "leaseUpdate"}
那么
headers
将会是一个Map
,其内容是:{"correlationId": "abc123", "messageType": "leaseUpdate"}
-
4. Channel channel
-
Channel
: 这是 RabbitMQ 的核心概念之一。Channel
代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。-
作用: 在 Spring AMQP 中,
Channel
通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用Channel
来手动确认消息,或者控制消息是否成功消费。 -
示例: 如果在消息处理过程中出现异常,消费者可能需要通过
channel.basicNack()
方法来拒绝该消息并可能重新入队。
-
5. throws IOException
throws IOException
: 表明这个方法可能会抛出IOException
异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。
Spring AMQP 消费者代码示例
假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Map;@Component
public class LeaseStateChangeListener {// 监听指定队列的消息@RabbitListener(queues = "leaseStateQueue")public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {try {// 处理消息体System.out.println("Received message: " + payload);// 获取消息头部信息String correlationId = (String) headers.get("correlationId");String messageType = (String) headers.get("messageType");System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);// 模拟处理业务逻辑processLeaseStateChange(payload);// 确认消息已成功消费channel.basicAck(headers.hashCode(), false); // 手动确认消息} catch (Exception e) {// 异常处理,拒绝消息并重新入队System.err.println("Error processing message: " + e.getMessage());channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队}}private void processLeaseStateChange(String payload) {// 假设这里是处理租赁状态更新的业务逻辑// 比如将消息解析为对象,进行租赁状态更新System.out.println("Processing lease state change for payload: " + payload);}
}
解析
-
@RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听
leaseStateQueue
队列。当有消息到达这个队列时,这个方法会被调用。 -
消息体 (
payload
): 这个方法会接收到一个消息体,@Payload
注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数payload
上。 -
消息头 (
headers
): 使用@Headers
注解将消息的头部信息绑定到headers
参数上,Map<String, Object>
类型。你可以从中获取如correlationId
和messageType
等附加信息。 -
Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用
channel.basicAck()
来确认消息,表示该消息已经被成功消费。如果处理失败,调用channel.basicNack()
拒绝该消息,并可以选择是否重新入队。
总结
- 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
- 通过
@Payload
获取消息体内容,使用@Headers
获取消息头信息。 - 使用
Channel
来确认消息的处理状态。 - 使用
@RabbitListener
注解自动监听队列,并处理消费的消息。
这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。
消息怎么知道发给哪一个队列
先看队列与交换机怎么绑定的
先创建队列,然后绑定到交换机
RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书
相关文章:
rabbitMq举例
新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服! 生产者 代码举例 public String sendMsg(final String exchangeName,final String routingKey,final String msg) {} /*** 发送消息* param exchangeName exchangeName* param routin…...
奇怪的知识又增加了:ESP32下的Lisp编程=>ULisp--Lisp for microcontrollers
ESP32下有MicroPython,那么我就在想,有Lisp语言支持吗?答案是果然有!有ULisp,专门为MCU设计的Lisp! 网址:uLisp - Lisp for microcontrollers 介绍:用于微控制器的 Lisp 适用于 Ar…...
渗透测试之信息收集
免责声明:使用本教程或工具,用户必须遵守所有适用的法律和法规,并且用户应自行承担所有风险和责任。 文章目录 1. 基础信息收集2. 网络资产发现3. 网站和应用信息4. 技术栈识别5. 安全漏洞和配置6. 移动应用分析7.Google语法常见Google使用场…...
基本分页存储管理
一、实验目的 目的:熟悉并掌握基本分页存储管理的思想及其实现方法,熟悉并掌握基本分页存储管理的分配和回收方式。 任务:模拟实现基本分页存储管理方式下内存空间的分配和回收。 二、实验内容 1、实验内容 内存空间的初始化——可以由用户输…...
SQLServer到MySQL的数据高效迁移方案分享
SQL Server数据集成到MySQL的技术案例分享 在企业级数据管理中,跨平台的数据集成是一个常见且关键的任务。本次我们将探讨如何通过轻易云数据集成平台,将巨益OMS系统中的退款单明细表从SQL Server高效、安全地迁移到MySQL数据库中。具体方案名称为“7--…...
软考:工作后再考的性价比分析
引言 在当今的就业市场中,软考(软件设计师、系统分析师等资格考试)是否值得在校学生花费时间和精力去准备?本文将从多个角度深入分析软考在不同阶段的性价比,帮助大家做出明智的选择。 一、软考的价值与局限性 1.1 …...
shell编程(完结)
shell编程(完结) 声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章 笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其…...
UNIX数据恢复—UNIX系统常见故障问题和数据恢复方案
UNIX系统常见故障表现: 1、存储结构出错; 2、数据删除; 3、文件系统格式化; 4、其他原因数据丢失。 UNIX系统常见故障解决方案: 1、检测UNIX系统故障涉及的设备是否存在硬件故障,如果存在硬件故障…...
adb连接逍遥安卓模拟器失败的问题解决方案
1、逍遥安卓模拟器进入系统应用,设置-关于平板电脑-版本号,连续点击3次以上,直到提示进入开发者模式,返回设置界面,进入【开发者选项】-【USB调试】开启,之后重启模拟器再次adb尝试连接。 2、android stud…...
【昇腾】NPU ID:物理ID、逻辑ID、芯片映射关系
起因: https://www.hiascend.com/document/detail/zh/Atlas%20200I%20A2/23.0.0/re/npu/npusmi_013.html npu-smi info -l查询所有NPU设备: [naienotebook-npu-bd130045-55bbffd786-lr6t8 DCNN]$ npu-smi info -lTotal Count : 1NPU…...
Three.js曲线篇 8.管道漫游
目录 创建样条曲线 创建管道 透视相机漫游 完整代码 大家不要被这个“管道漫游”这几个字所蒙骗了,学完后大家就知道这个知识点有多脏了。我也是误入歧途,好奇了一下“管道漫游”。好了,现在就给大家展示一下为啥这个只是点脏了。 我也废话…...
scala基础_数据类型概览
Scala 数据类型 下表列出了 Scala 支持的数据类型: 类型类别数据类型描述Scala标准库中的实际类基本类型Byte8位有符号整数,数值范围为 -128 到 127scala.Byte基本类型Short16位有符号整数,数值范围为 -32768 到 32767scala.Short基本类型I…...
【LeetCode刷题之路】622.设计循环队列
LeetCode刷题记录 🌐 我的博客主页:iiiiiankor🎯 如果你觉得我的内容对你有帮助,不妨点个赞👍、留个评论✍,或者收藏⭐,让我们一起进步!📝 专栏系列:LeetCode…...
暂停一下,给Next.js项目配置一下ESLint(Next+tailwind项目)
前提 之前开自己的GitHub项目,想着不是团队项目,偷懒没有配置eslint,后面发现还是不行。eslint的存在可以帮助我们规范代码格式,同时 ctrl s保存立即调整代码格式是真的很爽。 除此之外,团队使用eslint也是好处颇多…...
Windows系统磁盘与分区之详解(Detailed Explanation of Windows System Disks and Partitions)
Windows系统磁盘与分区知识详解 在日常使用Windows操作系统的过程中,我们常常会接触到磁盘管理,磁盘分区等操作.然而,许多人可能并不完全理解磁盘和分区的运作原理以及如何高效管理它们. 本篇文章将探讨Windows系统中关于磁盘和分区的各种知识,帮助大家更好地理解磁盘以及分区…...
顺序表的使用,对数据的增删改查
主函数: 3.c #include "3.h"//头文件调用 SqlListptr sql_cerate()//创建顺序表函数 {SqlListptr ptr(SqlListptr)malloc(sizeof(SqlList));//在堆区申请连续的空间if(NULLptr){printf("创建失败\n");return NULL;//如果没有申请成功ÿ…...
XDMA与FPGA:高效数据传输的艺术
XDMA与FPGA:高效数据传输的艺术 引言 在现代计算系统中,数据传输的效率直接影响系统的整体性能。特别是在涉及到高速数据处理的领域,如高性能计算(HPC)、实时视频处理和大数据分析等,如何高效地在主机与F…...
#思科模拟器通过服务配置保障无线网络安全Radius
演示拓扑图: 搭建拓扑时要注意: 只能连接它的Ethernet接口,不然会不通 MAC地址绑定 要求 :通过配置MAC地址过滤禁止非内部员工连接WiFi 打开无线路由器GUI界面,点开下图页面,配置路由器无线网络MAC地址过…...
浅谈Python库之pillow
一、pillow的介绍 Pillow是Python Imaging Library (PIL) 的一个分支,它是一个强大的图像处理库,用于打开、操作和保存许多不同图像文件格式。Pillow提供了广泛的文件格式支持、强大的图像处理能力和广泛的文件格式兼容性。它是PIL的一个友好的分支&…...
Android通过okhttp下载文件(本文案例 下载mp4到本地,并更新到相册)
使用步骤分为两步 第一步导入 okhttp3 依赖 第二步调用本文提供的 utils 第一步这里不做说明了,直接提供第二步复制即用 DownloadUtil 中 download 为下载文件 参数说明 这里主要看你把 destFileName 下载文件名称定义为什么后缀,比如我定义为 .mp4 下…...
计算机网络从诞生之初到至今的发展历程
前言 "上网",相信大家对这个动词已经不再陌生,网 通常指的是网络;在 2024 年的今天,网络已经渗透到了每个人的生活中,成为其不可或缺的一部分;你此时此刻在看到我的博客,就是通过网络…...
Kudu 源码编译-aarch架构 1.17.1版本
跟着官方文档编译 第一个问题:在make阶段时会报的问题: kudu/src/kudu/util/block_bloom_filter.cc:210:3: error: ‘vst1q_u32_x2’ was not declared in this scope kudu/src/kudu/util/block_bloom_filter.cc:436:5: error: ‘vst1q_u8_x2’ was no…...
SEC_ASA 第二天作业
拓扑 按照拓扑图配置 NTP,Server端为 Outside路由器,Client端为 ASA,两个设备的 NTP传输使用MD5做校验。(安全 V4 LAB考点) 提示:Outside路由器作为 Server端要配置好正确的时间和时区,ASA防…...
操作系统(5)进程
一、定义与特点 定义:进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 特点: 动态性:进程是动态创建的,有它自身的生命周期,…...
6_Sass 选择器函数 --[CSS预处理]
Sass 提供了一系列的选择器函数,用于操作和组合CSS选择器。这些函数可以帮助你更灵活地创建样式规则,并且可以减少重复代码。以下是几个常用的选择器函数及其用法: 1. selector-append($selector1, $selector2...) selector-append($select…...
考研数学【线性代数基础box(数二)】
本文是对数学二线性代数基础进行总结,一些及极其简单的被省略了,代数的概念稀碎,不如高数关联性高,所以本文仅供参考,做题请从中筛选! 本文为初稿,后面会根据刷题和自己的理解继续更新 第一章…...
ModbusTcp获取数据
ModbusTcp获取数据 记录一个用 pymodbus 库来获取数据的代码。 注意: 1.读取寄存器地址是16进制的。2.大小端转换通过代码知道原理。读取数据时,切记频率别太高,否则会出现连接被关闭问题。 from pymodbus.client.sync import ModbusTcpCli…...
java 知识点:注解及使用
注解 大多数时候,我们会使用注解,而不是自定义注解。注解给谁用?编译器 、给解析程序用注解不是程序的一部分,可以理解为注解就是一个标签 主要的作用有以下四方面: 生成文档,通过代码里标识的元数据生成…...
AI预测体彩排3采取888=3策略+和值012路+胆码+通杀1码测试12月13日升级新模型预测第156弹
经过100多期的测试,当然有很多彩友也一直在观察我每天发的预测结果,得到了一个非常有价值的信息,那就是9码定位的命中率非常高,已到达90%的命中率,这给喜欢打私菜的朋友提供了极高价值的预测结果~当然了,大…...
faiss数据库检索不稳定
faiss数据检索不稳定 def build_faiss_index(embeddings_vector):dim np.shape(embeddings_vector)[-1]index faiss.index_factory(dim, HNSW64, faiss.METRIC_INNER_PRODUCT)index.add(embeddings_vector)return index这个代码不稳定,构建的索引召回结果可能会不…...
wordpress centos查看目录/百度不能搜的十大禁词
NEW关注Tech逆向思维视频号最新视频→【世界各国加班程度到底怎么样?】3月30日消息,苹果公司内部备忘录显示,如果iPhone等苹果设备在GSMA设备注册数据库中被标记为丢失或被盗,苹果零售商店和苹果授权服务提供商将会收到提醒&#…...
公司经营范围 网站开发/长春seo排名收费
文件 FilecreateNewFile()delete()exists() 文件存不存在mkdir() 创建文件夹mkdirs() 创建文件夹list() 列出全部文件isDirectory()RandomAccessFile(File,"rw")File.separatorFile.pathSeparator文件流InputStream--FileInputStream(f)OutputStream--FileOutputStre…...
企业网站的优点和缺点/北京搜索优化排名公司
參考链接:http://www.cnblogs.com/Esfog/p/DissolveShader.html 效果图: 从颜色变化来说,有三种,一种是纹理颜色。一种是纹理与黑边的混合颜色,一种是透明(用discard处理。不绘制像素)。还须要一张纹理(随意)ÿ…...
动态网站建设 毕业答辩/seo网站推广建站服务商
Ndo 的配置方式非常灵活,可以有如下的几种方式 1: 默认的配置方式-单数据库(支持解析SqlClientDriver,OledbDriver,OracleClientDriver,JetDriver,OdbcDriver,SqlOledbDriver,OracleOledbDriver,如果是其它数据源可以通过后面的其它方式)Ndo自动在appSettings 的配置…...
做响应式网站制作/青岛百度推广seo价格
标签:今天看了关于依赖、关联、聚合和组合的理解,原文如下:又温习了一遍,上学时,学的面向对象分析与设计,肯定当时很清楚这几个的关系。但今天读了这个以后,感觉还是有收获。简单整理如下&#…...
备案时暂时关闭网站/怎样写营销策划方案
以下文章摘录自: 《机器学习观止——核心原理与实践》 京东: https://item.jd.com/13166960.html 当当:http://product.dangdang.com/29218274.html (由于博客系统问题,部分公式、图片和格式有可能存在显示问题,请…...