当前位置: 首页 > news >正文

Reactor响应式流的核心机制——背压机制

响应式流是什么?

       响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。 

       响应式流模型存在两种基本的实现机制。一种就是传统开发模式下的“拉”模式,即消费者主动从生产者拉取元素;而另一种就是“推”模式,在这种模式下,生产者将元素推送给消费者。相较于“拉”模式,“推”模式下的数据处理的资源利用率更好,下图所示的就是一种典型的推模式处理流程。

       数据流的生产者会持续地生成数据并推送给消费者。这里就引出了流量控制问题,即如果数据的生产者和消费者处理数据的速度是不一致的,我们应该如何确保系统的稳定性呢?

流量控制

生产者生产数据的速率小于消费者的场景

     这种场景对于消费者来说没啥压力,正常消费就好了,这里也就不需要所谓的流量控制了。

生产者生产数据的速率大于消费者的场景

     生产者生产数据的速率大于消费者的场景,应该是我们业务中经常遇到的场景了,这种场景由于消费者处理不过来导致崩溃,业界通常的做法是在生产者与消费者之间加一个队列做缓冲。我们知道队列具有存储与转发的功能,所以可以用它来进行一定的流量控制。

如何对于流量进行很好的控制?这就转变到了如何设计好一个队列了,目前 Java 业界主流的队列有以下三种:

无界队列 :

界队列在原则上是拥有无线大小容量的队列,可以存放生产者产生的所有消息。

  • 优势:确保消费者消费到所有的数据

  • 劣势:系统的回弹性降低,任何一个系统不可能拥有无限的资源,一旦内存等资源耗尽,系统就可能会有崩溃的风险。

有界丢弃队列 :

       为了避免上面无界队列的弊端,有界丢弃队列采用的是如果队列满了,就会采用丢弃后面传入的值,这里可以设置一些丢弃策略,比如说按照优先级或先进先出等。

  • 优势:考虑到资源的限制,适合允许丢消息的业务场景。

  • 劣势:消息重要性很高的场景不建议采取这种队列

有界阻塞队列: 

        像一些支付金融级别的场景,是不允许丢数据的,所以我们引出有界阻塞队列,我们会在队列消息数量达到上限后阻塞生产者,而不是直接丢弃消息。

  • 优势:解决了不允许丢数据的业务场景

  • 劣势:当队列满了的时候,会阻塞生产者停止生产数据,这种场景不可能实现异步操作的。

所以,无论从回弹性、弹性还是即时响应性出发,上述的队列都不是响应式流的上佳解决办法。

背压机制 

       上面说的那几种队列纯“推”模式下的数据流量会有很多不可控制的因素,并不能直接应用,而是需要在“推”模式和“拉”模式之间考虑一定的平衡性,从而优雅地实现流量控制。这就需要引出响应式系统中非常重要的一个概念——背压机制(Backpressure)。 

       什么是背压?简单来说就是下游能够向上游反馈流量请求的机制。通过前面的分析,我们知道如果消费者消费数据的速度赶不上生产者生产数据的速度时,它就会持续消耗系统的资源,直到这些资源被消耗殆尽。

      这个时候,就需要有一种机制使得消费者可以根据自身当前的处理能力通知生产者来调整生产数据的速度,这种机制就是背压。采用背压机制,消费者会根据自身的处理能力来请求数据,而生产者也会根据消费者的能力来生产数据,从而在两者之间达成一种动态的平衡,确保系统的即时响应性。

响应式流规范

   有了背压机制,我们再来看下响应式流是如何基于这种机制去设计的一套规范,规范详情请参考:Reactive Streams

Java API 的响应式流只定义了四个核心接口:

  • Publisher<T>

  • Subscriber<T>

  • Subscription

  • Processor<T,R>

publisher<`T`>

Publisher 代表的就是一种可以生产无限数据的发布者,接口如下:

public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);
}

 可以看到,Publisher 里的 subscribe 方法传入的是 Subscriber 接口,其实这里用的是回调,Publisher 根据收到的请求向当前订阅者 Subscriber 发送元素。

Subscriber<`T`>

Subscriber 代表的是一种可以从发布者那里订阅并接收元素的订阅者,接口如下:

public interface Subscriber<T> {public void onSubscribe(Subscription s);public void onNext(T t);public void onError(Throwable t);public void onComplete();
}

        Subscriber 接口定义的这组方法构成了数据流请求和处理的基本流程,其中,onSubscribe() 从命名上看就是一个回调方法,当发布者的 subscribe() 方法被调用时就会触发这个回调。而在该方法中有一个参数 Subscription,可以把这个 Subscription 看作是一种用于订阅的上下文对象。Subscription 对象中包含了这次回调中订阅者想要向发布者请求的数据个数。

       当订阅关系已经建立,那么发布者就可以调用订阅者的 onNext() 方法向订阅者发送一个数据。这个过程是持续不断的,直到所发送的数据已经达到 Subscription 对象中所请求的数据个数。这时候 onComplete() 方法就会被触发,代表这个数据流已经全部发送结束。而一旦在这个过程中出现了异常,那么就会触发 onError() 方法,我们可以通过这个方法捕获到具体的异常信息进行处理,而数据流也就自动终止了。 

Subscription

Subscription 代表的就是一种订阅上下文对象,它在订阅者和发布者之间进行传输,从而在两者之间形成一种契约关系,接口如下:

public interface Subscription {public void request(long n);public void cancel();
}

这里的 request() 方法用于请求 n 个元素,订阅者可以通过不断调用该方法来向发布者请求数据;而 cancel() 方法显然是用来取消这次订阅。请注意,Subscription 对象是确保生产者和消费者针对数据处理速度达成一种动态平衡的基础,也是流量控制中实现背压机制的关键所在

 Processor<`T,R`> 

Processor 代表的就是订阅者和发布者的处理阶段,Processor 接口继承了 Publisher 和 Subscriber 接口。它用于转换发布者——订阅者管道中的元素。Processor订阅类型 T 的数据元素,接收并转换为类型 R 的数据,并发布变换后的数据。接口如下:

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

图显示了处理者在发布者——订阅和管道中作为转换器的作用,可以拥有多个处理者。

总结 

  • 响应式流规范定义的很简洁,但实现起来并不简单,发布者和订阅者之间的所有交互的异步性质以及背压机制使得实现变得复杂。

  • 响应式流规范非常灵活,还可以提供独立的“推”模型和“拉”模型。如果为了实现纯“推”模型,我们可以考虑一次请求足够多的元素;而对于纯“拉”模型,相当于就是在每次调用 Subscriber 的 onNext() 方法时只请求一个新元素。

  • JDK 9 中提供了 Flow 响应式流接口,与响应式流兼容的接口,可以看得出,JDK 团队后续的发展趋势也是想往响应式流这块靠近。

相关文章:

Reactor响应式流的核心机制——背压机制

响应式流是什么&#xff1f; 响应式流旨在为无阻塞异步流处理提供一个标准。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者&#xff0c;而不需要发布者阻塞&#xff0c;或订阅者有无限制的缓冲区或丢弃。 响应式流模型存在两种基本的实现机制。一种就是传统…...

[数据结构]栈的深入学习-java实现

CSDN的各位uu们你们好,今天千泽带来了栈的深入学习,我们会简单的用代码实现一下栈, 接下来让我们一起进入栈的神奇小世界吧!0.速览文章一、栈的定义1. 栈的概念2. 栈的图解二、栈的模拟实现三.栈的经典使用场景-逆波兰表达式总结一、栈的定义 1. 栈的概念 栈&#xff1a;一种…...

网络编程基础

1 互联网的本质硬件设备有了操作系统&#xff0c;然后装上软件之后我们就能够正常使用了&#xff0c;然后也只能自己使用。像这样&#xff0c;每个人都拥有一台自己的机器&#xff0c;然而彼此孤立。如何才能和大家一起愉快的玩耍&#xff1f;什么是网络&#xff1f;简单来说&a…...

华为OD机试题 - 数列还原(JavaScript)| 机考必刷

更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:数列还原题目输入输出示例一输入输出Code代码解析版权说明华为O…...

10-Oracle存储过程(创建,修改,使用及管理)

