当前位置: 首页 > news >正文

深入学习RabbitMQ的Direct Exchange(直连交换机)

        RabbitMQ作为一种高性能的消息中间件,在分布式系统中扮演着重要角色。它提供了多种消息传递模式,其中Direct Exchange(直连交换机)是最基础且常用的一种。本文将深入介绍Direct Exchange的原理、应用场景、配置方法以及实践案例,帮助读者更好地理解和使用这一消息传递模式。

 

一、Direct Exchange的原理

        Direct Exchange是RabbitMQ中最简单的消息交换机类型之一。它根据消息的路由键(Routing Key)将消息路由到与之匹配的队列中。每个队列在绑定到交换机时,都会指定一个或多个绑定键(Binding Key)。当消息发送到交换机时,交换机将消息的路由键与所有绑定键进行匹配,并将消息路由到所有匹配成功的队列中。

  1. 消息发送:生产者将消息发送到Direct Exchange时,需要指定一个路由键。
  2. 绑定关系:队列与交换机之间的绑定关系是通过绑定键建立的。每个队列可以绑定到多个交换机,每个交换机也可以绑定多个队列。
  3. 消息路由:交换机根据消息的路由键和队列的绑定键进行匹配,将消息路由到所有匹配成功的队列中。
二、Direct Exchange的应用场景

        Direct Exchange适用于需要精确匹配路由键的场景,特别是在一对一或多对一的消息传递中表现出色。以下是一些典型的应用场景:

  1. 日志处理:根据日志的级别或类型,将日志消息路由到不同的处理队列中。例如,可以将ERROR级别的日志路由到一个错误处理队列,将INFO级别的日志路由到一个信息处理队列。
  2. 任务分发:在任务分发系统中,可以将不同的任务分配给不同的处理队列,每个队列对应一个或多个消费者。这样可以实现任务的并行处理和负载均衡。
  3. 订单处理:在电商系统中,根据订单号将订单消息路由到特定的处理队列,以便进行后续的订单处理流程。这可以确保每个订单都被正确地处理和跟踪。
  4. 消息过滤:在某些情况下,可能需要根据消息的某些属性进行过滤,将符合条件的消息路由到特定的队列中。Direct Exchange可以通过精确匹配路由键来实现这一功能。
三、Direct Exchange的配置方法

        在RabbitMQ中配置Direct Exchange通常涉及以下几个步骤:

  1. 声明交换机:使用RabbitMQ的API或管理界面声明一个Direct Exchange。
  2. 绑定队列:将队列与Direct Exchange绑定,并指定绑定键。这个绑定键将用于匹配消息的路由键。
  3. 发送消息:生产者发送消息到Direct Exchange时,需要指定一个路由键。交换机将根据这个路由键来查找与之匹配的队列。
  4. 接收消息:消费者从绑定的队列中接收消息进行处理。
四、实践案例

        以下是一个使用Spring AMQP和RabbitMQ实现Direct Exchange的示例案例。这个案例将展示如何配置交换机、队列、绑定关系以及发送和接收消息。

1. 配置交换机和队列

        首先,需要在RabbitMQ中声明一个Direct Exchange和一个或多个队列。这里我们使用Spring AMQP的Java配置方式来实现。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "direct_exchange";public static final String QUEUE_NAME_ONE = "queue_one";public static final String QUEUE_NAME_TWO = "queue_two";public static final String ROUTING_KEY_ONE = "routing_key_one";public static final String ROUTING_KEY_TWO = "routing_key_two";@Beanpublic Queue queueOne() {return new Queue(QUEUE_NAME_ONE, true);}@Beanpublic Queue queueTwo() {return new Queue(QUEUE_NAME_TWO, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(EXCHANGE_NAME);}@Beanpublic Binding bindingOne(Queue queueOne, DirectExchange directExchange) {return BindingBuilder.bind(queueOne).to(directExchange).with(ROUTING_KEY_ONE);}@Beanpublic Binding bindingTwo(Queue queueTwo, DirectExchange directExchange) {return BindingBuilder.bind(queueTwo).to(directExchange).with(ROUTING_KEY_TWO);}
}
2. 发送消息

        接下来,我们编写一个生产者类来发送消息到Direct Exchange。

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(String routingKey, String message) {amqpTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, routingKey, message);}
}
3. 接收消息

        最后,我们编写一个消费者类来接收消息并处理。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME_ONE)public void receiveMessageFromQueueOne(String message) {System.out.println("Received message from queue one: " + message);}@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME_TWO)public void receiveMessageFromQueueTwo(String message) {System.out.println("Received message from queue two: " + message);}
}

