当前位置: 首页 > news >正文

RabbitMQ的6种工作模式

RabbitMQ的6种工作模式

官方文档:

http://www.rabbitmq.com/

https://www.rabbitmq.com/getstarted.html

RabbitMQ 常见的 6 种工作模式:
在这里插入图片描述

1、simple简单模式

在这里插入图片描述

1)、消息产生后将消息放入队列。

2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。

3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。

4)、应用场景:聊天(中间有一个过度的服务器)。

5)、代码实现:

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>rabbitmq-java</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies></project>

工具类

package com.example;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {// 连接rabbitmq服务,共享一个工厂对象private static ConnectionFactory factory;static {factory=new ConnectionFactory();//设置rabbitmq属性factory.setHost("127.0.0.1");factory.setUsername("zsx242030");factory.setPassword("zsx242030");factory.setVirtualHost("/");factory.setPort(5672);}public static Connection getConnection(){Connection connection=null;try {//获取连接对象connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}

消息提供者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者

package com.example.simple;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息(消费的是队列,而不是交换机)channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者获得消息为:Hello RabbitMQ!!!

2、work工作模式(资源的竞争)

在这里插入图片描述

1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,

C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。

2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关

(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。

3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到

消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

4)、代码实现:

消息提供者

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//通过通道创建队列channel.queueDeclare("queue1", false, false, false, null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.work;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//监听队列中的消息channel.basicConsume("queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();// connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10

3、publish/subscribe发布订阅(共享资源)

在这里插入图片描述

1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消

息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。

Exchange 有常见以下 3 种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定 routing key 的队列。

  • Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列。

Exchange (交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者

没有符合路由规则的队列,那么消息会丢失。

2)相关场景:邮件群发,群聊天,广播(广告)。

3)、代码实现:

消息提供者

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("fanout_exchange", "fanout");//通过通道创建队列//channel.queueDeclare("queue1",false,false,false,null);//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue1", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue1", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.publishsubscribe;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("fanout_queue2", false, false, false, null);//给队列绑定交换机channel.queueBind("fanout_queue2", "fanout_exchange", "");//监听队列中的消息channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10

4、routing路由模式

在这里插入图片描述

1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路

由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意

绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的

RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列

的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

2)、根据业务功能定义路由字符串。

3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可

以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。

5)、代码实现:

消息提供者

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)// 1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("direct_exchange", "direct");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("direct_exchange",//设置路由键,符合路由键的队列,才能拿到消息"insert",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue1", "direct_exchange", "select");channel.queueBind("direct_queue1", "direct_exchange", "insert");//监听队列中的消息channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.souting;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("direct_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)channel.queueBind("direct_queue2", "direct_exchange", "delete");channel.queueBind("direct_queue2", "direct_exchange", "select");//监听队列中的消息channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

5、topic 主题模式(路由模式的一种)

在这里插入图片描述

1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型

Exchange 可以让队列在绑定 Routing key 的时候使用通配符。

2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。

通配符规则:

# :匹配一个或多个词

*:匹配不多不少恰好1个词

举例:

item.# :能够匹配item.insert.abc或者item.insert

item.* :只能匹配 item.insert

usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

#.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

3)、路由功能添加模糊匹配。

4)、消息产生者产生消息,把消息交给交换机。

5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。

6)、代码实现:

