Flink 数据序列化

为 Flink 量身定制的序列化框架

大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM上的,Flink也是运行在JVM上,基于JVM的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM的一些问题,比如Java对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在Java生态圈中已经有许多序列化框架,比如说Java serialization, Kryo,Apache Avro等等。但是Flink依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若Flink选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。
在这里插入图片描述Flink在其内部构建了一套自己的类型系统,Flink现阶段支持的类型分类如图所示,从图中可以看到Flink类型可以分为基础类型Basic、数组Arrays、复合类型Composite、辅助类型Auxiliary、泛型和其它类型GenericFlink支持任意的Java或是Scala类型。不需要像Hadoop一样去实现一个特定的接口org.apache.hadoop.io.WritableFlink能够自动识别数据类型。
在这里插入图片描述
TypeInformation的思维导图如图所示,从图中可以看出,在Flink中每一个具体的类型都对应了一个具体的TypeInformation实现类,例如BasicTypeInformation中的IntegerTypeInformationFractionalTypeInformation都具体的对应了一个TypeInformation。然后还有BasicArrayTypeInformationCompositeType以及一些其它类型,也都具体对应了一个TypeInformation

TypeInformationFlink类型系统的核心类。对于用户自定义的Function来说,Flink需要一个类型信息来作为该函数的输入输出类型,即TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer,并用于执行语义检查,比如当一些字段在作为joingrouping的键时,检查这些字段是否在该类型中存在。

Flink 的序列化过程

Flink序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个TypeInformation的具体实现,每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer
在这里插入图片描述对于大多数数据类型 Flink可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfoWritableTypeInfo等,但针对GenericTypeInfo类型,Flink会使用Kyro进行序列化和反序列化。其中,TuplePojoCaseClass类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。

简单的介绍下Pojo的类型规则,即在满足一些条件的情况下,才会选用Pojo的序列化进行相应的序列化与反序列化的一个操作。即类必须是Public的,且类有一个public的无参数构造函数,该类(以及所有超类)中的所有非静态no-static、非瞬态no-transient字段都是public的(和非最终的final)或者具有公共gettersetter方法,该方法遵循gettersetterJava bean命名约定。当用户定义的数据类型无法识别为POJO类型时,必须将其作为GenericType处理并使用Kryo进行序列化。

Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformationTypeSerializerTypeComparator即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。

序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java里面可以简单地理解成一个byte数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的Tuple3这个对象为例,简述一下它的序列化过程。
[点击并拖拽以移动] ​

Tuple3包含三个层面,一是int类型,一是double类型,还有一个是PersonPerson包含两个字段,一是int型的ID,另一个是 String 类型的name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3 会把 int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节就可以了。根据int占用四个字节,这个能够体现出Flink可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用Java的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。

MemorySegment具有什么作用呢? MemorySegmentFlink中会将对象序列化到预分配的内存块上,它代表1个固定长度的内存,默认大小为32 kbMemorySegment代表Flink中的一个最小的内存分配单元,相当于是Java的一个byte数组。 每条记录都会以序列化的形式存储在一个或多个MemorySegment中。

Flink 序列化的最佳实践

Flink常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation,具体如下:
【1】注册子类型: 如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让Flink了解这些子类型会大大提高性能。可以在StreamExecutionEnvironmentExecutionEnvironment中调用.registertype (clazz) 注册子类型信息。
【2】注册自定义序列化器: 对于不适用于自己的序列化框架的数据类型,Flink会使用Kryo来进行序列化,并不是所有的类型都与Kryo无缝连接,具体注册方法在下文介绍。
【3】添加类型提示: 有时,当Flink用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,这个通常只在Java API中需要。
【4】手动创建TypeInformation 在某些API调用中,这可能是必需的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为Flink已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。

实践 - 类型声明: 类型声明去创建一个类型信息的对象是通过哪种方式?通常是用TypeInformation.of()方法来创建一个类型信息的对象,具体说明如下:
【1】对于非泛型类,直接传入class对象即可

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

【2】对于泛型类,需要通过TypeHint来保存泛型类型信息

final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});

【3】预定义常量:BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于StringBooleanByteShortIntegerLongFloatDoubleChar等基本类型的类型声明,可以直接使用。而且Flink还提供了完全等价的Typesorg.apache.flink.api.common.typeinfo.Types。特别需要注意的是,flink-table模块也有一个Typesorg.apache.flink.table.api.Types,用于table模块内部的类型定义信息,用法稍有不同。使用IDE 的自动import时一定要小心。
【4】自定义TypeInfoTypeInfoFactory 通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),使存
储更紧凑,运行时也更高效。需要注意在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo()方法。

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{public T0 myfield0;public T1 myfield1;
}public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{@Overridepublic TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));}
}

实践 - 注册子类型

Flink认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。StreamExecutionEnvironmentExecutionEnvironment提供registerType()方法用来向Flink注册子类信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);

registerType()方法内部,会使用TypeExtractor来提取类型信息,如上所示,获取到的类型信息属于PojoTypeInfo及其子类,那么需要将其注册到一起,否则统一交给Kryo去处理,Flink并不过问 ( 这种情况下性能会变差 )。