4. 运行案例

        现在,我们可以运行这个Spring Boot应用程序,并使用MessageProducer来发送消息。例如:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class ApplicationRunner implements CommandLineRunner {@Autowiredprivate MessageProducer messageProducer;@Overridepublic void run(String... args) throws Exception {messageProducer.sendMessage(RabbitMQConfig.ROUTING_KEY_ONE, "Hello, queue one!");messageProducer.sendMessage(RabbitMQConfig.ROUTING_KEY_TWO, "Hello, queue two!");}
}

        当应用程序运行时,它会发送两条消息到Direct Exchange。第一条消息将使用routing_key_one作为路由键,因此将被路由到queue_one。第二条消息将使用routing_key_two作为路由键,因此将被路由到queue_two。消费者类将分别接收并处理这两条消息。

总结

        Direct Exchange是RabbitMQ中最简单且常用的消息交换机类型之一。它通过精确匹配路由键将消息路由到与之匹配的队列中。本文深入介绍了Direct Exchange的原理、应用场景、配置方法以及实践案例。通过本文的学习,读者可以更好地理解和使用Direct Exchange来实现消息传递和分发功能。在实际应用中,可以根据具体需求选择合适的消息交换机类型来构建高效、可靠的消息传递系统。

相关文章:

深入学习RabbitMQ的Direct Exchange(直连交换机)

RabbitMQ作为一种高性能的消息中间件,在分布式系统中扮演着重要角色。它提供了多种消息传递模式,其中Direct Exchange(直连交换机)是最基础且常用的一种。本文将深入介绍Direct Exchange的原理、应用场景、配置方法以及实践案例&a…...

HTML实战课堂之启动动画弹窗

一:代码片段讲解 小提示:下面是一个包含启动页和弹窗的完整示例。这个示例包括一个简单的启动页和一个弹窗,当用户点击启动页上的按钮时,会显示弹窗。 1. **HTML结构**: - #startPage:启动页,包…...

将本地的 Git 仓库上传到 GitHub 上(github没有该仓库)

文章目录 步骤 1:在 GitHub 上创建新仓库步骤 2:配置本地仓库步骤 3:添加远程仓库地址步骤 4:推送本地代码到 GitHub验证上传 步骤 1:在 GitHub 上创建新仓库 登录 GitHub: 打开浏览器并访问 GitHub。使用自…...

【Linux】模拟Shell命令行解释器

一、知识补充 1.1 snprintf snprintf() 是 C语言的一个标准库函数&#xff0c;定义在<stdio.h>头文件中。 snprintf() 函数的功能是格式化字符串&#xff0c;并将结果存储在指定的字符数组中。该函数的原型如下&#xff1a; int snprintf(char *str, size_t size, con…...

G-Star Landscape 2.0 重磅发布,助力开源生态再升级

近日&#xff0c;备受行业瞩目的 G-Star Landscape 迎来了其 2.0 版本的发布&#xff0c;这一成果标志着 GitCode 在开源生态建设方面又取得了重要进展。 G-Star Landscape仓库链接&#xff1a; https://gitcode.com/GitCode-official-team/G-Star-landscape 2024 GitCode 开…...

Lianwei 安全周报|2024.1.7

以下是本周「Lianwei周报」&#xff0c;我们总结推荐了本周的政策/标准/指南最新动态、热点资讯和安全事件&#xff0c;保证大家不错过本周的每一个重点&#xff01; 政策/标准/指南最新动态 01 国家发改委等三部门印发《国家数据基础设施建设指引》 国家数据基础设施是从数据…...

ASP.NET Core 实现微服务 - Consul 配置中心

这一次我们继续介绍微服务相关组件配置中心的使用方法。本来打算介绍下携程开源的重型配置中心框架 apollo 但是体系实在是太过于庞大&#xff0c;还是让我爱不起来。因为前面我们已经介绍了使用Consul 做为服务注册发现的组件 &#xff0c;那么干脆继续使用 Consul 来作为配置…...

使用redis的5种常用场景

文章目录 1. 缓存热点数据2. 分布式锁3. 计数器和限流器4. 消息队列5. 会话管理总结 在日常开发工作中&#xff0c;Redis作为一款高性能的内存数据库&#xff0c;凭借其强大的功能特性和卓越的性能表现&#xff0c;已经成为了许多项目中不可或缺的组件。本文将详细介绍Redis在实…...

微信小程序防止重复点击事件

直接写在app.wpy里面&#xff0c;全局可以调用 // 防止重复点击事件preventActive(fn) {const self this;if (this.globalData.PageActive) {this.globalData.PageActive false;if (fn) fn();setTimeout(() > {self.globalData.PageActive true;}, 3000); //设置该时间内…...

