做网站需要用socket吗/营销网络的建设有哪些
文章目录
- 一、安装
- 二、入门
- 1.分类
- 2.核心概念
- 3.工作原理
- 4.六大模式
- 三、模式一:"Hello World!"
- 1.依赖
- 2.生产者代码
- 3.消费者代码
- 四、模式二:Work Queues
- 1.工作原理
- 2.工具类代码:连接工厂
- 3.消费者代码
- 4.生产者代码
- 5.分发策略
- 不公平分发
- 预取值
- 五、保障消息不丢失的措施
- 1.消息应答
- 修改消费者代码
- 2.持久化
- 3.发布确认
- 单个发布确认(同步)
- 批量发布确认(同步)
- 异步发布确认
- 三种方式对比
- 六、模式三:Publish/Subscribe
- 消费者代码
- 生产者代码
- 七、模式四:Routing
- 修改消费者代码
参考:
官网
教程
本博客完整demo仓库:跳转
- 本文的几个名词解释:
消息队列=工作队列=mq=RabbitMQ
生产者=消息的发送方
消费者=工作线程=消息的消费方
exchange=交换机
binding=绑定
一、安装
建议使用docker安装,方便、快速、易上手。
# 查找镜像
docker search rabbitmq# pull最新版rabbitmq的镜像
docker pull rabbitmq# 运行 15672:15672和5673:5672 是外:内端口映射,15672是管理端口,5673是通信端口
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq# 查看容器id
docker ps -a# 设置容器开机自启
docker update --restart=always 容器id# 进入容器内部 rabbit是上面--name参数值
docker exec -it rabbit bash# 开启插件管理
rabbitmq-plugins enable rabbitmq_management
访问:http://ip地址:15672
二、入门
MQ即Message Queue,消息队列,本质上是一个FIFO的队列。也是一种跨进程的通信机制,用于消息的发送方、接收方的逻辑解耦和物理解耦。
- 三大特点:
- 流量削峰:对访问进行排队,限制上限。缺点是访问的速度会稍微下降,优点是服务器不会宕机。
- 应用解耦:不使用MQ,当一个子系统异常时,请求将无法正常完成;有了MQ起了一定的延时作用,并暂时保存请求,等到子系统恢复正常就可以处理请求。
- 异步:A有一个调用B的操作,但是B处理的很慢,此时A调用完了B可以去干别的,等B执行完了会发一条消息给MQ,MQ将消息转发给A。
1.分类
1 ActiveMQ
单机吞吐量万级,时效性毫秒级,可用性高(主从架构),消息很难丢失。
缺点:太老了,官方维护很少2 Kafka
现在使用很多,大数据的杀手锏,吞吐量为百万级TPS,在数据的采集、传输、存储的过程中发挥着作用。可靠性高(数据有备份),分布式。
缺点:单机超过64个队列服务器性能会迅速降低,实时性取决于轮询间隔,消息失败不支持重试,社区更新较慢。
适用:大数据量的大型项目。3 RocketMQ
基于kafka的改进,单机吞吐量十万级,分布式架构,可靠性很高,解决了Kafka队列增多后服务器性能迅速下降的问题。
缺点:支持的客户端语言不多,仅有java和c++,其中c++不成熟,社区活跃度一般。
适用:可靠性要求高,比如金融互联网。4 RabbitMQ
吞吐量达到万级,支持多种语言,erlang语言提供了高并发性能,社区活跃度高。
缺点:商业版收费
适用:数据量没那么大的中小型项目,时效性、并发性都很好。
2.核心概念
1 生产者
2 消费者
3 交换机
4 队列
3.工作原理
一些说明:
connection:生产者/消费者和RabbitMQ之间的TCP连接
channel:信道,创建一次connection的代价较大,因此就创建一个connection,其中有多个channel(逻辑连接,轻量级的connection),每次通信只占用一个channel。
4.六大模式
第五种我不讲,说白了3、4、5的具体工作步骤都差别不大,只是绑定路由、发送消息有细微的差别而已。
第六种这篇博客也不说,下一篇再更。
三、模式一:“Hello World!”
这是RabbitMQ中最简单的一个模式,通过它来上手消息队列是不错的。
首先确保服务器上的rabbitMQ已经跑起来了:
然后用idea创建一个spring boot项目,我命名为了hello-world。然后配置好了项目的maven仓库。
1.依赖
<!-- rabbitMQ客户端依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!-- 操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
2.生产者代码
package tracy.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {//队列名称public static final String QUEUE_NAME = "hello-world";//生产消息public static void main(String[] args) throws TimeoutException, IOException {//1 创建一个连接工厂ConnectionFactory factory=new ConnectionFactory();//2 设置工厂ip等信息 连接rabbitMQfactory.setHost("rabbitMQ的ip地址");factory.setPort(5673);//通信端口factory.setUsername("guest");//默认的用户名factory.setPassword("guest");//默认的密码//3 创建连接Connection connection=factory.newConnection();//4 获取信道Channel channel=connection.createChannel();//5 生成一个队列,参数挨个为:队列名,是否持久化,是否允许多消费者消费,其他参数channel.queueDeclare(QUEUE_NAME,false,false,false,null);//6 发消息,参数挨个为:交换机,路由的key值,其他参数,发送的消息channel.basicPublish("",QUEUE_NAME,null,"hello world111".getBytes());}
}
运行main方法之后,访问http://ip:15672/
管理端即可看到生产的消息:
3.消费者代码
package tracy.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {//队列名称public static final String QUEUE_NAME = "hello-world";//接收消息public static void main(String[] args) throws TimeoutException, IOException {//1 创建一个连接工厂ConnectionFactory factory=new ConnectionFactory();//2 设置工厂ip等信息 连接rabbitMQfactory.setHost("rabbitMQ的ip");factory.setPort(5673);//通信端口factory.setUsername("guest");//默认的用户名factory.setPassword("guest");//默认的密码//3 创建连接Connection connection=factory.newConnection();//4 获取信道Channel channel=connection.createChannel();//5 消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback=consumerTag-> System.out.println("消费被中断");//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
同时执行生产者、消费者的main方法:
消费成功。
四、模式二:Work Queues
这种模式主要针对有大量资源密集型任务频繁提交的情况,创建的工作队列将会对提交的任务进行弹出和分发,让后台的一个或多个工作线程来处理这些任务,从而达到缓解高并发的目的。
下面通过实战来学习这种模式。
1.工作原理
- 轮询机制
由于工作队列中的消息只能被消费一次,因此消息队列会对消费者们进行轮询,为了公平挨个分配消息。
2.工具类代码:连接工厂
考虑到在模式一中生产者、消费者中有一段代码是重复的:从创建连接工厂->获取信道,为了更好地提高代码的简洁性和复用性,可以将这部分代码抽取出来。
package tracy.workqueues.utils;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class MQutils {//获取一个channelpublic static Channel getChannel() throws IOException, TimeoutException {//1 创建一个连接工厂ConnectionFactory factory=new ConnectionFactory();//2 设置工厂ip等信息 连接rabbitMQfactory.setHost("rabbitMQ的ip");factory.setPort(5673);//通信端口factory.setUsername("guest");//默认的用户名factory.setPassword("guest");//默认的密码//3 创建连接Connection connection=factory.newConnection();//4 获取信道return connection.createChannel();}
}
3.消费者代码
基于多线程实现消费者:
package tracy.workqueues;import com.rabbitmq.client.*;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者 工作线程
public class Consumer implements Runnable{//队列名称public static final String QUEUE_NAME="queue2.0";@Overridepublic void run() {try {Channel channel = MQutils.getChannel();//消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));};//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag-> {});} catch (Exception e) {e.printStackTrace();}}//接收消息public static void main(String[] args) throws TimeoutException, IOException {//启动两个消费者/工作线程Runnable runnable= new Consumer();new Thread(runnable).start();new Thread(runnable).start();}
}
4.生产者代码
通过循环的方式发送多条消息:
package tracy.workqueues;import com.rabbitmq.client.Channel;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {//队列名称public static final String QUEUE_NAME="queue2.0";//生产消息public static void main(String[] args) throws TimeoutException, IOException {Channel channel=MQutils.getChannel();//生成一个队列,参数挨个为:队列名,是否持久化,是否允许多消费者消费,其他参数channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息,参数挨个为:交换机,路由的key值,其他参数,发送的消息//发送大量消息for(int i=0;i<20;++i){String message="message"+i;channel.basicPublish("",QUEUE_NAME,null,message.getBytes());}System.out.println("生产者发送完成!");}
}
- 测试:
启动生产者和消费者之后得到执行结果:
虽然消息的消费看上去是乱序的,这是由于多线程的异步导致的,但仔细看就会发现两个线程是交替消费的。
5.分发策略
不公平分发
前面的分发机制都是轮询分发,实际上这样的机制并不公平,因为不同的消费者处理消息的能力不一定是一样的,有的配置好一些,消费就会更快。因此需要引入不公平分发机制。
- 修改消费者代码:
添加channel.basicQos(1);
package tracy.workqueues;import com.rabbitmq.client.*;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者 工作线程
public class Consumer implements Runnable{//队列名称public static final String QUEUE_NAME="queue2.0";@Overridepublic void run() {try {Channel channel = MQutils.getChannel();//设置分发策略为不公平分发channel.basicQos(1);//消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));//接受成功后 确认应答,第二个参数表示是否批量应答,这里是false表明只确认当前消息channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag-> {});//第二个参数改成了false} catch (Exception e) {e.printStackTrace();}}//接收消息public static void main(String[] args) throws TimeoutException, IOException {//启动两个消费者/工作线程Runnable runnable= new Consumer();new Thread(runnable).start();new Thread(runnable).start();}
}
在这种机制下,消费能力强的消费者会被分发更多消息。
预取值
说白了就是按比例分发的策略,比如指定10条消息分6条给消费者1,分4条给消费者2。
将消费者中 channel.basicQos(1);
中的1改成大于1的数字就行了。
五、保障消息不丢失的措施
1 消息应答:消息从队列发送出去之后可能会丢失,通过开启消息应答来保障
2 持久化:将队列、消息保存到磁盘上
3 发布确认:消息发送到队列前消息可能会丢失,通过开启发布确认来保障
1.消息应答
上面的模式有一个问题:消费者在消费消息的过程中宕机了,会造成该任务消息的丢失。为了解决这种任务的丢失,引入消息应答机制。
简单来说:消息应答就是,消费者消费完一个消息之后告诉mq它已经消费完了,mq再把队列中的该消息删除,这样就不会导致消息的丢失。
- 自动应答(默认,但是不建议):
自动应答说白了就是,只要消费者接收到了消息就进行应答,但它后面能否被成功消费完就不管了。这种方式牺牲了一定的可靠性,但能保证高吞吐量。
- 手动应答:
Channel.basicAck 肯定确认,mq会认为该消息消费成功并删除该消息,支持批量应答
Channel.basicNack 否定确认,mq会认为该消息消费不成功,支持批量应答
Channel.basicReject 否定确认,mq会认为该消息消费不成功,不支持批量应答
没有被确认应答的消息会自动重新入队,因此消息不会丢失。
修改消费者代码
package tracy.workqueues;import com.rabbitmq.client.*;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者 工作线程
public class Consumer implements Runnable{//队列名称public static final String QUEUE_NAME="queue2.0";@Overridepublic void run() {try {Channel channel = MQutils.getChannel();//消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));//接受成功后 确认应答,第二个参数表示是否批量应答,这里是false表明只确认当前消息channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag-> {});} catch (Exception e) {e.printStackTrace();}}//接收消息public static void main(String[] args) throws TimeoutException, IOException {//启动两个消费者/工作线程Runnable runnable= new Consumer();new Thread(runnable).start();new Thread(runnable).start();}
}
2.持久化
- 队列的持久化
修改生产者中声明队列的语句:
//生成一个队列,参数挨个为:队列名,是否持久化,是否允许多消费者消费,其他参数
//第二个参数改为true表示将队列持久化
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
- 消息的持久化:
修改生产者中发布消息的语句:
//第三个参数声明为MessageProperties.PERSISTENT_TEXT_PLAIN表明将消息进行保存到磁盘上
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
不过持久化不是一定成功,因为原本是保存到内存中,设置之后将保存到磁盘上,降低了丢失的可能性。
3.发布确认
单个发布确认(同步)
发一条确认一条,确认了上一条再发下一条,效率不高。
- 生产者代码:
package tracy.workqueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {//队列名称public static final String QUEUE_NAME="queue3.0";//生产消息public static void main(String[] args) throws TimeoutException, IOException, InterruptedException {Channel channel=MQutils.getChannel();//开启发布确认channel.confirmSelect();//声明队列时,开启队列持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);//记录发布前后耗时long start=System.currentTimeMillis();for(int i=0;i<1000;++i){String message="message"+i;//发布消息时开启消息 持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());//单个发布确认 每发布一条 都等待确认channel.waitForConfirms();}long end=System.currentTimeMillis();System.out.println("生产者发送完成,耗时:"+(end-start)/1000.0d+"s");}
}
- 消费者代码:
package tracy.workqueues;import com.rabbitmq.client.*;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者 工作线程
public class Consumer implements Runnable{//队列名称public static final String QUEUE_NAME="queue3.0";@Overridepublic void run() {try {Channel channel = MQutils.getChannel();channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(Thread.currentThread().getName()+":"+new String(message.getBody()));//确认应答,只确认当前消息channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//关闭自动确认应答channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag-> {});} catch (Exception e) {e.printStackTrace();}}//接收消息public static void main(String[] args) throws TimeoutException, IOException {//启动两个消费者/工作线程Runnable runnable= new Consumer();new Thread(runnable).start();new Thread(runnable).start();}
}
- 运行:
运行生产者、消费者之后,显示生产者发送消息用时20-30s左右。
批量发布确认(同步)
发布一批消息,然后批量确认一次。优点是比单个发布确认快,缺点是一旦这一批消息出了问题,无法得知是哪个消息出了问题,这一批消息都得重发。
- 修改生产者代码:
package tracy.workqueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {//队列名称public static final String QUEUE_NAME="queue3.0";//生产消息public static void main(String[] args) throws TimeoutException, IOException, InterruptedException {...int batch_size=100;//每100条确认一次for(int i=0;i<1000;++i){String message="message"+i;//发布消息时开启消息 持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());//每100条等待确认一次if((i+1)%batch_size==0)channel.waitForConfirms();}...}
}
消费者代码不改变。
- 运行:
多次运行,时间都在1s内,性能的提升非常明显。
异步发布确认
生产者向工作队列发布了消息之后,不用等待确认就继续发布了;而工作队列收到生产者发送过来的消息之后,通过回调函数的形式告诉生产者哪些消息发布成功了,哪些发布失败了,发布失败的需要重新发布。
异步的引入对效率的提升很大。不过这种机制需要给消息进行编号,底层数据结构是基于哈希表。
- 生产者AsynProducer代码:
注意,我把队名给改了,消费者那边也要改一下队名。
package tracy.workqueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import tracy.workqueues.utils.MQutils;import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;//生产者 基于异步发布
public class AsynProducer {//队列名称public static final String QUEUE_NAME="queue4.0";//生产消息public static void main(String[] args) throws TimeoutException, IOException, InterruptedException {Channel channel=MQutils.getChannel();//开启发布确认channel.confirmSelect();//声明队列时,开启队列持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);//声明一个线程安全的哈希表,用来存放发送失败的消息ConcurrentSkipListMap<Long,String> failedMessage=new ConcurrentSkipListMap<>();//消息监听器,哪些成功了,哪些失败了ConfirmCallback ackCallback=(deliveryTag,multiple)->{//将发布成功的从failedMap中删除,剩下的就是发布未成功的if(multiple){//批量failedMessage.headMap(deliveryTag).clear();}else{//单个failedMessage.remove(deliveryTag);}};ConfirmCallback nackCallback=(deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//第一个参数为发布成功的回调,第二个为发布失败的回调,需要手动实现channel.addConfirmListener(ackCallback,nackCallback);//记录发布前后耗时long start=System.currentTimeMillis();for(int i=0;i<1000;++i){String message="message"+i;//将所有消息存储在failedMap中failedMessage.put(channel.getNextPublishSeqNo(),message);//发布消息时开启消息 持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}long end=System.currentTimeMillis();System.out.println("生产者发送完成,耗时:"+(end-start)/1000.0d+"s");}
}
- 运行:
快的飞起!异步yyds
三种方式对比
第三种是最快的,性能好,能充分利用CPU资源;第一种是最慢的,性能比较差;第二种介于二者之间,但是出了问题很难定位具体出问题的消息。
六、模式三:Publish/Subscribe
在上一种模式中,我们创建了一个工作队列。工作队列背后的假设是每个消息都是只交付给一名消费者。在这一部分中,我们将做一些事情完全不同的:向多个消费者传递消息。此模式称为发布/订阅。
这种模式需要指定exchange的类型为fanout,exchange将把收到的消息广播到所有队列中。
消费者代码
两个消费者除了上面的三个常量有所不同,下面的代码都是一样的。但是为了模拟真实情况,我还是写了两个消费者。
package tracy.subscribe.fanout;import com.rabbitmq.client.*;
import tracy.subscribe.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者1
public class Consumer1{public static final String QUEUE_NAME1="q1";//队列名称public static void main(String[] args) throws TimeoutException, IOException {//获取信道Channel channel=MQutils.getChannel();System.out.println("消费者1等待接收消息...");//消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(new String(message.getBody()));//消息应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=consumerTag-> System.out.println("消费被中断");//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME1,false,deliverCallback,cancelCallback);}
}
package tracy.subscribe.fanout;import com.rabbitmq.client.*;
import tracy.subscribe.utils.MQutils;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消费者2
public class Consumer2{public static final String QUEUE_NAME1="q2";//队列名称public static void main(String[] args) throws TimeoutException, IOException {//获取信道Channel channel=MQutils.getChannel();System.out.println("消费者2等待接收消息...");//消费消息DeliverCallback deliverCallback=(consumerTag, message)->{System.out.println(new String(message.getBody()));//消息应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=consumerTag-> System.out.println("消费被中断");//参数挨个为:队列,消费成功后是否要自动应答,成功消费的回调,未成功消费的回调channel.basicConsume(QUEUE_NAME1,false,deliverCallback,cancelCallback);}
}
生产者代码
package tracy.subscribe.fanout;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import tracy.subscribe.utils.MQutils;import java.io.IOException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static final String EXCHANGE_NAME="fanout_exchange";//交换机名称public static final String QUEUE_NAME1="q1";//队列名称public static final String QUEUE_NAME2="q2";//队列名称public static void main(String[] args) throws TimeoutException, IOException, InterruptedException {//获取信道Channel channel=MQutils.getChannel();//开启发布确认channel.confirmSelect();//声明一个exchange,参数为名称和类型channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明两个队列channel.queueDeclare(QUEUE_NAME1,true,false,false,null);channel.queueDeclare(QUEUE_NAME2,true,false,false,null);//绑定交换机与队列,参数为队列名、交换机名,routingKey,考虑到这是广播模式,不写routingKeychannel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"");channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"");//异步发布确认ConcurrentSkipListMap<Long,String> failedMessage=new ConcurrentSkipListMap<>();ConfirmCallback ackCallback=(deliveryTag, multiple)->{if(multiple){failedMessage.headMap(deliveryTag).clear();}else{failedMessage.remove(deliveryTag);}};ConfirmCallback nackCallback=(deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};channel.addConfirmListener(ackCallback,nackCallback);//发布消息for(int i=0;i<20;++i) {String message="message"+i;//将所有消息存储在failedMap中failedMessage.put(channel.getNextPublishSeqNo(),message);//发布消息时开启消息 持久化channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}System.out.println("生产者发送完毕");}
}
跑一下看看是否两个消费者都能消费20条消息。
七、模式四:Routing
生产者在发布消息时,通过指定routingKey来把让exchange把消息bingding给指定的队列。
这种模式需要把exchange类型指定为direct。
修改消费者代码
其他的在上一章基础上没有发生变化,主要就是生产者binding、消息发送的过程有一些变化。
package tracy.subscribe;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.MessageProperties;
import tracy.subscribe.utils.MQutils;import java.io.IOException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;//生产者
public class Producer {public static final String EXCHANGE_NAME="direct_exchange";//交换机名称public static final String QUEUE_NAME1="q1";//队列名称public static final String QUEUE_NAME2="q2";//队列名称public static final String RKEY1_1="orange";//与q1绑定的routingKeypublic static final String RKEY2_1="black";//与q2绑定的routingKeypublic static final String RKEY2_2="green";//与q2绑定的routingKeypublic static void main(String[] args) throws TimeoutException, IOException, InterruptedException {//获取信道Channel channel=MQutils.getChannel();//开启发布确认channel.confirmSelect();//声明一个exchange,参数为名称和类型channel.exchangeDeclare(EXCHANGE_NAME,"direct");//声明两个队列channel.queueDeclare(QUEUE_NAME1,true,false,false,null);channel.queueDeclare(QUEUE_NAME2,true,false,false,null);//绑定交换机与队列,参数为队列名、交换机名,routingKeychannel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,RKEY1_1);channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,RKEY2_1);channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,RKEY2_2);//异步发布确认ConcurrentSkipListMap<Long,String> failedMessage=new ConcurrentSkipListMap<>();ConfirmCallback ackCallback=(deliveryTag, multiple)->{if(multiple){failedMessage.headMap(deliveryTag).clear();}else{failedMessage.remove(deliveryTag);}};ConfirmCallback nackCallback=(deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};channel.addConfirmListener(ackCallback,nackCallback);//发布消息for(int i=0;i<20;++i) {String message="message"+i;//将所有消息存储在failedMap中failedMessage.put(channel.getNextPublishSeqNo(),message);//发布消息时开启消息 持久化if((i+1)%3==0)channel.basicPublish(EXCHANGE_NAME,RKEY1_1, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());else if((i+1)%3==1)channel.basicPublish(EXCHANGE_NAME,RKEY2_1, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());else channel.basicPublish(EXCHANGE_NAME,RKEY2_2, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}System.out.println("生产者发送完毕");}
}
相关文章:

