当前位置: 首页 > news >正文

网站cms系统哪个好用网络营销外包专员

网站cms系统哪个好用,网络营销外包专员,建设个人博客网站制作,wordpress主题4mudiFlink系统知识讲解之#xff1a;Flink内存管理详解 在现阶段#xff0c;大部分开源的大数据计算引擎都是用Java或者是基于JVM的编程语言实现的#xff0c;如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不用考虑底层#xff0c;降低了程…Flink系统知识讲解之Flink内存管理详解 在现阶段大部分开源的大数据计算引擎都是用Java或者是基于JVM的编程语言实现的如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不用考虑底层降低了程序员的门槛JVM可以对代码进行深度优化对内存资源进行管理自动回收内存。但是自动内存管理的问题在于不可控基于JVM的大数据引擎常常会面临一个问题即在处理海量数据时如何在内存中存储大量的数据。 自主内存管理 Flink从一开始就选择了自主的内存管理避开了JVM内存管理在大数据场景下的问题提升了计算效率。 1.JVM内存管理的不足 当需要将海量数据存储到内存中时就不得不面对JVM存在的几个问题 1有效数据密度低 Java的对象在内存中的存储包含3个主要部分对象头、实例数据、对齐填充部分。32位和64位的虚拟机中对象头分别需要占32bit和64bit。实例数据时实际的数据存储。为了提高效率内存中数据存储不是连续的而是按照8 byte的整数倍进行存储。例如只有一个boolean字段的类实例占16 byte头信息8 byteboolean 1 byte为了对齐达到8的倍数会额外占用7 byte。这就导致在JVM中有效信息的存储密度很低。 2垃圾回收 JVM的内存回收机制的优点和缺点同样明显优点是开发者无需关注资源回收问题可以提高开发效率减少内存泄漏的可能。但是内存回收是不可控的在大数据计算的场景中这个缺点被放大TB、PB级的数据计算需要消耗大量的内存在内存中产生海量的Java对象。一旦出现Full GCGC会达到秒级甚至分钟级直接影响执行效率。 GC带来的中断会使集群中的心跳超时导致节点被踢出集群整个集群进入不稳定状态。虽然通过JVM参数调优可以提升回收效率尽量减少Full GC的发生但是仍然不能避免这个问题精确的调优也确实非常困难。 3OOM问题影响稳定性 OutOfMemoryError是分布式计算框架经常会遇到的问题当JVM中所有对象大小超过分配给JVM的内存大小时就会发生OutOfMemoryError错误导致JVM崩溃分布式框架的健壮性和性能都会受到影响。 4缓存未命中问题 CPU进行计算的时候是从CPU缓存中获取数据而不是直接从内存中获取数据。 CPU有分L1、L2、L3级缓存。L1小一般为32KBL3大能达到32MB。缓存的理论基础是程序局部性原理包括时间局部性和空间局部性最近被CPU访问的数据短期内CPU还要访问时间被CPU访问的数据附近的数据CPU短期内还要访问空间。Java对象在堆上存储的时候并不是连续的所以从内存中读取Java对象时缓存的邻近的内存区域的数据往往不是CPU下一步计算所需要的这就是缓冲未命中。此时CPU需要空转等待从内存中重新读取数据。CPU的速度和内存的速度之间差好几个数量级导致CPU没有充分利用起来。如果数据没有在内存中而是需要从磁盘上加载那么执行效率就会变得惨不忍睹。 2.自主内存管理 因为直接使用JVM做内存管理在大数据场景下可能遇到的诸多问题所以越来越多的大数据计算引擎选择自行管理JVM内存如Spark、Flink、HBase尽量达到C/C一样的性能同时避免OOM的发生。本章主要介绍Flink是如何解决上面的问题的主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存等。 在Flink中Java对象的有效信息被序列化为二进制数据流在内存中连续存储保存在预分配的内存块上内存块叫做MemorySegment。MemorySegment是内存分配的最小单元是一段固定长度的内存默认大小为32KB。同时Flink为其提供了非常高效的读写方法很多运算可以直接操作二进制数据而不需要反序列化即可执行。 MemorySegment可以保存在堆上其内部存储为一个Java byte数组也可以保存在堆外的ByteBuffer中。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。 但使用堆上内存仍然不是完全自主的内存管理还存在以下问题 1超大内存上百GBJVM的启动需要很长时间Full GC可以达到分钟级。使用堆外内存可以将大量的数据保存在堆外极大地减小堆内存避免GC和内存溢出的问题。 2高效的IO操作。堆外内存在写磁盘或网络传输时采用的是零拷贝而堆上内存则至少需要1次内存复制。 3.堆外内存的不足之处 堆外内存提供了更好的性能和更可控的内存管理但是也存在几个问题 1堆上内存的使用、监控、调试简单堆外内存出现问题后的诊断则较为复杂。 2Flink有时需要分配短生命周期的MemorySegment在堆外内存上分配比在堆上内存开销更高。 3在Flink的测试中部分操作在堆外内存上会比堆上内存慢。 同时为了提供效率Flink在计算中采用了DBMS的Sort和Join算法直接操作二进制数据避免数据反复序列化带来的开销。Flink的内部实现更像C/C而非Java。 内存模型 内存布局 TaskManager是Flink中执行计算的核心组件是用来运行用户代码的Java进程。其中大量使用了堆外内存。 Flink TaskManager的简化和详细内存结构如下图所示 简化内存模型 详细内存模型 基于文初提及的使用JVM堆上内存的一些不足之处Flink设计了使用堆外内存的自主内存管理。因此Flink任务进程的总内存Total Process Memory, TPM Flink自身使用的内存Total Flink Memory, TFM JVM运行额外的内存如Metaspace、overhead。其中Flink自身使用的内存TFM包括了JVM堆内存和自主管理的堆外内存堆外内存又包含了托管内存和直接内存。下面分别对这些分类进行介绍 JVM Heap Framework Heap 这部分内存主要由Flink框架自身使用用于存储系统级别的数据结构包括Flink框架在运行期间需要的一些数据结构例如任务的线程栈内存和其他Flink框架的基础设施。例如用于JobManager和TaskManager的RPC消息、管理检查点的元数据等。它是作业执行所必需的基本内存独立于用户程序和运行期间的数据存储。 Task Heap 这部分内存主要用于存储由用户函数创建的Java对象和用户函数操作的数据。例如当执行一个map操作您的函数可能会创建一些新的Java对象这些对象都是在JVM堆内存中创建和管理的。如果Flink的托管内存配置为堆内那么Flink的排序、哈希和状态后端操作也会使用到Task Heap内存。 Off-Heap 托管内存Managed Memory 托管内存是由 Flink 负责分配和管理的本地堆外内存。 以下场景需要使用托管内存 流处理作业中用于 RocksDB State Backend。流处理和批处理作业中用于排序、哈希表及缓存中间结果。流处理和批处理作业中用于在 Python 进程中执行用户自定义函数。 更具体的对于当作业中使用排序、哈希表及缓存中间结果时Flink是如何使用托管内存的排序例如当你需要对一个非常大的数据集进行排序时如果数据无法完全装入内存Flink 就会使用其托管内存来执行外排序。在外排序过程中数据会被分割成可以装入内存的小块每个小块内部进行排序然后将排序后的小块写入磁盘。当所有小块都进行了排序和写入后Flink 会从磁盘读取这些小块执行归并排序直到所有数据都被排序。哈希表Flink 在处理连接Join操作时经常需要使用哈希表来维护到目前为止已经看到的数据记录。如果不能将所有数据装入内存Flink 就会使用托管内存来存储这个哈希表。这样就可以保证即使在处理大规模数据时也能保持良好的性能。缓存中间结果在一些需要多遍扫描数据的算法比如迭代算法中Flink 会缓存数据的中间结果以便下一轮迭代可以重复使用这样可以减少数据重复读取的开销。Flink 托管内存就是用来存储这些中间结果的。 另外当使用 RocksDB 作为状态后端时Flink 托管内存主要被用作 RocksDB 的写缓冲区Write Buffer和读缓存Block Cache从而提高状态访问的速度。 简言之Flink 的托管内存主要用于存储在处理过程中需要存储的中间计算数据和结果以求在充分利用有限内存资源的同时提供尽可能高的处理速度。 直接内存() 直接内存通常指的是被Flink进程直接从操作系统中申请的、不受Java堆内存垃圾回收器管理的内存。 以下场景需要使用直接内存 于网络通信和文件I/O通过网络缓冲池进行数据交换如shuffle数据缓冲以及序列化/反序列化中进行应用。 Flink通过直接内存技术进行数据交换可以有效避免频繁的Java堆内存和本地I/O缓存之间的数据复制利用零拷贝技术从而提高性能。 在一些情况下直接内存也可以利用在某些需要大量内存并希望避免频繁触发垃圾回收的处理中例如当使用RocksDB作为状态后端时RocksDB的本地内存通常是由直接内存提供的这样可以避免状态数据引起的Java堆内存的显著增加从而降低了垃圾收集的开销和提高了性能。 直接内存包括了以下几部分 Framework Off-Heap这部分内存被Flink框架用于框架自身的一些运行需求。比如Flink的一些本地数据结构和算法可能会使用这部分内存进行操作。这部分内存一般不大。Task Off-Heap这部分内存主要用于存储由用户任务产生的、并由Flink以某种形式管理的内存。比如如果你配置了本地状态后端如RocksDB使用堆外内存那么这部分内存将存储状态数据。这部分内存的使用可以避免引起频繁的Java GC操作提高性能。Network此部分内存主要用于网络通信中的缓冲区。Flink通过此缓冲区在TaskManager之间发送和接收数据。这部分内存通常是直接内存不受GC的影响可以有效地进行数据交换和缓冲以提高网络通信的性能。 另外除了Flink使用的总内存Total Flink MemoryTFM外总进程内存Total Process MemoryTPM还包括了JVM元空间Metaspace和其他开销内存overhead。在JVM内存模型中将元空间从堆内存独立出来了所以在上面的内存模型中也元空间也是单独一部分外加一些JVM运行时的额外开销内存例如线程栈、代码缓存、GC回收空间等等。 Flink内存模型分类配置参数 1.Flink使用的内存 1JVM堆上内存 框架堆上内存Framework Heap Memory。Flink框架本身所使用的内存即TaskManager本身所占用的堆上内存不计入slot的资源中。 配置参数taskmanager.memory.framework.heap.size 128MB默认128MB。Task堆上内存Task Heap Memory。Task执行用户代码时所使用的堆上内存。 配置参数taskmanager.memory.task.heap.size 2JVM堆外内存框架堆外内存Framework Off-Heap Memory。Flink框架本身所使用的内存即TaskManager本身所占用的堆外内存不计入slot的资源。 配置参数taskmanager.memory.framework.off-heap.size 128MB默认128MB。Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。 配置参数taskmanager.memory.task.off-heap.size 0默认为0.网络缓冲内存Network Memory。网络数据交换所使用的堆外内存大小如网络数据交换缓冲区Network Buffer后面回介绍。 配置参数taskmanager.memory.network.(min/max/fraction)默认min64MBmax1GBfraction0.1。堆外托管内存 Managed Memory。Flink管理的堆外内存。 配置参数taskmanager.memory.managed.[size|fraction]默认fraction 0.4。 2.JVM本身使用的内存 JVM本身直接使用了操作系统的内存。 JVM元空间 JVM元空间所使用的内存 配置参数taskmanager.memory.jvm-metaspace96M默认96MB。JVM执行开销 JVM在执行时自身所需要的内容包括线程栈、IO、编译缓存等所使用的内存。 配置参数taskmanager.memory.jvm-overhead.[min|max|fraction]。默认min192MBmax1GBfraction0.1。总体内存 1Flink使用内存 综上而言Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size参数进行控制。 2进程使用内存 整个进程所使用的内存包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。 JVM内存控制参数如下所示 1JVM堆上内存使用-Xmx和-Xms参数进行控制。 2JVM直接内存使用参数-XX:MaxDirectMemorySize进行控制。对于托管内存使用Unsafe.allocateMemory()申请不受该参数控制。 3JVM Metaspace使用-XX:MaxMetaspaceSize进行控制。 内存计算 目前的实现中在JVM启动之前就需要确定各个内存区块的大小。一旦JVM启动了在TaskManager进程内部就不再重新计算。Flink中有两个地方进行内存大小计算 在Standalone部署模式下内存的计算在启动脚本中实现。在容器环境下Yarn、K8s、Mesos计算在ResourceManager中进行。 在启动脚本与容器环境下的内存大小计算都调用了Flink的Java代码时间保证了所有部署模式下的统一计算好的参数使用-D参数提交给Java进程。 计算时需要配置如下3个参数组合中的至少1个 1Task的堆上内存和托管内存 如果手动配置了网络缓冲区内存大小则使用该参数。如果没有明确配置则使用分配系数fraction ✖️总体Flink使用内存计算网络缓冲区内存大小。 2总体Flink使用内存 如果配置了该选项而没有配置1则从整体Flink内存中划分网络缓冲区内存和托管内存剩余的内存作为Task堆上内存。 如果手动设置了网络缓冲内存则使用其值否则使用默认的分配系统fraction✖️总体Flink内存。 3总体进程使用内存 如果只配置了总体进程使用内存则从整体进程中扣除JVM元空间和JVM执行开销内存剩余的内存作为总体Flink使用内存。 内存数据结构 Flink的内存管理像操作系统管理内存一样将内存划分为内存段、内存页等结构。 内存段 内存段在Flink内存叫做MemorySegment是Flink的内存抽象的最小分配单元。 默认情况下一个MemorySegment对应着一个32KB大小的内存块。这块内存既可以是堆上内存Java的byte数组也可以是堆外内存基于Netty的DirectByteBuffer。 MemorySegment同时也提供了堆二进制数据进行读取和写入的方法。对于Java基本数据类型如short、int、long等MemorySegment内置了方法可以直接返回或者写入数据对于其他类型读取二进制数组byte[]后进行反序列化序列化为二进制数据byte[]后写入。 MemorySegment结构 为了更清晰地理解MemorySegment下面一起看一下MemorySegment的关键属性。 1BYTE_ARRAY_BASE_OFFSET二进制字节数组的起始索引相对于字节数组对象而言。 2LITTLE_ENDIAN判断是否为Little Endian模式的字节存储顺序若不是就是Big Endian模式。 3HeapMemory如果MemorySegment使用堆上内存则表示一个堆上的字节数组byte[]如果MemorySegment使用堆外内存则为null。 4address字节数组对应的相对地址若HeapMemory为null即可能为堆外内存的绝对地址。 5addressLimit标识地址结束位置addresssize。 6size内存段的字节数。 结构类图如下所示 可以看到MemorySegment类定义了一系列的方法来对字节和基本数据类型如int、long、float进行读写的方法例如get和put方法。同时还支持批量操作例如复制和比较操作它提供了copy和compare方法用来对大量数据进行操作。 Flink的MemorySegment主要用于Flink框架对内存的管理和数据的处理主要用在它的网络缓冲、排序算法和内存状态后端等地方以提供高效的内存操作。在设计上MemorySegment主要实现了以下几点 高效的数据访问。 MemorySegment类包含了一个连续的字节数组heapMemory用于存储实际的二进制数据。所有的get和put操作都是在这个字节数组上执行的。使用连续的字节数组的好处正是我们上文提到的可以充分利用程序的局部性原理因此Flink使用MemorySegment作为其最小的内存分配单元保证了读写数据时相邻的数据能够一起被加载到CPU缓存中提升处理性能。高效的内存管理。 Flink通过MemorySegment对内存进行管理保证了Flink程序运行时的内存效率。例如对于Flink的网络缓冲、排序算法和内存状态后端等地方都会使用MemorySegment进行内存的分配和回收。这有助于Flink高效地使用内存而且避免了一些Java内存管理中常见的问题如垃圾收集Garbage Collection过频繁等。**支持堆外内存off-heap memory操作。**这意味着除了在JVM堆内存上操作MemorySegment还能直接在系统的物理内存上进行操作。使用离堆内存可以避免频繁的垃圾回收提高数据处理的性能。 另外需要注意的是MemorySegment抽象类中的heapMemory仅适用于“堆”内存段即哪些将数据存储在Java堆上的内存段。对于“非堆”内存段即哪些将数据存储在Java堆之外的内存段Flink使用java.nio.DirectByteBuffer的字节缓冲区定义在HybridMemorySegment类中来存储和操作数据。 最后再简要谈一下MemorySegment如果觉得理解起来比较抽象的话可以跟其他的一些数据结构类如ArrayList、LinkedList一起来对比理解这些类都是定义的用来表示数据如何在内存中存储和管理的。ArrayList是划分一块连续的内存地址LinkedList是用链表的结构来存储而MemorySegment就是划分一块指定大小的连续内存地址来存储字节数据。上层的模块可以直接对MemorySegment进行操作就相当于对平时对ArrayList、LinkedList这些结构的操作比如插入、排序、比较等。因此MemorySegment就是Flink定义的一种数据结构用来方便地存储、管理和操作内存数据。 字节顺序Big Endian和Little Endian 字节顺序是指字节类型的数据在内存中的存放顺序。不同的CPU架构体系使用不同的存储顺序。PowerPC系统采用Big Endian方式存储数据低地址存放最高有效字节MSB而x86系列则采用Little Endian方式存储数据低地址存放最低有效字节LSB如下图所示 MemorySegment实现 Flink的MemorySegment有堆上和堆外两种实现其类体系结构如图所示 HeapMemorySegment用来分配堆上内存HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中并没有使用HeapMemorySegment而是使用HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。 之所以在后续的版本中只使用HybridMemorySegment涉及了JIT编译优化的问题。如果同时使用了两个类那么在运行的时候每一次调用都需要去查询函数表确定调用哪个子类中的方法无法提前优化。但是如果只使用一个类那么JIT编译时自动识别方法的调用都可以被去虚拟化de-virtualized和内联inlined可以极大地提高性能。调用越频繁优化效果就越好。 内存页 MemorySegment是Flink内存分配的最小单元对于跨MemorySegment保存的数据如果需要上层的使用者需要考虑所有的细节非常繁琐。所以Flink又抽象了一层叫做内存页。内存页是MemorySegment之上的数据访问视图数据读取抽象为DataInputView数据写入抽象为DataOutputView。 有了这一层上层使用者无需关心MemorySegment的细节该层会自动处理跨MemorySegment的读取和写入。 DataInputView DataInputView是从MemorySegment数据读取的抽象视图该视图可用于顺序读取内存的内容。继承自java.io.DataInput提供了从二进制流中读取不同数据类型的方法。如下图所示 InputView中持有多个MemorySegment的饮用MemorySegment[]这一组MemorySegment被视为一个内存页Page可以顺序读取MemorySegment中的数据。DataInputView主要提供了一系列接口用于从数据输入流中读取数据而MemorySegment则主要用于在连续的内存块上进行数据的低层次操作。 在Flink的网络缓冲排序哈希表等操作中MemorySegment用作持有真实数据的内存块。而DataInputView则提供了读取这些数据的接口以方便地从MemorySegment读取所需的数据。 在实际的数据编解码过程中常常需要将DataInputView与MemorySegment一起使用。例如一个典型的使用场景是在网络数据传输中Flink会首先将数据保存在MemorySegment中然后通过实现DataInputView的方式来进行数据的读取和解码。 基本上所有的InputView实现类都继承了AbstractPageInputView抽象类也就是所所有的InputView实现类都支持Page。 DataOutputView DataOutputView是数据写入MemorySegment的抽象视图继承自java.io.DataOutput提供了将不同类型的数据写入二进制流的一系列方法。同样DataOutputView中持有一个或者多个MemorySegment的引用MemorySegment[]这一组MemorySegment被视为一个内存页Page可以顺序地向MemorySegment中写入数据。 DataOutputView的接口继承关系如图所示 在实际的数据编码和写入过程中Flink通常会利用一个DataOutputView的实现将数据写入一个或多个MemorySegment。例如在网络数据发送时Flink会通过实现DataOutputView的方式将数据写入MemorySegment然后将这些MemorySegment添加到网络缓冲区以准备发送。 基本上所有的OutputView实现类都继承了AbstractPageOutputView抽象类也就是说所有的OutputView实现类都支持跨MemorySegment写入。 内存页的使用 对内存的读取写入操作是非常底层的行为对于上层应用DataStream作业而言涉及向MemorySegment写入读取二进制的地方都使用到了DataOutputView和DataInputView而不是直接使用MemorySegment。 例如在flink-table-runtime-blink中BinaryRowSerializer中使用AbstractPagedInputView从MemorySegment中读取二进制数据并转换成BinaryRow使用AbstractPagedOutputView将BinaryRow写入MemorySegment中。 Buffer Task算子处理数据完毕将结果交给下游的时候使用的抽象或者说内存对象是Buffer。Buffer接口是网络层面上传输数据和事件的统一抽象其实现类是NetworkBuffer。 Flink在各个TaskManager之间传递数据时使用的是这一层的抽象。1个NetworkBuffer中包装了1个MemorySegment实际的数据就存储在这个MemorySegment中并引入了一些额外的元数据例如数据大小currentSize属性以及Buffer中包含的数据类型dataType属性等。 此外Buffer还提供了内存的引用计数和递增/递减的方法用于在资源回收时管理内存。 简单来说Buffer是基于MemorySegment的它在MemorySegment上增加了一些用于网络传输和内存管理的额外功能。 Buffer接口的类体系如图所示 Buffer的底层是MemorySegmentBuffer申请和释放由Flink自行管理Flink引入了“引用数”的概念。当有新的Buffer消费者时引用数加1当消费者消费完Buffer时引用数减1最终当引用数变为0时就可以将Buffer释放重用了。 具体来说在Apache Flink中Buffer对象具有一个“引用数Reference Count”的属性它是用来跟踪Buffer实例在系统中被多少组件引用的指标。每当一个组件获取对Buffer的引用时引用数就会增加。当组件完成对Buffer的使用并且不再需要它时就会减少引用数。 这种设计的目的是为了更好地管理和复用内存资源。当Buffer的引用数降为0就表示没有任何组件再使用该Buffer它的内存可以归还给MemorySegment池以便其他组件复用。引用数在内存管理中是一种常见的机制能够避免不必要的对象复制和频繁的内存分配和释放在Flink的Buffer管理中起到了重要的作用。例如在数据移交过程中可能有多个线程或模块同时处理同一个Buffer此时通过引用数可以准确判断什么时候可以安全地释放该Buffer。 NetworkBuffer同时继承了AbstractReferenceCountedByteBuf。 AbstractReferenceCountedByteBuf是Netty中的抽象类通过继承该类Flink中Buffer具备了引用计数的能力并且实现了对MemorySegment的读写。感兴趣的读者可以去了解一下Netty。 Buffer资源池 Buffer资源池在Flink中叫做BufferPool。BufferPool用来管理Buffer包含Buffer的申请、释放、销毁、可用Buffer通知等其实现类是LocalBufferPool。 BufferPool的类体系如图所示 为了方便对BufferPool的管理Flink设计了BufferPoolFactory提供BufferPod的创建和销毁其唯一的实现类是NetworkBufferPool。 每个TaskManager只有一个NetworkBufferPool同一个TaskManager上的Task共享NetworkBufferPool在TaskManager启动的时候就会创建NetworkBufferPool为其分配内存。 NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存所以除了作为BufferPool的工厂外还作为Task所需内存段MemorySegment的提供者每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。 内存管理器 **MemoryManager是Flink中管理托管内存的组件其管理的托管内存只使用堆外内存。**在批处理中用在排序、Hash表和中间结果的缓冲中在流计算中作为RocksDBStateBackend的内存。 在Flink 1.10之前的版本中MemoryManager负责TaskManager的所有内存。1.10版本中MemoryManager的管理范围缩小为Slot级别即为Task管理内容TaskManager为每个Slot分配相同的内容Task不能使用超过其Slot分配的资源。 MemoryManager主要通过内部接口MemoryPool来管理所有的MemorySegment。托管内存的管理相比于Network Buffers的管理更为简单因为不需要Buffer的那一层封装。 内存申请 批处理计算任务中MemorySegment负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer代码如下所示 # MemorySegmentFactory类 /*** Allocates an off-heap unsafe memory and creates a new memory segment to represent that* memory.** pCreation of this segment schedules its memory freeing operation when its java wrapping* object is about to be garbage collected, similar to {link* java.nio.DirectByteBuffer#DirectByteBuffer(int)}. The difference is that this memory* allocation is out of option -XX:MaxDirectMemorySize limitation.** param size The size of the off-heap unsafe memory segment to allocate.* param owner The owner to associate with the off-heap unsafe memory segment.* param gcCleanupAction A custom action to run upon calling GC cleaner.* return A new memory segment, backed by off-heap unsafe memory.*/public static MemorySegment allocateOffHeapUnsafeMemory(int size, Object owner, Runnable gcCleanupAction) {long address MemoryUtils.allocateUnsafe(size);ByteBuffer offHeapBuffer MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);Runnable cleaner MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);// 在申请内存的时候同时为该内存片段准备好内存清理return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);}使用Unsafe申请堆外内存包装为ByteBuffer后再包装为MemorySegment。 流计算任务中MemoryManager更多的作用是管理控制RocksDB的内存使用量通过RocksDB的Block Cache和WriterBufferManager参数来限制参数的具体值从TaskManager的内存配置参数中计算而来。RocksDB自己来负责运行过程中的内存申请和内存释放如下述代码所示 /*** Acquires a shared resource, identified by a type string. If the resource already exists, this* returns a descriptor to the resource. If the resource does not yet exist, the method* initializes a new resource using the initializer function and given size.** pThe resource opaque, meaning the memory manager does not understand its structure.** pThe OpaqueMemoryResource object returned from this method must be closed once not used any* further. Once all acquisitions have closed the object, the resource itself is closed.*/public T extends AutoCloseable OpaqueMemoryResourceT getExternalSharedMemoryResource(String type, LongFunctionWithExceptionT, Exception initializer, long numBytes)throws Exception {// This object identifies the lease in this request. It is used only to identify the release// operation.// Using the object to represent the lease is a bit nicer safer than just using a reference// counter.final Object leaseHolder new Object();final SharedResources.ResourceAndSizeT resource sharedResources.getOrAllocateSharedResource(type, leaseHolder, initializer, numBytes);// 创建资源释放函数final ThrowingRunnableException disposer () - sharedResources.release(type, leaseHolder);return new OpaqueMemoryResource(resource.resourceHandle(), resource.size(), disposer);}内存释放 Flink自行管理内存也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种 内存使用完毕Task停止正常或异常执行。 在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放提供了两个Java Cleaner。 LegacyCleanerProvider 该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收使用sum.misc.Cleaner来释放内存。 Java9CleanerProvider 该CleanerProvider提供1.9及以上版本的JDK的Flink管理的内存的垃圾回收使用java.lang.ref.Cleaner来释放内存。 JavaGcCleanerWrapper会为每个Owner创建一个包含Cleaner的Runnable对象在每个MemorySegment释放内存的时候调用此Cleaner进行内存的释放。 当MemoryManager关闭的时候会对所有申请的MemorySegment进行释放交还给操作系统。 网络缓冲器 网络缓冲器Network Buffer是网络交换数据的封装其对应于MemorySegment内存段当结果分区ResultPartition开始写出数据的时候需要向LocalBufferPool申请Buffer资源使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后则会持续等待Buffer的释放。 BufferBuilder在上游Task中用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer。BufferConsumer位于下游Task中负责从MemorySegment中读取数据。1个BufferBuilder对应1个BufferConsumer。 内存申请 LocalBufferPool的大小是动态的在最小内存段数量与最大内存段数据之间浮动。使用NetworkBufferPool创建LocalBufferPool时如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数量总和则会发生错误。 Buffer申请 结果分区ResultPartition申请Buffer进行数据写入时如下代码所示 LocalBufferPool首先从自身持有的MemorySegment中分配可用的如果没有可用的则从TaskManager的NetworkBuffer中申请如果没有则阻塞等待可用的MemorySegment如下代码所示 MemorySegment申请 申请Buffer本质上来说就是申请MemorySegment如果在LocalBufferPool中则申请新的堆外内存MemorySegment如下代码若是 内存回收 Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer就会对Buffer的引用计数1每个Buffer被消费完了就会对Buffer的引用计数-1当Buffer引用计数为0的时候就可以回收了。Buffer回收 前面介绍过Buffer的主要实现类是NetworkBuffer同时继承了AbstractReferenceCountedByteBuf.。当Buffer被消费一次后就会对Buffer的引用计数-1如下代码所示 Buffer回收之后并不会释放MemorySegment此时MemorySegment仍然在LocalBufferPool的资源池中除非TaskManager的级别内存不足才会释放回TaskManager持有的全局资源池。 释放MemorySegment的时候同样要根据MemorySegment的类型来进行并且要在不低于保留内存的情况下将内存释放回内存段中变为可用内存后续申请MemorySegment的时候可以重复利用该内存片段。 MemorySegment释放 当NetworkBufferPool关闭的时候进行内存的释放交还给操作系统。 总结 大数据场景下使用Java的内存管理会带来一系列的问题所以Flink从一开始就选择自主内存管理。为了实现内存管理Flink对内存进行了一系列的抽象内存段MemorySegment是最小的内存分配单元对于跨段的内存访问Flink抽象了DataInputView和DataOutputView可以看作是内存页。 Flink在1.10版本重构了其TaskManager的内存管理模型主要分为堆上内存和堆外内存并简化了内存参数。在计算层面上Flink的内存管理器提供了对内存的申请和释放在数据传输层面上Flink抽象了网络内存缓存Buffer1个Buffer对应一个MemorySegment的申请和释放。
http://www.w-s-a.com/news/465234/