PySpark用sort-merge join解决数据倾斜的完整案例

假设有两个大表 table1 和 table2 &#xff0c;并通过 sort-merge join 来解决可能的数据倾斜问题。 from pyspark.sql import SparkSession from pyspark.sql.functions import col# 初始化SparkSession spark SparkSession.builder.appName("SortMergeJoinExample&quo…...

sklearn-逻辑回归-制作评分卡

目录 数据集处理 分箱 分多少个箱子合适 分箱要达成什么样的效果 对一个特征进行分箱的步骤 分箱的实现 封装计算 WOE 值和 IV值函数 画IV曲线&#xff0c;判断最佳分箱数量 结论 pd.qcut 执行报错 功能函数封装 判断分箱个数 在银行借贷场景中&#xff0c;评分卡是…...

scrapy爬取图片

scrapy 爬取图片 环境准备 python3.10scrapy pillowpycharm 简要介绍scrapy Scrapy 是一个开源的 Python 爬虫框架&#xff0c;专为爬取网页数据和进行 Web 抓取而设计。它的主要特点包括&#xff1a; 高效的抓取性能&#xff1a;Scrapy 采用了异步机制&#xff0c;能够高效…...

在 Vue 项目中使用地区级联选

在 Vue 项目中使用地区级联选择的完整流程&#xff1a; 1.安装依赖包&#xff0c;这个包提供了中国省市区的完整数据。 npm install element-china-area-data --save 2.导入数据 import { regionData } from element-china-area-data 这个包提供了几种不同的数据格式&#…...

【简博士统计学习方法】第1章:1. 统计学习的定义与分类

自用笔记 1. 统计学习的定义与分类 1.1 统计学习的概念 统计学习&#xff08;Statistical Machine Learning&#xff09;是关于计算机基于数据构建概率统计模型并运用模型对数据进行预测与分析的一门学科。 以计算机和网络为平台&#xff1b;以数据为研究对象&#xff1b;以…...

利用 Python 脚本批量创建空白 Markdown 笔记

文章目录 利用 Python 脚本批量创建空白 Markdown 笔记1 背景介绍2 需求描述3 明确思路4 具体实现4.1. 遍历 toc.md 文件&#xff0c;收集文件名和对应的文件内容4.2. 实现文件批量生成逻辑4.3. 补全缺失的工具函数4.4. 进一步补全工具函数中的工具函数 5 脚本运行6 注意事项 利…...

【Qt】C++11 Lambda表达式

1. 举例 connect(ui->pushButton, &QPushButton::clicked, [](bool checked){//具体代码qDebug() << "Hello" << checked;}); 2. 详情 //完整形式 [ capture ] ( params ) opt -> ret { body; }; capture 是捕获列表params 是参数表opt 是函数…...

怎样提高服务器中的数据传输速度?

服务器中的数据传输速度会影响着用户的体验感&#xff0c;当企业中的数据传输速度出现卡顿或者是过慢时&#xff0c;用户不能及时浏览到所需的内容&#xff0c;给用户造成不好的体验感&#xff0c;那么企业该怎样才能提高服务器中的数据传输速度呢&#xff1f; 服务器之间如何传…...

Vue 封装公告滚动

文章目录 需求分析1. 创建公告组件Notice.vue2. 注册全局组件3. 使用 需求 系统中需要有一个公告展示&#xff0c;且这个公告位于页面上方&#xff0c;每个页面都要看到 分析 1. 创建公告组件Notice.vue 第一种 在你的项目的合适组件目录下&#xff08;比如components目录&a…...

JVM实战—12.OOM的定位和解决

大纲 1.如何对系统的OOM异常进行监控和报警 2.如何在JVM内存溢出时自动dump内存快照 3.Metaspace区域内存溢出时应如何解决(OutOfMemoryError: Metaspace) 4.JVM栈内存溢出时应如何解决(StackOverflowError) 5.JVM堆内存溢出时应该如何解决(OutOfMemoryError: Java heap s…...

【python翻译软件V1.0】

如果不想使用密钥的形式&#xff0c;且需要一个直接可用的中英文翻译功能&#xff0c;可以使用一些免费的公共 API&#xff0c;如 opencc 或其他无需密钥的库&#xff0c;或直接用 requests 获取翻译结果。 其中&#xff0c;我可以给你一个简单的代码示例&#xff0c;使用 tra…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站&#xff0c;会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后&#xff0c;网站没有变化的情况。 不熟悉siteground主机的新手&#xff0c;遇到这个问题&#xff0c;就很抓狂&#xff0c;明明是哪都没操作错误&#x…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...