实践 - Kryo 序列化

对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfoTypeInfoFactory),默认会交给 Kryo处理,如果Kryo仍然无法处理(例如GuavaThriftProtobuf等第三方库的一些类),有两种解决方案:
【1】强制使用Avro来代替Kryo

env.getConfig().enableForceAvro();

【2】为Kryo增加自定义的Serializer以增强Kryo的功能

env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用Kryo100%使用Flink的序列化机制),可以通过Kryoenv.getConfig().disableGenericTypes()的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。

Flink 通信层的序列化

FlinkTask之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入NetworkBufferPool,然后下层的Task读出之后再进行反序列化操作,最后进行逻辑处理。为了使得记录以及事件能够被写入 Buffer,随后在消费时再从Buffer中读出,
Flink提供了数据记录序列化器RecordSerializer与反序列化器RecordDeserializer以及事件序列化器EventSerializer

Function发送的数据被封装成SerializationDelegate,它将任意元素公开为IOReadableWritable以进行序列化,通过setInstance()来传入要序列化的数据。在Flink通信层的序列化中,有几个问题值得关注,具体如下:
【1】何时确定Function的输入输出类型?
[点击并拖拽以移动] ​

在构建StreamTransformation的时候通过TypeExtractor工具确定Function的输入输出类型。TypeExtractor类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
【2】何时确定Function的序列化 / 反序列化器?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers() 操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【3】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers()操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【4】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
[点击并拖拽以移动] ​

大家都应该清楚TaskStreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封装成StreamRecord交给算子处理;然后将处理结果通过Collector发送给下游 ( 在构建Collector时已经确定了SerializtionDelegate),并通过RecordWriter写入器将序列化后的结果写入DataOutput;最后序列化的操作交给SerializerDelegate处理,实际还是通过TypeSerializerserialize()方法完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/223436.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python算法例27 对称数

1. 问题描述 对称数是一个旋转180后&#xff08;倒过来&#xff09;看起来与原数相同的数&#xff0c;找到所有长度为n的对称数。 2. 问题示例 给出n2&#xff0c;返回[&#xff02;11&#xff02;&#xff0c;&#xff02;69&#xff02;&#xff0c;&#xff02;88&#x…

【JAVA】分布式链路追踪技术概论

目录 1.概述 2.基于日志的实现 2.1.实现思想 2.2.sleuth 2.2.可视化 3.基于agent的实现 4.联系作者 1.概述 当采用分布式架构后&#xff0c;一次请求会在多个服务之间流转&#xff0c;组成单次调用链的服务往往都分散在不同的服务器上。这就会带来一个问题&#xff1a;…

计算机网络 运输层下 | TCP概述 可靠传输 流量控制 拥塞控制 连接管理

文章目录 3 运输层主要协议 TCP 概述3.1 TCP概述 特点3.2 TCP连接RSVP资源预留协议 4 TCP可靠传输4.1 可靠传输工作原理4.1.1 停止等待协议4.1.2 连续ARQ协议 4.2 TCP可靠通信的具体实现4.2.1 以字节为单位的滑动窗口4.2.2 超时重传时间的选择4.2.3 选择确认SACK 5 TCP的流量控…

mac m1芯片 pytorch安装及gpu性能测试

pytorch 使用mac的m1芯片进行模型训练。 #小结&#xff1a;在数据量小和模型参数少&#xff0c;batch_size小时&#xff0c;cpu训练更快&#xff08;原因&#xff1a;每次训练时数据需要放入GPU中&#xff0c;由于batch_size小。数据放入gpu比模型计算时间还长&#xff09; 在…

(Mac上)使用Python进行matplotlib 画图时,中文显示不出来

【问题描述】 ①报错确缺失字体&#xff1a; ②使用matplotlib画图&#xff0c;中文字体显示不出来 【问题思考】 在网上搜了好多&#xff0c;关于使用python进行matplotlib画图字体显示不出来的&#xff0c;但是我试用了下&#xff0c;对我来说都没有。有些仅使用于windows系…

Netty-2-数据编解码

解析编解码支持的原理 以编码为例&#xff0c;要将对象序列化成字节流&#xff0c;你可以使用MessageToByteEncoder或MessageToMessageEncoder类。 这两个类都继承自ChannelOutboundHandlerAdapter适配器类&#xff0c;用于进行数据的转换。 其中&#xff0c;对于MessageToMe…

字符设备驱动开发-注册-设备文件创建

一、字符设备驱动 linux系统中一切皆文件 1、应用层&#xff1a; APP1 APP2 ... fd open("led驱动的文件"&#xff0c;O_RDWR); read(fd); write(); close(); 2、内核层&#xff1a; 对灯写一个驱动 led_driver.c driver_open(); driver_read(); driver_write(…

GoogLeNet(V1)

目录 一、GooLeNet介绍 1、模型设计的motivation 2、Inception块 3、GoogLeNet架构 4、Inception后续变种 5、总结 二、代码实现 1、Inception块 2、GoogLeNet模型 3、训练模型 4、总结 一、GooLeNet介绍 GoogLeNet是由Google团队于2014年提出的深度卷积神经网络架构…