RabbitMQ核心内容:实战教程(java)
文章目录一、安装二、入门1.分类2.核心概念3.工作原理4.六大模式三、模式一:"Hello World!"1.依赖2.生产者代码3.消费者代码四、模式二:Work Queues1.工作原理2.工具类代码:连接工厂3.消费者代码4.生产者代码5.分发策略不公平分发预…...

RK356x U-Boot研究所(命令篇)3.7 pci与nvme命令的用法
平台U-Boot 版本Linux SDK 版本RK356x2017.09v1.2.3文章目录 一、设备树与config配置二、pci命令的定义三、nvme命令的定义四、pci与nvme命令的用法3.1 pci总线扫描3.2 nvme设备信息3.3 nvme设备读写一、设备树与config配置 RK3568支持PCIe接口,例如ROC-RK3568-PC: 原理图如…...

微信头像昵称获取能力的变化导致了我半年没更新小程序
背景 2022年9月份,微信更改了获取头像昵称的规则,回收了原有 wx.getUserProfile 中的部分能力,为了减小对【微点记账】小程序的影响,长达半年未做任何更新,今天为了增加这个聊天机器人的功能,不得不重新查…...

【深度学习编译器系列】1. 为什么需要深度学习编译器?
本系列是自学深度学习编译器过程中的一些笔记和总结,参考文献在文末。 1. 概述 深度学习(DL)编译器的产生有两方面的因素:深度学习模型的广泛应用,以及深度学习芯片的层出不穷。 一方面,我们现在有非常多…...

