加入收藏 | 设为首页 | 会员中心 | 我要投稿 江门站长网 (https://www.0750zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

深度解析 Flink 是如何管理好内存的?

发布时间:2019-08-22 14:51:10 所属栏目:教程 来源:zhisheng翻译
导读:副标题#e# 前言 如今,许多用于分析大型数据集的开源系统都是用 Java 或者是基于 JVM 的编程语言实现的。最着名的例子是 Apache Hadoop,还有较新的框架,如 Apache Spark、Apache Drill、Apache Flink。基于 JVM 的数据分析引擎面临的一个常见挑战就是如何

Java 生态系统提供了几个库,可以将对象转换为二进制表示形式并返回。常见的替代方案是标准 Java 序列化,Kryo,Apache Avro,Apache Thrift 或 Google 的 Protobuf。Flink 包含自己的自定义序列化框架,以便控制数据的二进制表示。这一点很重要,因为对二进制数据进行操作需要对序列化布局有准确的了解。此外,根据在二进制数据上执行的操作配置序列化布局可以显著提升性能。Flink 的序列化机制利用了这一特性,即在执行程序之前,要序列化和反序列化的对象的类型是完全已知的。

Flink 程序可以处理表示为任意 Java 或 Scala 对象的数据。在优化程序之前,需要识别程序数据流的每个处理步骤中的数据类型。对于 Java 程序,Flink 提供了一个基于反射的类型提取组件,用于分析用户定义函数的返回类型。Scala 程序可以在 Scala 编译器的帮助下进行分析。Flink 使用 TypeInformation 表示每种数据类型。

  • Flink 有如下几种数据类型的 TypeInformations:
  • BasicTypeInfo:所有 Java 的基础类型或 java.lang.String
  • BasicArrayTypeInfo:Java 基本类型构成的数组或 java.lang.String
  • WritableTypeInfo:Hadoop 的 Writable 接口的任何实现
  • TupleTypeInfo:任何 Flink tuple(Tuple1 到 Tuple25)。Flink tuples 是具有类型化字段的固定长度元组的 Java 表示
  • CaseClassTypeInfo:任何 Scala CaseClass(包括 Scala tuples)
  • PojoTypeInfo:任何 POJO(Java 或 Scala),即所有字段都是 public 的或通过 getter 和 setter 访问的对象,遵循通用命名约定
  • GenericTypeInfo:不能标识为其他类型的任何数据类型

每个 TypeInformation 都为它所代表的数据类型提供了一个序列化器。例如,BasicTypeInfo 返回一个序列化器,该序列化器写入相应的基本类型;WritableTypeInfo 的序列化器将序列化和反序列化委托给实现 Hadoop 的 Writable 接口的对象的 write() 和 readFields() 方法;GenericTypeInfo 返回一个序列化器,该序列化器将序列化委托给 Kryo。对象将自动通过 Java 中高效的 Unsafe 方法来序列化到 Flink MemorySegments 支持的 DataOutput。对于可用作键的数据类型,例如哈希值,TypeInformation 提供了 TypeComparators,TypeComparators 比较和哈希对象,并且可以根据具体的数据类型有效的比较二进制并提取固定长度的二进制 key 前缀。

Tuple,Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。因此,它们的序列化和比较也都比较复杂,一般将其成员数据类型的序列化和比较都交给各自的 Serializers(序列化器) 和 Comparators(比较器)。下图说明了 Tuple3对象的序列化,其中Person 是 POJO 并定义如下:

  1. public class Person { 
  2.     public int id; 
  3.     public String name; 


深度解析 Flink 是如何管理好内存的?

通过提供定制的 TypeInformations、Serializers(序列化器) 和 Comparators(比较器),可以方便地扩展 Flink 的类型系统,从而提高序列化和比较自定义数据类型的性能。

Flink 如何对二进制数据进行操作?

与其他的数据处理框架的 API(包括 SQL)类似,Flink 的 API 也提供了对数据集进行分组、排序和连接等转换操作。这些转换操作的数据集可能非常大。关系数据库系统具有非常高效的算法,比如 merge-sort、merge-join 和 hash-join。Flink 建立在这种技术的基础上,但是主要分为使用自定义序列化和自定义比较器来处理任意对象。在下面文章中我们将通过 Flink 的内存排序算法示例演示 Flink 如何使用二进制数据进行操作。

Flink 为其数据处理操作符预先分配内存,初始化时,排序算法从 MemoryManager 请求内存预算,并接收一组相应的 MemorySegments。这些 MemorySegments 变成了缓冲区的内存池,缓冲区中收集要排序的数据。下图说明了如何将数据对象序列化到排序缓冲区中:

深度解析 Flink 是如何管理好内存的?

排序缓冲区在内部分为两个内存区域:第一个区域保存所有对象的完整二进制数据,第二个区域包含指向完整二进制对象数据的指针(取决于 key 的数据类型)。将对象添加到排序缓冲区时,它的二进制数据会追加到第一个区域,指针(可能还有一个 key)被追加到第二个区域。分离实际数据和指针以及固定长度的 key 有两个目的:它可以有效的交换固定长度的 entries(key 和指针),还可以减少排序时需要移动的数据。如果排序的 key 是可变长度的数据类型(比如 String),则固定长度的排序 key 必须是前缀 key,比如字符串的前 n 个字符。请注意:并非所有数据类型都提供固定长度的前缀排序 key。将对象序列化到排序缓冲区时,两个内存区域都使用内存池中的 MemorySegments 进行扩展。一旦内存池为空且不能再添加对象时,则排序缓冲区将会被完全填充并可以进行排序。Flink 的排序缓冲区提供了比较和交换元素的方法,这使得实际的排序算法是可插拔的。默认情况下, Flink 使用了 Quicksort(快速排序)实现,可以使用 HeapSort(堆排序)。下图显示了如何比较两个对象:

深度解析 Flink 是如何管理好内存的?

(编辑:江门站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读