当前位置: 首页 > news >正文

15.消息队列RabbitMQ

一、基本概念

        RabbitMQ 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

        安装 RabbitMQ 需要先安装 Erlang 环境并配置环境变量,安装完后进入 RabbitMQ 的 sbin 目录运行命令激活控制台界面,访问地址  账号密码均为 guest。

rabbitmq-plugins enable rabbitmq_management

二、用户

  1. 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  3. 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  4. 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  5. 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。

三、工作模式

        RabbitMQ主要有五种工作模式,分别是:

  1. 简单模式(hello world)
  2. 工作队列模式(work queue)
  3. 发布/订阅模式(publish/subscribe)
  4. 路由模式(routing)
  5. 主题模式(topic)

         导入依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>

        工具类:

public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setVirtualHost("vhost");factory.setUsername("guest");factory.setPassword("guest");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}

        1.简单模式(hello world):

//发送信息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 消息内容String message = "Hello World!";channel.basicPublish("", "hello", null, message.getBytes());//关闭通道和连接channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();// 从连接中创建通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列channel.basicConsume("hello", true, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());}
}

        2.工作队列模式(work queue):多个消费者消费同一队列消息。

//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("hello", false, false, false, null);// 同一时刻服务器只会发一条消息给消费者,否则MQ会将所有请求平均发送给所有消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,false表示手动返回完成状态,true表示接收到消息马上自动确认完成channel.basicConsume("hello", false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回确认状态,否则表示使用自动确认模式channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}

        3.发布/订阅模式(publish/subscribe):通过交换机发送消息到多个队列。

//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}

        4.路由模式(routing):通过交换机进行路由匹配发送消息到不同队列。

//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息内容String message = "Hello World!";//指定消息路由channel.basicPublish(EXCHANGE_NAME, "routing", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个路由channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing1");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing2");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}

        5.主题模式(topic):通过交换机进行通配符匹配发送消息到不同队列。

//发送消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange及类型channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息内容String message = "Hello World!";//指定消息匹配关键字channel.basicPublish(EXCHANGE_NAME, "topic", null, message.getBytes());channel.close();connection.close();
}//接收消息
public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,并指定多个通配符channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic1.*");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic2.*");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());// 返回完成状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
}

四、Spring整合

        Spring 提供了 RabbitTemplate 类执行消息发送。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 192.168.88.88port: 5672username: guestpassword: guestvirtual-host: /
