15.消息队列RabbitMQ
一、基本概念
RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量,安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面,访问地址 账号密码均为 guest。
rabbitmq-plugins enable rabbitmq_management
二、用户
- 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
- 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
- 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
- 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。
三、工作模式
RabbitMQ主要有五种工作模式,分别是:
- 简单模式(hello world)
- 工作队列模式(work queue)
- 发布/订阅模式(publish/subscribe)
- 路由模式(routing)
- 主题模式(topic)
导入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>
工具类:
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("vhost");factory.setUsername("guest");factory.setPassword("guest");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}
1.简单模式(hello world):
//发送信息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 消息内容String message = "Hello World!";channel.basicPublish("", "hello", null, message.getBytes());//关闭通道和连接channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列channel.basicConsume("hello", true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());}
}
2.工作队列模式(work queue):多个消费者消费同一队列消息。
//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示接收到消息马上自动确认完成channel.basicConsume("hello", false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回确认状态,否则表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
3.发布/订阅模式(publish/subscribe):通过交换机发送消息到多个队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
4.路由模式(routing):通过交换机进行路由匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";//指定消息路由channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
5.主题模式(topic):通过交换机进行通配符匹配发送消息到不同队列。
//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "Hello World!";//指定消息匹配关键字channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个通配符channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}
四、Spring整合
Spring 提供了 RabbitTemplate 类执行消息发送。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.88.88port: 5672username: guestpassword: guestvirtual-host: /
@Configuration
public class MQConfig {@Beanpublic Exchange exchange1(){return ExchangeBuilder.fanoutExchange("fanout").build();}@Beanpublic Exchange exchange2(){return ExchangeBuilder.directExchange("direct").build();}@Beanpublic Queue queue1(){return QueueBuilder.durable("hello1").build();}@Beanpublic Queue queue2(){return QueueBuilder.durable("hello2").build();}@Beanpublic Binding binding1(Exchange exchange1,Queue queue1){return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();}@Beanpublic Binding binding2(Exchange exchange2,Queue queue2){return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();}
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {@RabbitHandlerpublic void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {//手动返回状态if () {// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true,表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}}
}
相关文章:
15.消息队列RabbitMQ
一、基本概念 RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息…...
并发编程之死锁问题介绍
一、本文概览 死锁问题在并发编程中是一个非常致命的问题,问题一旦产生,只能通过重启机器、修改代码来修复问题,下面我们通过一小段文章内容介绍下死锁以及如何死锁的预防 二、什么是死锁? 在介绍死锁之前,先来明确下什…...
【python学习笔记】:SQL常用脚本(一)
1、行转列的用法PIVOT CREATE table test (id int,name nvarchar(20),quarter int,number int) insert into test values(1,N苹果,1,1000) insert into test values(1,N苹果,2,2000) insert into test values(1,N苹果,3,4000) insert into test values(1,N苹果,4,5000) insert…...
Spring是怎么解决循环依赖的
1.什么是循环依赖: 这里给大家举个简单的例子,相信看了上一篇文章大家都知道了解了spring的生命周期创建流程。那么在Spring在生命周期的哪一步会出现循环依赖呢? 第一阶段:实例化阶段 Instantiation 第二阶段:属性赋…...
HTML创意动画代码
目录1、动态气泡背景2、创意文字3、旋转立方体1、动态气泡背景 <!DOCTYPE html> <html> <head><title>Bubble Background</title><style>body {margin: 0;padding: 0;height: 100vh;background: #222;display: flex;flex-direction: colum…...
软工第一次个人作业——阅读和提问
软工第一次个人作业——阅读和提问 项目内容这个作业属于哪个课程2023北航敏捷软件工程这个作业的要求在哪里个人作业-阅读和提问我在这个课程的目标是体验敏捷开发过程,掌握一些开发技能,为进一步发展作铺垫这个作业在哪个具体方面帮助我实现目标对本课…...
urho3d的自定义文件格式
Urho3D尽可能使用现有文件格式,仅在绝对必要时才定义自定义文件格式。当前使用的自定义文件格式有: 二进制模型格式(.mdl) Model geometry and vertex morph data byte[4] Identifier "UMDL" or "UMD2" …...
spark第一章:环境安装
系列文章目录 spark第一章:环境安装 文章目录系列文章目录前言一、文件准备1.文件上传2.文件解压3.修改配置4.启动环境二、历史服务器1.修改配置2.启动历史服务器总结前言 spark在大数据环境的重要程度就不必细说了,直接开始吧。 一、文件准备 1.文件…...
MySQL---存储过程与存储函数的相关概念
MySQL—存储过程与存储函数的相关概念 存储函数和存储过程的主要区别: 存储函数一定会有返回值的存储过程不一定有返回值 存储过程和函数能后将复杂的SQL逻辑封装在一起,应用程序无需关注存储过程和函数内部复杂的SQL逻辑,而只需要简单地调…...
PMP值得考吗?
第一,PMP的价值体现 1、PMP是管理岗位必考证书。 多数企业会选择优先录用持PMP证书的管理人才,PMP成为管理岗位的必考证书。PMP在很多外企和国内中大型企业非常受重视,中石油、中海油、华为等等都会给内部员工做培训。 这些机构对项目管理…...
Quartus 报错汇总(持续更新...)
1、Error (10663): Verilog HDL Port Connection error at top_rom.v(70): output or inout port "stcp" must be connected to a structural net expression输出变量stcp在原设计文件中已经定义为reg型,在实例化时不能再定义为reg型,而应该是…...
Netty权威指南总结(一)
一、为什么选择Netty:API使用简单,开发门槛低,屏蔽了NIO通信的底层细节。功能强大,预制了很多种编解码功能,支持主流协议。定制能力强,可以通过ChannelHandler对通信框架进行灵活地拓展。性能高、成熟、稳定…...
Elasticsearch:如何轻松安全地对实时 Elasticsearch 索引重新索引你的数据
在很多的时候,由于一些需求,我们不得不修改索引的映射,也即 mapping,这个时候我们需要重新索引(reindex)来把之前的数据索引到新的索引中。槽糕的是,我们的这个索引还在不断地收集实时数据&…...
【算法笔记】前缀和与差分
第一课前缀和与差分 算法是解决问题的方法与步骤。 在看一个算法是否优秀时,我们一般都要考虑一个算法的时间复杂度和空间复杂度。 现在随着空间越来越大,时间复杂度成为了一个算法的重要指标,那么如何估计一个算法的时间复杂度呢…...
python实战应用讲解-【实战应用篇】函数式编程-八皇后问题(附示例代码)
目录 知识储备-迭代器相关模块 itertools 模块 创建新的迭代器 根据最短输入序列长度停止的迭代器...
【Servlet篇】如何解决Request请求中文乱码的问题?
前言 前面一篇文章我们探讨了 Servlet 中的 Request 对象,Request 请求对象中封装了请求数据,使用相应的 API 就可以获取请求参数。 【Servlet篇】一文带你读懂 Request 对象 也许有小伙伴已经发现了前面的方式获取请求参数时,会出现中文乱…...
SpringBoot:SpringBoot简介与快速入门(1)
SpringBoot快速入门1. SpringBoot简介2. SpringBoot快速入门2.1 创建SpringBoot项目(必须联网,要不然创建失败,在模块3会讲到原因)2.2 编写对应的Controller类2.3 启动测试3. Spring官网构建工程4. SpringBoot工程快速启动4.1 为什…...
RabbitMQ学习(十一):RabbitMQ 集群
一、集群1.1 为什么要使用集群前面我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以…...
学渣适用版——Transformer理论和代码以及注意力机制attention的学习
参考一篇玩具级别不错的代码和案例 自注意力机制 注意力机制是为了transform打基础。 参考这个自注意力机制的讲解流程很详细, 但是学渣一般不知道 key,query,value是啥。 结合B站和GPT理解 注意力机制是一种常见的神经网络结构࿰…...
网上这么多IT的培训机构,我们该怎么选?
说实话,千万不要把这个答案放在网上来找,因为你只能得到别人觉得合适的或者机构的广告;当然个人的培训经历可以听一听的,毕竟不靠谱的机构也有,比如让你交一两万去上线上课程或者一百号来人坐一起看视频,这…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
CppCon 2015 学习:REFLECTION TECHNIQUES IN C++
关于 Reflection(反射) 这个概念,总结一下: Reflection(反射)是什么? 反射是对类型的自我检查能力(Introspection) 可以查看类的成员变量、成员函数等信息。反射允许枚…...