相关文章:

  • 网站开发多少费用火车头采集wordpress发布时间
  • 有没有做皮艺的网站教育培训网站建设ppt
  • 建设外贸商城网站制作如何建设景区旅游网站
  • 网站建设服务的具体条件怎么建设一个响应式网站
  • 做flash的网站wordpress设置前台投稿
  • 商务网站开发文档迅雷资源做下载网站
  • 无极磁铁网站如何把地图放到自己做的网站上
  • 青浦赵巷网站建设公司网站开发需求文档
  • 苏州网站建设的公司哪家好无锡网站制作那些
  • 装饰公司网站模板科技成果鉴定机构
  • 给公司做的东西放到私人网站上十堰为企业做网站的单位
  • 手机网站建设价钱手机自己做网站
  • 网站建设属于哪种公司电子商务查询网站
  • 工程建设标准强制性条文最新版本网站关键词排名优化应该怎么做
  • 网站网页设计内容品牌高端网站建设公司
  • 网站开发报价 福州中国建筑网官网手机版
  • 网站 图片 自动往右移专门做定制化的网站
  • 最好用的cms手机百度关键词排名 网站优化软件
  • 凉山州城乡规划建设局网站长沙网站建设哪家强
  • 广州网站开发创意设计公司企业自己怎么制作网站首页
  • 曲靖 曲靖网站建设软件(app)开发wordpress 没有远程发布
  • 官方网站开发与定制网站建设技术是干嘛的
  • 昆明网站建设工作室网站菜单导航怎么做的
  • 南京网站做的好的公司猪八戒网站做推广怎么样
  • 建站收费标准福州网站搭建
  • 做防护用品的网站欧美网站建设风格特点
  • 龙华做网站联系电话北京软件开发培训班
  • 做网站运营有前途网站的建设与管理的心得体会
  • 河南网站推广怎么做网页制作免费下载
  • 网站如何屏蔽中国ip商丘网站建设的公司哪家好