RabbitMQ中的Work Queues模式
在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一种常见的模式,用于在多个消费者之间分配任务,从而实现负载均衡和提高系统的处理能力。下面将详细介绍 RabbitMQ 中的 Work Queues 模式。
1. 什么是 Work Queues 模式?
Work Queues 模式(也称为任务队列模式)是一种消息传递模式,用于在多个消费者之间分配任务。在这种模式下,生产者将任务(消息)发送到队列中,多个消费者从队列中获取任务并进行处理。每个任务只会被一个消费者处理,从而实现负载均衡。
Work Queues 模式的主要优点包括:
- 负载均衡:多个消费者可以并行处理任务,从而提高系统的处理能力。
- 解耦:生产者和消费者之间通过队列进行通信,彼此之间不需要直接交互。
- 异步处理:任务可以异步处理,生产者不需要等待任务完成即可继续执行其他操作。
2. Work Queues 模式的工作原理
2.1 生产者(Producer)
生产者负责将任务(消息)发送到队列中。生产者不需要知道有多少消费者会处理这些任务,只需要将任务发送到队列即可。
2.2 队列(Queue)
队列是消息的缓冲区,用于存储生产者发送的任务。队列可以有多个消费者,但每个任务只会被一个消费者处理。
2.3 消费者(Consumer)
消费者从队列中获取任务并进行处理。多个消费者可以并行处理任务,从而实现负载均衡。消费者可以是独立的进程、线程或服务。
2.4 消息确认(Message Acknowledgment)
为了确保任务能够可靠地处理,RabbitMQ 提供了消息确认机制。消费者在处理完任务后,需要向 RabbitMQ 发送确认消息,告知任务已经处理完成。如果消费者在处理任务时崩溃,RabbitMQ 会将未确认的任务重新分配给其他消费者。
2.5 公平分发(Fair Dispatch)
默认情况下,RabbitMQ 会按顺序将任务分发给消费者。然而,如果某些消费者处理任务的速度较慢,可能会导致任务堆积。为了避免这种情况,可以使用 basicQos
方法设置预取计数(prefetch count),限制每个消费者一次可以获取的任务数量,从而实现更公平的分发。
3. 环境准备
在开始之前,确保你已经安装了以下环境:
- Java 开发环境(JDK 8 或更高版本)
- RabbitMQ 服务器(已启动并运行)
- Maven(用于管理依赖)
3.1 添加依赖
首先,在你的项目中添加 RabbitMQ 客户端库的依赖。如果你使用的是 Maven,可以在 pom.xml
中添加以下依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
4. 代码案例
4.1 生产者代码
生产者负责将任务发送到队列。以下是一个简单的生产者代码示例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.将消息发送到队列for (int i = 1; i <= 10; i++) {String message = "Task to be processed, " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}}
}
代码解释:
ConnectionFactory
: 用于创建与 RabbitMQ 服务器的连接。Connection
: 表示与 RabbitMQ 服务器的物理连接。Channel
: 表示与 RabbitMQ 服务器的逻辑连接,用于发送和接收消息。queueDeclare
: 声明一个队列,true
表示队列是持久的。basicPublish
: 将消息发布到队列,使用默认交换机(空字符串)。
4.2 消费者代码
消费者负责从队列中获取任务并处理。以下是一个简单的消费者代码示例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static final String QUEUE_NAME = "work_queues";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1); // 每次只处理一个消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");//手动ack,因为不同的机器处理速度不一样,因此不同的机器会在不同时间应答,这样机器就可以根据实际能力处理了channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}
代码解释:
basicQos(1)
: 设置每次只处理一个消息,确保任务的公平分配。DeliverCallback
: 定义消息处理逻辑,doWork
方法模拟任务处理过程。basicAck
: 确认消息处理完成,从队列中移除消息。
5. 运行示例
- 启动 RabbitMQ 服务器:确保 RabbitMQ 服务器已启动并运行。
- 运行多个消费者:启动多个消费者实例,确保它们连接到同一个队列。
- 运行生产者:启动生产者实例,发送任务到队列。
示例输出:
-
生产者输出:
[x] Sent 'Task to be processed, 1'[x] Sent 'Task to be processed, 2'[x] Sent 'Task to be processed, 3'[x] Sent 'Task to be processed, 4'[x] Sent 'Task to be processed, 5'[x] Sent 'Task to be processed, 6'[x] Sent 'Task to be processed, 7'[x] Sent 'Task to be processed, 8'[x] Sent 'Task to be processed, 9'[x] Sent 'Task to be processed, 10'
-
消费者1输出:
[*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 1'[x] Done[x] Received 'Task to be processed, 4'[x] Done[x] Received 'Task to be processed, 6'[x] Done[x] Received 'Task to be processed, 8'[x] Done[x] Received 'Task to be processed, 10'[x] Done
-
消费者2输出:
[*] Waiting for messages. To exit press CTRL+C[x] Received 'Task to be processed, 2'[x] Done[x] Received 'Task to be processed, 3'[x] Done[x] Received 'Task to be processed, 5'[x] Done[x] Received 'Task to be processed, 7'[x] Done[x] Received 'Task to be processed, 9'[x] Done
6. 总结
本文详细介绍了如何在 RabbitMQ 中实现 Work Queues 模式,包括生产者、默认交换机、队列和多个消费者的设计与实现。通过使用 RabbitMQ 的 Java 客户端库,我们可以轻松地实现任务的分配和处理。Work Queues 模式非常适合需要将任务分配给多个消费者处理的场景,如任务调度、日志处理等。
相关文章:
RabbitMQ中的Work Queues模式
在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一…...
GESP202412 四级【Recamán】题解(AC)
》》》点我查看「视频」详解》》》 [GESP202412 四级] Recamn 题目描述 小杨最近发现了有趣的 Recamn 数列,这个数列是这样生成的: 数列的第一项 a 1 a_1 a1 是 1 1 1;如果 a k − 1 − k a_{k-1}-k ak−1−k 是正整数并且没有在数…...
蓝桥杯新年题解 | 第15届蓝桥杯迎新篇
蓝桥杯新年题解 | 第15届蓝桥杯迎新篇 2024年的蓝桥杯即将拉开序幕!对于许多编程爱好者来说,这不仅是一次展示自我能力的舞台,更是一次学习和成长的机会。作为一名大一新生的小蓝,对蓝桥杯充满了期待,但面对初次参赛的…...
3D 生成重建035-DiffRF直接生成nerf
3D 生成重建035-DiffRF直接生成nerf 文章目录 0 论文工作1 论文方法2 实验结果 0 论文工作 本文提出了一种基于渲染引导的三维辐射场扩散新方法DiffRF,用于高质量的三维辐射场合成。现有的方法通常难以生成具有细致纹理和几何细节的三维模型,并且容易出…...
@SpringBootTest 报错: UnsatisfiedDependencyException
Spring Boot Test 报错: UnsatisfiedDependencyException 在使用 SpringBootTest 测试时,出现 UnsatisfiedDependencyException 报错,原因和解决方法如下。 报错原因分析 1. Spring 存在涉及 Bean 没有被添加 Spring Boot 测试中,默认会加…...
mysql、postgresql、oceanbase调优
一、mysql 1、my.cnf [mysqld_safe] log-error=/data/mysql/log/mysql.log pid-file=/data/mysql/run/mysqld.pid[client] socket=/data/mysql/run/mysql.sock default-character-set=utf8[mysqld] basedir=/usr/local/mysql tmpdir=/data/mysql/tmp datadir=/data/mysql/dat…...
MySQL 数据库事务实践
引言 在现代应用程序开发中,确保数据库操作的完整性和一致性至关重要。MySQL 提供了强大的事务管理功能,允许开发者以原子性、一致性、隔离性和持久性(ACID)的方式处理数据。本文将通过详细的解释和实际示例,带你深入…...
VScode、Windsurf、Cursor 中 R 语言相关快捷键设置
前言 在生物信息学数据分析中,R语言是一个不可或缺的工具。为了提高R语言编程效率,合理设置快捷键显得尤为重要。本文介绍在VSCode Windsurf Cursor 中一些实用的R语言快捷键设置,让非 Rstudio 的 IDE 用起来得心应手😑 操作种…...
tcpdump编译
https://github.com/westes/flex/releases/download/v2.6.4/flex-2.6.4.tar.gz tar -zxvf flex-2.6.4.tar.gz ./configure CFLAGS-D_GNU_SOURCE make sudo make installwget http://ftp.gnu.org/gnu/bison/bison-3.2.1.tar.gz ./configure make sudo make install以上两个库是…...
Linux下禁止root远程登录访问
开始讲故事 Long long ago, Linux远程访问方式有telnet、ssh两种协议;有人可能还会说vnc和rdp协议方式,后面这两种主要是可视化桌面场景下的,并非主流。 时过境迁,telnet因安全性低逐渐被禁用淘汰,最后就…...
算法刷题Day16: BM41 输出二叉树的右视图
题目链接 描述 思路: 递归构造二叉树在Day15有讲到。复习一下,就是使用递归构建左右子树。将中序和前序一分为二。 接下来是找出每一层的最右边的节点,可以利用队列层次遍历。 利用队列长度记录当前层有多少个节点,每次从队列里…...
登录授权的实现:json web token + redis + springboot
文章目录 引言I token实现思路传统JWT TOKEN认证方式改进的JWT TOKEN认证方式redis设计II java代码实现登录接口退出登录接口登录之后接口(token解析和校验)III 常见问题400引言 应用场景: 登录认证 I token实现思路 传统JWT TOKEN认证方式 RESTful API TOKEN认证方式:…...
yolov,coco,voc标记的睡岗检测数据集,可识别在桌子上趴着睡,埋头睡觉,座椅上靠着睡,平躺着睡等多种睡姿的检测,6549张图片
yolov,coco,voc标记的睡岗检测数据集,可识别在桌子上趴着睡,埋头睡觉,座椅上靠着睡,平躺着睡等多种睡姿的检测,6549张图片 数据集分割 6549总图像数 训练组91% 5949图片 有效集9&#x…...
数据库表的CRUD
SQL语句(Structured Query Language)是用于与关系型数据库进行交互的语言。下面是几个常用的SQL语句: 创建表: CREATE TABLE table_name ( column1 datatype, column2 datatype, column3 datatype, ... ); 插入数据: …...
Proxy与Reflect
监听对象操作 在Object中,可以通过defineProperty中的get,set进行监听, Proxy基本使用 有两个参数,第一个是要代理的对象,第二个是捕获器,在不知道捕获器使用哪个之前可以先传个空对象。就会启用默认的捕获…...
【安卓开发】【Android Studio】启动时报错“Unable to access Android SDK add-on list”
一、问题描述 在启动Android Studio时,软件报错:Unable to access Android SDK add-on list,报错截图如下: 二、原因及解决方法 初步推测是由于网络节点延迟,无法接入谷歌导致的。点击Cancel取消即可。...
【C语言篇】C 语言总复习(下):点亮编程思维,穿越代码的浩瀚星河
我的个人主页 我的专栏:C语言,希望能帮助到大家!!!点赞❤ 收藏❤ 在C语言的世界里,结构体和联合体以及文件操作都是非常重要且实用的知识板块,掌握它们能帮助我们更高效地组织数据以及与外部文…...
AI技术架构:从基础设施到应用
人工智能(AI)的发展,正以前所未有的速度重塑我们的世界。了解AI技术架构,不仅能帮助我们看懂 AI 的底层逻辑,还能掌握其对各行业变革的潜力与方向。 一、基础设施层:AI 技术的坚实地基 基础设施层是 AI 技…...
centos7的yum镜像源设置
sudo yum repolist 查看镜像源连接情况,not found即为连接失败 sudo cp -r /etc/yum.repos.d /etc/yum.repos.d.backup 备份镜像源文件 sudo nano /etc/yum.repos.d/CentOS-Base.repo 进入镜像源文件编辑内容 # CentOS-Base.repo # # The mirror system uses the…...
Qt6开发自签名证书的https代理服务器
目标:制作一个具备类似Fiddler、Burpsuit、Wireshark的https协议代理抓包功能,但是集成到自己的app内,这样无需修改系统代理设置,使用QWebengineview通过自建的代理服务器,即可实现https包的实时监测、注入等自定义功能…...
HarmonyOS:多线程并发-Worker
Worker主要作用是为应用程序提供一个多线程的运行环境,可满足应用程序在执行过程中与宿主线程分离,在后台线程中运行一个脚本进行耗时操作,极大避免类似于计算密集型或高延迟的任务阻塞宿主线程的运行。具体接口信息及使用方法详情请见Worker…...
小程序IOS安全区域优化:safe-area-inset-bottom
ios下边有一个小黑线,位于底部的元素会被黑线阻挡 safe-area-inset-bottom 一 用法及作用: IOS全面屏底部有小黑线,位于底部的元素会被黑线阻挡,可以使用以下样式: .model{padding-bottom: constant(safe-area-ins…...
C++ 中多态性在实际项目中的应用场景
C中的多态性是面向对象编程中的一个核心概念,它允许我们在使用基类指针或引用的情况下,调用派生类对象的特定方法。这种特性在实际项目中有着广泛的应用场景,具体包括但不限于以下几个方面: 1.图形图像处理: 在图形图…...
prettier配置
配置 Prettier 在 VSCode 中自动格式化代码的教程 1. 安装 Prettier VSCode 插件 打开 VSCode。点击左侧活动栏的扩展市场图标(或按 Ctrl+Shift+X)。在搜索栏中输入 Prettier - Code formatter。找到插件并点击 Install 安装它。2. 配置 VSCode 设置 确保 VSCode 配置正确,…...
【基于OpenEuler国产操作系统大数据实验环境搭建】
大数据实验环境搭建 一、实验简介1.1 实验内容1.2 环境及其资源规划 二、实验目的三、实验过程3.1 安装虚拟机软件及操作系统3.2 创建安装目录(在主节点上操作)3.2 安装JDK及基本设置(所有节点都需要操作)3.3 安装Hadoop3.4 安装Z…...
期末软件经济学
文章目录 前言复习策略复习名词解释简答题第一章 ppt后记 前言 最近白天都在忙正事,晚上锻炼一下,然后处理一些杂事,现在是晚上十点多,还有一些时间复习一下期末考试。复习到十一点。 复习策略 感觉比较简单,直接刷…...
滑动窗口算法专题
滑动窗口简介 滑动窗口就是利用单调性,配合同向双指针来优化暴力枚举的一种算法。 该算法主要有四个步骤 1. 先进进窗口 2. 判断条件,后续根据条件来判断是出窗口还是进窗口 3. 出窗口 4.更新结果,更新结果这个步骤是不确定的,…...
基于Java的世界时区自动计算及时间生成方法
目录 前言 一、zoneinfo简介 1、zoneinfo是什么 2、zoneinfo有什么 二、在Java中进行时区转换 1、Java与zoneInfo 2、Java展示zoneInfo实例 3、Java获取时区ID 三、Java通过经纬度获取时区 1、通过经度求解偏移 2、通过偏移量计算时间 3、统一的处理算法 四、总结 …...
Excel + Notepad + CMD 命令行批量修改文件名
注意:该方式为直接修改原文件的文件名,不会生成新文件 新建Excel文件 A列:固定为 renB列:原文件名称C列:修改后保存的名称B列、C列,需要带文件后缀,为txt文件就是.txt结尾,为png图片…...
OpenGL 几何着色器高级应用
几何着色器高级应用 概念回顾 几何着色器(Geometry Shader)是 OpenGL 管线中的可选着色器阶段,位于顶点着色器(Vertex Shader) 和光栅化阶段 之间。 其核心功能是基于输入的图元(如点、线或三角形),生成新的图元,或对输入的图元进行修改。 几何着色器的执行是以图元…...
火星免费建网站/360排名优化工具
我想你在结尾处漏了一个缩进,你将每个输出图像保存50次。我把你的密码改成:import os import numpy as np from PIL import Image for j in range(1, 10): filename = str(j) + .jpg print processing, filename, ... im = Image.open(filename) data = np.array(im) for i i…...
网站备案审核制度/今天刚刚发生的新闻
题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:危险化学品经营单位主要负责人考试题是安全生产模拟考试一点通总题库中生成的一套危险化学品经营单位主要负责人模拟试题,安全生产模拟考试一点通上危险化学品经营单位…...
中国建设委员会官网站/网站交换链接的常见形式
openstack之kvm学习(一) 感谢Cloudman提供的系列基础博客: http://cloudman.blog.51cto.com/10425448/1745873 虚拟化基础-Hypervisor Hypervisor(也称VMM):运行在物理服务器和操作系统之间的中间软件层…...
个人摄影网站/网络营销推广方案步骤
http://blog.sina.com.cn/s/blog_557ee0540100cukp.html熟悉面向对象编程和网络编程的人一定对ActiveX、OLE和COM/DCOM这些概念不会陌生,但是它们之间究竟是什么样的关系,对许多们还是比较模糊的。在具体介绍它们的关系之间,我们还是先明确组…...
湖南关键词优化品牌推荐/杭州seo网站推广
主要记录黄勇的视频讲解。CSDN:https://blog.csdn.net/nunchakushuang/article/list/1? 他的文档大纲为: video3 拓展1:HTTP与HTTPS的区别,各自的原理? 大致的答案: 作者:北京千锋互联科技有限…...
做原材料供应的网站有哪些/seo建站的步骤
现代社会,几乎已经人手一部手机了,有的人甚至同时使用多部,不过随着手机功能越来越多、使用的频率越来越高,手机的续航能力成为了困扰用户的一个问题,电量越来越供给不上,如果是玩游戏的话耗电速度更快&…...