Flink系统知识讲解之:Flink内存管理详解
Flink系统知识讲解之:Flink内存管理详解
在现阶段,大部分开源的大数据计算引擎都是用Java或者是基于JVM的编程语言实现的,如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不用考虑底层,降低了程序员的门槛,JVM可以对代码进行深度优化,对内存资源进行管理,自动回收内存。但是自动内存管理的问题在于不可控,基于JVM的大数据引擎常常会面临一个问题,即在处理海量数据时,如何在内存中存储大量的数据。
自主内存管理
Flink从一开始就选择了自主的内存管理,避开了JVM内存管理在大数据场景下的问题,提升了计算效率。
1.JVM内存管理的不足
当需要将海量数据存储到内存中时,就不得不面对JVM存在的几个问题:
(1)有效数据密度低
Java的对象在内存中的存储包含3个主要部分:对象头、实例数据、对齐填充部分。32位和64位的虚拟机中对象头分别需要占32bit和64bit。实例数据时实际的数据存储。为了提高效率,内存中数据存储不是连续的,而是按照8 byte的整数倍进行存储。例如,只有一个boolean字段的类实例占16 byte:头信息8 byte,boolean 1 byte,为了对齐达到8的倍数会额外占用7 byte。这就导致在JVM中有效信息的存储密度很低。
(2)垃圾回收
JVM的内存回收机制的优点和缺点同样明显,优点是开发者无需关注资源回收问题,可以提高开发效率,减少内存泄漏的可能。但是内存回收是不可控的,在大数据计算的场景中,这个缺点被放大,TB、PB级的数据计算需要消耗大量的内存,在内存中产生海量的Java对象。一旦出现Full GC,GC会达到秒级甚至分钟级,直接影响执行效率。
GC带来的中断会使集群中的心跳超时,导致节点被踢出集群,整个集群进入不稳定状态。虽然通过JVM参数调优可以提升回收效率,尽量减少Full GC的发生,但是仍然不能避免这个问题,精确的调优也确实非常困难。
(3)OOM问题影响稳定性
OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的健壮性和性能都会受到影响。
(4)缓存未命中问题
CPU进行计算的时候,是从CPU缓存中获取数据,而不是直接从内存中获取数据。 CPU有分L1、L2、L3级缓存。L1小,一般为32KB,L3大,能达到32MB。缓存的理论基础是程序局部性原理,包括时间局部性和空间局部性:最近被CPU访问的数据,短期内CPU还要访问(时间);被CPU访问的数据附近的数据,CPU短期内还要访问(空间)。Java对象在堆上存储的时候并不是连续的,所以从内存中读取Java对象时,缓存的邻近的内存区域的数据往往不是CPU下一步计算所需要的,这就是缓冲未命中。此时CPU需要空转等待从内存中重新读取数据。CPU的速度和内存的速度之间差好几个数量级,导致CPU没有充分利用起来。如果数据没有在内存中,而是需要从磁盘上加载,那么执行效率就会变得惨不忍睹。
2.自主内存管理
因为直接使用JVM做内存管理在大数据场景下可能遇到的诸多问题,所以越来越多的大数据计算引擎选择自行管理JVM内存,如Spark、Flink、HBase,尽量达到C/C++一样的性能,同时避免OOM的发生。本章主要介绍Flink是如何解决上面的问题的,主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存等。
在Flink中,Java对象的有效信息被序列化为二进制数据流,在内存中连续存储,保存在预分配的内存块上,内存块叫做MemorySegment。MemorySegment是内存分配的最小单元,是一段固定长度的内存(默认大小为32KB)。同时,Flink为其提供了非常高效的读写方法,很多运算可以直接操作二进制数据,而不需要反序列化即可执行。
MemorySegment可以保存在堆上,其内部存储为一个Java byte数组,也可以保存在堆外的ByteBuffer中。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。
但使用堆上内存,仍然不是完全自主的内存管理,还存在以下问题:
1)超大内存(上百GB)JVM的启动需要很长时间,Full GC可以达到分钟级。使用堆外内存,可以将大量的数据保存在堆外,极大地减小堆内存,避免GC和内存溢出的问题。
2)高效的IO操作。堆外内存在写磁盘或网络传输时采用的是零拷贝,而堆上内存则至少需要1次内存复制。
3.堆外内存的不足之处
堆外内存提供了更好的性能和更可控的内存管理,但是也存在几个问题:
1)堆上内存的使用、监控、调试简单,堆外内存出现问题后的诊断则较为复杂。
2)Flink有时需要分配短生命周期的MemorySegment,在堆外内存上分配比在堆上内存开销更高。
3)在Flink的测试中,部分操作在堆外内存上会比堆上内存慢。
同时为了提供效率,Flink在计算中采用了DBMS的Sort和Join算法,直接操作二进制数据,避免数据反复序列化带来的开销。Flink的内部实现更像C/C++而非Java。
内存模型
内存布局
TaskManager是Flink中执行计算的核心组件,是用来运行用户代码的Java进程。其中,大量使用了堆外内存。
Flink TaskManager的简化和详细内存结构如下图所示:
简化内存模型:
详细内存模型:
基于文初提及的使用JVM堆上内存的一些不足之处,Flink设计了使用堆外内存的自主内存管理。因此,Flink任务进程的总内存(Total Process Memory, TPM)= Flink自身使用的内存(Total Flink Memory, TFM) + JVM运行额外的内存(如Metaspace、overhead)。其中,Flink自身使用的内存(TFM)包括了JVM堆内存和自主管理的堆外内存,堆外内存又包含了托管内存和直接内存。下面分别对这些分类进行介绍:
JVM Heap
Framework Heap
这部分内存主要由Flink框架自身使用,用于存储系统级别的数据结构,包括Flink框架在运行期间需要的一些数据结构,例如任务的线程栈内存和其他Flink框架的基础设施。例如用于JobManager和TaskManager的RPC消息、管理检查点的元数据等。它是作业执行所必需的基本内存,独立于用户程序和运行期间的数据存储。
Task Heap
这部分内存主要用于存储由用户函数创建的Java对象和用户函数操作的数据。例如,当执行一个map操作,您的函数可能会创建一些新的Java对象,这些对象都是在JVM堆内存中创建和管理的。如果Flink的托管内存配置为堆内,那么Flink的排序、哈希和状态后端操作也会使用到Task Heap内存。
Off-Heap
托管内存(Managed Memory)
托管内存是由 Flink 负责分配和管理的本地(堆外)内存。 以下场景需要使用托管内存:
- 流处理作业中用于 RocksDB State Backend。
- 流处理和批处理作业中用于排序、哈希表及缓存中间结果。
- 流处理和批处理作业中用于在 Python 进程中执行用户自定义函数。
更具体的,对于当作业中使用排序、哈希表及缓存中间结果时,Flink是如何使用托管内存的: - 排序:例如,当你需要对一个非常大的数据集进行排序时,如果数据无法完全装入内存,Flink 就会使用其托管内存来执行外排序。在外排序过程中,数据会被分割成可以装入内存的小块,每个小块内部进行排序,然后将排序后的小块写入磁盘。当所有小块都进行了排序和写入后,Flink 会从磁盘读取这些小块,执行归并排序,直到所有数据都被排序。
- 哈希表:Flink 在处理连接(Join)操作时,经常需要使用哈希表来维护到目前为止已经看到的数据记录。如果不能将所有数据装入内存,Flink 就会使用托管内存来存储这个哈希表。这样就可以保证即使在处理大规模数据时也能保持良好的性能。
- 缓存中间结果:在一些需要多遍扫描数据的算法(比如迭代算法)中,Flink 会缓存数据的中间结果,以便下一轮迭代可以重复使用,这样可以减少数据重复读取的开销。Flink 托管内存就是用来存储这些中间结果的。
另外,当使用 RocksDB 作为状态后端时,Flink 托管内存主要被用作 RocksDB 的写缓冲区(Write Buffer)和读缓存(Block Cache),从而提高状态访问的速度。
简言之,Flink 的托管内存主要用于存储在处理过程中需要存储的中间计算数据和结果,以求在充分利用有限内存资源的同时提供尽可能高的处理速度。
直接内存()
直接内存通常指的是被Flink进程直接从操作系统中申请的、不受Java堆内存垃圾回收器管理的内存。 以下场景需要使用直接内存:
- 于网络通信和文件I/O,
- 通过网络缓冲池进行数据交换(如shuffle)
- 数据缓冲以及序列化/反序列化中进行应用。
Flink通过直接内存技术进行数据交换可以有效避免频繁的Java堆内存和本地I/O缓存之间的数据复制(利用零拷贝技术),从而提高性能。
在一些情况下,直接内存也可以利用在某些需要大量内存并希望避免频繁触发垃圾回收的处理中,例如当使用RocksDB作为状态后端时,RocksDB的本地内存通常是由直接内存提供的,这样可以避免状态数据引起的Java堆内存的显著增加,从而降低了垃圾收集的开销和提高了性能。
直接内存包括了以下几部分:
- Framework Off-Heap:这部分内存被Flink框架用于框架自身的一些运行需求。比如,Flink的一些本地数据结构和算法可能会使用这部分内存进行操作。这部分内存一般不大。
- Task Off-Heap:这部分内存主要用于存储由用户任务产生的、并由Flink以某种形式管理的内存。比如,如果你配置了本地状态后端(如RocksDB)使用堆外内存,那么这部分内存将存储状态数据。这部分内存的使用可以避免引起频繁的Java GC操作,提高性能。
- Network:此部分内存主要用于网络通信中的缓冲区。Flink通过此缓冲区在TaskManager之间发送和接收数据。这部分内存通常是直接内存,不受GC的影响,可以有效地进行数据交换和缓冲以提高网络通信的性能。
另外,除了Flink使用的总内存(Total Flink Memory,TFM)外,总进程内存(Total Process Memory,TPM)还包括了JVM元空间(Metaspace)和其他开销内存(overhead)。在JVM内存模型中,将元空间从堆内存独立出来了,所以在上面的内存模型中也元空间也是单独一部分,外加一些JVM运行时的额外开销内存,例如线程栈、代码缓存、GC回收空间等等。
Flink内存模型分类配置参数
1.Flink使用的内存
(1)JVM堆上内存
- 框架堆上内存Framework Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆上内存,不计入slot的资源中。
配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。 - Task堆上内存Task Heap Memory。Task执行用户代码时所使用的堆上内存。
配置参数:taskmanager.memory.task.heap.size
(2)JVM堆外内存 - 框架堆外内存Framework Off-Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆外内存,不计入slot的资源。
配置参数:taskmanager.memory.framework.off-heap.size = 128MB,默认128MB。 - Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。
配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0. - 网络缓冲内存Network Memory。网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区(Network Buffer,后面回介绍)。
配置参数:taskmanager.memory.network.(min/max/fraction),默认min=64MB,max=1GB,fraction=0.1。 - 堆外托管内存 Managed Memory。Flink管理的堆外内存。
配置参数:taskmanager.memory.managed.[size|fraction],默认fraction = 0.4。
2.JVM本身使用的内存
JVM本身直接使用了操作系统的内存。
- JVM元空间
JVM元空间所使用的内存
配置参数:taskmanager.memory.jvm-metaspace=96M,默认96MB。 - JVM执行开销
JVM在执行时自身所需要的内容,包括线程栈、IO、编译缓存等所使用的内存。
配置参数:taskmanager.memory.jvm-overhead.[min|max|fraction]。默认min=192MB,max=1GB,fraction=0.1。 - 总体内存
(1)Flink使用内存
综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size参数进行控制。
(2)进程使用内存
整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。
JVM内存控制参数如下所示:
1)JVM堆上内存,使用-Xmx和-Xms参数进行控制。
2)JVM直接内存,使用参数-XX:MaxDirectMemorySize进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受该参数控制。
3)JVM Metaspace使用-XX:MaxMetaspaceSize进行控制。
内存计算
目前的实现中,在JVM启动之前就需要确定各个内存区块的大小。一旦JVM启动了,在TaskManager进程内部就不再重新计算。Flink中有两个地方进行内存大小计算:
- 在Standalone部署模式下,内存的计算在启动脚本中实现。
- 在容器环境下(Yarn、K8s、Mesos),计算在ResourceManager中进行。
在启动脚本与容器环境下的内存大小计算都调用了Flink的Java代码时间,保证了所有部署模式下的统一,计算好的参数使用-D参数提交给Java进程。
计算时,需要配置如下3个参数组合中的至少1个:
(1)Task的堆上内存和托管内存
如果手动配置了网络缓冲区内存大小,则使用该参数。如果没有明确配置,则使用分配系数fraction ✖️总体Flink使用内存计算网络缓冲区内存大小。
(2)总体Flink使用内存
如果配置了该选项,而没有配置(1),则从整体Flink内存中划分网络缓冲区内存和托管内存,剩余的内存作为Task堆上内存。
如果手动设置了网络缓冲内存,则使用其值,否则使用默认的分配系统fraction✖️总体Flink内存。
(3)总体进程使用内存
如果只配置了总体进程使用内存,则从整体进程中扣除JVM元空间和JVM执行开销内存,剩余的内存作为总体Flink使用内存。
内存数据结构
Flink的内存管理像操作系统管理内存一样,将内存划分为内存段、内存页等结构。
内存段
内存段在Flink内存叫做MemorySegment,是Flink的内存抽象的最小分配单元。 默认情况下,一个MemorySegment对应着一个32KB大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer)。
MemorySegment同时也提供了堆二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数据byte[]后写入。
MemorySegment结构
为了更清晰地理解MemorySegment,下面一起看一下MemorySegment的关键属性。
1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。
2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是就是Big Endian模式。
3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用堆外内存,则为null。
4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为堆外内存的绝对地址)。
5)addressLimit:标识地址结束位置(address+size)。
6)size:内存段的字节数。
结构类图如下所示:
可以看到,MemorySegment类定义了一系列的方法来对字节和基本数据类型,如int、long、float进行读写的方法(例如get和put方法)。同时,还支持批量操作,例如复制和比较操作,它提供了copy和compare方法用来对大量数据进行操作。
Flink的MemorySegment主要用于Flink框架对内存的管理和数据的处理,主要用在它的网络缓冲、排序算法和内存状态后端等地方,以提供高效的内存操作。在设计上,MemorySegment主要实现了以下几点:
- 高效的数据访问。 MemorySegment类包含了一个连续的字节数组(heapMemory),用于存储实际的二进制数据。所有的get和put操作都是在这个字节数组上执行的。使用连续的字节数组的好处正是我们上文提到的,可以充分利用程序的局部性原理,因此,Flink使用MemorySegment作为其最小的内存分配单元,保证了读写数据时,相邻的数据能够一起被加载到CPU缓存中,提升处理性能。
- 高效的内存管理。 Flink通过MemorySegment对内存进行管理,保证了Flink程序运行时的内存效率。例如,对于Flink的网络缓冲、排序算法和内存状态后端等地方,都会使用MemorySegment进行内存的分配和回收。这有助于Flink高效地使用内存,而且避免了一些Java内存管理中常见的问题,如垃圾收集(Garbage Collection)过频繁等。
- **支持堆外内存(off-heap memory)操作。**这意味着,除了在JVM堆内存上操作,MemorySegment还能直接在系统的物理内存上进行操作。使用离堆内存可以避免频繁的垃圾回收,提高数据处理的性能。
另外,需要注意的是,MemorySegment抽象类中的heapMemory仅适用于“堆”内存段,即哪些将数据存储在Java堆上的内存段。对于“非堆”内存段,即哪些将数据存储在Java堆之外的内存段,Flink使用java.nio.DirectByteBuffer的字节缓冲区(定义在HybridMemorySegment类中)来存储和操作数据。
最后再简要谈一下MemorySegment,如果觉得理解起来比较抽象的话,可以跟其他的一些数据结构类如ArrayList、LinkedList一起来对比理解,这些类都是定义的用来表示数据如何在内存中存储和管理的。ArrayList是划分一块连续的内存地址,LinkedList是用链表的结构来存储,而MemorySegment就是划分一块指定大小的连续内存地址来存储字节数据。上层的模块可以直接对MemorySegment进行操作,就相当于对平时对ArrayList、LinkedList这些结构的操作(比如插入、排序、比较等)。因此,MemorySegment就是Flink定义的一种数据结构,用来方便地存储、管理和操作内存数据。
字节顺序Big Endian和Little Endian
字节顺序是指字节类型的数据在内存中的存放顺序。不同的CPU架构体系使用不同的存储顺序。PowerPC系统采用Big Endian方式存储数据,低地址存放最高有效字节(MSB),而x86系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB),如下图所示:
MemorySegment实现
Flink的MemorySegment有堆上和堆外两种实现,其类体系结构如图所示:
HeapMemorySegment用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。
之所以在后续的版本中只使用HybridMemorySegment,涉及了JIT编译优化的问题。如果同时使用了两个类,那么在运行的时候,每一次调用都需要去查询函数表,确定调用哪个子类中的方法,无法提前优化。但是如果只使用一个类,那么JIT编译时,自动识别方法的调用都可以被去虚拟化(de-virtualized)和内联(inlined),可以极大地提高性能。调用越频繁,优化效果就越好。
内存页
MemorySegment是Flink内存分配的最小单元,对于跨MemorySegment保存的数据,如果需要上层的使用者,需要考虑所有的细节,非常繁琐。所以Flink又抽象了一层,叫做内存页。内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。 有了这一层,上层使用者无需关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
DataInputView
DataInputView是从MemorySegment数据读取的抽象视图,该视图可用于顺序读取内存的内容。继承自java.io.DataInput,提供了从二进制流中读取不同数据类型的方法。如下图所示:
InputView中持有多个MemorySegment的饮用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。DataInputView主要提供了一系列接口用于从数据输入流中读取数据,而MemorySegment则主要用于在连续的内存块上进行数据的低层次操作。
在Flink的网络缓冲,排序,哈希表等操作中,MemorySegment用作持有真实数据的内存块。而DataInputView则提供了读取这些数据的接口,以方便地从MemorySegment读取所需的数据。
在实际的数据编解码过程中,常常需要将DataInputView与MemorySegment一起使用。例如,一个典型的使用场景是在网络数据传输中,Flink会首先将数据保存在MemorySegment中,然后通过实现DataInputView的方式,来进行数据的读取和解码。
基本上所有的InputView实现类都继承了AbstractPageInputView抽象类,也就是所所有的InputView实现类都支持Page。
DataOutputView
DataOutputView是数据写入MemorySegment的抽象视图,继承自java.io.DataOutput,提供了将不同类型的数据写入二进制流的一系列方法。同样,DataOutputView中持有一个或者多个MemorySegment的引用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。
DataOutputView的接口继承关系如图所示:
在实际的数据编码和写入过程中,Flink通常会利用一个DataOutputView的实现将数据写入一个或多个MemorySegment。例如,在网络数据发送时,Flink会通过实现DataOutputView的方式,将数据写入MemorySegment,然后将这些MemorySegment添加到网络缓冲区以准备发送。
基本上所有的OutputView实现类都继承了AbstractPageOutputView抽象类,也就是说所有的OutputView实现类都支持跨MemorySegment写入。
内存页的使用
对内存的读取写入操作是非常底层的行为,对于上层应用(DataStream作业)而言,涉及向MemorySegment写入,读取二进制的地方都使用到了DataOutputView和DataInputView,而不是直接使用MemorySegment。
例如,在flink-table-runtime-blink中,BinaryRowSerializer中使用AbstractPagedInputView从MemorySegment中读取二进制数据并转换成BinaryRow,使用AbstractPagedOutputView将BinaryRow写入MemorySegment中。
Buffer
Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。 Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了1个MemorySegment,实际的数据就存储在这个MemorySegment中,并引入了一些额外的元数据,例如数据大小(currentSize属性)以及Buffer中包含的数据类型(dataType属性)等。
此外,Buffer还提供了内存的引用计数和递增/递减的方法,用于在资源回收时管理内存。
简单来说,Buffer是基于MemorySegment的,它在MemorySegment上增加了一些用于网络传输和内存管理的额外功能。
Buffer接口的类体系如图所示:
Buffer的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了“引用数”的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。
具体来说,在Apache Flink中,Buffer对象具有一个“引用数(Reference Count)”的属性,它是用来跟踪Buffer实例在系统中被多少组件引用的指标。每当一个组件获取对Buffer的引用时,引用数就会增加。当组件完成对Buffer的使用并且不再需要它时,就会减少引用数。
这种设计的目的是为了更好地管理和复用内存资源。当Buffer的引用数降为0,就表示没有任何组件再使用该Buffer,它的内存可以归还给MemorySegment池,以便其他组件复用。引用数在内存管理中是一种常见的机制,能够避免不必要的对象复制和频繁的内存分配和释放,在Flink的Buffer管理中起到了重要的作用。例如,在数据移交过程中,可能有多个线程或模块同时处理同一个Buffer,此时通过引用数可以准确判断什么时候可以安全地释放该Buffer。
NetworkBuffer同时继承了AbstractReferenceCountedByteBuf。
AbstractReferenceCountedByteBuf是Netty中的抽象类,通过继承该类,Flink中Buffer具备了引用计数的能力,并且实现了对MemorySegment的读写。感兴趣的读者可以去了解一下Netty。
Buffer资源池
Buffer资源池在Flink中叫做BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,其实现类是LocalBufferPool。
BufferPool的类体系如图所示:
为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供BufferPod的创建和销毁,其唯一的实现类是NetworkBufferPool。
每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferPool,为其分配内存。
NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。
内存管理器
**MemoryManager是Flink中管理托管内存的组件,其管理的托管内存只使用堆外内存。**在批处理中用在排序、Hash表和中间结果的缓冲中,在流计算中作为RocksDBStateBackend的内存。
在Flink 1.10之前的版本中,MemoryManager负责TaskManager的所有内存。1.10版本中,MemoryManager的管理范围缩小为Slot级别,即为Task管理内容,TaskManager为每个Slot分配相同的内容,Task不能使用超过其Slot分配的资源。
MemoryManager主要通过内部接口MemoryPool来管理所有的MemorySegment。托管内存的管理相比于Network Buffers的管理更为简单,因为不需要Buffer的那一层封装。
内存申请
批处理计算任务中,MemorySegment负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer,代码如下所示,
# MemorySegmentFactory类
/*** Allocates an off-heap unsafe memory and creates a new memory segment to represent that* memory.** <p>Creation of this segment schedules its memory freeing operation when its java wrapping* object is about to be garbage collected, similar to {@link* java.nio.DirectByteBuffer#DirectByteBuffer(int)}. The difference is that this memory* allocation is out of option -XX:MaxDirectMemorySize limitation.** @param size The size of the off-heap unsafe memory segment to allocate.* @param owner The owner to associate with the off-heap unsafe memory segment.* @param gcCleanupAction A custom action to run upon calling GC cleaner.* @return A new memory segment, backed by off-heap unsafe memory.*/public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner, Runnable gcCleanupAction) {long address = MemoryUtils.allocateUnsafe(size);ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);Runnable cleaner =MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);// 在申请内存的时候,同时为该内存片段准备好内存清理return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);}
使用Unsafe申请堆外内存,包装为ByteBuffer后再包装为MemorySegment。
流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache和WriterBufferManager参数来限制,参数的具体值从TaskManager的内存配置参数中计算而来。RocksDB自己来负责运行过程中的内存申请和内存释放,如下述代码所示:
/*** Acquires a shared resource, identified by a type string. If the resource already exists, this* returns a descriptor to the resource. If the resource does not yet exist, the method* initializes a new resource using the initializer function and given size.** <p>The resource opaque, meaning the memory manager does not understand its structure.** <p>The OpaqueMemoryResource object returned from this method must be closed once not used any* further. Once all acquisitions have closed the object, the resource itself is closed.*/public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(String type, LongFunctionWithException<T, Exception> initializer, long numBytes)throws Exception {// This object identifies the lease in this request. It is used only to identify the release// operation.// Using the object to represent the lease is a bit nicer safer than just using a reference// counter.final Object leaseHolder = new Object();final SharedResources.ResourceAndSize<T> resource =sharedResources.getOrAllocateSharedResource(type, leaseHolder, initializer, numBytes);// 创建资源释放函数final ThrowingRunnable<Exception> disposer =() -> sharedResources.release(type, leaseHolder);return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);}
内存释放
Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种:
- 内存使用完毕
- Task停止(正常或异常)执行。
在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个Java Cleaner。
LegacyCleanerProvider
该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sum.misc.Cleaner来释放内存。
Java9CleanerProvider
该CleanerProvider提供1.9及以上版本的JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。
JavaGcCleanerWrapper会为每个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。
当MemoryManager关闭的时候会对所有申请的MemorySegment进行释放,交还给操作系统。
网络缓冲器
网络缓冲器(Network Buffer)是网络交换数据的封装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。
BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer。BufferConsumer位于下游Task中,负责从MemorySegment中读取数据。1个BufferBuilder对应1个BufferConsumer。
内存申请
LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数据之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数量总和,则会发生错误。
Buffer申请
结果分区(ResultPartition)申请Buffer进行数据写入时,如下代码所示:
LocalBufferPool首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBuffer中申请,如果没有,则阻塞等待可用的MemorySegment,如下代码所示:
MemorySegment申请
申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment,如下代码若是:
内存回收
Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数+1,每个Buffer被消费完了,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。
Buffer回收
前面介绍过Buffer的主要实现类是NetworkBuffer,同时继承了AbstractReferenceCountedByteBuf.。当Buffer被消费一次后,就会对Buffer的引用计数-1,如下代码所示:
Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager的级别内存不足,才会释放回TaskManager持有的全局资源池。
释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。
MemorySegment释放
当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。
总结
大数据场景下,使用Java的内存管理会带来一系列的问题,所以Flink从一开始就选择自主内存管理。为了实现内存管理,Flink对内存进行了一系列的抽象,内存段MemorySegment是最小的内存分配单元,对于跨段的内存访问,Flink抽象了DataInputView和DataOutputView,可以看作是内存页。
Flink在1.10版本重构了其TaskManager的内存管理模型,主要分为堆上内存和堆外内存,并简化了内存参数。在计算层面上,Flink的内存管理器提供了对内存的申请和释放,在数据传输层面上,Flink抽象了网络内存缓存Buffer(1个Buffer对应一个MemorySegment)的申请和释放。
相关文章:
Flink系统知识讲解之:Flink内存管理详解
Flink系统知识讲解之:Flink内存管理详解 在现阶段,大部分开源的大数据计算引擎都是用Java或者是基于JVM的编程语言实现的,如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不用考虑底层,降低了程…...
使用JMeter模拟多IP发送请求!
你是否曾遇到过这样的场景:使用 JMeter 进行压力测试时,单一 IP 被服务器限流或者屏蔽?这时,如何让 JMeter 模拟多个 IP 发送请求,成功突破测试限制,成为测试工程师必须攻克的难题。今天,我们就…...
【Ubuntu与Linux操作系统:六、软件包管理】
第6章 软件包管理 6.1 Linux软件安装基础 Linux的软件包是以二进制或源码形式发布的程序集合,包含程序文件和元数据。软件包管理器是Linux系统的重要工具,用于安装、更新和卸载软件。 1. 常见的软件包管理器: DEB 系统(如Ubunt…...
【数据结构-堆】力扣1834. 单线程 CPU
给你一个二维数组 tasks ,用于表示 n 项从 0 到 n - 1 编号的任务。其中 tasks[i] [enqueueTimei, processingTimei] 意味着第 i 项任务将会于 enqueueTimei 时进入任务队列,需要 processingTimei 的时长完成执行。 现…...
【前端动效】原生js实现拖拽排课效果
目录 1. 效果展示 2. 效果分析 2.1 关键点 2.2 实现方法 3. 代码实现 3.1 html部分 3.2 css部分 3.3 js部分 3.4 完整代码 4. 总结 1. 效果展示 如图所示,页面左侧有一个包含不同课程(如语文、数学等)的列表,页面右侧…...
C#使用OpenTK绘制3D可拖动旋转图形三棱锥
接上篇,绘制着色矩形 C#使用OpenTK绘制一个着色矩形-CSDN博客 上一篇安装OpenTK.GLControl后,这里可以直接拖动控件GLControl 我们会发现GLControl继承于UserControl //// 摘要:// OpenGL-aware WinForms control. The WinForms designer will always call the default//…...
排序的本质、数据类型及算法选择
排序的本质、数据类型及算法选择 一、排序的本质二、排序的数据类型三、排序算法的选择依据 前两天老金写了篇 “十大排序简介”,有点意犹未尽,这一回老金想把排序连根拔起,从排序的本质说道说道。 一、排序的本质 从字面上理解,…...
Python的列表基础知识点(超详细流程)
目录 一、环境搭建 二、列表 2.1 详情 2.2 列表定义 2.3 列表长度 2.4 列表索引 2.5 切片索引 2.6 添加 2.7 插入 2.8 剔除 2.8.1 pop方法 2.8.2 del方法 2.9 任何数据类型 2.10 拼接 2.10.1 “” 2.10.2 “*” 2.11 逆序 编辑 2.12 计算出现次数 2.13 排序…...
HarmonyOS鸿蒙开发 弹窗及加载中指示器HUD功能实现
HarmonyOS鸿蒙开发 弹窗及加载中指示器HUD功能实现 最近在学习鸿蒙开发过程中,阅读了官方文档,在之前做flutter时候,经常使用overlay,使用OverlayEntry加入到overlayState来做添加悬浮按钮、提示弹窗、加载中指示器、加载失败的t…...
【Ubuntu与Linux操作系统:一、Ubuntu安装与基本使用】
第1章 Ubuntu安装与基本使用 1.1 Linux与Ubuntu Linux是一种开源、类Unix操作系统内核,拥有高稳定性和强大的网络功能。由于其开源性和灵活性,Linux被广泛应用于服务器、嵌入式设备以及桌面环境中。 Ubuntu是基于Debian的一个流行Linux发行版…...
React 元素渲染
React 元素渲染 React 是一个用于构建用户界面的 JavaScript 库,它允许开发人员创建大型应用程序,这些应用程序可以随着时间的推移而高效地更新和渲染。React 的核心概念之一是元素渲染,它描述了如何将 JavaScript 对象转换为 DOM࿰…...
【2024年华为OD机试】 (C卷,100分)- 括号匹配(Java JS PythonC/C++)
一、问题描述 题目描述 给定一个字符串,里边可能包含“()”、“[]”、“{}”三种括号,请编写程序检查该字符串中的括号是否成对出现,且嵌套关系正确。 若括号成对出现且嵌套关系正确,或该字符串中无括号字符,输出&am…...
解锁企业数字化转型新力量:OpenCoze(开源扣子)
在当今数字化浪潮席卷之下,企业对于高效管理和协同运作的需求愈发迫切,而开源技术正逐渐成为众多企业破局的关键利器。今天,想给大家介绍一款极具潜力的开源项目 ——OpenCoze,中文名称 “开源扣子”。 一、OpenCoze 是什么&…...
【网络云SRE运维开发】2025第2周-每日【2025/01/12】小测-【第12章 rip路由协议】理论和实操考试题解析
文章目录 选择题答案及解析理论题答案及解析实操题答案及解析下一步进阶 选择题答案及解析 RIP路由协议是基于哪种算法的动态路由协议? 答案:B. 距离矢量算法解析:链路状态算法用于OSPF等协议;最小生成树算法主要用于生成树协议&…...
【微服务】8、分布式事务 ( XA 和 AT )
文章目录 利用Seata解决分布式事务问题(XA模式)AT模式1. AT模式原理引入2. AT模式执行流程与XA模式对比3. AT模式性能优势及潜在问题4. AT模式数据一致性解决方案5. AT模式一阶段操作总结6. AT模式二阶段操作分析7. AT模式整体特点8. AT模式与XA模式对比…...
CVE-2025-22777 (CVSS 9.8):WordPress | GiveWP 插件的严重漏洞
漏洞描述 GiveWP 插件中发现了一个严重漏洞,该插件是 WordPress 最广泛使用的在线捐赠和筹款工具之一。该漏洞的编号为 CVE-2025-22777,CVSS 评分为 9.8,表明其严重性。 GiveWP 插件拥有超过 100,000 个活跃安装,为全球无数捐赠平…...
TypeScript Jest 单元测试 搭建
NPM TypeScript 项目搭建 创建目录 mkdir mockprojectcd mockproject初始化NPM项目 npm init -y安装TypeScript npm i -D typescript使用VSCode 打开项目 创建TS配置文件tsconfig.json {"compilerOptions": {"target": "es5","module&…...
基于 SSH 的任务调度系统
文末附有完整项目代码 在当今科技飞速发展的时代,任务调度系统的重要性日益凸显。本文将详细介绍一个基于 SSH(SpringStruts2Hibernate)的任务调度系统的设计与实现。 一、系统概述 本系统旨在改变传统人工任务调度方式,通过计算…...
filestream安装使用全套+filebeat的模块用法
1 filestream介绍 官方宣布:输入类型为log在filebeat7.16版本已经弃用了 Filestream 是 Filebeat 中的一种 输入类型(Input),用于处理日志文件的读取。它是为了取代 Filebeat 中传统的 log 输入(Input)设…...
java项目之房屋租赁系统源码(springboot+mysql+vue)
项目简介 房屋租赁系统实现了以下功能: 房屋租赁系统的主要使用者分为: 系统管理:个人中心、房屋信息管理、预约看房管理、合同信息管理、房屋报修管理、维修处理管理、房屋评价管理等模块的查看及相应操作; 房屋信息管理&#…...
sap mm学习笔记
1. 业务流程 2. 组织架构 3. 物料主数据 4.采购主数据 5. 采购管理 6. 库存管理 7.物料主数据 8. 采购申请 ME51N...
代码随想录_链表
代码随想录02 链表 203.移除链表元素 力扣题目链接(opens new window) 题意:删除链表中等于给定值 val 的所有节点。 示例 1: 输入:head [1,2,6,3,4,5,6], val 6 输出:[1,2,3,4,5] 示例 2: 输入:he…...
EF Code 并发控制
【悲观控制】 不推荐用,EF Core 没有封装悲观并发控制的使用,需要使用原生Sql来使用悲观并发控制 一般使用行锁、表锁等排他锁对资源进行锁定,同时只有一个使用者操作被锁定的资源 拿sql server举例,可以使用表所、或者行所解决…...
ceph fs status 输出详解
ceph fs status 命令用于显示 Ceph 文件系统的状态信息,其中各列的含义如下: RANK:元数据服务器(MDS)的等级或标识符。 STATE:MDS 的当前状态,例如 active(活跃)、stan…...
FFmpeg Muxer HLS
使用FFmpeg命令来研究它对HLS协议的支持程度是最好的方法: ffmpeg -h muxerhls Muxer HLS Muxer hls [Apple HTTP Live Streaming]:Common extensions: m3u8.Default video codec: h264.Default audio codec: aac.Default subtitle codec: webvtt. 这里面告诉我…...
如何用SQL语句来查询表或索引的行存/列存存储方式|OceanBase 用户问题集锦
一、问题背景 自OceanBase 4.3.0版本起,支持了列存引擎,允许表和索引以行存、纯列存或行列冗余的形式创建,且这些存储方式可以自由组合。除了使用 show create table命令来查看表和索引的存储类型外,也有用户询问如何通过SQL语句…...
回归预测 | MATLAB实GRU多输入单输出回归预测
回归预测 | MATLAB实GRU多输入单输出回归预测 目录 回归预测 | MATLAB实GRU多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 回归预测 | MATLAB实GRU多输入单输出回归预测。使用GRU作为RNN的一种变体来处理时间序列数据。GRU相比传统的RNN有较好的记…...
【OpenGL/Assimp】渲染模型、半透明材质与封装光源
文章目录 渲染成果Assimp库准备:Mesh类修改:透明贴图使用:光源封装:使用方式在如下测试环境中: 渲染成果 Assimp库准备: 从GitHub拉取源码,根据网络教程,借助CMake生成VS工程项目&a…...
pandas与sql对应关系【帮助sql使用者快速上手pandas】
本页旨在提供一些如何使用pandas执行各种SQL操作的示例,来帮助SQL使用者快速上手使用pandas。 目录 SQL语法一、选择SELECT1、选择2、添加计算列 二、连接JOIN ON1、内连接2、左外连接3、右外连接4、全外连接 三、过滤WHERE1、AND2、OR3、IS NULL4、IS NOT NULL5、B…...
Linux WEB漏洞
定义:Linux Web 漏洞是指在基于 Linux 操作系统的 Web 应用程序、Web 服务器软件或者相关的网络服务配置中存在的安全弱点。这些漏洞可能导致攻击者未经授权访问敏感信息、篡改网页内容、执行恶意代码,甚至完全控制服务器。 常见类型及原理 SQL 注入漏…...
网站制作详细报价表/软文营销代理
文章目录重点职责技术团队重点 你对在这里工作最满意的地方是?你为什么留在这家公司? 该职位为何会空缺?公司为什么在招人?(产品发展 / 新产品 / 波动…) 你最开始为什么选择了这家公司?公司如何保证人才…...
wordpress 拍卖 主题/黄山seo推广
因为 equals() 和 hashCode() 方法是相互关联的。当一个类重写了 equals() 方法时,通常也需要重写 hashCode() 方法,以维护一致性。如果两个对象相等,那么它们的 hashCode() 方法应该返回相同的值。因此,在重写 equals() 方法后不…...
如何建网站教程视频/公司网络优化方案
Java类的初始化顺序: 若有继承,先初始化父类中的静态和子类中的静态,再初始化父类和子类的构造方法 静态变量(带有static的变量)或者静态代码区—>类中new对象的代码//构造函数 静态最优先,成员变量次…...
自己做的网站怎样才有网址浏览/橘子seo查询
llvm如何使用vc编译器本系列的其他文章 在“ 使用LLVM框架创建可用的编译器”系列中查看更多文章。 本系列的第一篇文章探讨了LLVM中间表示(IR)。 您是手工制作的“ Hello World”测试程序; 学习了LLVM的一些细微差别,例如类型转…...
wordpress 每日替换/网站优化推广培训
关注、星标嵌入式客栈,精彩及时送达[导读] 你是否被要求写的代码需要0 erros, 0 warnings? 或者你的项目是否需要做静态代码检测?0 erros是一定要做到,而0 wanring有时候会让你抓狂。前面转了一篇数组内包含头文件的写法,看起来对…...
市住房官方建设委网站/今天合肥刚刚发生的重大新闻
练手nginx反向代理和负载均衡apache实战 先说下原理性的 什么是反向代理 用户访问域名 域名的指向到nginx nginx把请求转发到apache apache处理后 返回给用户 整套的逻辑 对于用户来说 就是访问域名 然后返回 没啥感觉。 正向代理 就是用户访问 代理服务器 然后代理服务…...