消息提供者

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建public class Provider {public static void main(String[] args) {try {//获取连接对象Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)   //1.参数一:交换机名称    参数二:交换机类型channel.exchangeDeclare("topic_exchange", "topic");//向队列中发送消息for (int i = 1; i <= 10; i++) {channel.basicPublish("topic_exchange",// #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况)   *(匹配一个单词)"emp.hello world",null,("Hello RabbitMQ!!!" + i).getBytes());}//断开连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者1

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer1 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue1", false, false, false, null);//绑定交换机(routingKey:路由键)  #:匹配0-n个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue1", "topic_exchange", "emp.#");//监听队列中的消息channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}

消息消费者2

package com.example.topic;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer2 {public static void main(String[] args) {try {Connection connection = ConnectionUtil.getConnection();//获取通道对象Channel channel = connection.createChannel();//创建队列channel.queueDeclare("topic_queue2", false, false, false, null);//绑定交换机(routingKey:路由键)  *:匹配1个单词(之间以.区分,两点之间算一个单词)channel.queueBind("topic_queue2", "topic_exchange", "emp.*");//监听队列中的消息channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));}});//消费方不需要关闭连接,保持一直监听队列状态// channel.close();//connection.close();} catch (Exception e) {e.printStackTrace();}}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10

6、RPC

在这里插入图片描述

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3)、服务端将RPC方法 的结果发送到RPC响应队列。

4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

5)、代码实现:

Client端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class Client {public static void main(String[] argv) throws IOException, InterruptedException {String message = "Hello World!!!";// 建立一个连接和一个通道,并为回调声明一个唯一的回调队列Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 定义一个临时变量的接受队列名String replyQueueName = channel.queueDeclare().getQueue();// 生成一个唯一的字符串作为回调队列的编号String corrId = UUID.randomUUID().toString();// 发送请求消息,消息使用了两个属性:replyTo和correlationId// 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 发布一个消息,rpc_queue路由规则channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));// 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。// 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//检查它的correlationId是否是我们所要找的那个if (properties.getCorrelationId().equals(corrId)) {//如果是,则响应BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});System.out.println(" 客户端请求的结果:" + response.take());}
}

Server端

package com.example.rpc;import com.example.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;public class Server {public static void main(String[] args) {Connection connection = null;try {connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("rpc_queue", false, false, false, null);channel.basicQos(1);System.out.println("Awaiting RPC requests:");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response = new String(body, "UTF-8");System.out.println("response (" + response + ")");} catch (RuntimeException e) {System.out.println("错误信息 " + e.toString());} finally {// 返回处理结果队列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));// 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};// 取消自动确认boolean autoAck = false;channel.basicConsume("rpc_queue", autoAck, consumer);// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}} catch (Exception e) {e.printStackTrace();} finally {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!

7、发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使

用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将

队列绑定到默认的交换机 。

相关文章:

RabbitMQ的6种工作模式

RabbitMQ的6种工作模式 官方文档&#xff1a; http://www.rabbitmq.com/ https://www.rabbitmq.com/getstarted.html RabbitMQ 常见的 6 种工作模式&#xff1a; 1、simple简单模式 1)、消息产生后将消息放入队列。 2)、消息的消费者监听消息队列&#xff0c;如果队列中…...

MFC第二十六天 CRgn类简介与开发、封装CMemoryDC类并应用开发

文章目录 CRgn类简介与开发CRgn类简介CRgn类区域管理开发CRgn类区域管理与不规则形状的选取 封装CMemoryDC类并应用开发CMemoryDC.h封装CMemoryDC开发游戏透明动画CFlashDlg.hCFlashDlg.cpp 封装CMemoryDC开发游戏动画 附录四大窗口CDC派生类 CRgn类简介与开发 CRgn类简介 CR…...

解决VScode远程服务器时opencv和matplotlib无法直接显示图像的问题

解决VScode远程服务器时opencv和matplotlib无法直接显示图像的问题 1、本方案默认本地已经安装了VScode与MobaXterm2、在服务器端3、在本地端安装MobaXterm4、测试5、opencv显示测试&#xff08;测试过程中需保持MobaXterm开启的状态&#xff09;6、 matplotlib显示测试&#x…...

支付模块功能实现(小兔鲜儿)【Vue3】

支付 渲染基础数据 支付页有俩个关键数据&#xff0c;一个是要支付的钱数&#xff0c;一个是倒计时数据&#xff08;超时不支付商品释放&#xff09; 准备接口 import request from /utils/httpexport const getOrderAPI (id) > {return request({url: /member/order/$…...

php meilisearch demo

# 创建一个meilisearch 使用完自动销毁 docker run -itd --rm -p 7700:7700 getmeili/meilisearch:v1.3docker-compose 参数 version: "3" networks:flyserver:driver: bridge services:search:image: getmeili/meilisearch:v1.3restart: alwaysenvironment:- MEILI…...

芒格之道——查理·芒格股东会讲话1987-2022

你越是认真生活&#xff0c;你的生活就会越美好&#xff01; 这里将读书过程划线的内容摘抄在这里&#xff0c;方便自己回顾。 书分为两部分&#xff0c;我先读了后半部分&#xff0c;而且是从后往前读&#xff0c;到了前半部分&#xff0c;我是从前往后读。书还挺贵&#xff…...

如何运营手游联运平台游戏?

运营手游联运平台游戏需要综合考虑多个方面&#xff0c;包括游戏选择、合作伙伴、市场推广、用户运营等。以下是运营手游联运平台游戏的一些建议&#xff1a; 游戏选择&#xff1a;选择优质的手游&#xff0c;确保游戏的品质和内容能够吸引玩家&#xff0c;满足市场需求。 合…...

vscode连接远程Linux服务器

文章目录 一、环境安装1.1 下载vscode1.2 下载vscode-sever 二、ssh链接2.1 安装Remote-SSH2.2 设置vscode ssh2.3 设置免密登录2.3.1 本地生成公私钥2.3.2 服务器端添加公钥 三、安装插件3.1 vscode安装插件3.1.1 在线安装插件3.1.2.1 下载插件3.1.2.2 安装插件 3.2 vscode-se…...

numpy 转换成 cupy 利用GPU执行 错误

ModuleNotFoundError: No module named cupy._core. routines_sorting 提示缺少包 使用 pyinstaller -D views.py --nocons 可以正常打包出来 但是运行出现报错 说明这个打包工具 忽略了很多 隐式导入的包 解决方法很简单 hiddenimports [fastrlock, fastrlock.rlock, cu…...

力扣:55. 跳跃游戏(Python3)

题目&#xff1a; 给定一个非负整数数组 nums &#xff0c;你最初位于数组的 第一个下标 。 数组中的每个元素代表你在该位置可以跳跃的最大长度。 判断你是否能够到达最后一个下标。 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;力扣 示例&#xf…...

Unity 编辑器资源导入处理函数 OnPreprocessAudio :深入解析与实用案例

Unity 编辑器资源导入处理函数 OnPreprocessAudio 用法 点击封面跳转下载页面 简介 在 Unity 中&#xff0c;资源导入是一个非常重要的环节&#xff0c;它决定了资源在项目中的使用方式和效果。Unity 提供了一系列的资源导入处理函数&#xff0c;其中之一就是 OnPreprocessAud…...

mongodb-win32-x86_64-2008plus-3.4.24-signed.msi

Microsoft Windows [版本 6.1.7601] 版权所有 (c) 2009 Microsoft Corporation。保留所有权利。C:\Users\Administrator>cd C:\MongoDB\Server\3.4\binC:\MongoDB\Server\3.4\bin>C:\MongoDB\Server\3.4\bin>mongod --help Options:General options:-h [ --help ] …...

java的反射

在java语言中&#xff0c;反射机制是指对于处在运行状态的类&#xff0c;都能够获取到这个类的所有属性和方法。对于任意一个对象&#xff0c;都能够调用它的任意一个方法以及访问它的属性&#xff1b;这种通过动态获取类或对象的属性以及方法从而完成调用功能被称为java语言的…...

MySQL — InnoDB 锁

文章目录 锁共享锁和排他锁意向锁记录锁间隙锁临键锁插入意向锁自增锁 锁 加锁是实现数据库并发控制的一个非常重要的技术。当事务在对某个数据对象进行操作前&#xff0c;先向系统发出请求&#xff0c;对其加锁。加锁后事务就对该数据对象有了一定的控制&#xff0c;在该事务…...

首批获得金融级行业云平台认证,天翼云深耕行业云

云计算下半场看什么&#xff1f; 无疑是金融、政务、制造等传统政企用户的上云与用云。随着数字经济发展和产业数字化的提速&#xff0c;上云已是政企用户推动其数字化转型不断深入的重要抓手&#xff0c;成为不可阻挡的趋势。 与互联网用户相比&#xff0c;政企用户上云极为…...

浅谈Python解释器的组成

Python解释器是一个复杂的软件&#xff0c;它可以解释和执行Python代码。以下是Python解释器的主要组成部分&#xff1a; 源代码词法分析器&#xff08;Lexical Analyzer&#xff09;: 这部分的任务是将输入的Python源代码分解成称为"tokens"的基础元素。例如&#x…...

服务限流治理

一、基础概念 1.什么是服务限流&#xff1f; 限流在日常生活中也很常见&#xff0c;比如节假日你去一个旅游景点&#xff0c;为了不把景点撑爆&#xff0c;管理部门通常会在外面设置拦截&#xff0c;限制景点的进入人数&#xff08;等有人出来之后&#xff0c;再放新的人进去…...

机器学习笔记 - 使用CLIP在没有数据的情况下创建图像分类器

想象一下,如果我们现在需要对人们是否戴眼镜进行分类,但您没有数据或资源来训练自定义模型。该怎么办?这里我们了解如何使用预先训练的 CLIP 模型来创建自定义分类器,而无需任何培训。这种方法称为零样本图像分类,它可以对原始 CLIP 模型训练期间未明确看到的类别图像进行…...

42.利用 牛顿迭代法解非线性高维方程组(matlab程序)

1.简述 若向量记号为X,方程组就可以写成F(X)0的形式。 我们知道&#xff0c;对于一元函数的牛顿迭代法求根公式 类似的&#xff0c;对于多元函数求根公式 其中X是向量&#xff0c;是非线性方程组对应的雅可比矩阵。 具体求解的时候&#xff0c;我们可以先通过绘图命令绘制图形…...

我在leetcode用动态规划炒股

事情是这样的&#xff0c;突然兴起的我在letcode刷题 121. 买卖股票的最佳时机122. 买卖股票的最佳时机 II123. 买卖股票的最佳时机 III 以上三题。 1. 121. 买卖股票的最佳时机 1.1. 暴力遍历&#xff0c;两次遍历 1.1.1. 算法代码 public class Solution {public int Ma…...

rust实践-异步并发socket通信

客户端 [package] name = "rust_client" version = "0.1.0" edition = "2021"[dependencies] tokio = {version = "1.14.0", features = ["full"] }use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::…...

SolidUI社区-根据Prompt打造人设

背景 随着文本生成图像的语言模型兴起&#xff0c;SolidUI想帮人们快速构建可视化工具&#xff0c;可视化内容包括2D,3D,3D场景&#xff0c;从而快速构三维数据演示场景。SolidUI 是一个创新的项目&#xff0c;旨在将自然语言处理&#xff08;NLP&#xff09;与计算机图形学相…...

设计模式行为型——观察者模式

目录 什么是观察者模式 观察者模式的实现 观察者模式角色 观察者模式类图 观察者模式举例 观察者模式代码实现 观察者模式的特点 优点 缺点 使用场景 注意事项 实际应用 什么是观察者模式 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式…...

Kernel Exception导致手机重启案例分析

和你一起终身学习&#xff0c;这里是程序员Android 经典好文推荐&#xff0c;通过阅读本文&#xff0c;您将收获以下知识点: 一、高温触发 Kernel Exception 重启问题二、解决方案三、提高电池温度方案 一、 高温触发 Kernel Exception 重启问题 手机 电池温度 默认60度以上高温…...

C++入门篇5---模板

相信大家都遇到过这么一种情况&#xff0c;为了满足不同类型的需求&#xff0c;我们要写多个功能相同&#xff0c;参数类型不同的代码&#xff0c;为此&#xff0c;C引入了泛型编程这一概念&#xff0c;而模板就是实现泛型编程的基础&#xff0c;其实本质就是我们写一个类似”模…...

L2CS-Net: 3D gaze estimation

L2CS-Net: Fine-Grained Gaze Estimation in Unconstrained Environments论文解析 摘要1. 简介2. Related Work3. METHOD3.1 Proposed loss function3.2 L2CS-Net 结构3.3 数据集3.4 评价指标 4. 实验4.1 实验结果 论文地址&#xff1a;L2CS-Net: Fine-Grained Gaze Estimation…...

kenernetes/k8s笔试面试

k8s的基础概念 k8s本质是一个容器编排系统&#xff0c;可以管理容器的生命周期&#xff0c;应用部署&#xff0c;更新&#xff0c;维护&#xff0c;应用提供服务&#xff0c;扩容缩容应用&#xff0c;故障自愈。 k8s与docker的关系 docker:是一种轻量级的虚拟化技术。运维层…...

我们真的是在做数据治理吗

我们真的是在做数据治理吗&#xff1f; 什么是数据治理&#xff1f; 数据治理和数据管理有什么区别&#xff1f; 相信即使是考过数据治理工程师的人&#xff0c;面对这2个问题也仍然会有这个疑问。 目前国际和国内对于数据治理没有明确统一的定义&#xff0c;对于数据治理的服…...

聊聊汽车电子的话题

当谈到汽车电子时&#xff0c;有许多有趣的话题可以探讨。以下是一些可能感兴趣的话题&#xff1a; 自动驾驶技术&#xff1a;自动驾驶技术正变得越来越先进&#xff0c;它们如何在汽车中实现&#xff1f;它们将如何改变我们的交通方式以及对道路安全的影响&#xff1f; 电动汽…...

ThinkPHP6企业OA办公系统

有需要请加文章底部Q哦 可远程调试 ThinkPHP6企业OA办公系统 一 介绍 勾股OA基于ThinkPHP6开发&#xff0c;前端Layui&#xff0c;数据库mysql&#xff0c;是一款实用的企业办公系统。可多角色登录&#xff0c;集成了系统设置、人事管理、消息管理、审批管理、日常办公、客户…...