![不仅仅是流计算:Apache Flink实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/768/24045768/b_24045768.jpg)
Apache Flink类型和序列化机制简介
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0144_0001.jpg?sign=1739643184-IY6pYovb73P59L9mRCNobSC2XNfnKR6Q-0-1bca09f87d17f7f50bcc9f0147259e07)
使用Apache Flink(以下简称Flink)编写处理逻辑时,新手总是容易被林林总总的概念所混淆:
为什么Flink有那么多的类型声明方式?
BasicTypeInfo.STRING TYPE INFO、Types.STRING 、Types.STRING()有何区别?
TypeInfoFactory又是什么?
TypeInformation.of和TypeHint是如何使用的呢?
接下来本文将逐步解密Flink的类型和序列化机制。
Flink的类型分类
Flink的类型系统源码位于org.apache.flink.api.common.typeinfo包,让我们对图1深入追踪,看一下类的继承关系图:
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0001.jpg?sign=1739643184-8505e2sBoFi4yBDK2bbq8i7Acc5XF1YZ-0-e673e95ddff584a6512b16ec556a1066)
图1:Flink类型分类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0145_0002.jpg?sign=1739643184-mgeqYrjWsq89BGa7eN1zXPh5e1PimSEe-0-dc52073e3919f01dd0f362601d88277a)
图2:TypeInformation类继承关系图
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0146_0001.jpg?sign=1739643184-24uWIBMhrIxtV2g6lXRDb4YzjolQiJeb-0-baa2ee3cf1ef8d40b2b4d1221b76f4a2)
图3:使用.returns方法声明返回类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0001.jpg?sign=1739643184-9OfgC9TMJpMv23z7Z6d3SrIPCeKyy2oO-0-f8d91048f3f3b3884951929376b9cea5)
图4:Flink-ML注册子类类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0147_0002.jpg?sign=1739643184-PuVQZzrIExA2kQXxkRPlmExYoaD9hgwo-0-7c56ad6b5b7050d20839401b18f56d03)
图5:Flink允许注册自定义类型
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0001.jpg?sign=1739643184-8K0cy79RKF0i9Q9VrhPvZRXqrCuH6Elp-0-1a5c2edc04ec74475684e524834c1382)
图6:class对象作为参数
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0002.jpg?sign=1739643184-c5APwAhSiXW0iIksWKocQJnKDxx9wbYQ-0-8fe4a1428e67236c8e26859aee0560e0)
图7:TypeHint作为参数,保存泛型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0148_0003.jpg?sign=1739643184-7OGJ5DGL8g4GB9K9rVydZGelSYfGtANR-0-9d36fc144d00e571ab8cf9d98b76004e)
图8:BasicTypeInfo快捷方式
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0001.jpg?sign=1739643184-ERzhXPzanSufjllHEIwDv4CrOJUwxAdd-0-a2655922b22760415018ce177edc804a)
图9:使用BasicTypeInfo快捷方式来声明一行(Row)每个字段的类型信息
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0002.jpg?sign=1739643184-LxUB7RXFyS6zF9Tsg5iv8fr7dLczrQiK-0-daca6227e604f5981a5217dea201bb10)
图10:Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0149_0003.jpg?sign=1739643184-tD9QwiU5UbU4uEbZlo8QDbLoODwrf8sp-0-42d679aca2bd456d2dd51c5c6c860eff)
图11:flink-table模块的Types类
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0150_0001.jpg?sign=1739643184-sm4uOoFJ7baSU9xM6IPdRmw8hwKAyN1N-0-29ca3e4b5b5a9aec16bc82b7723dae13)
图12:为自定义类提供类型支持(图片未展示全部字段)
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0151_0001.jpg?sign=1739643184-1lPEqA9fKPPMTHz6oht6SndXBKc7e2by-0-a314f96628dedc31ae7a57530c2718af)
图13:Flink自带的TypeSerializer子类概览
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0001.jpg?sign=1739643184-kFZND3N520DmqARHSsirb6D57u1ziHlv-0-3de953395daec5dfd65db69e4adc1286)
图14:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0152_0002.jpg?sign=1739643184-yxWKNsz2bxp4ueMwxKcHWM44seuiQ9C4-0-680044e57606be9276b276ea67ddafc6)
图15:为Kryo增加自定义的Serializer
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0001.jpg?sign=1739643184-IwiluioAFmSRh8dxOWRR0A7qd74NM6ak-0-9fad2730fb1b79853cad06c4487ad885)
图16:类型信息到内存块
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0153_0002.jpg?sign=1739643184-7TksKrfM2mnKDQKzFZEqef3SFxud6H2f-0-6c6f3f3116b6b3dd929d4adedf869121)
图17:StringSerializer类的serialize()方法
![](https://epubservercos.yuewen.com/B4C696/12862211003466706/epubprivate/OEBPS/Images/figure_0154_0001.jpg?sign=1739643184-RpOlsg7Lr4H6KFyGWkL3XEUwFE6g65k5-0-ac225fb52bc7f9b833d955050217c46e)
图18:String对象的序列化过程
可以看到,图1和图2是一一对应的,TypeInformation类是描述一切类型的公共基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随Flink的作业提交,被传递给每个执行节点。
由于Flink自己管理内存,采用了一种非常紧凑的存储格式(见官方博文),因而类型信息在整个数据处理流程中属于至关重要的元数据。
TypeExtractror类型提取
Flink内部实现了名为TypeExtractror的类,可以利用方法签名、子类信息等蛛丝马迹,自动提取和恢复类型信息(当然也可以显式声明,即本文所介绍的内容)。
然而由于Java的类型擦除,自动提取并不是总是有效。因而一些情况下(例如通过URLClassLoader动态加载的类),仍需手动处理;例如下图中对DataSet变换时,使用.returns()方法声明返回类型。
这里需要说明一下,returns()接受三种类型的参数:字符串描述的类名(例如"String")、TypeHint(接下来会讲到,用于泛型类型参数)、Java原生Class(例如String.class)等;不过字符串形式的用法即将废弃,如果确实有必要,请使用Class.forName()等方法来解决。
下面是ExecutionEnvironment类的registerType方法,它可以向Flink注册子类信息(Flink认识父类,但不一定认识子类的一些独特特性,因而需要注册),下面是Flink-ML机器学习库代码的例子:
从下图可以看到,如果通过TypeExtractor.createTypeInfo(type)方法获取到的类型信息属于PojoTypeInfo及其子类,那么将其注册到一起;否则统一交给Kryo去处理,Flink并不过问(这种情况下性能会变差)。
声明类型信息的常见手段
通过TypeInformation.of()方法,可以简单地创建类型信息对象。
1.对于非泛型的类,直接传入Class对象即可
2.对于泛型类,需要借助TypeHint来保存泛型类型信息
TypeHint的原理是创建匿名子类,运行时TypeExtractor可以通过getGenericSuperclass(). getActualTypeArguments()方法获取保存的实际类型。
3.预定义的快捷方式
例如BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明,可以直接使用。
例如下面是对Row类型各字段的类型声明,使用方法非常简明,不再需要new XxxTypeInfo<>(很多很多参数)
当然,如果觉得BasicTypeInfo还是太长,Flink还提供了完全等价的Types类(org.apache.flink.api.common.typeinfo.Types):
特别需要注意的是,flink-table模块也有一个Types类(org.apache.flink.table.api.Types),用于table模块内部的类型定义信息,用法稍有不同。使用IDE的自动import时一定要小心:
4.自定义TypeInfo和TypeInfoFactory
通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),可令存储更紧凑,运行时也更高效。
开发者在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo方法。
注意需要继承TypeInformation类,为每个字段定义类型,并覆盖元数据方法,例如是否是基本类型(isBasicType)、是否是Tuple(isTupleType)、元数(对于一维的Row类型,等于字段的个数)等等,从而为TypeExtractor提供决策依据。
更多示例,请参考Flink源码的org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
TypeSerializer
Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用:
如果不能满足,那么可以继承TypeSerializer及其子类以实现自己的序列化器。
Kryo序列化
对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfo和TypeInfoFactory),默认会交给Kryo处理。
如果Kryo仍然无法处理(例如Guava、Thrift、Protobuf等第三方库的一些类),有以下两种解决方案:
1.可以强制使用Avro来替代Kryo:
env.getConfig().enableForceAvro(); // env代表ExecutionEnvironment对象,下同
2.为Kryo增加自定义的Serializer以增强Kryo的功能:
env.getConfig().addDefaultKryoSerializer(Class<? > type, Class<? extends Serializer<? >> serializerClass
以及
env.getConfig().registerTypeWithKryoSerializer(Class<? > type, T serializer)
如果希望完全禁用Kryo(100% 使用Flink的序列化机制),则可以使用以下设置,但注意一切无法处理的类都将导致异常:
env.getConfig().disableGenericTypes();
类型机制的陷阱与缺陷
金无足赤,人无完人。Flink内置的类型系统虽然强大而灵活,但仍然有一些需要注意的点:
1.Lambda函数的类型提取
由于Flink类型提取依赖于继承等机制,而lambda函数比较特殊,它是匿名的,也没有与之相关的类,所以其类型信息较难获取。
Eclipse的JDT编译器会把lambda函数的泛型签名等信息写入编译后的字节码中,而对于javac等常见的其他编译器,则不会这样做,因而Flink就无法获取具体类型信息了。
2.Kryo的JavaSerializer在Flink下存在Bug
推荐使用org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer而非
com.esotericsoftware.kryo.serializers.JavaSerializer以防止与Flink不兼容。
类型机制与内存管理
下面以StringSerializer为例,来看下Flink是如何紧凑管理内存的:
下面是具体的序列化过程:
可以看到,Flink对于内存管理是非常细致的,层次分明,代码也容易理解。