BUG记录 | 使用阿里云OSS实现文件上传后,得到的url无法在浏览器中打开

项目背景 SpringBoot的项目&#xff0c;使用阿里云对象存储OSS对项目中的文件进行存储&#xff0c;所需文件也会通过IDEA中由官方Demo改编而成的工具类作为接口&#xff0c;调用接口后上传 问题描述 使用阿里云OSS实现文件上传后&#xff0c;通过postman测试得到的url无法在…

H266/VVC帧内预测编码

预测编码技术 预测编码&#xff08;Prediction Coding&#xff09;是指利用已编码的一个或多个样本值&#xff0c;根据某种模型或方法&#xff0c;对当前的样本值进行预测&#xff0c;并对样本真实值和预测值之间的差值进行编码。 视频中的每个像素看成一个信源符号&#xff…

【UML】第12篇 序列图(1/2)——基本概念和构成

目录 一、什么是序列图&#xff08;Sequence Diagram&#xff09; 1.1 定义 1.2 主要用途 1.3 序列图和BPMN的区别和联系 二、序列图的构成 2.1 对象 2.2 生命线 2.3 消息 2.4 激活 序列图&#xff0c;是我个人认为的用处最多的一种图。产品和研发的同学&#xff0c;都…

二级指针的作用 -- 将变量从函数中带出

使用一级指针不能将变量带出 void test(int *p) {static int nub 10; /*使用static是保证函数结束, 变量依然存在, 不然即使将它带出来, 函数结束时这片内存已经被释放了就没有意义了*/p &nub; }int main(void) {int *p NULL;test(p);printf("%d",*p);return …

驱动开发-1

一、驱动课程大纲 内核模块字符设备驱动中断 二、ARM裸机代码和驱动有什么区别&#xff1f; 1、共同点&#xff1a; 都能够操作硬件 2、不同点&#xff1a; 1&#xff09;裸机就是用C语言给对应的寄存器里面写值&#xff0c;驱动是按照一定的套路往寄存器里面写值 2&#xff09…

开关电源厚膜集成电路引脚功能

开关电源厚膜集成电路引脚功能 一、 STR51213、STR50213、STR50103 引脚号 引脚功能 1 接地&#xff0c;内接稳压基准电路 2 开关管基极 3 开关管集电极 4 开关管发射极 5 误差比较电压信号输入&#xff0c;兼待机控制 二、 STR3302、STR3202 引脚号 引脚功能 1内部半…

华为vrrp+mstp+ospf+dhcp+dhcp relay配置案例

1、左边是vlan 10主桥&#xff0c;右边是vlan 20的主桥&#xff0c;并且互为备桥 2、 vlan 10 vrrp网关默认用左边&#xff0c;vlan 20的vrrp 网关默认用右边&#xff0c;对应mstp生成树 3、两边都track检测&#xff0c;不通就把vrrp减掉60&#xff0c;这样就会自动切另一边了 …

四、UART_阻塞发送中断接收

1、开发环境 (1)Keil MDK: V5.38.0.0 (2)MCU: mm320163D7P 2、实验目的&原理图 2.1、实验目的 (1)上位机串口助手给MCU发送信息&#xff0c;MCU串口通过通过串口助手接收后&#xff0c;将接收到的内容通过串口助手发送到上位机。 (2)串口在whil循环中每隔1秒发送一次…

【一起学Rust | 框架篇 | Tauri2.0框架】Tauri2.0环境搭建与项目创建

文章目录 前言一、搭建 Tauri 2.0 开发环境二、创建 Tauri 2.0 项目1.创建项目2.安装依赖4. 编译运行 三、设置开发环境四、项目结构 前言 Tauri在Rust圈内成名已久&#xff0c;凭借Rust的可靠性&#xff0c;使用系统原生的Webview构建更小的App 以及开发人员可以灵活的使用各…

离散型制造企业为什么要注重MES管理系统的实施

离散型制造企业经常面临三个核心问题&#xff1a;生产什么、生产多少以及如何生产。尽管许多企业都实施了ERP系统&#xff0c;但仍然绕不开MES管理系统的话题。本文将从三个方面详细解释为什么离散型企业需要实施MES管理系统。 一、生产线经常出现的问题 在离散型企业中&#…

Blazor 混合开发_MAUI+Vue_WPF+Vue

Blazor 混合开发_MAUIVue_WPFVue 背景混合开发的核心为什么必须使用 wwwroot 文件夹放置 Web 项目文件 创建 MAUI 项目创建 wwwroot 文件夹服务注册创建 _import.razor添加 Main.razor 组件修改 MainPage.xaml 文件 创建 WPF 项目创建 wwwroot 文件夹服务注册创建 _import.razo…

OpenCV利用HSV颜色区间分离不同物体

需求 当前有个需求是从一个场景中将三个不同的颜色的二维码分离出来&#xff0c;如下图所示。 这里有两个思路可以使用 思路一是通过深度学习的方式&#xff0c;训练一个能够识别旋转边界框的模型&#xff0c;但是需要大量的数据进行模型训练&#xff0c;此处缺少训练数据&a…