当前位置: 首页 > news >正文

【RabbitMQ五】——RabbitMQ路由模式(Routing)

RabbitMQ路由模式

  • 前言
  • RabbitMQ模式的基本概念
    • 为什么要使用Rabbitmq 路由模式
    • RabbitMQ路由模式组成元素
  • 路由模式完整代码
    • Pom文件引入RabbtiMQ依赖
    • RabbitMQ工具类
    • 生产者
    • 消费者1
    • 消费者2
    • 运行结果截图

前言

通过本篇博客能够简单使用RabbitMQ的路由模式。
本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。

RabbitMQ模式的基本概念

路由模式是根据Routing Key有条件的将消息筛选后发送给消费者,消费者只接受筛选之后的消息
路由模式的核心是:
配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,再由对应的消费者进行消费

为什么要使用Rabbitmq 路由模式

由于发布订阅模式是无条件将所有消息分发给所有消费者,路由模式可以根据条件(Routing Key)将消息筛选之后发送给消费者。

应用场景:
例如:有一个股票分析机构,每天都会有一些独家的股票分析报告。对于其他一些应用平台,想要每天都到这家股票分析机构提供的百度的独家股票分析报告,对于另外一些应用平台想要收到谷歌的独家股票分析报告,就可以使用路由模式。

RabbitMQ路由模式组成元素

在这里插入图片描述
P:生产者,向交换机发送消息的是否需要指定routing key
X:交换机,接收生产者发送的消息,需要指定交换机的类型为direct,并且将消息发送给与routing key匹配的队列
C1:消费者1,它所在队列指定了需要routing key为error的信息
C2:消费者2,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式完整代码

**业务场景:**生产者为日志分发平台,分发info、warning、error级别的日志,消费者1只接受日志级别为error的日志,消费者2接收全部日志。

Pom文件引入RabbtiMQ依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : RabbitMQUtils* @description : [rabbitmq工具类]* @createTime : [2023/1/17 8:49]* @updateUser : [WangWei]* @updateTime : [2023/1/17 8:49]* @updateRemark : [描述说明本次修改内容]*/
public class RabbitMQUtils {/** @version V1.0* Title: getConnection* @author Wangwei* @description 创建rabbitmq连接* @createTime  2023/1/17 8:52* @param []* @return com.rabbitmq.client.Connection*/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setPort(5672);factory.setVirtualHost("虚拟主机");factory.setUsername("用户名");factory.setPassword("密码");//创建连接Connection connection=factory.newConnection();return  connection;}/** @version V1.0* Title: getChannel* @author Wangwei* @description 创建信道* @createTime  2023/1/17 8:55* @param []* @return com.rabbitmq.client.Channel*/public static Channel getChannel() throws IOException, TimeoutException {Connection connection=getConnection();Channel channel=connection.createChannel();return channel;}
}

生产者