数据结构与算法总结整理(超级全的哦!)
数据结构与算法基础大O表示法时间复杂度大O表示法时间复杂度排序:最坏时间复杂度时间复杂度的几条基本计算规则内存工作原理什么是内存内存主要分为三种存储器随机存储器(RAM)只读存储器(ROM)高速缓存(Cach…...

DPDK — MALLOC 堆内存管理组件
目录 文章目录 目录MALLOC 堆内存管理组件rte_malloc() 接口malloc_heap 结构体malloc_elem 结构体内存初始化流程内存申请流程内存释放流程MALLOC 堆内存管理组件 MALLOC(堆内存管理组件)基于 hugetlbfs 内核文件系统来实现,能够从 HugePage 中分配一块连续的物理大页内存…...

分享113个HTML艺术时尚模板,总有一款适合您
分享113个HTML艺术时尚模板,总有一款适合您 113个HTML艺术时尚模板下载链接:https://pan.baidu.com/s/1ReoPNIRjkYov-SjsPo0vhg?pwdjk4a 提取码:jk4a Python采集代码下载链接:采集代码.zip - 蓝奏云 女性化妆用品网页模板 粉…...

2023年美赛C题Wordle预测问题一建模及Python代码详细讲解
相关链接 (1)2023年美赛C题Wordle预测问题一建模及Python代码详细讲解 (2)2023年美赛C题Wordle预测问题二建模及Python代码详细讲解 (3)2023年美赛C题Wordle预测问题三、四建模及Python代码详细讲解 &…...

小米12s ultra,索尼xperia1 iv,数码相机 拍照对比
首先说明所有的测试结果和拍摄数据我放到百度网盘了(地址在结尾) 为什么做这个测试 我一直想知道现在的手机和相机差距有多大,到底差在哪儿? 先说结论: 1.1英寸的手机cmos(2022年) 6年前(2016)的入门款相机(m43画幅) 2.手机 不能换镜头,只能在特定的拍摄距离才能发挥出全…...

C++笔记 模板的进阶知识
目录 1. 非类型模板参数 2.模板的特化 2.1 函数模板的特化 2.2 类模板的特化 2.2.1 全特化 2.2.2 偏特化 3.模板的分离编译 3.1 什么是分离编译? 3.2 模板的分离编译 4.模板的总结 模板的初阶内容:(594条消息) C模板的原理和使用_全貌的博客-CSD…...

基于 Debain11 构建 asp.net core 6.x 的基础运行时镜像
基于 Debain11 构建 asp.net core 6.x 的基础运行时镜像Linux 环境说明Debian 简介Debian 发行版本关于 Debian 11Linux 常用基础工具Dockerfile 中 RUN 指令RUN 语法格式RUN 语义说明编写 Dockerfile 构建 Runtime 基础镜像ASP.NET Core Runtime 基础镜像Dockerfile 编写Windo…...

【无人机路径规划】基于IRM和RRTstar进行无人机路径规划(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

Spring Boot中使用@Autowire装配接口是怎么回事?
在学习使用Spring Boot框架时候,发现了一个特别的现象UserMapper是一个接口,在另一个类中好像直接使用Autowired装配了一个UserMapper对象???我纳闷了一会儿,接口居然可以直接实例对象吗?根据我…...

23种设计模式介绍(Python示例讲解)
文章目录一、概述二、设计模式七种原则三、设计模式示例讲解1)创建型模式1、工厂模式(Factory Method)【1】简单工厂模式(不属于GOF设计模式之一)【2】工厂方法模式2、抽象工厂模式(AbstractFactory&#x…...

初识Hadoop,走进大数据世界
文章目录数据!数据!遇到的问题Hadoop的出现相较于其他系统的优势关系型数据库网格计算本文章属于Hadoop系列文章,分享Hadoop相关知识。后续文章中会继续分享Hadoop的组件、MapReduce、HDFS、Hbase、Flume、Pig、Spark、Hadoop集群管理系统以及…...

加油站会员管理小程序实战开发教程14 会员充值
我们上篇介绍了会员开卡的业务,开卡是为了创建会员卡的信息。有了会员卡信息后我们就可以给会员进行充值。当然了充值这个业务是由会员自主发起的。 按照我们的产品原型,我们在我的页面以轮播图的形式循环展示当前会员的所有卡信息。这个会员卡信息需要先用变量从数据源读取…...

leetcode 1792. 最大平均通过率
一所学校里有一些班级,每个班级里有一些学生,现在每个班都会进行一场期末考试。给你一个二维数组 classes ,其中 classes[i] [passi, totali] ,表示你提前知道了第 i 个班级总共有 totali 个学生,其中只有 passi 个学…...

15-基础加强-2-xml(约束)枚举注解
文章目录1.xml1.1概述【理解】(不用看)1.2标签的规则【应用】1.3语法规则【应用】1.4xml解析【应用】1.5DTD约束【理解】1.5.1 引入DTD约束的三种方法1.5.2 DTD语法(会阅读,然后根据约束来写)1.6 schema约束【理解】1.6.1 编写schema约束1.6.…...

13:高级篇 - CTK 事件管理机制(signal/slot)
作者: 一去、二三里 个人微信号: iwaleon 微信公众号: 高效程序员 在《12:高级篇 - CTK 事件管理机制(sendEvent/postEvent)》一文中,我们介绍了如何进行插件间通信 - sendEvent()/postEvent() + ctkEventHandler。然而,除了这种方式之外,EventAdmin 还提供了另一种方…...

群晖-第1章-IPV6的DDNS
群晖-第1章-IPV6的DDNS 方案:腾讯云群晖DS920 本文参考群晖ipv6 DDNS-go教程-牧野狂歌,感谢原作者的分享。 这篇文章只记录了我需要的部分,其他的可以查看原文,原文还记录了更多的内容,可能帮到你。 一、购买域名 …...

centos7系统-kubeadm安装k8s集群(v1.26版本)亲测有效,解决各种坑可供参考
文章目录硬件要求可省略的步骤配置虚拟机ip设置阿里镜像源各服务器初始化配置配置主节点的主机名称配置从节点的主机名称配置各节点的Host文件关闭各节点的防火墙关闭selinux永久禁用各节点的交换分区同步各节点的时间将桥接的IPv4流量传递到iptables的链(三台都执行…...

帮助指令 man ,help及文档常用管理指令
帮助指令 man,help 1. man 当我们想要了解某个命令如何使用,及选项的含义是什么以及配置文件的帮助信息时,可以使用 man [命令或配置文件],这样便可以获得到帮助提示信息了。 语法格式:man [命令或者配置文件] 比如…...

电子科技大学操作系统期末复习笔记(五):文件管理
目录 前言 文件管理:基础 基本概念 文件 文件系统 文件系统的实现模型 文件的组成 文件名 文件分类 文件结构 逻辑结构 物理结构 练习题 文件管理:目录 文件控制块FCB FCB:File Control Block FCB信息 目录 基本概念 目…...

SpringBoot+ActiveMQ-发布订阅模式(生产端)
SpringBootActiveMQ-发布订阅模式(生产端)Topic 主题* 消息消费者(订阅方式)消费该消息* 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息* 这种方式和点对点方式不同…...

Android实例仿真之三
目录 四 Android架构探究 五 大骨架仿真 六 Android实例分析思路拓展 四 Android架构探究 首先,Android系统所带来的好处,就在于它本身代码的开放性,这提供了一个学习、借鉴的平台。这对分析仿真而言,本身就是一大利好…...

关于MySQL的limit优化
1、前提 提示:只适用于InnoDB引擎 2、InnoDB存储特点 它把索引和数据放在了一个文件中,就是聚集索引。这与MyISAM引擎是不一样的。 3、SQL示例 -- 给cve字段建立索引 select * from cnnvd where cveCVE-2022-24808 limit 300000,10;由于M…...

Java-Stream流基本使用
collection.stream将会破坏原有的数据结构,可以通过collect方法收集,可以用Collectors提供的构造器,add等方法构造形成新的数据结构。 HashSet<List<Integer>> rs new HashSet<>(); rs.stream().toList();Collection集合转…...

Liunx(狂神课堂笔记)
一.常用命令 1. cd 切换目录 cd ./* 当前目录cd /* 绝对路径cd .. 返回上一级目录cd ~ 回到当前目录pwd …...

【史上最全面esp32教程】点灯大师篇
文章目录前言ESP32简介认识arduino的两个函数点灯步骤函数介绍LED灯闪烁流水灯总结前言 esp32有很多的功能,例如wifi,蓝牙等,这节我们学习最简单的点灯。 提示:以下是本篇文章正文内容,下面案例可供参考 ESP32简介 …...

【Java 面试合集】内存中为什么要区分栈和堆
内存中为什么要区分栈和堆 1. 概述 嗨,大家好Java 面试合集又来了,今天我们分享的主题很大众化以及普通。无论是Java 语言本身还是别的语言都会有所涉及,但是今天我们从Java角度来讲下 2. 分析 今天我们会从多个方向来分享这个话题 2.1 栈是…...