本章内容 1、我们为什么要用存储过程? 2、存储过程是如何定义和维护的? 3、我们如何调用存储过程? 4、存储过程中常用的复合数据处理方式及CTE 5、存储过程如何进行异常处理? 6、存储过程如何进行事务处理? 7、我们应如何优化存储过程? 1、我们为什么要用存储过程…...

创建线程的三种方法

文章目录1、创建一个类实现Runnable接口&#xff0c;并重写run方法。2、创建一个类继承Thread类&#xff0c;并重写run方法。3、实现Callable接口&#xff0c;重写call()方法&#xff0c;这种方式可以通过FutureTask获取任务执行的返回值。4、run()方法和start()方法有什么区别…...

第一章---Pytorch快速入门---第三节---pytorch中的数据操作和预处理

目录 1.高维数组 1.1 回归数据准备 1.2 分类数据准备 2. 图像数据 1.torchvision.datasets模块导入数据并预处理 2.从文件夹中导入数据并进行预处理 pytorch中torch.utils.data模块包含着一些常用的数据预处理的操作&#xff0c;主要用于数据的读取、切分、准备等。 常用…...

【代码随想录训练营】【Day38】第九章|动态规划|理论基础|509. 斐波那契数|70. 爬楼梯|746. 使用最小花费爬楼梯

理论基础 动态规划与贪心的区别并不是学习动态规划所必须了解的&#xff0c;所以并不重要。 想要了解动态规划算法题的特点&#xff0c;可以直接做下面三道入门简单题练练手感&#xff0c;找找感觉&#xff0c;很快就能体会到动态规划的解题思想。 总结成一句话就是&#xf…...

华为OD机试题 - 快递货车(JavaScript)| 机考必刷

更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:快递货车题目输入输出示例一输入输出Code解题思路版权说明华为O…...

前端——7.图像标签和路径

这篇文章&#xff0c;我们来讲解一下图像标签 目录 1.图像标签 1.1介绍 1.2实际展示 1.3图像标签的属性 1.3.1 alt属性 1.3.2 title属性 1.3.3 width / height 属性 1.3.4 border属性 1.4注意事项 2.文件夹 2.1目录文件夹和根目录 2.2 VSCode打开目录文件夹 3.路…...

java -- stream流

写在前面: stream流一直在使用&#xff0c;但是感觉还不够精通&#xff0c;现在深入研究一下。 stream这个章节中&#xff0c;会用到 函数式接口–lambda表达式–方法引用的相关知识 介绍 是jdk8引进的新特性。 stream流是类似一条流水线一样的操作&#xff0c;每次对数据进…...

【Spring6】| Bean的四种获取方式(实例化)

目录 一&#xff1a;Bean的实例化方式 1. 通过构造方法实例化 2. 通过简单工厂模式实例化 3. 通过factory-bean实例化 4. 通过FactoryBean接口实例化 5. BeanFactory和FactoryBean的区别&#xff08;面试题&#xff09; 6. 使用FactoryBean注入自定义Date 一&#xff1a…...

01: 新手学SpringCloud前需知道的5点

目录 第一点&#xff1a; 什么是微服务架构 第二点&#xff1a;为什么需要学习Spring Cloud 第三点&#xff1a; Spring Cloud 是什么 第四点&#xff1a; SpringCloud的优缺点 1、SpringCloud优点 2、SpringCloud缺点 第五点&#xff1a; SpringCloud由什么组成 1&…...

ubuntu apt安装arm交叉编译工具

查找查找编译目标为32位的gcc-arm交叉编译器命令apt-cache search arm|awk index($1,"arm")!0 {print}|grep gcc-arm\|g-arm #或者 apt-cache search arm|awk index($1,"arm")!0 {print}|grep -E gcc-arm|g\\-arm输出如下g-arm-linux-gnueabihf - GNU C co…...

阿里云一面经历

文章目录 ES 查询方式都有哪些?1 基于词项的查询term & terms 查询Fuzzy QueryWildcard Query2 基于全文的查询Match QueryQuery String QueryMatch Phrase Query3 复合查询Bool QueryElasticsearch 删除原理ES 大文章怎么存arthas 常用命令arthas 排查问题过程arthas 工作…...

Java Stream中 用List集合统计 求和 最大值 最小值 平均值

