SpringCloud(五)MQ消息队列
MQ
- 概念
- 常见消息模型
- helloworld案例
- 实现
- 实现spring AMQP发送消息
- 实现spring AMQP接收消息
- 工作消息队列
- 实现
- 发布订阅模型
- Fanout Exchange
- 实现
- DirectExchange
- 实现
- TopicExchange
- 实现
- DirectExchange 和FanoutExchange的差异
- DirectExchange 和TopicExchange的差异
- 基于@RabbitListener注解声明队列有 哪些常用注
- 消息转换器
- 注意
- 同步调用
- 异步调用
- 安装
- SpringAMQP
- 特征
概念
MQ(MessageQueue):消息队列,事件驱动架构中的Broker
- channel:操作MQ的工具
- exchange:路由消息到队列
- queue:缓存消息
- virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组
常见消息模型
helloworld案例
角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
实现
实现spring AMQP发送消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);} }
实现spring AMQP接收消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在consumer服务中编写消费逻辑,监听simple.queue。
- 在consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");} }
工作消息队列
作用: 提高消息处理速度,避免队列消息堆积。
实现
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp!";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println("spring 消费者1接收到消息:【"+msg+"】");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色Thread.sleep(200);
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
- 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1
发布订阅模型
概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型:
- Fanout:广播
- Direct:路由
- Topic:话题
Fanout Exchange
Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue
实现
-
在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)
- SringAMQP提供了声明交换机、队列、绑定关系的API。
- 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
@Configuration public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("root.fanout");}//声明第一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);} //第二个同第一个 }
- SringAMQP提供了声明交换机、队列、绑定关系的API。
-
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.fanout发送消息
@Testpublic void testSendFanoutExchange(){String exchangeName = "root.fanout";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(exchangeName,message);}
DirectExchange
DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue1"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue2"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.direct发送消息
@Testpublic void testSendDirectExchange(){String exchangeName = "root.direct";String message = "hello,red!";rabbitTemplate.convertAndSend(exchangeName,"red",message);}
TopicExchange
TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。
Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue1"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue2"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.topic发送消息
@Testpublic void testSendTopicExchange(){String exchangeName = "root.topic";String message = "hello,china.news!";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
DirectExchange 和FanoutExchange的差异
- FanoutExchange将消息路由给每一个与之绑定的队列
- DirectExchange根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
DirectExchange 和TopicExchange的差异
- TopicExchange的routineKey必须使用多个单词,以== . ==分割
- TopicExchange可以使用通配符
基于@RabbitListener注解声明队列有 哪些常用注
- @Queue
- @Exchange
消息转换器
设置JSON方式序列化:
发送消息
- 在publisher服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
接收消息
- 在consumer服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
- 定义消费者
@RabbitListener(queues = "object.queue")public void listenObjectQueueMessage(Map<String,Object> msg){System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色}
注意
MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter
同步调用
优点: 时效性高
问题:
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用
实现方式:
- 事件驱动(常用)
优势:
- 服务解耦
- 性能提升,吞吐量提高
- 故障隔离。没有强依赖,不担心级联失败问题
- 流量削锋
缺点:
- 依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂、业务没有明显的流程线,不好追踪管理
安装
docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=toor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
SpringAMQP
AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求
Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:
- spring-amqp:基础抽象
- spring-rabbit:底层默认实现
特征
- 监听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列、交换和绑定
相关文章:
SpringCloud(五)MQ消息队列
MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...
SQL语法基础汇总
三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...
惠普星14Pro电脑开机不了显示错误代码界面怎么办?
惠普星14Pro电脑开机不了显示错误代码界面怎么办?有用户电脑开机之后,进入了一个错误界面,里面有一些错误代码。重启电脑之后依然是无法进入到桌面中,那么这个情况怎么去进行解决呢?我们可以重装一个新系统,…...
顺序表的构造及功能
定义顺序表是一种随机存储都结构,其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A),sizeof(ElemType)是每个数据元素所占的存储空间的大小,则线性表L所对应的顺序存储如下图。顺序表的优缺点优点:随机…...
cesium: 绘制线段(008)
第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...
HTML、CSS学习笔记4(3D转换、动画)
目录 一、空间转换(3D转换) 1.空间位移 语法: 取值:(正负均可) 透视: 2.空间旋转 3.立体呈现 二、动画(animation) 1.动画的使用 先定义动画 再调用定义好的动画 …...
java的分布式锁
什么是分布式锁 分布式锁是指分布式环境下,系统部署在多个机器中,实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁,锁被存在公共存储(比如 Redis、Memcache、数据库等三方存储中),以实现多个进程并…...
17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)
项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...
Polkadot 基础
Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统,称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信,形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产,…...
spring源码编译
spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk࿱…...
防盗链是什么?带你了解什么是防盗链
目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程,通过url过滤技术实现的防止盗链的软件。 比如:photo.abc.com/video.mp4 这个下载地址,如果没有装防盗链,别人就能轻…...
Linux基础命令-fdisk管理磁盘分区表
文章目录 fdisk 命令介绍 命令格式 基本参数 1)常用参数 2)fdisk菜单操作说明 创建一个磁盘分区 1)创建分区 2)创建交换分区 参考实例 1) 显示当前分区的信息 2) 显示每个磁盘的分区信息 命令…...
(四)K8S 安装 Nginx Ingress Controller
ingress-nginx 是 Kubernetes 的入口控制器,使用NGINX作为反向代理和负载均衡器 版本介绍 版本1:Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础(kubernetes.io),可在GitHub的 kubernet…...
高频面试题
MyISAM和InnoDB是MySQL两种常见的存储引擎,它们之间有以下几点区别: 事务支持:MyISAM不支持事务处理,而InnoDB支持事务处理。 行级锁:MyISAM只支持表级锁,而InnoDB支持行级锁,可以避免并发访问…...
js 字节数组操作,TCP协议组装
js字节数组,进制转换js基础知识数组 Array扩展操作符三个点(...)ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除,添加,替换js 字节数组转数字以及…...
JavaScript的引入并执行-包含动态引入与静态引入
JavaScript的引入并执行-包含动态引入与静态引入 JavaScript引入方式 html文件需要引入JavaScript代码,才能在页面里使用JavaScript代码。 静态引入 行内式 直接在DOM标签上使用 <!DOCTYPE html> <html lang"en"> <head><meta ch…...
第四阶段01-酷鲨商城项目准备
1. 关于csmall-product项目 这是“酷鲨商城”大项目中的“商品管理”项目,是一个后台管理项目(给管理员,或运营人员使用的项目,并不是普通用户使用的),并且,只会涉及与发布商品可能相关的功能开…...
Uncaught ReferenceError: jQuery is not defined
今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 (以后闭坑) 我和同事同时拉取了一样的代码,结果同事的页面加载正常而我的页面像被狗啃了一样,知道是js的问题但是不知道问题出在哪里?后来还是同事帮我解决…...
面试阿里测开岗,被面试官针对,当场翻脸,把我的简历还给我,疑似被拉黑...
好家伙,金三银四一到,这奇葩事可真是多,前两天和粉丝聊天,他说前段时间面试阿里的测开岗,最后和面试官干起来了。 我问他为什么,他说没啥,就觉得面试官太装了,就爱问一些虚而不实的…...
2. 驱动开发--驱动开发环境搭建
文章目录前言一、Linux中配置编译环境1.1 linux下安装软件的方法1.2 交叉编译工具链的安装1.2.1 测试是否安装成功1.3 设置环境变量1.3.1 将工具链导出到环境变量1.4 为工具链创建arm-linux-xxx符号链接二、 搭建运行开发环境2.1 tftp网络方式加载内核和设备树文件2.2 nfs网络方…...
《数据库系统概论》学习笔记——第四章 数据库安全
教材为数据库系统概论第五版(王珊) 这一章简单记一下那几条sql的用法和两种存取控制和审计(今年期末考了)吧,不知道有啥好考的 数据库安全性 问题的提出 数据库的一大特点是数据可以共享数据共享必然带来数据库的安全…...
山洪径流过程模拟及洪水危险性评价
目录 1.洪水淹没危险性评价方法及技术讲解 2.GIS水文信息提取与分析(基于ArcGIS软件) 3.洪水淹没模拟水文分析:洪峰流量估算 4.洪水淹没模拟水力学分析:Hec-RAS实例操作 GIS水文分析(ArcHydro、Spatial Anlysist等模块)是流域…...
LeetCode HOT100 (23、32、33)
目录 23、合并K个升序链表 32、最长有效括号 33、搜索旋转排序数组 23、合并K个升序链表 思路:采用顺序合并的方法,用一个变量 ans 来维护以及合并的链表,第 i 次循i 个链表和 ans合并,答案保存到 ans中。 代码: …...
电力监控仪表主要分类
电力监控仪表是电工仪表行业的一个新兴、细分行业,类别属于安装式数字仪表,从模拟指针式仪表和电量变送器演变而来。随着计算机技术的发展,电力监控仪表已应用到电力系统的发、输、变、配、用的各个环节,实现对电网电参量的测量、…...
山野户外定位依赖GPS或者卫星电话就能完成么?
每当有驴友失联的新闻报道,很多的户外“老鸟”和“菜鸟”都在讲:为什么不带卫星电话,不带GPS……云云!提一个小小的问题:如果你拿着卫星电话、GPS或者其他即时通信的其他设备,你就能准定位你所处的位置么&a…...
SAP 应收应付重组配置
应收应付重组是为了使资产负债表真实的反映资产及负债的真实情况,需要对应收、应付账款的余额时行实际调整。即将“应收账款”的贷方余额和“应付账款”的借方余额分别调整至“预收账款”与“预付账款”账户中。 应收应付重组SAP系统是按照公司代码、客户/供应商、…...
算法练习(八)计数质数(素数)
1、问题描述: 给定整数 n ,返回 所有小于非负整数 n 的质数的数量 。 2、示例如下: 3、代码如下: 第一种:比较暴力的算法 class Solution {public int countPrimes(int n) {int count1;if(n<2) return 0;for(in…...
用反射模拟IOC模拟getBean
IOC就是spring的核心思想之一:控制反转。这里不再赘述,看我的文章即可了解:spring基础思想IOC其次就是java的反射,反射机制是spring的重要实现核心,今天我看spring的三级缓存解决循坏引用的问题时,发现一个…...
【Ap AutoSAR入门与实战开发02】-【Ap_s2s模块01】: s2s的背景
总目录链接==>> AutoSAR入门和实战系列总目录 文章目录 1 s2s的背景?2 AUTOSAR 方法应支持车辆的无缝开发2.1 面向服务的ECU的解读2.2 面向信号的ECU的解读2.3 通过网关ECU实现转换1 s2s的背景? Cp AutoSAR基于传统的can,lin,flexray总线的通信,一般是面向信号设…...
C语言数据结构(3)----无头单向非循环链表
目录 1. 链表的概念及结构 2. 链表的分类 3. 无头单向非循环链表的实现(下面称为单链表) 3.1 SListNode* BuySListNode(SLTDateType x) 的实现 3.2 void SListPrint(SListNode* plist) 的实现 3.3 void SListPushBack(SListNode** pplist, SLTDateType x) 的实现 3.4 voi…...
江苏省义务教育标准化建设网站/360指数查询工具
0.Manjaro启动U盘的制作 推荐使用4-16G容量的U盘,避免兼容性问题(U盘太大可能会无法启动)。 用rufus就可以,注意选用DD模式才能成功制作(默认是hyperiso)。 如果在linux环境里,先用sudo fdisk -…...
网站建设与管理专业就业前景/aso优化软件
在c语言中pow()函数是用来求x的y次幂。x、y及函数值都是double型 ,其语法为“double pow(double x, double y)”;其中参数“double x”表示底数;参数“double y”表示指数。pow()函数用来求x的y次幂,x、y及函数值都是double型 &am…...
phicomm怎么做网站/企业关键词排名优化网址
前几天读研时候上铺的同学和我说到了一个问题,就是他们单位的redhat服务器给MySQL服务的数据库文件所在的磁盘空间不够了,对于这个问题我也是没有想过的,在受朋友之托下考虑自己做下复现,由于同学所在单位存放的时全省的交易记录&…...
网站开发流程包括/产品市场营销策划方案
此文已由作者谢蕾授权网易云社区发布。欢迎访问网易云社区,了解更多网易技术产品运营经验。前言我们对于“异常处理”这个词并不陌生,众多框架和库在异常处理方面都提供了便利,但是对于何种处理才是最佳实践,也是众说纷纭。异常处…...
著名的电子商务网站/临沂seo优化
即使在这个网络发达的社会,仍有很多人喜欢带有纸感的明信片。当看到很多漂亮明信片,是不是很心动。其实利用PS就可轻松制作个明信片。那么用PS如何制作明信片?想学习的朋友和小编一起来DIY明信片吧!PS制作明信片步骤:1…...
烟台手机网站建设费用/虞城seo代理地址
熟悉JAVA的车友对于“费罗切”Feroce一定不会陌生。自2015年上市以来,Feroce就凭借炫酷的涂装和亲民的价格坐稳了JAVA中高端公路车产品销量的王座。四年后的今天,JAVA Feroce 2开启全新篇章。渐变涂装、气动碟刹、车首隐藏式走线、UCI认证……相比旧款&a…...