多线程(初阶七:阻塞队列和生产者消费者模型)
目录
一、阻塞队列的简单介绍
二、生产者消费者模型
1、举个栗子:
2、引入生产者消费者模型的意义:
(1)解耦合
(2)削峰填谷
三、模拟实现阻塞队列
1、阻塞队列的简单介绍
2、实现阻塞队列
(1)实现普通队列
(2)加上线程安全
(3)加上阻塞功能
3、运用阻塞队列的生产者消费者模型
都看到这了,点个赞再走吧,谢谢谢谢谢
一、阻塞队列的简单介绍
首先,我们都知道,队列是先进先出的一种数据结构,而阻塞队列,是基于队列,做了一些扩展,在多线程有就非常有意义了
阻塞队列的特性:
(1)是线程安全的
(2)具有阻塞的特性
①当队列满了,这时不能往队列里放数据,就会阻塞等待,等队列的数据出队列后,这时队列没满,才能放数据。
②当队列空了,这时不能拿队列里的数据,就会阻塞等待,等有数据如队列了,这时队列不为空,才能拿数据。
这里,阻塞队列的用处非常大,基于阻塞队列的功能,就可以实现 “生产者消费者模型”。
二、生产者消费者模型
生产者消费者模型是一种很朴素的概念,描述的是一种多线程编程的方法。
1、举个栗子:
一家人包饺子,首先得和面,和完面就要开始擀饺子皮了,然后才开始包饺子,这里,因为一般家里也只有一个擀面杖,所以只能一个人擀饺子皮,其余的家人就会帮忙包饺子,假设一个人擀饺子皮,2个人包饺子,那么擀饺子皮的人就是生产者,包饺子的人就是消费者;这也就是消费者生产者模型。
一个桌子能发饺子皮的数量是有限的,当擀饺子皮的人比较快,桌子放满饺子皮后,就要等包饺子的人,消耗一些饺子皮来包饺子,才能继续擀饺子皮;而这也类似阻塞队列中队里满的情况。当包饺子的人包的比较快,桌子上的饺子皮都没了,就要等擀饺子皮的人擀了饺子皮后才能继续包饺子;而这也类似阻塞队列空的情况。不同的人分工不同,也类似多线程,各自干各自的事情。
2、引入生产者消费者模型的意义:
(1)解耦合
引入生产者消费者模型,就能更好的做到解耦合(把代码的耦合程度,从高降低,就称为解耦合)
在实际开发中,会涉及到 “分布式系统” ,服务器的整个功能不是又一个服务器完成实现的,而是由多个服务器,各自实现各自的一部分功能,再通过网络通信,把这些服务器联系起来,最终完成整个服务器的功能。粗糙流程如图:
而这时,入口服务器与A、B服务器的联系是密切相关的,请求要经过入口服务器,才能传达给A、B服务器,再A、B服务器拿到想要的数据,再返回给入口服务器,通过入口服务器,再把响应传给客户端。如果是这样,那如果请求突然骤升,这时超过入口服务器接收请求的峰值,这时入口服务器就挂了,入口服务器挂了后,A、B服务器拿不到请求,也会挂掉,这就体现了入口服务器和A、B服务器的耦合性比较高。
当我们在入口服务器和A、B服务器之间引入阻塞队列时,如图:
这时,如果入口服务器挂了,但是阻塞队列中还有请求的数据,至少不会因入口服务器挂了,A、B服务器也挂了,这样,入口服务器和A、B服务器的耦合性也就降低了。
(2)削峰填谷
如图,当客户端这边的请求突然骤增时,入口服务器都是比较能抗压的,但是也是有极限的,这时我们引入阻塞队列,就可以把这些请求数据都放进阻塞队列中,形成一个缓冲区,这样,即使外面的请求达到了峰值,也是由阻塞队列来承担,这样就形成了削峰填谷的效果。
注意:这时的阻塞队列,是基于阻塞队列这一数据结构,而实现的服务器程序,所以也叫消息队列
三、模拟实现阻塞队列
1、阻塞队列的简单介绍
在java标准库中,提供了现成的阻塞队列这一数据结构,如图:
是基于队列扩展而来的,队列有的,它也有;我们知道,入队列时可以用offer方法,出队列时可以用poll方法,在阻塞队列中,也有这两个方法,但是这两个方法是不带阻塞功能的;其中,在阻塞队列中,put是在阻塞功能的入队列,take方法是带阻塞功能的出队列。
代码案例:
public class TestDemo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(10);blockingQueue.put("aaa");String s1 = blockingQueue.take();System.out.println("第一个打印:s1 = " + s1);s1 = blockingQueue.take();System.out.println("第二个打印:s1 = " + s1);} }执行结果:
线程卡住不动了,原因是想要第二次出队列时,队列是空的,所以要等队列有元素入队列时,才能出队列,也就是说,这是带有阻塞功能的。
2、实现阻塞队列
阻塞队列是通过循环队列实现的,而队列是依靠数组来实现的,这里的阻塞队列,我们只模拟实现其中的put和take方法。
(1)实现普通队列
代码:
// 为了简单, 不写作泛型的形式. 考虑存储的元素就是单纯的 String class MyBlockingQueue {private String elems[] = null;private int head = 0;//记录头结点private int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) {//判断容量满了没,满了就不能入队列,要阻塞等待if(size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列return;}//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;}//出队列public String take() {String elem = null;//要判断队列是不是空的,空就不能出队列了,要阻塞等待if(size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充return null;}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;return elem;} }每个步骤说明代码中有注释。
测试一下可不可以用,如图:
是可以用的,这样,普通的队列就已经搞好了
(2)加上线程安全
我们想想put和take里面要给哪里上锁,首先,写操作肯定是要加锁的,因为多线程同时执行写操作,肯定是线程不安全的,也就是下面这段代码:
接下来,我们讨论一下这两个代码要不要加锁,以take为例,如图:
我们画一下图,会比较好理解:
如果size = -1,是不符合我们预期的,size最小也只可能是0,不可能是-1,所以我要上面的判断条件也要加锁。
而put也一样,判断条件也要加锁。
代码:
class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待if(size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列return;}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;}}}//出队列public String take() {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待if(size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充return null;}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;return elem;}} }
(3)加上阻塞功能
我们要加上阻塞功能,就要在这两条件判断上加上wait,我们用locker的对象给他wait,而且wait必须要在synchronized内使用,这里的locker正好能对应上;当这个队列满时,就阻塞等待,等take方法拿走一个数据时,才给他唤醒。
加上阻塞功能后的代码如下:(不是最终代码,里面还是存在线程安全问题)
class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待if (size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待if (size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充synchronized (locker) {locker.wait();}}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;locker.notify();return elem;}} }加上阻塞功能的判断语句
与其对应的,一定要记住put和take后要notify,不然就会一直阻塞下去,导致程序动不了,如图:
现在进行代码分析:
当put时,队列满了,就要阻塞等待,等take这队列后,就会唤醒put操作,接着put就能入队列了;如果是take就相反,这是符合我们预期的。如果不满也不空时,每次put和take都会notify一次,这时会有影响吗?答案肯定是否定的,不会有影响,因为就算没有其他线程在等待,唤醒也没有事,不会对程序造成啥影响。而且我们现在的代码,一定是要么满,要么空,要么不满也不空。
但是,如果有两个线程同时put,现在队列是满的,A线程先阻塞,B线程也阻塞,这时有第三个线程take一次,把A线程的wait唤醒了,等A执行到下面的notify,A线程里put的notify就会唤醒B线程里的wait,但是因为A线程put了,和第三个线程的take一取一放抵消了,此时队列还是满的,因为A线程里的put把B线程里的wait唤醒了,这时已经是满了的队列还往里放元素,就造成了线程安全问题。
解决方案:把条件判断if换成while循环语句,不是只判断一次,当有其他线程把wait唤醒后,还要再判断一次这个队列是不是满的或者是空的,如果不是满的或者不是空的,才释放这个wait,不然就要继续wait,这样问题也就解决了。
最终代码:
class MyBlockingQueue {Object locker = new Object();private String elems[] = null;private int head = 0;//记录头结点private int tail = 0;//记录尾结点private int size = 0;//队列元素个数//构造方法,定义队列的容量大小public MyBlockingQueue(int capacity) {this.elems = new String[capacity];}//入队列public void put(String elem) throws InterruptedException {synchronized (locker) {//判断容量满了没,满了就不能入队列,要阻塞等待while (size >= this.elems.length) {//阻塞等待,先不写,先实现普通功能的队列synchronized (locker) {locker.wait();}}//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//入队列elems[tail] = elem;tail++;//因为是循环队列,所以要判断尾巴有没有超过容量大小下标,超过了就要从0开始了if(tail > elems.length) {tail = 0;}//队列元素要++size++;locker.notify();}}}//出队列public String take() throws InterruptedException {String elem = null;//因为这些都是写操作,也有读操作,多线程并发执行时,写操作是线程不安全的,要把这些打包成一个原子,加锁synchronized (locker) {//要判断队列是不是空的,空就不能出队列了,要阻塞等待while (size == 0) {//阻塞等待,因为是先实现普通队列的功能,所以后面再补充synchronized (locker) {locker.wait();}}elem = elems[head];head++;//因为是循环队列,所以要判断头结点有没有超过容量大小下标,超过了就要0开始了if(head >= elems.length) {head = 0;}//出队列后,队列元素要--size--;locker.notify();return elem;}} }我们看看wait内部,可以看到它内部也是会有说明wait可能会提前被唤醒,就要我们多加一次判断,这样,用while会比用if更好,如图:
在实际开发中,生产者消费者模型,往往是多个生产者,多个消费者;这里的生产者和消费者往往不仅仅是一个线程,也可能是一个独立的服务器,甚至是一组服务器程序。
但生产者消费者模型,最核心的部分还是阻塞队列,可以使用synchronized和wait / notify 达到线程安全与阻塞。
3、运用阻塞队列的生产者消费者模型
简单的生产者消费者模型代码:
public class Test {public static void main(String[] args) {BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);//生产者Thread t1 = new Thread(() -> {int n = 1;while (true) {try {queue.put(n);System.out.println("生产者元素:" + n);n++;Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}});//消费者Thread t2 = new Thread(() -> {while (true) {try {int n = queue.take();System.out.println("消费者元素:" + n);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();} }执行结果:
可以看到生产者生产一个,消费者就消费一个,继续等待生产者生产元素后再消费。
都看到这了,点个赞再走吧,谢谢谢谢谢
相关文章:
多线程(初阶七:阻塞队列和生产者消费者模型)
目录 一、阻塞队列的简单介绍 二、生产者消费者模型 1、举个栗子: 2、引入生产者消费者模型的意义: (1)解耦合 (2)削峰填谷 三、模拟实现阻塞队列 1、阻塞队列的简单介绍 2、实现阻塞队列 &#…...
区间价值 --- 题解--动态规划
目录 区间价值 题目描述 输入描述: 输出描述: 输入 输出 备注: 思路: 代码: 区间价值 J-区间价值_牛客竞赛动态规划专题班习题课 (nowcoder.com) 时间限制:C/C 2秒,其他语言4秒 空间限制:C/C 262144K&…...
计算机毕业设计 基于大数据的心脏病患者数据分析管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...
20:kotlin 类和对象 --泛型(Generics)
类可以有类型参数 class Box<T>(t: T) {var value t }要创建类实例,需提供类型参数 val box: Box<Int> Box<Int>(1)如果类型可以被推断出来,可以省略 val box Box(1)通配符 在JAVA泛型中有通配符?、? extends E、? super E&…...
我对迁移学习的一点理解(系列2)
文章目录 我对迁移学习的一点理解 我对迁移学习的一点理解 源域和目标域是相对的概念,指的是在迁移学习任务中涉及到的两个不同的数据集或领域。 源域(Source Domain)通常指的是已经进行过训练和学习的数据集,它被用来提取特征、…...
Spring MVC学习随笔-控制器(Controller)开发详解:控制器跳转与作用域(二)视图模板、静态资源访问
学习视频:孙哥说SpringMVC:结合Thymeleaf,重塑你的MVC世界!|前所未有的Web开发探索之旅 衔接上文Spring MVC学习随笔-控制器(Controller)开发详解:控制器跳转与作用域(一) SpingMVC中…...
原型模式(Prototype Pattern)
1 基本概念 1.1 大佬文章 设计模式是什么鬼(原型) 详解设计模式:原型模式-腾讯云开发者社区-腾讯云 1.2 知识汇总 (1)原型模式:先 new 一个实例,该实例符合需求,之后再根据这个实…...
IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket
文章目录 1. 引言2. 短轮询(Short Polling)2.1 原理2.2 代码示例2.2.1 服务器端(Node.js)2.2.2 客户端(HTML JavaScript) 3. 长轮询(Long Polling)3.1 原理3.2 代码示例3.2.1 服务器…...
04_W5500_TCP_Server
上一节我们完成了TCP_Client实验,这节使用W5500作为服务端与TCP客户端进行通信。 目录 1.W5500服务端要做的: 2.代码分析: 3.测试: 1.W5500服务端要做的: 服务端只需要打开socket,然后监听端口即可。 2…...
入门Redis学习总结
记录之前刚学习Redis 的笔记, 主要包括Redis的基本数据结构、Redis 发布订阅机制、Redis 事务、Redis 服务器相关及采用Spring Boot 集成Redis 实现增删改查基本功能 一:常用命令及数据结构 1.Redis 键(key) # 设置key和value 127.0.0.1:6379> set …...
SpringSecurity6 | 自定义登录页面
✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: Java从入门到精通 ✨特色专栏…...
从单向链表中删除指定值的节点
输入一个单向链表和一个节点的值,从单向链表中删除等于该值的节点,删除后如果链表中无节点则返回空指针。 链表的值不能重复。构造过程,例如输入一行数据为:6 2 1 2 3 2 5 1 4 5 7 2 2则第一个参数6表示输入总共6个节点,第二个参数…...
Vue2与Vue3的语法对比
Vue2与Vue3的语法对比 Vue.js是一款流行的JavaScript框架,通过它可以更加轻松地构建Web用户界面。随着Vue.js的不断发展,Vue2的语法已经在很多应用中得到了广泛应用。而Vue3于2020年正式发布,带来了许多新的特性和改进,同时也带来…...
实时动作识别学习笔记
目录 yowo v2 yowof 判断是在干什么,不能获取细节信息 yowo v2 https://github.com/yjh0410/YOWOv2/blob/master/README_CN.md ModelClipmAPFPSweightYOWOv2-Nano1612.640ckptYOWOv2-Tiny...
5G常用简称
名称缩写全称非周期 信道状态信息参考信号aperidoc CSIAperidoc Channel State Information缓冲区状态报告BSRBuffer Status Report小区特定无线网络标识CS-RNTICell-Specific Radio Network Temporary Identifier主小区组MCGMaster Cell groupMCG的节点MNMasternode主小区PCel…...
自动化测试框架性能测试报告模板
一、项目概述 1.1 编写目的 本次测试报告,为自动化测试框架性能测试总结报告。目的在于总结我们课程所压测的目标系统的性能点、优化历史和可优化方向。 1.2 项目背景 我们公开课的性能测试目标系统。主要是用于我们课程自动化测试框架功能的实现,以及…...
【SpringBoot】解析Springboot事件机制,事件发布和监听
解析Springboot事件机制,事件发布和监听 一、Spring的事件是什么二、使用步骤2.1 依赖处理2.2 定义事件实体类2.3 定义事件监听类2.4 事件发布 三、异步调用3.1 启用异步调用3.2 监听器方法上添加 Async 注解 一、Spring的事件是什么 Spring的事件监听(…...
华为ensp实验——基于全局地址池的DHCP组网实验
目录 前言实验目的实验内容实验结果 前言 该实验基于华为ensp,版本号是1.3.00.100 V100R003C00SPC100,只供学习和参考,不作任何商业用途。 具体的DHCP命令可以看系列文章链接,计算机网络实验(华为eNSP模拟器ÿ…...
如何选择一款安全可靠的跨网安全数据交换系统?
随着网络和数据安全的重视程度增加,为了有效地保护内部的核心数据资产,普遍会采用内外网隔离的策略。像国内的政府机构、金融、能源电力、航空航天、医院等关乎国计民生的行业和领域均已进行了网络的隔离,将内部划分成不同的网段,…...
基于c++版本的数据结构改-python栈和队列思维总结
##栈部分-(叠猫猫) ##抽象数据类型栈的定义:是一种遵循先入后出的逻辑的线性数据结构。 换种方式去理解这种数据结构如果我们在一摞盘子中取到下面的盘子,我们首先要把最上面的盘子依次拿走,才可以继续拿下面的盘子&…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
ffmpeg(四):滤镜命令
FFmpeg 的滤镜命令是用于音视频处理中的强大工具,可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下: ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜: ffmpeg…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
android RelativeLayout布局
<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...
stm32wle5 lpuart DMA数据不接收
配置波特率9600时,需要使用外部低速晶振...
门静脉高压——表现
一、门静脉高压表现 00:01 1. 门静脉构成 00:13 组成结构:由肠系膜上静脉和脾静脉汇合构成,是肝脏血液供应的主要来源。淤血后果:门静脉淤血会同时导致脾静脉和肠系膜上静脉淤血,引发后续系列症状。 2. 脾大和脾功能亢进 00:46 …...

