对集合数据的统计&#xff0c;是开发中常用的功能&#xff0c;掌握好Java Stream提供的方法&#xff0c;避免自己写代码统计&#xff0c;可以提高工作效率。 先造点数据&#xff1a; pigs.add(new Pig(1, "猪爸爸", 31, "M", false)); pigs.add(new Pig(…...

【Linux】多线程---线程控制

进程在前面已经讲过了&#xff0c;所以这次我们来讨论一下多线程。前言&#xff1a;线程的背景进程是Linux中资源及事物管理的基本单位&#xff0c;是系统进行资源分配和调度的一个独立单位。但是实现进程间通信需要借助操作系统中专门的通信机制&#xff0c;但是只这些机制将占…...

秒杀高并发解决方案

秒杀高并发解决方案 1.秒杀/高并发方案-介绍 秒杀/高并发 其实主要解决两个问题&#xff0c;一个是并发读&#xff0c;一个是并发写并发读的核心优化理念是尽量减少用户到 DB 来"读"数据&#xff0c;或者让他们读更少的数据, 并 发写的处理原则也一样针对秒杀系统需…...

【每日一题】蓝桥杯加练 | Day07

文章目录一、三角回文数1、问题描述2、解题思路3、AC代码一、三角回文数 原题链接&#xff1a;三角回文数 1、问题描述 对于正整数 n, 如果存在正整数 k 使得n123⋯k k(k1)2\frac{k(k1)}{2}2k(k1)​ , 则 n 称为三角数。例如, 66066 是一个三角数, 因为 66066123⋯363 。 如果一…...

条件语句(分支语句)——“Python”

各位CSDN的uu们你们好呀&#xff0c;最近总是感觉特别特别忙&#xff0c;但是却又不知道到底干了些什么&#xff0c;好像啥也没有做&#xff0c;还忙得莫名其妙&#xff0c;言归正传&#xff0c;今天&#xff0c;小雅兰的内容还是Python呀&#xff0c;介绍一些顺序结构的知识点…...

论文投稿指南——中文核心期刊推荐(国家财政)

【前言】 &#x1f680; 想发论文怎么办&#xff1f;手把手教你论文如何投稿&#xff01;那么&#xff0c;首先要搞懂投稿目标——论文期刊 &#x1f384; 在期刊论文的分布中&#xff0c;存在一种普遍现象&#xff1a;即对于某一特定的学科或专业来说&#xff0c;少数期刊所含…...

面向数据安全共享的联邦学习研究综述

开放隐私计算 摘 要&#xff1a;跨部门、跨地域、跨系统间的数据共享是充分发挥分布式数据价值的有效途径&#xff0c;但是现阶段日益严峻的数据安全威胁和严格的法律法规对数据共享造成了诸多挑战。联邦学习可以联合多个用户在不传输本地数据的情况下协同训练机器学习模型&am…...

Redis经典五种数据类型底层实现原理解析