@Configuration
public class MQConfig {@Beanpublic Exchange exchange1(){return ExchangeBuilder.fanoutExchange("fanout").build();}@Beanpublic Exchange exchange2(){return ExchangeBuilder.directExchange("direct").build();}@Beanpublic Queue queue1(){return QueueBuilder.durable("hello1").build();}@Beanpublic Queue queue2(){return QueueBuilder.durable("hello2").build();}@Beanpublic Binding binding1(Exchange exchange1,Queue queue1){return BindingBuilder.bind(queue1).to(exchange1).with("key1").noargs();}@Beanpublic Binding binding2(Exchange exchange2,Queue queue2){return BindingBuilder.bind(queue2).to(exchange2).with("key2").noargs();}
}
@Component
//定义队列并绑定
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "hello", durable = "true", autoDelete = "true"),exchange = @Exchange(value = "fanout", type = ExchangeTypes.FANOUT), key = "key"), ackMode = "MANUAL")
public class MyListener {@RabbitHandlerpublic void consume(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {//手动返回状态if () {// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费channel.basicAck(deliveryTag, false);} else {// 第三个参数true,表示这个消息会重新进入队列channel.basicNack(deliveryTag, false, true);}}
}

相关文章:

15.消息队列RabbitMQ

一、基本概念 RabbitMQ 是一个开源的AMQP实现&#xff0c;服务器端用Erlang语言编写&#xff0c;支持多种客户端&#xff0c;如&#xff1a;Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等&#xff0c;支持AJAX。用于在分布式系统中存储转发消息&#xf…...

并发编程之死锁问题介绍

一、本文概览 死锁问题在并发编程中是一个非常致命的问题&#xff0c;问题一旦产生&#xff0c;只能通过重启机器、修改代码来修复问题&#xff0c;下面我们通过一小段文章内容介绍下死锁以及如何死锁的预防 二、什么是死锁&#xff1f; 在介绍死锁之前&#xff0c;先来明确下什…...

【python学习笔记】:SQL常用脚本(一)

1、行转列的用法PIVOT CREATE table test (id int,name nvarchar(20),quarter int,number int) insert into test values(1,N苹果,1,1000) insert into test values(1,N苹果,2,2000) insert into test values(1,N苹果,3,4000) insert into test values(1,N苹果,4,5000) insert…...

Spring是怎么解决循环依赖的

1.什么是循环依赖&#xff1a; 这里给大家举个简单的例子&#xff0c;相信看了上一篇文章大家都知道了解了spring的生命周期创建流程。那么在Spring在生命周期的哪一步会出现循环依赖呢&#xff1f; 第一阶段&#xff1a;实例化阶段 Instantiation 第二阶段&#xff1a;属性赋…...

HTML创意动画代码

目录1、动态气泡背景2、创意文字3、旋转立方体1、动态气泡背景 <!DOCTYPE html> <html> <head><title>Bubble Background</title><style>body {margin: 0;padding: 0;height: 100vh;background: #222;display: flex;flex-direction: colum…...

软工第一次个人作业——阅读和提问

软工第一次个人作业——阅读和提问 项目内容这个作业属于哪个课程2023北航敏捷软件工程这个作业的要求在哪里个人作业-阅读和提问我在这个课程的目标是体验敏捷开发过程&#xff0c;掌握一些开发技能&#xff0c;为进一步发展作铺垫这个作业在哪个具体方面帮助我实现目标对本课…...

urho3d的自定义文件格式

Urho3D尽可能使用现有文件格式&#xff0c;仅在绝对必要时才定义自定义文件格式。当前使用的自定义文件格式有&#xff1a; 二进制模型格式&#xff08;.mdl&#xff09; Model geometry and vertex morph data byte[4] Identifier "UMDL" or "UMD2" …...

spark第一章:环境安装

系列文章目录 spark第一章&#xff1a;环境安装 文章目录系列文章目录前言一、文件准备1.文件上传2.文件解压3.修改配置4.启动环境二、历史服务器1.修改配置2.启动历史服务器总结前言 spark在大数据环境的重要程度就不必细说了&#xff0c;直接开始吧。 一、文件准备 1.文件…...

MySQL---存储过程与存储函数的相关概念

MySQL—存储过程与存储函数的相关概念 存储函数和存储过程的主要区别&#xff1a; 存储函数一定会有返回值的存储过程不一定有返回值 存储过程和函数能后将复杂的SQL逻辑封装在一起&#xff0c;应用程序无需关注存储过程和函数内部复杂的SQL逻辑&#xff0c;而只需要简单地调…...

PMP值得考吗?

第一&#xff0c;PMP的价值体现 1、PMP是管理岗位必考证书。 多数企业会选择优先录用持PMP证书的管理人才&#xff0c;PMP成为管理岗位的必考证书。PMP在很多外企和国内中大型企业非常受重视&#xff0c;中石油、中海油、华为等等都会给内部员工做培训。 这些机构对项目管理…...

Quartus 报错汇总(持续更新...)

1、Error (10663): Verilog HDL Port Connection error at top_rom.v(70): output or inout port "stcp" must be connected to a structural net expression输出变量stcp在原设计文件中已经定义为reg型&#xff0c;在实例化时不能再定义为reg型&#xff0c;而应该是…...

Netty权威指南总结(一)

一、为什么选择Netty&#xff1a;API使用简单&#xff0c;开发门槛低&#xff0c;屏蔽了NIO通信的底层细节。功能强大&#xff0c;预制了很多种编解码功能&#xff0c;支持主流协议。定制能力强&#xff0c;可以通过ChannelHandler对通信框架进行灵活地拓展。性能高、成熟、稳定…...

Elasticsearch:如何轻松安全地对实时 Elasticsearch 索引重新索引你的数据

在很多的时候&#xff0c;由于一些需求&#xff0c;我们不得不修改索引的映射&#xff0c;也即 mapping&#xff0c;这个时候我们需要重新索引&#xff08;reindex&#xff09;来把之前的数据索引到新的索引中。槽糕的是&#xff0c;我们的这个索引还在不断地收集实时数据&…...

【算法笔记】前缀和与差分

第一课前缀和与差分 算法是解决问题的方法与步骤。 在看一个算法是否优秀时&#xff0c;我们一般都要考虑一个算法的时间复杂度和空间复杂度。 现在随着空间越来越大&#xff0c;时间复杂度成为了一个算法的重要指标&#xff0c;那么如何估计一个算法的时间复杂度呢&#xf…...

python实战应用讲解-【实战应用篇】函数式编程-八皇后问题(附示例代码)

目录 知识储备-迭代器相关模块 itertools 模块 创建新的迭代器 根据最短输入序列长度停止的迭代器...

【Servlet篇】如何解决Request请求中文乱码的问题?

前言 前面一篇文章我们探讨了 Servlet 中的 Request 对象&#xff0c;Request 请求对象中封装了请求数据&#xff0c;使用相应的 API 就可以获取请求参数。 【Servlet篇】一文带你读懂 Request 对象 也许有小伙伴已经发现了前面的方式获取请求参数时&#xff0c;会出现中文乱…...

SpringBoot:SpringBoot简介与快速入门(1)

SpringBoot快速入门1. SpringBoot简介2. SpringBoot快速入门2.1 创建SpringBoot项目&#xff08;必须联网&#xff0c;要不然创建失败&#xff0c;在模块3会讲到原因&#xff09;2.2 编写对应的Controller类2.3 启动测试3. Spring官网构建工程4. SpringBoot工程快速启动4.1 为什…...

RabbitMQ学习(十一):RabbitMQ 集群

一、集群1.1 为什么要使用集群前面我们介绍了如何安装及运行 RabbitMQ 服务&#xff0c;不过这些是单机版的&#xff0c;无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况&#xff0c;该怎么办&#xff1f;单台 RabbitMQ 服务器可以…...

学渣适用版——Transformer理论和代码以及注意力机制attention的学习

参考一篇玩具级别不错的代码和案例 自注意力机制 注意力机制是为了transform打基础。 参考这个自注意力机制的讲解流程很详细&#xff0c; 但是学渣一般不知道 key&#xff0c;query&#xff0c;value是啥。 结合B站和GPT理解 注意力机制是一种常见的神经网络结构&#xff0…...

网上这么多IT的培训机构,我们该怎么选?

说实话&#xff0c;千万不要把这个答案放在网上来找&#xff0c;因为你只能得到别人觉得合适的或者机构的广告&#xff1b;当然个人的培训经历可以听一听的&#xff0c;毕竟不靠谱的机构也有&#xff0c;比如让你交一两万去上线上课程或者一百号来人坐一起看视频&#xff0c;这…...

数据结构与算法—跳表(skiplist)

目录 前言 跳表 查询时间分析 1、时间复杂度 o(logn) 2、空间复杂度O(n) 动态插入和删除 跳表动态更新 跳表与红黑树比较 跳表实现 前言 二分查找用的数组 链表可不可以实现二分查找呢&#xff1f; 跳表 各方面性能比较优秀的动态数据结构&#xff0c;可以支持快速…...

【C++】5.C/C++内存管理

1.C/C内存管理 int globalVar 1; static int staticGlobalVar 1; void Test() {static int staticVar 1;int localVar 1;int num1[10] {1, 2, 3, 4};char char2[] "abcd";char* pChar3 "abcd";int* ptr1 (int*)malloc(sizeof (int)*4);int* ptr2 …...

一文让你彻底理解关于消息队列的使用

一、消息队列概述 消息队列中间件是分布式系统中重要的组件&#xff0c;主要解决应用解耦&#xff0c;异步消息&#xff0c;流量削锋等问题&#xff0c;实现高性能&#xff0c;高可用&#xff0c;可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ&#xff0c;Rabbit…...

条件期望3

条件期望例题—连续发生的事情 连续地做二项实验, 每一次成功概率为p. 当连续k次成功时, 停止实验. 求停止实验时做的总实验次数的期望. 解: 错误解法 设NkN_kNk​为停止实验时做的总实验次数, 则 E[Nk]E[E[Nk∣Nk−1]]∑jk−1∞E[Nk∣Nk−1j]\begin{split} E[N_k] & E[E…...

第四届蓝桥杯省赛 C++ B组 - 翻硬币

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;蓝桥杯题解集合 &#x1f4dd;原题地址&#xff1a;翻硬币 &#x1f4e3;专栏定位&#xff1a;为想参加蓝桥杯的小伙伴整理常考算法题解&#xff0c;祝大家都…...

linux shell 入门学习笔记14 shell脚本+数学计算

概念 把复杂的命令执行过程&#xff0c;通过逻辑代码&#xff0c;组成一个脚本文件的方式就叫做shell脚本。 shebang #! /bin/bash #! /bin/perl #! /bin/python执行脚本的方式 source my_first.sh . my_first.shbash my_first.sh ./my_first.sh变量引用 ${var} 取出变量结果 …...

ESP32设备驱动-MAX30100心率监测传感器驱动

MAX30100心率监测传感器驱动 1、MAX30100介绍 MAX30100 是一款集成脉搏血氧饱和度和心率监测传感器解决方案。 它结合了两个 LED、一个光电探测器、优化的光学器件和低噪声模拟信号处理,以检测脉搏血氧饱和度和心率信号。 MAX30100 采用 1.8V 和 3.3V 电源供电,可通过软件…...

RTD2169芯片停产|完美替代RTD2169芯片|CS5260低BOM成本替代RTD2169方案设计

RTD2169芯片停产|完美替代RTD2169芯片|CS5260低BOM成本替代RTD2169方案设计 瑞昱的RTD2169芯片目前已经停产了&#xff0c; 那么之前用RTD2169来设计TYPEC转VGA方案的产品&#xff0c;该如何生产这类产品&#xff1f;且RTD2169芯片价格较贵&#xff0c;芯片封装尺寸是QFN40&…...

urho3d数据库

只有在启用以下两个构建选项之一时&#xff0c;数据库子系统才会构建到Urho3D库中&#xff1a;Urho3D_Database_ODBC和Urho3D-Database_SQLITE。当两个选项都启用时&#xff0c;URHO3D_DATABASE_ODBC优先。这些构建选项决定子系统将使用哪个数据库API。ODBC DB API更适用于本地…...

141. 周期

Powered by:NEFU AB-IN Link 文章目录141. 周期题意思路代码141. 周期 题意 一个字符串的前缀是从第一个字符开始的连续若干个字符&#xff0c;例如 abaab 共有 5个前缀&#xff0c;分别是 a&#xff0c;ab&#xff0c;aba&#xff0c;abaa&#xff0c;abaab。 我们希望知道一…...