【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用
文章目录
- 📄前言
- 一. 阻塞队列初了解
- 🍆1. 什么是阻塞队列?
- 🍅2. 为什么使用阻塞队列?
- 🥦3. Java标准库中阻塞队列的实现
- 二. 阻塞队列的模拟实现
- 🍚1. 实现普通队列
- 🍥2. 实现队列的阻塞功能
- 🧊3. 解除阻塞状态
- 三. 使用模拟的阻塞队列验证生产者和消费者模型
📄前言
本文是对阻塞队列的应用场景的介绍,对阻塞队列的作用以及具体实现的讨论。
一. 阻塞队列初了解
🍆1. 什么是阻塞队列?
阻塞队列是一种带有阻塞功能的“先进先出”线性表。即在一个带有最大容量的队列中,在某时刻队列容量已满时继续入队 或 队列为空时继续出队,就会进入阻塞等待状态,直到队列变为 未满或非空 便解除阻塞状态,继续入队或出队。
🍅2. 为什么使用阻塞队列?
若存在以下简易的分布式系统:
上述分布式系统虽然能完成客户端与服务器端的交互需求,但可能存在以下问题:
- 在正常情况下,用户可以通过客户端想服务器发起请求并获取相应的服务,但假如在某刻服务器A突然出现了故障,与服务器A直接通信的服务器B也可能因此出现故障,导致整个服务瘫痪。
- 若未来想增加 更多的服务器 来处理服务器A发起的请求,则需求对 服务器A 的接口 进行一定的改动,付出一定的时间和人力成本。
- 当某个时刻,很多的客户端同时向 服务器A 发起请求,作为与用户直接交互的服务器,服务器A具备承载这些并发量的能力,但服务器集群中负责其他功能的服务器接收请求的承载能力可能较弱,此时可能造成其他服务器的崩溃。
造成上述现象的原因可以归结为以下两点:
- 模块间的耦合性较高(例如问题1和2)
- 承载能力较弱的模块不具备抗冲击能力。(例如问题3)
上述的解决方法是在服务器之间加入一个阻塞队列,利用生产者和消费者模型解决以上问题。
什么是生产者消费者模型呢?(如下图)
当服务器A接收来自客户端的请求时,不把请求直接发给服务器B,而是将请求数据加入到队列中,服务器B通过队列接收请求并把请求除了的结果返回给A。
当上述分布式系统引入阻塞队列后工作模式如下图所示:
引入阻塞队列的好处:
- 解耦合。当服务器A或服务器B出现问题时,就不会对其他服务器造成直接的影响;当需要添加新的服务器来处理这些请求时,新的服务器也同样只需从队列中取数据,无需对原有服务器的接口(代码)进行任何的改动。
- “削峰填谷”。当服务器A 瞬间接收客户端发来的大量请求时,由于服务器B处理请求的速度较慢,剩余的请求会在阻塞队列里面堆积,虽然客户端获取服务的时间相对增加了,但一定程度上缓解了其他承受并发量能力较弱的服务器的压力。
🥦3. Java标准库中阻塞队列的实现
BlockingQueue的主要方法:
方法演示如下:(使用普通入队方法入队4次,再使用带有阻塞的出队方法出队4次)
public static void main(String[] args) throws InterruptedException {BlockingQueue<Integer> q = new ArrayBlockingQueue<>(3);System.out.println("数据 5 入队状态: " + q.offer(5));System.out.println("数据 6 入队状态: " + q.offer(6));System.out.println("数据 7 入队状态: " + q.offer(7));System.out.println("数据 8 入队状态: " + q.offer(8));System.out.print("队列中的数据: ");System.out.println(q);System.out.println("数据出队: ");for (int i = 0; i < 4; i++) {System.out.print(q.take() + " ");}System.out.println("程序结束 !");
}
可以发现,当调用 take()方法取出队列元素时,因为队列最终为空,程序进入了阻塞状态,没有打印“程序结束”。
二. 阻塞队列的模拟实现
🍚1. 实现普通队列
阻塞队列的关键方法是两个带有阻塞功能的 put() 和 take()方法,而这两个方法是在原有出入队方法上使用 Object类 带有wait()方法 和 notify() 方法让线程进入等待状态 或 唤醒线程。
因此,我们可以先把基础的队列进行实现,随后在原有基础上进行修改。队列可以使用数组(环形队列)或链表两种方式实现,这里我采用数组的方式实现队列。(由于队列的实现方法较为常见,这里直接给出实现代码)
class MyBlockingQueue<E> {private Object[] elem;private int defaultCapacity = 11; // 阻塞队列默认容量private int front; // 记录队头元素位置private int rear; // 记录队尾元素位置private int size; // 用于记录当前队列元素的实际个数public MyBlockingQueue(){this.elem = new Object[defaultCapacity + 1];}public MyBlockingQueue(int capacity) {defaultCapacity = capacity;this.elem = new Object[defaultCapacity + 1];}public boolean offer(E val) {// 判断队列是否已满if (size == defaultCapacity) {return false;}elem[rear] = val;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);return true;}public E poll() {// 判断队列是否为空if (front == rear) {return null;}Object ret = elem[front];size--;// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);return (E)ret;}
}
🍥2. 实现队列的阻塞功能
当阻塞队列容量已满时,调用 put() 方法会进入阻塞状态,因此在原先 offer()方法判断的基础上,我们需要使用 wait()方法 让线程进入阻塞等待状态,考虑到可能有多个线程同时调用 put()方法,可能会引起线程安全问题,因此我们应在 if()判断条件和整个修改操作上 加锁(或者直接在方法上加锁)。(代码如下)
public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);}
}
当队列为空时,调用 take() 方法会使线程进入阻塞状态,同理若判空条件成立,我们需要调用 wait() 方法使线程进入阻塞,为防止多个线程在队列即将为空时同时调用 take() 方法引发线程安全问题,我们需要在 if()判断语句 和 整个修改操作 进行加锁操作(或者直接在方法上加锁)。(代码如下)
public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;return (E)ret;}
}
🧊3. 解除阻塞状态
什么情况下队列会接触阻塞状态呢?
- 当队满时,某个线程从阻塞队列取出一个元素,即执行完出队操作后,需要使用 notify()方法 唤醒因执行 put()方法而阻塞的线程。
- 当队空时,某个线程向队列新增一个元素,即执行完入队操作后,需要使用 notify()方法唤醒因执行 take()方法而阻塞的线程。
对 put()方法和take()方法 修改后代码如下:
public void put (E value) throws InterruptedException {// 判断队列是否已满synchronized (this) {if (size == defaultCapacity) {// 队列进入阻塞状态, 直到有元素出队时 解除阻塞this.wait();}queue[rear] = value;size++;// 如果 rear自增 到达数组末尾,使 rear 重新到数组的头部rear = (rear + 1) % (defaultCapacity + 1);// 此处的 notify 用来唤醒 队列为空时的 waitthis.notify();}
}public E take() throws InterruptedException {// 判断队列是否为空synchronized (this) {if (rear == front) {// 队列进入阻塞状态,直到有新的元素入队时 解除阻塞this.wait();}Object ret = queue[front];// 如果 front 自增 到达数组末尾,使 front 重新到数组的头部front = (front + 1) % (defaultCapacity + 1);size--;// 此处的 notify 用来唤醒 队列为满时的 waitthis.notify();return (E)ret;}
}
三. 使用模拟的阻塞队列验证生产者和消费者模型
为了方便看到效果,我们假设阻塞队列的容量为2,并将生产与消费的数据进行打印。
当生产者与消费者处理数据的频率一样,且生产速率为 次/1s、消费速率为 次/1s 时,程序的生产与消费数据应轮流打印:(模拟代码和程序运行结果如下)
public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);// 生产者Thread producer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread consumer = new Thread(() -> {for (int i = 0; i < 5; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}
当生产速率 > 消费速率,且生产速率为 次/1s、消费速率为 次/2s 时:可以预估到,当经过5s后程序会因队满进入阻塞状态,且后续每消费一次伴随着一次生产,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)
public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}
当生产速率 < 消费速率,且生产速率为 次/2s、消费速率为 次/1s 时:可以预估到,当经过2s后程序会因队满进入阻塞状态,且后续每生产一次伴随着一次消费,为方便观察阻塞情况,我们可以在方法实现的地方加上阻塞日志的提示(模拟代码和程序运行结果如下)
public static void main(String[] args) {MyBlockingQueue<Integer> myBlockingQueue = new MyBlockingQueue<>(2);Thread producer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {myBlockingQueue.put(i);System.out.println("生产了: " + i);Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread consumer = new Thread(() -> {for (int i = 0; i < 10; i++) {try {int ret = myBlockingQueue.take();System.out.println("消费了: " + ret);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});producer.start();consumer.start();
}
以上就是本篇文章的全部内容了,如果这篇文章对你有些许帮助,你的点赞、收藏和评论就是对我最大的支持。
另外,文章可能存在许多不足之处,也希望你可以给我一点小小的建议,我会努力检查并改进。
相关文章:
【阻塞队列】阻塞队列的模拟实现及在生产者和消费者模型上的应用
文章目录 📄前言一. 阻塞队列初了解🍆1. 什么是阻塞队列?🍅2. 为什么使用阻塞队列?🥦3. Java标准库中阻塞队列的实现 二. 阻塞队列的模拟实现🍚1. 实现普通队列🍥2. 实现队列的阻塞功…...
Cocos Creator使用VS Code调试代码配置
创建项目 首先我们先打开cocos创建一个项目 随便添加一个Cube和脚本,然后保存场景: 添加Chrome Debug配置 在Cocos 中选择添加Chrome Debug配置 然后再VS Code中就可以看到有一个cocos launch Chrome: 然后,就可以按快捷键F…...
【投稿优惠|EI优质会议】2024年材料化学与清洁能源国际学术会议(IACMCCE 2024)
【投稿优惠|优质会议】2024年材料化学与清洁能源国际学术会议(IACMCCE 2024) 2024 International Conference Environmental Engineering and Mechatronics Integration(ICEEMI 2024) 一、【会议简介】 随着全球能源需求的不断增长,清洁能源的研究与应用成为了国际…...
ubuntu设置右键打开terminator、code
前言: 这里介绍一种直接右键打开本地目录下的terminator和vscode的方法。 一:右键打开terminator 1.安装terminator sudo apt install terminator 2.安装nautilus-actions filemanager-actions sudo apt-get install nautilus-actions filemanager…...
PHP AES加解密:用代码为数据加上保护的盾牌
在网络世界里,数据的传输和存储是一个敏感而重要的问题。为了保护数据的安全性,加密算法是一项不可或缺的技术。而在PHP中,AES(Advanced Encryption Standard)加解密算法是一种常用的选择。本篇博客将深入解析PHP中的A…...
Socket实现服务器和客户端
Socket 编程是一种用于在网络上进行通信的编程方法,以下代码可以实现在不同主机之间传输数据。 Socket 编程中服务器端和客户端的基本步骤:服务器端步骤: 1.创建 Socket: int serverSocket socket(AF_INET, SOCK_STREAM, 0);…...
智能GPT图书管理系统(SpringBoot2+Vue2)、接入GPT接口,支持AI智能图书馆
☀️技术栈介绍 ☃️前端主要技术栈 技术作用版本Vue提供前端交互2.6.14Vue-Router路由式编程导航3.5.1Element-UI模块组件库,绘制界面2.4.5Axios发送ajax请求给后端请求数据1.2.1core-js兼容性更强,浏览器适配3.8.3swiper轮播图插件(快速实…...
面试经典 150 题 ---- 合并两个有序数组
面试经典 150 题 ---- 合并两个有序数组 合并两个有序数组方法一:直接合并后排序方法二:双指针方法三:逆向双指针 合并两个有序数组 方法一:直接合并后排序 这种方法最简单,直接将 nums2 的数组放到 nums1 数组的尾部…...
防火墙在企业园区出口安全方案中的应用(ENSP实现)
拓扑图 需求: 1、企业出口网关设备必须具备较高的可靠性,为了避免单点故障,要求两台设备形成双机热备状态。当一台设备发生故障时,另一台设备会接替其工作,不会影响业务正常运行。 2、企业从两个ISP租用了两条链路&…...
单片机学习笔记---矩阵键盘密码锁
目录 一,设置密码按键 1.设置密码区域 2.设置输入的数字左移 3.设置记录按键的次数 二,设置确认键 1.密码正确时显示OK 2.密码错误时显示ERR 3.密码错误恢复初始状态重输 三,设置取消键 学了这么久,迫不及待想要做一个密…...
8-小程序数据promise化、共享、分包
小程序API Promise化 wx.requet 官网入口 默认情况下,小程序官方异步API都是基于回调函数实现的 wx.request({method: , url: , data: {},header: {content-type: application/json // 默认值},success (res) {console.log(res.data)},fail () {},complete () { }…...
[HTML]Web前端开发技术18(HTML5、CSS3、JavaScript )HTML5 基础与CSS3 应用——喵喵画网页
希望你开心,希望你健康,希望你幸福,希望你点赞! 最后的最后,关注喵,关注喵,关注喵,佬佬会看到更多有趣的博客哦!!! 喵喵喵,你对我真的…...
Threejs 展示——obj 格式模型导入
文章目录 需求分析1. HTML版本2. Vue 版本 需求 导入obj 格式的模型数据 分析 .obj:Wavefront OBJ 格式,是一种广泛使用的三维模型文件格式。预览 .obj格式文件的软件可点此下载需要准备两种格式的数据,如下所示 1. HTML版本 html <!…...
深入浅出 diffusion(3):pytorch 实现 diffusion 中的 U-Net
导入python包 import mathimport torch import torch.nn as nn import torch.nn.functional as F silu激活函数 class SiLU(nn.Module): # SiLU激活函数staticmethoddef forward(x):return x * torch.sigmoid(x) 归一化设置 def get_norm(norm, num_channels, num_groups)…...
C#使用RabbitMQ-2_详解工作队列模式
简介 🍀RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中,生产者将任务发送到RabbitMQ交换器,然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后,消费者可以向…...
Day37 56合并区间 738单调递增的数字 968监控二叉树
56 合并区间 给出一个区间的集合,请合并所有重叠的区间。 示例 1: 输入: intervals [[1,3],[2,6],[8,10],[15,18]]输出: [[1,6],[8,10],[15,18]]解释: 区间 [1,3] 和 [2,6] 重叠, 将它们合并为 [1,6]. class Solution { public:vector<vector<int>>…...
【Android】在WSA安卓子系统中进行新实验性功能试用与抓包(2311.4.5.0)
前言 在根据几篇22和23的WSA抓包文章进行尝试时遇到了问题,同时发现新版Wsa的一些实验性功能能优化抓包配置时的一些步骤,因而写下此篇以作记录。 Wsa版本:2311.40000.5.0 本文出现的项目: MagiskOnWSALocal MagiskTrustUserCer…...
【服务器】服务器的管理口和网口
服务器通常会有两种不同类型的网络接口,即管理口(Management Port)和网口(Ethernet Port),它们的作用和用途不同。 一、管理口 管理口通常是用于服务器管理的网络接口,也被称为外带网卡或带外接…...
一个小例子,演示函数指针
结构体里经常看到函数指针的写法,函数指针其实就是函数的名字。但是结构体里你要是直接把一个函数摆上去,那就变成成员变量,就会发生混乱 1. 函数指针 #include <unistd.h> #include <stdio.h>struct Kiwia{void (*func)(int )…...
python12-Python的字符串之使用input获取用户输入
input()函数用于向用户生成一条提示,然后获取用户输入的内容。由于input0函数总会将用户输入的内容放入字符串中,因此用户可以输入任何内容,input()函数总是返回一个字符串。例如如下程序。 # !/usr/bin/env python# -*- coding: utf-8 -*-# @Time : 2024/01# @Author : Lao…...
【代码随想录-数组】移除元素
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…...
springboot事务管理
/*spring事务管理注解:Transactional位置:业务(service)层的方法上、类上、接口上作用:将当前方法交给spring进行事务管理,方法执行前,开启事务:成功执行完毕,提交事务:出现常,回滚事务需要在配置文件是加上开启spring事务yml文件…...
数据结构——链式二叉树(2)
目录 🍁一、二叉树的销毁 🍁二、在二叉树中查找某个数,并返回该结点 🍁三、LeetCode——检查两棵二叉树是否相等 🌕(一)、题目链接:100. 相同的树 - 力扣(LeetCode&a…...
spring-boot-starter-validation常用注解
文章目录 一、使用二、常用注解三、Valid or Validated ?四、分组校验1. 分组校验的基本概念2. 定义验证组3. 应用分组到模型4. 在控制器中使用分组5. 总结 一、使用 要使用这些注解,首先确保在你的 Spring Boot 应用的 pom.xml 文件中添加了 spring-bo…...
AF700 NHS 酯,AF 700 Succinimidyl Ester,一种明亮且具有光稳定性的近红外染料
AF700 NHS 酯,AF 700 Succinimidyl Ester,一种明亮且具有光稳定性的近红外染料,AF700-NHS-酯,具有水溶性和 pH 值不敏感性 您好,欢迎来到新研之家 文章关键词:AF700 NHS 酯,AF 700 Succinimid…...
C#常见内存泄漏
背景 在开发中由于对语言特性不了解或经验不足或疏忽,往往会造成一些低级bug。而内存泄漏就是最常见的一个,这个问题在测试过程中,因为操作频次低,而不能完全被暴露出来;而在正式使用时,由于使用次数增加&…...
Xmind安装到指定目录
Xmind安装到指定目录 默认情况下安装包自动引导安装在C盘(注册表默认位置) T1:修改注册表,比较麻烦 T2:安装时命令行指定安装位置,快捷省事 1)下载安装包(exe可执行文件) 2)安装…...
[GXYCTF2019]BabyUpload1
尝试各种文件,黑名单过滤后缀ph,content-type限制image/jpeg 内容过滤<?,木马改用<script languagephp>eval($_POST[cmdjs]);</script> 上传.htaccess将上传的文件当作php解析 蚁剑连接得到flag...
SpringBoot之分页查询的使用
背景 在业务中我们在前端总是需要展示数据,将后端得到的数据进行分页处理,通过pagehelper实现动态的分页查询,将查询页数和分页数通过前端发送到后端,后端使用pagehelper,底层是封装threadlocal得到页数和分页数并动态…...
【shell-10】shell实现的各种kafka脚本
kafka-shell工具 背景日志 log一.启动kafka->(start-kafka)二.停止kafka->(stop-kafka)三.创建topic->(create-topic)四.删除topic->(delete-topic)五.获取topic列表->(list-topic)六. 将文件数据 录入到kafka->(file-to-kafka)七.将kafka数据 下载到文件-&g…...
艺术家个人网站设计/seo云优化方法
该楼层疑似违规已被系统折叠 隐藏此楼查看此楼package test2;public class Person {int age;String name;String gender;public Person(int Age,String Name,String Gender) {this.ageAge;this.nameName;this.genderGender;}public String toString() {return "姓名:"…...
网站首页怎么做全屏swf/济宁网站建设
当一个类有多个实例,但是在实例之间有着相互的关联关系,此时,不建议在实例中新增一个成员属性来描述这种关联关系 据一个实际场景来帮助理解:People类有两个实例:AA和BB,AA是男的,BB 是女的&…...
欧美男女做黄色网站/源码之家
在Oracle 10g中,Flash back家族分为以下成员:Flashback Database | Flashback Drop | Flashback Table | Flashback Query(Flashback Query,Flashback Version Query,Flashback Transaction Query)。 闪回删除表(dropped tabl…...
可以免费浏览的网站/东莞seo外包公司哪家好
event安装包链接 PECL :: Package :: event 使用特定平台可用的最佳 I/O 通知机制有效调度基于 I/O、时间和信号的事件的扩展。 通过I/O调度,提高并发,更高效的利用服务器资源。 wget https://pecl.php.net/get/event-3.0.6.tgz tar -zxvf event-3.0.…...
成都集团网站建设/优化设计答案六年级上册
使用PHPMySqlAjaxjQuery实现省市区三级联动功能要求:写一个省市区(或者年月日)的三级联动,实现地区或时间的下拉选择。实现技术:php ajax实现:省级下拉变化时市下拉区下拉跟着变化,市级下拉变化时区下拉跟着变化。使用…...
wordpress首页友情链接插件/qq群推广方法
前言:上次与大家分享了SSH框架之Struts的知识,今天就接着之前的分享,今天要分享的知识是Ognl。 一、明确目标: 1、弄清楚之前遗留的问题(user中的uname属性有值,而Demo1Action中的uanme属性没有值…...