import com.rabbitmq.client.Channel;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : Producer* @description : [生产者]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class Producer {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel = RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,"direct");//声明routingKeyString severityInfo="info";String severityError="error";String severityWarning="warning";//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式info:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式error:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for (int i = 0; i <2 ; i++) {String msg="路由模式warning:"+i;/*推送消息*交换机命名,不填写使用默认的交换机* routingKey -路由键-* props:消息的其他属性-路由头等正文* msg消息正文*/channel.basicPublish(EXCHANGE_NAME,severityWarning,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}}
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : ConsumerOne* @description : [消费者1]* @createTime : [2023/2/1 9:39]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:39]* @updateRemark : [描述说明本次修改内容]*/
public class ConsumerOne {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"direct");String queueName = channel.queueDeclare().getQueue();//声明routingKey (error)String severityError="error";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityError );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author : [WangWei]* @version : [v1.0]* @className : ConsumerTwo* @description : [消费者2]* @createTime : [2023/2/1 9:38]* @updateUser : [WangWei]* @updateTime : [2023/2/1 9:38]* @updateRemark : [描述说明本次修改内容]*/
public class ConsumerTwo {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {RabbitMQUtils.getConnection();Channel channel = RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logschannel.exchangeDeclare(EXCHANGE_NAME,"direct");//创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称String queueName = channel.queueDeclare().getQueue();//声明routingKey (info,error,warning)String severityInfo="info";String severityError="error";String severityWarning="warning";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了3个routingKeychannel.queueBind(queueName, EXCHANGE_NAME,severityInfo );channel.queueBind(queueName, EXCHANGE_NAME,severityError );channel.queueBind(queueName, EXCHANGE_NAME,severityWarning );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}

运行结果截图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

【RabbitMQ五】——RabbitMQ路由模式(Routing)

RabbitMQ路由模式前言RabbitMQ模式的基本概念为什么要使用Rabbitmq 路由模式RabbitMQ路由模式组成元素路由模式完整代码Pom文件引入RabbtiMQ依赖RabbitMQ工具类生产者消费者1消费者2运行结果截图前言 通过本篇博客能够简单使用RabbitMQ的路由模式。 本篇博客主要是博主通过官网…...

【C语言】宏定义 结构体 枚举变量的用法

目录 一、数据类型 二、C语言宏定义 三、C语言typedef重命名 四、 #define与typedef的区别 五、结构体 六、枚举变量 补充学习一点STM32的必备基础知识 一、数据类型 二、C语言宏定义 关键字&#xff1a;#define 用途&#xff1a;用一个字符串代替一个数字&#xff0c;…...

锁升级之Synchronized

Synchronized JVM系统锁一个对象里如果有多个synchronized方法&#xff0c;同一时刻&#xff0c;只要有一个线程去调用其中的一个synchronized方法&#xff0c;其他线程只能等待&#xff01;锁的是当前对象&#xff0c;对象被锁定后&#xff0c;其他线程都不能访问当前对象的其…...

基于nodejs+vue疫情网课管理系统

疫情网课也都将通过计算机进行整体智能化操作,对于疫情网课管理系统所牵扯的管理及数据保存都是非常多的,例如管理员&#xff1a;首页、个人中心、学生管理、教师管理、班级管理、课程分类管理、课程表管理、课程信息管理、作业信息管理、请假信息管理、上课签到管理、论坛交流…...

Zabbix 构建监控告警平台(三)

Zabbix User parametersZabbix Trigger1.Zabbix User parameters 1.1即自定义KEY 注意&#xff1a;mysql安装在被监测主机 [rootlocalhost ~]# yum -y install mariadb-server mariadb [rootlocalhost ~]# systemctl start mariadb [rootlocalhost ~]# mysqladmin -uroot statu…...

Linux系统之dool命令行工具的基本使用

Linux系统之dool命令行工具的基本使用一、dool命令行工具介绍二、本地系统环境检查1.检查系统版本2.检查系统内核版本三、下载dool软件包1.创建下载目录2.下载dool四、安装dool1.安装python32.安装dool五、dool的命令帮助六、dool的基本使用1.直接使用dool监控系统2.监控cpu和网…...

LeetCode-2335-装满杯子需要的最短总时长

1、堆 我们可以维护一个堆&#xff0c;首先我们将数组中不为0的数全部加入堆中&#xff0c;而后进行循环。当堆不为空时&#xff0c;我们将堆顶元素出堆并减一&#xff0c;而后观察是否还能继续出堆&#xff0c;若能则出堆&#xff0c;否则跳过&#xff0c;最后我们将处理后的…...

npm ERR! code ELIFECYCLE解决方案,npm犯错!myweb@1.0.0构建脚本失败。

1.问题npm ERR! code ELIFECYCLEnpm ERR! errno 1npm ERR! myweb1.0.0 build: webpack --config config/webpack.config.jsnpm ERR! Exit status 1npm ERR!npm ERR! Failed at the myweb1.0.0 build script.npm犯错!代码ELIFECYCLEnpm犯错!errno 1npm犯错!myweb1.0.0 build: we…...

最小二乘支持向量机”在学习偏微分方程 (PDE) 解方面的应用(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 本代码说明了“最小二乘支持向量机”在学习偏微分方程 &#xff08;PDE&#xff09; 解方面的应用。提供了一个示例&#xff0c…...

ISYSTEM调试实践8-winIDEA Analyzer功能1

前面几篇介绍了ISYSTEM的基本调试界面和功能&#xff0c;相比我之前用过的IDE&#xff0c;除了几种断点方式和脚本功能以外&#xff0c;应该都是比较简单&#xff0c;稍微操作一下就可以直接上手&#xff0c;后续我将介绍winIDEA的Analyzer 功能。 1 Analyzer简介 iSYSTEM An…...

每日学术速递2.11

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.IR、cs.MM 1.A Comprehensive Survey on Multimodal Recommender Systems: Taxonomy, Evaluation, and Future Directions 标题&#xff1a;关于多模态推荐系统的综合调查&#xff1a;分…...

宝塔搭建实战php开源likeadmin通用管理admin端vue3源码(二)

大家好啊&#xff0c;我是测评君&#xff0c;欢迎来到web测评。 上一期给大家分享了server端的部署方式&#xff0c;今天来给大家分享admin端在本地搭建&#xff0c;与打包发布到宝塔的方法。感兴趣的朋友可以自行下载学习。 技术架构 vscode node16 vue3 elementPlus vit…...

网络基础-虚拟化工具-网桥

系列文章目录 本系列文章主要是回顾和学习工作中常用的网络基础命令&#xff0c;在此记录以便于回顾。 该篇文章主要是讲解虚拟化的工具网桥相关的概念和常用命令 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录系…...

剑指 Offer 14- II. 剪绳子 II

剑指 Offer 14- II. 剪绳子 II 给你一根长度为 n 的绳子&#xff0c;请把绳子剪成整数长度的 m 段&#xff08;m、n都是整数&#xff0c;n>1并且m>1&#xff09;&#xff0c;每段绳子的长度记为 k[0],k[1]…k[m - 1] 。请问 k[0]k[1]…*k[m - 1] 可能的最大乘积是多少&a…...

English Learning - Day55 作业打卡 2023.2.9 周四

English Learning - Day55 作业打卡 2023.2.9 周四引言1. Jim 在看电视的时候他的老婆正在做饭。2. 他刚睡着电话就响了。3. 我正在想事情&#xff0c;这时忽然有人从后面抓我胳膊。4. 我们总是边吃火锅边唱歌。5. 他一听说出了事故&#xff0c;马上就来了现场。6. He entered …...

pixhawk2.4.8-地面站配置-APM固件

文章目录一、硬件准备二、软件准备1 已实飞测试2 MP地面站 任意版本下载&#xff1a;3 APM固件 任意版本下载&#xff1a;三、飞控校准1 刷固件2 机架选择3 加速度计校准4 指南针校准5 遥控器校准6 飞行模式7 紧急断电&无头模式8 基础参数设置9 电流计校准10 电调校准11 起…...

golang 通道类型

文章目录一、什么是通道类型二、通道产生的原因三、声明channel四、创建channel五、channel相关操作1、发送值2、接收值3、关闭通道3.1 注意3.2 特点四、通道类型1、无缓冲通道2、有缓冲通道五、单向通道一、什么是通道类型 Go 语言中的通道&#xff08;channel&#xff09;是一…...

并发、并行、吞吐量、延迟、响应时间 含义理解

并发、并行、吞吐量、延迟、响应时间 知识点了解 1. 响应时间(RT) 理解&#xff1a;响应时间是指系统对请求作出响应的时间。例如一个正在运行的服务&#xff0c;服务内程序接受到参数请求开始&#xff0c;到程序计算完&#xff0c;并将结果返回出去结束&#xff0c;这段时间…...

HTTP 和 HTTPS 的区别

文章目录前言一、HTTP 与 HTTPS 的基本概念HTTPHTTPS二、HTTP 和 HTTPS协议的区别前言 浏览网站时&#xff0c;我们会发现网址有两种格式&#xff0c;一种以http://开头&#xff0c;一种https://开头。好像这两种格式差别不大&#xff0c;只多了一个s&#xff0c;实际上他们有…...

微搭低代码从入门到精通07-基础布局组件

低码开发不同于传统开发&#xff0c;传统开发我们通常需要编写前端代码和后端代码。前端代码由HTML、CSS和JavaScript组成&#xff0c;后端代码我们通常要用后端语言比如Java来编写接口。 低码开发的特点是可视化开发&#xff0c;在编辑器中通过组件的拖拽来完成页面的编制。如…...

RestClient

什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端&#xff0c;它允许HTTP与Elasticsearch 集群通信&#xff0c;而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级&#xff…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

无法与IP建立连接,未能下载VSCode服务器

如题&#xff0c;在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈&#xff0c;发现是VSCode版本自动更新惹的祸&#xff01;&#xff01;&#xff01; 在VSCode的帮助->关于这里发现前几天VSCode自动更新了&#xff0c;我的版本号变成了1.100.3 才导致了远程连接出…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

【JavaWeb】Docker项目部署

引言 之前学习了Linux操作系统的常见命令&#xff0c;在Linux上安装软件&#xff0c;以及如何在Linux上部署一个单体项目&#xff0c;大多数同学都会有相同的感受&#xff0c;那就是麻烦。 核心体现在三点&#xff1a; 命令太多了&#xff0c;记不住 软件安装包名字复杂&…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

CSS | transition 和 transform的用处和区别

省流总结&#xff1a; transform用于变换/变形&#xff0c;transition是动画控制器 transform 用来对元素进行变形&#xff0c;常见的操作如下&#xff0c;它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...