RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 页面分析
前言
RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用,并给出怎么用的案例。
本篇博客结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码。(文末有惊喜~)
其他相关的rabbitmq博客文章列表如下:
RabbitMQ基础(1)——生产者消费者模型 & RabbitMQ简介 & Docker版本的安装配置 & RabbitMQ的helloworld + 分模块构建 & 解决大量注册案例
RabbitMQ基础(2)——发布订阅/fanout模式 & topic模式 & rabbitmq回调确认 & 延迟队列(死信)设计
RabbitMQ的Docker版本安装 + 延迟插件安装 & QQ邮箱和阿里云短信验证码的主题模式发送
目录
- 前言
- 引出
- MQ场景:
- 1异步处理
- 2解耦
- 3削峰填谷
- 常见的MQ
- 拆解控制台页面
- 总览页面
- 连接connection
- 队列页面
- 简单模式
- 工具类:建立连接
- 生产者:生产消息
- 消费者:消费消息
- 进行测试
- Ack:项目中必须false
- 工作模式
- 一个生产者,两个消费者
- QOS: 限流
- Tips:队列参数怎么变?
- 队列相关参数
- x-max-length
- x-overflow
- x-max-length-bytes
- 发布者订阅模式
- 生产者:给fanout交换机发消息
- 消费者1:队列q32
- 消费者2:队列q321
- 路由模式
- 生产者:发给交换机,告诉路由键
- 消费者1:根据路由键,接收3种消息
- 消费者2:根据路由键,接收1种消息
- 主题模式(Topic)
- 生产者:主题交换机,带路由键
- 消费者1:通配符的路由键
- 消费者2:统配符
- 总结
引出
1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;
MQ场景:
Message Queue 消息队列。
有比较大的负载,而且这些负载不用立刻马上给程序返回结构,可以有一些等待时间。可以用消息队列。
1异步处理
过去的项目,大部分都是同步解决问题。
UserinfoService{register(){//典型的同步//插入数据库//发短信//发邮件}
}
//如果你的操作基本没有任何延时操作,或者瓶颈,没有压力。那么你没有必要用MQ
UserinfoService{register(){//发起的是一个异步请求都不等待对方给我返回的结果//异步插入数据库//异步发短信//异步发邮件}
}
2解耦
库存有没有可能挂了。或者访问量巨大。因为库存慢,导致订单也慢。
解耦。
3削峰填谷
双12的时候,叫好早上的8点。秒杀的时候。
有些瞬间服务器压力是超大的,过了这个瞬间,几乎没有消耗量。
等服务器能够正常的时候,我慢慢执行就行了。
常见的MQ
MQ的前身,就是一个发布者订阅模式。
Kafka RabbitMQ(1W/s) RocketMQ ActiveMQ
Kafka : 10w/S 主要用于日志。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
拆解控制台页面
总览页面
包括刷新时间间隔,rabbitmq节点,连接端口的信息等
配置可以导出和导入
连接connection
队列页面
包括队列的名字,状态,准备好的消息数量,未确认的消息数量,总计消息数量
简单模式
使用的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>
工具类:建立连接
package com.tianju.config;import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 建立连接的工具类*/
public class ConnectionFactory {public static Connection createConnection() throws IOException, TimeoutException {com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();connectionFactory.setHost("192.168.111.130"); // http://192.168.111.130/connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("123");connectionFactory.setVirtualHost("/demo");// amqp://admin@192.168.111.130:5672/return connectionFactory.newConnection();}
}
生产者:生产消息
package com.tianju.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 100; i++) {String msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}
消费者:消费消息
package com.tianju.simple;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 队列必须声明,如果不存在,则自动创建// 声明一个消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 回调函数,用来接收消息* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array),字节数组* @throws IOException*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body); // 收到的信息System.out.println("消费者接收到:"+msg);try {Thread.sleep(3000);// 模拟一个耗时操作} catch (InterruptedException e) {throw new RuntimeException(e);}}};// 表明自己是消费者,接收消息/*** autoAck:自动确认设置成 true*/channel.basicConsume(QUEUE_ORDER, true, defaultConsumer);}
}
进行测试
生产者发送100条消息
消费者进行消费,假设突然之间服务宕机了,此时消费了4条消息,理论上还应该有96条消息
打开控制台页面查看,消息全部消失了,出现了数据丢失的情况
!
全部用默认值的情况下,如果发生异常,则消息全部丢失。
消费者一次性拿到了所有的消息。
Ack:项目中必须false
生产者 ==投递消息=》队列
消费者=接受==》队列
ack: false 必须要手工确认。
消费者接到这个消息的时候, 这个消息进入 unack状态。
1:手工确认。消息删除。完成
2: 没手工确认,断开连接或者超时。
3:这个消息重新进入ready状态。等待其他消费者进行消费。
先设置自动ack为false,表示需要手工确认,然后在消费消息的方法中,进行消息的确认。
package com.tianju.simple;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列channel.queueDeclare(QUEUE_ORDER,false,false,false,null);// 队列必须声明,如果不存在,则自动创建// 声明一个消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 回调函数,用来接收消息* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array),字节数组* @throws IOException*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {// no work to doString msg = new String(body); // 收到的信息System.out.println("消费者接收到:"+msg);try {Thread.sleep(3000);// 模拟一个耗时操作} catch (InterruptedException e) {throw new RuntimeException(e);}// 消费者代码可能有失败,消息拿到之后,可能还没有处理,就宕机了// 消息确认的代码一定在最后一行// long deliveryTag 消息的下标;// boolean multiple 是否批量确认;channel.basicAck(envelope.getDeliveryTag(),true); // 确认消息,批量确认}};// 表明自己是消费者,接收消息/*** autoAck:自动确认设置成 true* 是否自动确认,* false:不进行自动确认;true:自动确认* 消费过程中可能产生异常* 如果产生异常,则必须进行消费补偿*/channel.basicConsume(QUEUE_ORDER, false, defaultConsumer); // 接收消息}
}
再次模拟宕机的情况,一开始消息全部被放到Unack中,当宕机时,又把消息吐了出来,至少消息没有出现丢失的情况。
工作模式
一个生产者,两个消费者
两个消费者消费的是同一个队列中的消息。
两个消费者上来后,默认是按照平均分配。结果: 有人瞬间干完。有人很久都没干完。
QOS: 限流
可以限制你一次拉取几个。
这样两个消费者,也能够节省服务器CPU了。
// 创建频道Channel channel = connection.createChannel();channel.basicQos(1); /** 1次只能拿1个 **/
当消费的速度太慢,觉得不够,加入其他的消费者。
核心,只有一个消息队列。
消息是轮询的方式,发送给不同的消费者。
Tips:队列参数怎么变?
队列的参数发生变化后,要删除再添加。
// 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,null);
队列相关参数
x-max-length
package com.tianju.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在Map map = new HashMap();map.put("x-max-length", 10); // 设置最大的长度,只接受10个// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 1000; i++) {String msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}
How many (ready) messages a queue can contain before it starts to drop them from its head.
x-overflow
Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are
drop-head
,reject-publish
orreject-publish-dlx
. The quorum queue type only supportsdrop-head
andreject-publish
.
此时就是前10个了
x-max-length-bytes
package com.tianju.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();channel.queueDelete(QUEUE_ORDER); // 先删除旧的队列boolean durable = true; // 当前队列中的消息持久化操作,重启之后,消息还在Map map = new HashMap();map.put("x-max-length", 10); // 设置最大的长度,只接受10个map.put("x-overflow", "reject-publish"); // 拒绝发布,变成前10gemap.put("x-max-length-bytes", 4); // 消息的最大字节长度// 创建队列channel.queueDeclare(QUEUE_ORDER,durable,false,false,map);// 发送消息,指定给哪个队列上发消息for (int i = 0; i < 20; i++) {String msg = "53"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());msg = "hello rabbitmq--"+i;channel.basicPublish("", QUEUE_ORDER, null, msg.getBytes());System.out.println("消息发布成功");}connection.close();}
}
参数
//创建队列boolean durable = true;//当前队列中的消息进行持久化操作 重启之后,消息还在Map map = new HashMap();map.put("x-max-length",10);//设置队列的最大长度map.put("x-overflow","reject-publish");//设置队列的最大长度 drop headmap.put("x-max-length-bytes",4);//设置消息的最大字节长度
// map.put("x-expires",10000);//超时后,直接删除队列channel.queueDeclare(QUQUENAME,durable,false,false,map);
发布者订阅模式
工作模式中,只有一个队列。两个消费者轮询从同一个队列中取数据。
发布者订阅模式: 交换机后面会绑定多个消息队列。每个消息队列都有完整的信息。每个队列后的消费者都会有完整的消息。
交换机:就是一个无意识的广播。行为就是扇出.
场景:
下订单:
1:订单的数据入数据库
2:发送订单的短信
3: 物流
要每个队列都有完整的信息。
生产者:给fanout交换机发消息
package com.tianju.publish;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String QUEUE_ORDER = "queue_order";private static String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个for (int i = 0; i < 100; i++) {channel.basicPublish(EXCHANGE, "", MessageProperties.TEXT_PLAIN, ("hello fanout"+i).getBytes());}}
}
消费者1:队列q32
package com.tianju.publish;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个channel.queueDeclare("q32", false, false, false, null);channel.queueBind("q32", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的channel.basicConsume("q32", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
消费者2:队列q321
package com.tianju.publish;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static String EXCHANGE = "pet_exchange";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "fanout"); // 扇出,类型只能用这个channel.queueDeclare("q321", false, false, false, null);channel.queueBind("q321", EXCHANGE, ""); // 路由键fanout模式,必须为空,即使写了是无效的channel.basicConsume("q321", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者2:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
路由模式
允许用不同的路由键来接受不同的信息。
队列会接到完整的全部的信息。
日志系统:
日志级别:
OFF INFO DEBUG ERROR FATAL ALL
如果是致命错误: 立刻给运维发短信。
全部信息: 放到专门的日志系统
Error: 进行发邮件
生产者:发给交换机,告诉路由键
package com.tianju.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机String msg = "this is fatal message";channel.basicPublish(EXCHANGE,"fatal", MessageProperties.TEXT_PLAIN,msg.getBytes());msg = "this is error message";channel.basicPublish(EXCHANGE,"error", MessageProperties.TEXT_PLAIN,msg.getBytes());msg = "this is debug message";channel.basicPublish(EXCHANGE,"debug", MessageProperties.TEXT_PLAIN,msg.getBytes());}
}
消费者1:根据路由键,接收3种消息
package com.tianju.routing;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.Calendar;
import java.util.concurrent.TimeoutException;public class Consumer {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q58", false, false, false, null);channel.queueBind("q58", EXCHANGE, "fatal");channel.queueBind("q58", EXCHANGE, "error");channel.queueBind("q58", EXCHANGE, "debug");channel.basicConsume("q58", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
消费者2:根据路由键,接收1种消息
package com.tianju.routing;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "direct"); // 直接交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q581", false, false, false, null);channel.queueBind("q581", EXCHANGE, "fatal");channel.basicConsume("q581", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("消费者1:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
主题模式(Topic)
可以采取通配符模式:
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
#`:匹配0个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
场景:
京东: 无锡,河南下订单。退货
==========总部 收到这个订单 .
无锡分销商也有这个消息 *.wuxi
生产者:主题交换机,带路由键
package com.tianju.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者发送消息* 建立连接--> 创建频道 --> 创建队列 --> 发送消息*/
public class Provider {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题模式交换机String msg = "this is wuxi order";channel.basicPublish(EXCHANGE,"order.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());msg = "this is henan order";channel.basicPublish(EXCHANGE,"order.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());msg = "this is wuxi back";channel.basicPublish(EXCHANGE,"back.wuxi", MessageProperties.TEXT_PLAIN,msg.getBytes());msg = "this is henan back";channel.basicPublish(EXCHANGE,"back.henan", MessageProperties.TEXT_PLAIN,msg.getBytes());}
}
消费者1:通配符的路由键
package com.tianju.topic;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q58", false, false, false, null);channel.queueBind("q58", EXCHANGE, "*.wuxi"); // 意味着无锡的订单和退货都能收到channel.basicConsume("q58", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("无锡仓库:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
消费者2:统配符
package com.tianju.topic;import com.rabbitmq.client.*;
import com.tianju.config.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {private static String EXCHANGE = "exchange_58";public static void main(String[] args) throws IOException, TimeoutException {// 建立连接Connection connection = ConnectionFactory.createConnection();// 创建频道Channel channel = connection.createChannel();// 创建队列// 创建交换机channel.exchangeDeclare(EXCHANGE, "topic"); // 主题交换机// 队列必须声明,如果不存在,则自动创建channel.queueDeclare("q581", false, false, false, null);channel.queueBind("q581", EXCHANGE, "*.*"); // 意味着所有的订单和退货都能收到channel.basicConsume("q581", new DefaultConsumer(channel){public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{// no work to doString msg = new String(body);System.out.println("总部仓库:"+msg);channel.basicAck(envelope.getDeliveryTag(), false);}});}
}
总结
1.MQ,消息队列的应用场景,几种MQ简单对比;
2.分析RabbitMQ的浏览器控制台页面;
3.结合场景来阐述RabbitMQ的几种模式,描述了不同模式的应用场景,并给出相应的代码;
相关文章:
RabbitMQ的5种模式——再探RabbitMQ的模式,简单、工作,发布订阅(广播),路由、主题 页面分析
前言 RabbitMQ作为一款常用的消息中间件,在微服务项目中得到大量应用,其本身是微服务中的重点和难点,有不少概念我自己的也是一知半解,本系列博客尝试结合实际应用场景阐述RabbitMQ的应用,分析其为什么使用࿰…...
初识华为云数据库GaussDB for openGauss
01 前言 GaussDB是华为自主创新研发的分布式关系型数据库。该产品具备企业级复杂事务混合负载能力,同时支持分布式事务,同城跨AZ部署,数据0丢失,支持1000的扩展能力,PB级海量存储。同时拥有云上高可用,高可…...
深圳寄包裹到德国
深圳,作为全球最发达的城市之一,以其高效的物流服务在全球范围内享有盛名。如果你正在寻找一种方式将包裹从深圳寄送到德国,那么本文将为你提供详细的步骤和建议。 第一步:了解国际邮寄的基本信息 首先,你需要了解包裹…...
系统架构师备考倒计时22天(每日知识点)Redis篇
Redis篇 1.Redis与Memcache能力对比 工作MemCacheRedis数据类型简单 key/value 结构丰富的数据结构持久性不支持支持分布式存储客户端哈希分片/一致性哈希多种方式,主从、Sentinel、Cluster 等多线程支持支持支持(Redis5.0及以前版本不支持)内存管理私有内存池/内…...
现有库存(on-hand inventory),库存水平(inventory level),库存位置(inventory position)
库存管理中,这几个名词特别容易混,干脆写一篇博客总结下。 现有库存(on-hand inventory),是指持有的真实库存量 库存水平(inventory level),现有库存减去延迟交付的订单 inventory level on-hand inventory − backorder quant…...
智慧空开让用电更安全、管理更智能——电脑APP远程控制开合闸
安科瑞 崔丽洁 01 什么是低压断路器?低压断路器的定义是:能够接通、承载及分断正常电路条件下的电流,也能在规定的非正常电路条件(过载、短路、特别是短路)下接通、承载一定时间和分断电流的开关电器。 断路器的分类&…...
PyTorch 中张量运算广播
TLDR 右对齐,空补一,从左往右依维运算 [m] [x, y] [m x, m y] 正文 以如下 a b 两个 tensor 计算为例 a torch.tensor([[1],[2],[3], ]) b torch.tensor([[[1, 2, 3],],[[4, 5, 6],],[[7, 8, 9],], ]) # a.shape (3, 1) # b.shape (3, 1, 3)首先…...
Blender:使用立方体制作动漫头像
好久没水文章 排名都掉到1w外了 ~_~ 学习一下blender,看能不能学习一点曲面变形的思路 一些快捷键 ctrl 空格:区域最大化,就是全屏 ctrl alt 空格:也是区域最大化 shift b:框选区域然后最大化显示该范围 shift 空…...
【ppt技巧】ppt里的图片如何提取出来?
之前分享过如何将PPT文件导出成图片,今天继续分享PPT技巧,如何提取出PPT文件里面的图片。 首先,我们将PPT文件的后缀名,修改为rar,将文件改为压缩包文件 然后我们将压缩包文件进行解压 最好是以文件夹的形式解压出来…...
Python学习基础笔记七十三——调试程序
为什么要调试? 我们发现程序运行的结果和我们预期的不符。 程序运行的错误,我们通常叫做bug。 有两种类型的bug:语句错误和逻辑错误。 所谓语句错误,就是执行代码的时候,解释器就可以直接发现的代码错误,…...
BOSHIDA DC电源模块关于电容器的电解液位置
BOSHIDA DC电源模块关于电容器的电解液位置 DC电源模块中的电容器扮演着一个非常重要的角色,它们能够对电路提供稳定的电源电压,同时也可以作为电路中的滤波器,去除电路中的噪声和纹波。在DC电源模块中使用的电容器通常是电解型电容器&#…...
如何实现 Es 全文检索、高亮文本略缩处理(封装工具接口极致解耦)
如何实现 Es 全文检索、高亮文本略缩处理 前言技术选型JAVA 常用语法说明全文检索开发高亮开发Es Map 转对象使用核心代码 Trans 接口(支持父类属性的复杂映射)Trans 接口可优化的点高亮全局配置类如下真实项目落地效果为什么不用 numOfFragments、fragm…...
C++多线程编程(第四章 案例1,C++11和C++17 多核并行计算样例)
目录 4.1手动实现多核base16编码4.1.1 实现base16编码4.1.2无多线程代码4.1.3 C 11多线程代码4.1.4 C 17多线程并发4.1.5 所有测试代码汇总 4.1手动实现多核base16编码 4.1.1 实现base16编码 二进制转换为字符串 一个字节8位,拆分为两个4位字节(最大值…...
获取远程仓库的信息和远程分支的信息
前记: git svn sourcetree gitee github gitlab gitblit gitbucket gitolite gogs 版本控制 | 仓库管理 ---- 系列工程笔记. Platform:Windows 10 Git version:git version 2.32.0.windows.1 Function:获取远程仓库的信息和远…...
QT学习day1
一、思维导图 二、作业:实现登录界面 #include "widget.h" #include<QDebug> #include<QIcon>Widget::Widget(QWidget *parent): QWidget(parent) {/**********************窗口******************///设置窗口图标this->setWindowTitle…...
unity面试八股文 - 框架设计与资源管理
Unity项目框架是如何设计的?有哪些原则 在设计Unity项目框架时,通常会遵循一些基本的原则和步骤。以下是主要的一些原则: 模块化:每个功能都应该被作为一个独立的模块来处理,这样可以方便修改和维护。 低耦合&#x…...
智能网关IOT 2050采集应用
SIMATIC IOT2050 是西门子公司新推出的应用于企业数字化转型的智能边缘计算和云连接网关。 它将云、公司内 IT 和生产连接在一起,专为直接在生产环境中获取、处理和传输数据的工业 IT 解 决方案而设计。例如,它可用于将生产 过程与基于云的机器和生产数据…...
iOS代码混淆-从入门到放弃
目录 1. 什么是iOS代码混淆? 2. iOS自动代码混淆的方法是什么? 3. iOS代码混淆的作用是什么? 4. 怎么样才能做到更好的iOS代码混淆? 总结 参考资料 1. 什么是iOS代码混淆? 代码混淆是指将程序中的方法名、属…...
基于Eigen的位姿转换
位姿中姿态的表示形式有很多种,比如:旋转矩阵、四元数、欧拉角、旋转向量等等。这里基于Eigen实现四种数学形式的相互转换功能。本文利用Eigen实现上述四种形式的相互转换。我这里给出一个SE3(4*4)(先平移、再旋转)的构建方法&…...
Jmeter之Bean shell使用详解
一、什么是Bean Shell BeanShell是一种完全符合Java语法规范的脚本语言,并且又拥有自己的一些语法和方法;BeanShell是一种松散类型的脚本语言(这点和JS类似); BeanShell是用Java写成的,一个小型的、免费的、可以下载的、嵌入式的Java源代码解释器,具有对象脚本语言特性,非常精…...
TCP/IP(八)TCP的连接管理(五)四次握手
一 tcp连接断开 每一个TCP报文的超时重传都由一个特定的内核参数来控制 ① 四次握手的过程 遗留: 谁先发送FIN包,一定是client吗? --> upload和download补充: 主动和被动断开连接的场景 "四次握手过程描述" F --> FIN --> F…...
MyBatis-Plus主键生成策略[MyBatis-Plus系列] - 第491篇
历史文章(文章累计490) 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 …...
Spring——和IoC相关的特性
目录 IoC中Bean的生命周期 实例化(Instantiation) 属性注入(Populate Properties) 初始化(Initialization) 使用(Bean in Use) 销毁(Destruction) Laz…...
在 TensorFlow 中调试
如果调试是消除软件错误的过程,那么编程一定是添加错误的过程。Edsger Dijkstra。来自 https://www.azquotes.com/quote/561997 一、说明 在这篇文章中,我想谈谈 TensorFlow 中的调试。 在之前的一些帖子(此处、此处和此处)中&…...
想要精通算法和SQL的成长之路 - 连续的子数组和
想要精通算法和SQL的成长之路 - 连续的子数组和 前言一. 连续的子数组和1.1 最原始的前缀和1.2 前缀和 哈希表 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 连续的子数组和 原题链接 1.1 最原始的前缀和 如果这道题目,用前缀和来算,我们的思路…...
【C++】头文件chrono
2023年10月16日,周一晚上 当前我只是简单的了解了一下chrono 以后可能会深入了解chrono并更新文章 目录 功能原理头文件chrono中的一些类头文件chrono中的数据类型一个简单的示例程序小实验:证明a的效率比a高 功能 这个chrono头文件是用来处理时间的…...
Python学习六
前言:相信看到这篇文章的小伙伴都或多或少有一些编程基础,懂得一些linux的基本命令了吧,本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python:一种编程语言&…...
Springboot 集成 WebSocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接…...
谨以此篇,纪念我2023年曲折的计算机保研之路
目录 阶段一:迷茫阶段二:准备个人意愿保研材料准备套磁老师5.1日 浙大线上编程测试5.8日 浙大线上面试 —— 一面5.17日 浙大线上面试——二面5.29日 实验室面试结果5.27日 南开线上面试6.20日 华师电话面试 阶段三:旅途北航CS(6.…...
VSS、VDD、VBAT、VSSA
引言 在学习设计TM32时,发现芯片除了GPIO引脚外还会引出许多引脚,以STM32F407ZGT6为例除了GPIO引脚还会有以下引脚 如VSS、VDD、VBAT、VSSA、NRST、VREF、VDDA、VCAP_1、VCAP_2、PDR_ON这些引脚。他们有何作用,电路设计中应如何连接&#x…...
医院的网络推广方案/seo工作内容
首先,1.x和2.x的生命周期钩子对比: 钩子函数的树状图,红色的是我们可以利用的函数,绿色的是函数解析,蓝色的是函数执行时机 <!DOCTYPE html> <html><head><title></title><script ty…...
wordpress小蜜蜂/安徽网站关键词优化
程序实现功能:用户登陆注册,客房预订,房间查询,房间报修,房间退订,以维修部身份登陆查看房间损坏情况。。。功能简单,适合初学者拿来练手,或者大学有Android程序设计课的同学用来当个…...
修改wordpress语言/seo优化是什么职业
PHP用超级全局变量数组$_FILES来记录文件上传相关信息的,在php文件上传之前,可通过调节php.ini中相关配置指令,来控制上传相关细节。1.file_uploadson/off是否允许通过http方式上传文件2.max_execution_time30允许脚本最大执行时间࿰…...
重庆可做网站 APP/网络快速推广渠道
目录如下参考文献返回规则用户自定义类型测试2参考文献 https://docs.python.org/3/reference/expressions.html#grammar-token-and-test 原文内容: 在bool运算时,以下五种类型被当作 False,其余的都当作True False None numeric zero of al…...
网站首页怎么做ps/百度营消 营销推广
注意:属于转载很多人对于PCB走线的参考平面感到迷惑,经常有人问:对于内层走线,如果走线一侧是VCC,另一侧是GND,那么哪个是参考平面?要弄清楚这个问题,必须对了解传输线的概念。我们知…...
织梦可以做商城网站吗/企业网站seo服务
来源 | https://github.com/Wscats/articles最近在整理 JavaScript 的时候发现遇到了很多面试中常见的面试题,本文主要是在 Github 等各大论坛收录的 JavaScript 相关知识和一些相关面试题时所做的笔记,现在分享给大家,希望对 JavaScript 可以…...