目录总纲redis的k,v键值对新的三大类型五种经典数据类型redisObject结构图示结构讲解数据类型与数据结构关系图示string数据类型三大编码格式SDS详解代码结构为什么要重新设计源码解析三大编码格式hash数据类型ziplist和hashtable编码格式ziplist详解结构剖析ziplist的优势(为什…...

Jackson 返回前端的 Response结果字段大小问题

目录 1、问题产生的背景 2、出现的现象 3、解决方案 4、成果展现 5、总结 6、参考文章 1、问题产生的背景 因为本人最近工作相关的对接外部项目&#xff0c;在我们国内有很多程序员都是使用汉语拼音或者部分字母加上英文复合体定义返回实体VO&#xff0c;这样为了能够符合…...

每天五分钟机器学习:你理解贝叶斯公式吗?

本文重点 贝叶斯算法是机器学习算法中非常经典的算法,也是非常古老的一个算法,但是它至今仍然发挥着重大的作用,本节课程及其以后的专栏将会对贝叶斯算法来做一个简单的介绍。 贝叶斯公式 贝叶斯公式是由联合概率推导而来 其中p(Y|X)称为后验概率,P(Y)称为先验概率…...

C++的入门

C的关键字 C总计63个关键字&#xff0c;C语言32个关键字 命名空间 我们C的就是建立在C语言之上&#xff0c;但是是高于C语言的&#xff0c;将C语言的不足都弥补上了&#xff0c;而命名空间就是为了弥补C语言的不足。 看一下这个例子。在C语言中会报错 #include<stdio.h>…...

数据的存储

类型的意义&#xff1a;使用这个类型开辟内存空间的大小&#xff08;大小决定了使用范围&#xff09;如何看待内存空间视角类型的基本归类整型家族浮点数家族构造类型指针类型空类型整型存储解构:整型在计算机中占用四个字节&#xff0c;整型分为无符号整型和有符号整型在计算机…...

Linux查看UTC时间

先了解一下几个时间概念。 GMT时间&#xff1a;Greenwich Mean Time&#xff0c;格林尼治平时&#xff0c;又称格林尼治平均时间或格林尼治标准时间。是指位于英国伦敦郊区的皇家格林尼治天文台的标准时间。 GMT时间存在较大误差&#xff0c;因此不再被作为标准时间使用。现在…...

SpringBoot修改启动图标(详细步骤)

目录 一、介绍 二、操作步骤 三、介绍Java学习&#xff08;题外话&#xff09; 四、关于基础知识 一、介绍 修改图标就是在资源加载目录&#xff08;resources&#xff09;下放一个banner.txt文件。这样运行加载的时候就会扫描到这个文件&#xff0c;然后启动的时候就会显…...

【每日一题Day143】面试题 17.05. 字母与数字 | 前缀和+哈希表

面试题 17.05. 字母与数字 给定一个放有字母和数字的数组&#xff0c;找到最长的子数组&#xff0c;且包含的字母和数字的个数相同。 返回该子数组&#xff0c;若存在多个最长子数组&#xff0c;返回左端点下标值最小的子数组。若不存在这样的数组&#xff0c;返回一个空数组。…...

wordpress 获取缩略图/网站查询器

LAMP开发可以说非常流行了&#xff0c;稳定安全的Linux系统和apache服务器搭配轻量级的PHP、MYSQL可以说是完美组合。可以在效率和安全性等各个方面都比ASP.NET、JSP等动态语言优胜一筹。这也是php这么流行的原因之一。说到Linux&#xff0c;不得不说这是一个最好用的操作系统&…...

成都大学网站建设特色/广点通官网

这是一份开源安全项目清单&#xff0c;收集了一些比较优秀的开源安全项目&#xff0c;以帮助甲方安全从业人员构建企业安全能力。这些开源项目&#xff0c;每一个都在致力于解决一些安全问题。 项目收集的思路&#xff1a; 一个是关注互联网企业/团队的安全开源项目&#xff0c…...

网页作业怎么做一个网站/高质量外链购买

目录 动机 理解 简介 目的 为什么要进行显著性检验 怎么做显著性检验 白话举例 参考 动机 看到论文中使用显著性分析&#xff0c;于是先了解下什么是显著性检验&#xff0c;于是是通过T检验手段&#xff0c;具体可以看本人另一篇博客 【数学】T检验&#xff08;显著性…...

有没有专门做av字幕的网站/文员短期电脑培训

下面的内容是要给大家介绍eclipse将maven项目打包成jar包的方法&#xff0c;一起来看看整个过程是怎样的吧&#xff0c;希望对你的编程之路可以有所帮助哦。利用eclipse把项目打包成jar&#xff0c;放入服务器执行。1、首先的话在eclipse当中选中项目&#xff0c;右键Run As&am…...

水淼wordpress/上海优化排名网站

一、什么是观察者模式 观察者定义了一种一对多的依赖关系&#xff0c;当一个主题(Subject)对象状态发生变化时&#xff0c;所有依赖它的相关对象都会得到通知并且能够自动更新自己的状态&#xff0c;这些依赖的对象称之为观察者(Observer)对象这类似于发布/订阅模式。 观察者…...

海南美容网站建设/国家免费技能培训平台

--js与PHP同是一种弱类型语言 弱类型语言只是不显示表现 定义变量时系统自动给默认了所以在定义PHP的变量时可以定义变量如图1&#xff1a;---------PHP中的常用语句&#xff1a;$a10; 根据写的值&#xff0c;系统自动生成为int$b"hello";定义一个字符串&#xff1b…...