当前位置: 首页 > news >正文

RabbitMq 使用说明

1. 声明交换机和队列,以及交换机和队列绑定

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class CeshiQueue {//延迟交换机fanoutpublic static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";//延时队列Apublic static final String DELAY_QUEUE_A_NAME = "delay_queue_a_name";//延时队列Bpublic static final String DELAY_QUEUE_B_NAME = "delay_queue_b_name";//延时队列A路由Keypublic static final String DELAY_ROUTING_KEY_A_NAME = "delay_routing_key_a_name";//延时队列B路由Keypublic static final String DELAY_ROUTING_KEY_B_NAME = "delay_routing_key_b_name";//死信交换机public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange_name";//死信队列Apublic static final String DEAD_LETTER_QUEUE_A_NAME = "dead_letter_queue_a_name";//死信队列Bpublic static final String DEAD_LETTER_QUEUE_B_NAME = "dead_letter_queue_b_name";//死信队列A路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_A_NAME = "dead_letter_routing_key_a_name";//死信队列B路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_B_NAME = "dead_letter_routing_key_b_name";/*** 声明延时交换机 fanout** @return*/@Bean(DELAY_EXCHANGE_NAME)public Exchange DELAY_EXCHANGE_NAME() {// return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.directExchange(DELAY_EXCHANGE_NAME).durable(true).build();}/*** 声明死信交换机 direct** @return*/@Bean(DEAD_LETTER_EXCHANGE_NAME)public Exchange DEAD_LETTER_EXCHANGE_NAME() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME).durable(true).build();}/*** 声明延时队列A* 并绑定死信交换机 和 死信队列A  根据路由key** @return*/@Bean(DELAY_QUEUE_A_NAME)public Queue DELAY_QUEUEA_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_NAME);//设置此队列延时时间 6秒map.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(map).build();}/*** 声明延时队列B* 并绑定死信交换机 和 死信队列B  根据路由key** @return*/@Bean(DELAY_QUEUE_B_NAME)public Queue DELAY_QUEUEB_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_NAME);//设置此队列延时时间 12秒map.put("x-message-ttl", 12000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(map).build();}/*** 声明死信队列A** @return*/@Bean(DEAD_LETTER_QUEUE_A_NAME)public Queue DEAD_LETTER_QUEUEA_NAME() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}/*** 声明死信队列B** @return*/@Bean(DEAD_LETTER_QUEUE_B_NAME)public Queue DEAD_LETTER_QUEUEB_NAME() {return new Queue(DEAD_LETTER_QUEUE_B_NAME);}/*** 延时队列A绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueABinding(@Qualifier(DELAY_QUEUE_A_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_A_NAME).noargs();}/*** 延时队列B绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueBBinding(@Qualifier(DELAY_QUEUE_B_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_B_NAME).noargs();}/*** 死信队列A绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueABinding(@Qualifier(DEAD_LETTER_QUEUE_A_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_NAME).noargs();}/*** 死信队列B绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueBBinding(@Qualifier(DEAD_LETTER_QUEUE_B_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_NAME).noargs();}}

2. 生产者发送消息

import com.core.rabbitmq.queue.CeshiQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/ceshiDelay1")public void ceshi() {System.out.println("provider ================" + new Date());//发送消息Map map = new HashMap();map.put("name", "老六");map.put("age", "18");//第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
//        rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, "", map);rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, CeshiQueue.DELAY_ROUTING_KEY_A_NAME, map
//                ,message -> {
//                    message.getMessageProperties().setExpiration(1000 * 6 + "");
//                    message.getMessageProperties().setDelay(6 * 1000);
//                    return message;
//                }););System.out.println("订单提交成功,请及时付款");}

3. 消费者消费消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;
import java.util.Map;/*** 小程序订单超时未支付关闭订单** @date 2023/03/03*/
@Component
@Slf4j
//@RequiredArgsConstructor
public class ConsumerListener {@RabbitListener(queues = CeshiQueue.DEAD_LETTER_QUEUE_A_NAME)public void process2(Map order, Message message, @Headers Map<String, Object> headers, Channel channel) {log.info("消費者 订单号消息", order);long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("订单 延时队列,消费者 订单号消息【{}】", order);log.info("订单 延时队列,消费者 订单号消息【{}】", headers);log.info("订单 延时队列,消费者 订单号消息【{}】", message);log.info("订单 延时队列,消费者 订单号消息【{}】", channel);log.info("订单 延时队列,消费者 订单号消息【{}】", deliveryTag);System.out.println(new Date() + "消费者 执行结束...." + message);try {      // 如果没有错误,则执行此步,表示成功签收; 手动签收,第一个参数表示消息的deliveryTag,第二个参数表示是否允许多条消息同时被签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// requeue为true则重新入队列,否则丢弃或者进入死信队列  /*** @param deliveryTag 消息序号* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)*/channel.basicReject(deliveryTag, false);log.error("订单  超时支付异常,系统重新关闭订单【{}】", deliveryTag);}}
}

        注意:生产者发送延迟队列里,用延迟路由key,想要得到延迟效果,需要用死信队列来监听,如果用延迟队列监听就得不到延迟效果,延迟队列就会立即受到消息(初学者会遇到的坑)。

4. BasicReject 

        拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。 接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);

        BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。

        BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。

channel.BasicNack(3, true, false);

        在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。

5. RabbitMQ 消息确认机制ACK

ack 机制保证的是broker和消费者之间的可靠性

ack 表示的是消费端收到消息后的确认方式,有三种确认方式

        自动确认:acknowledge="none"(默认)
        手动确认:acknowledge="manual"
        根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,不作讲解)
自动确认的解释:

        当消息一旦被 Consumer 接收到,则自动确认收到,并将相应消息从 RabbitMQ 的消息缓存中移除
手动确认的解释:

        在实际业务处理中,很可能消费端收到消息后业务处理出现异常,那么该消息就会丢失;
        如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() 手动签收;如果出现异常,则调用 channel.basicNack()方法,让 broker 自动重新发送消息给消费端。

rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景_出一个场景问mq的用法(延迟、死信)

rabbitmq-BasicReject

RabbitMQ ACK消息确认机制 快速入门

相关文章:

RabbitMq 使用说明

1. 声明交换机和队列&#xff0c;以及交换机和队列绑定 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.spr…...

Vue(10-20)

1Vue赋值方式 Object.defineProperty <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" conten…...

C++-对四个智能指针:shared_ptr,unique_ptr,weak_ptr,auto_ptr的理解

回答如下&#xff1a; C的智能指针是一种特殊类型的“指针”&#xff0c;其主要目的是自动跟踪内存分配和释放&#xff0c;以避免程序中出现内存泄露或空悬指针等问题&#xff0c;主要采用的技术是&#xff1a;借助于类的生命周期&#xff0c;当超出了类的作用域时&#xff0c…...

uni-app中使用vue3语法详解

全局创建 app.use(createPina()).mount 全局方法 通过app.config.globalProperties.xxx可以创建 这里我们写了一个字符串翻转的全局方法 main.js里面添加一个全局方法 不要忘了加$ 否则会报错 // #ifdef VUE3 //导入创建app import { createSSRApp } from vue //导入创建ap…...

三十四、MongoDB PHP

PHP 语言可是使用 mongo.so ( Windows 下是 mongo.dll ) 扩展访问 MongoDB 数据库 MongoDB PHP 在各平台上的安装及驱动包下载请查看: PHP 安装 MongoDB 扩展驱动 如果你使用的是 PHP7&#xff0c;请移步&#xff1a; PHP7 MongoDB 安装与使用 PHP 连接 MongoDB 和 选择一个…...

浅拷贝和深拷贝的区别

浅拷贝和深拷贝 总结&#xff1a;浅拷贝对象数据共享&#xff0c;深拷贝是一个完全独立的对象&#xff0c;因此对象数据不共享。 浅拷贝&#xff08;Shallow Copy&#xff09; 浅拷贝是指创建一个新的对象&#xff0c;但是该新对象只是原始对象的一个副本。具体而言&#xf…...

6个常用Pycharm插件推荐,老手100%都用过

人生苦短 我用python 有些插件是下载后需要重启Pycharm才生效的 免费领源码、安装包&#xff1a;扣扣qun 903971231 PyCharm 本身已经足够优秀&#xff0c; 就算不使用插件&#xff0c; 也可以吊打市面上 90%的 Python 编辑器。 如果硬要我推荐几款实用的话&#xff0c; 那么…...

TCP的11种状态

CLOSED状态&#xff1a;初始状态&#xff0c;表示TCP连接是“关闭的”或者“未打开的”LISTEN状态&#xff1a;表示服务端的某个端口正处于监听状态&#xff0c;正在等待客户端连接的到来SYN_SENT状态&#xff1a;当客户端发送SYN请求建立连接之后&#xff0c;客户端处于SYN_SE…...

new 指令简单过程 / 类加载简单过程初始化

例子&#xff1a;Person p new Person(“张三”,”23”); 因为new用到person.class,所以先找到person.class文件&#xff0c;并且加载到内存中&#xff08;如果有父类先加载父类&#xff09;执行static块以及static变量的初始化&#xff08;如果有父类先初始化父类&#xff0…...

Asan基本原理及试用

概述 Asan是Google专门为C/C开发的内存错误探测工具&#xff0c;其具有如下功能 使用已释放内存&#xff08;野指针&#xff09;√堆内存越界&#xff08;读写&#xff09;√栈内存越界&#xff08;读写&#xff09;√全局变量越界&#xff08;读写&#xff09;函数返回局部变…...

深度学习应用技巧4-模型融合:投票法、加权平均法、集成模型法

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下&#xff0c;深度学习中的模型融合。它是将多个深度学习模型或其预测结果结合起来&#xff0c;以提高模型整体性能的一种技术。 深度学习中的模型融合技术&#xff0c;也叫做集成学习&#xff0c;是指同时使用多个…...

【并发编程】深入理解Java内存模型及相关面试题

文章目录优秀引用1、引入2、概述3、JMM内存模型的实现3.1、简介3.2、原子性3.3、可见性3.4、有序性4、相关面试题4.1、你知道什么是Java内存模型JMM吗&#xff1f;4.2、JMM和volatile他们两个之间的关系是什么&#xff1f;4.3、JMM有哪些特性/能说说JMM的三大特性吗&#xff1f…...

C++编程语言STL之queue介绍

本文主要介绍C编程语言的STL&#xff08;Standard Template Library&#xff09;中queue&#xff08;队列&#xff09;的相关知识&#xff0c;同时通过示例代码介绍queue的常见用法。1 概述适配器&#xff08;adaptor&#xff09;是STL中的一个通用概念。容器、迭代器和函数都有…...

ACO优化蚁群算法

%% 蚁群算法&#xff08;ant colony optimization,ACO&#xff09; %清空变量 clear close all clc [ graph ] createGraph(); figure subplot(1,3,1) drawGraph( graph); %% 初始化参数 maxIter 100; antNo 50; tau0 10 * 1 / ( graph.n * mean( graph.edges(:) …...

SwiftUI 常用组件和属性(SwiftUI初学笔记)

本文为初学SwiftUI笔记。记录SwiftUI常用的组件和属性。 组件 共有属性(View的属性) Image("toRight").resizable().background(.red) // 背景色.shadow(color: .black, radius: 2, x: 9, y: 15) //阴影.frame(width: 30, height: 30) // 宽高 可以只设置宽或者高.…...

Centos 中设置代理的两种方法

Centos 中设置代理的两种方法 在使用局域网时&#xff0c;有时在局域网内只有一台电脑可以进行上网&#xff0c;其他电脑只能通过配置代理的方式来上网&#xff0c;在Windows系统中设置代理上网相对简单&#xff0c;如果只需上网的话&#xff0c;只需在浏览器中找到网络连接&am…...

高速PCB设计指南系列(一)

第一篇 PCB布线 在PCB设计中&#xff0c;布线是完成产品设计的重要步骤&#xff0c;可以说前面的准备工作都是为它而做的&#xff0c; 在整个PCB中&#xff0c;以布线的设计过程限定最高&#xff0c;技巧最细、工作量最大。PCB布线有单面布线、 双面布线及多层布线。布线的方…...

云端IDE:TitanIDE v2.6.0 正式发布

原文作者&#xff1a;行云创新技术总监 邓冰寒 引言 云原生集成开发环境 TitanIDE v2.6.0 正式发布了&#xff0c;一起来看看都有那些全新的体验吧&#xff01; TitanIDE 是一款云IDE, 也称 CloudIDE&#xff0c;作为数字化时代研发体系不可或缺的一环&#xff0c;和企业建设…...

【Python】tqdm 模块

import mathfrom tqdm import tqdm, trange# 计算阶乘 results_1 []for i in range(6666):results_1.append(math.factorial(i))这是一个循环计算阶乘的程序&#xff0c;我们不知道程序运行的具体情况&#xff0c;如果能加上一个程序运行过程的进度条&#xff0c;那可就太有趣…...

论文阅读:Adversarial Cross-Modal Retrieval对抗式跨模式检索

Adversarial Cross-Modal Retrieval 对抗式跨模式检索 跨模态检索研究的核心是学习一个共同的子空间&#xff0c;不同模态的数据可以直接相互比较。本文提出了一种新的对抗性跨模态检索&#xff08;ACMR&#xff09;方法&#xff0c;它在对抗性学习的基础上寻求有效的共同子空间…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

【JVM】Java虚拟机(二)——垃圾回收

目录 一、如何判断对象可以回收 &#xff08;一&#xff09;引用计数法 &#xff08;二&#xff09;可达性分析算法 二、垃圾回收算法 &#xff08;一&#xff09;标记清除 &#xff08;二&#xff09;标记整理 &#xff08;三&#xff09;复制 &#xff08;四&#xff…...