Flink 数据类型 TypeInformation信息
Flink
流应用程序处理的是以数据对象表示的事件流。所以在Flink
内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink
需要明确知道应用程序所处理的数据类型。并为每个数据类型生成特定的序列化器、反序列化器和比较器。Flink支持非常完善的数据类型,数据类型描述信息都是由TypeInformation
定义,比较常用的TypeInformation
有BasicTypeInfo
、TupleTypeInfo
、CaseClassTypeInfo
、PojoTypeInfo
类等。TypeInformation
主要作用是为了在 Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。Flink
能够支持任意的Java
或Scala
的数据类型,不用像Hadoop
中的org.apache.hadoop.io.Writable
而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation
管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink
编写应用的过程中的数据类型问题。
原生数据类型
Flink
通过实现BasicTypeInfo
数据类型,能够支持任意Java 原生基本类型(装箱)或String
类型,例如Integer
、String
、Double
等,如以下代码所示,通过从给定的元素集中创建DataStream
数据集。
//创建 Int 类型的数据集
DataStreamSource<Integer> integerDataStreamSource = env.fromElements(1, 2, 3, 4, 5);
//创建 String 的类型的数据集
DataStreamSource<String> stringDataStreamSource = env.fromElements("Java", "Scala");
Flink
实现另外一种TypeInfomation
是BasicArrayTypeInfo
,对应的是Java
基本类型数组(装箱)或String
对象的数组,如下代码通过使用 Array
数组和List
集合创建DataStream
数据集。
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
//通过 List 集合创建数据集
DataStreamSource<Integer> integerDataStreamSource1 = env.fromCollection(integers);
Java Tuples类型
通过定义TupleTypeInfo
来描述Tuple
类型数据,Flink
在Java
接口中定义了元祖类Tuple
供用户使用。Flink Tuples
是固定长度固定类型的Java Tuple
实现,不支持空值存储。目前支持任意的Flink Java Tuple
类型字段数量上限为25
,如果字段数量超过上限,可以通过继承Tuple
类的方式进行拓展。如下代码所示,创建Tuple
数据类型数据集。
//通过实例化 Tuple2 创建具有两个元素的数据集
DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
//通过实例化 Tuple3 创建具有三个元素的数据集
DataStreamSource<Tuple3<String, Integer, Long>> tuple3DataStreamSource = env.fromElements(new Tuple3<>("a", 1, 3L), new Tuple3<>("b", 2, 3L));
Scala Case Class类型
Flink
通过实现CaseClassTypeInfo
支持任意的Scala Case Class
,包括Scala tuples
类型,支持的字段数量上限为22
,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class
数据类型,然后通过fromElements
方法创建input
数据集,调用keyBy()
方法对数据集根据 word字段重新分区。
//定义 WordCount Case Class 数据结构
case class WordCount(word: Sring, count: Int)
//通过 fromElements 方法创建数据集
val input = env.fromElements(WordCount("hello", 1),WordCount("word",2))
val keyStream1 = input.keyBy("word")//根据word字段为分区字段,
val keyStream2 = input.keyBy(0)//也可以通过制定position分区
通过使用Scala Tuple
创建DataStream
数据集,其他的使用方式和Case Class
相似。需要注意的是,如果根据名称获取字段,可以使用 Tuple
中的默认字段名称。
//通过实例化Scala Tuple2 创建具有两个元素的数据集
val tupleStream: DataStream[Tuple2[String,Int]] = env.fromElements(("a",1),("b",2));
//使用默认名字段获取字段,表示第一个 tuple字段,相当于下标0
tuple2DataStreamSource.keyBy("_1");
POJOs 类型
POJOs
类可以完成复杂数据结构的定义,Flink
通过实现PojoTypeInfo
来描述任意的POJOs
,包括Java
和Scala
类。在Flink
中使用POJOs
类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName")
,对于用户做数据处理则非常透明和简单,如代码所示。如果在Flink
中使用POJOs
数据类型,需要遵循以下要求:
【1】POJOs
类必须是Public
修饰且必须独立定义,不能是内部类;
【2】POJOs
类中必须含有默认空构造器;
【3】POJOs
类中所有的 Fields
必须是Public
或者具有Public
修饰的getter
和setter
方法;
【4】POJOs
类中的字段类型必须是Flink
支持的。
//类和属性具有 public 修饰
public class Persion{public String name;public Integer age;//具有默认的空构造器public Persion(){}public Persion(String name,Integer age){this.name = name;this.age = age;};
}
定义好POJOs Class
后,就可以在 Flink环境中使用了,如下代码所示,使用fromElements
接口构建Person
类的数据集。POJOs
类仅支持字段名称指定字段,如代码中通过Person name
来指定Keyby
字段。
DataStreamSource<Persion> persionDataStreamSource = env.fromElements(new Persion("zzx", 18), new Persion("fj", 16));
persionData.keyBy("name").sum("age");
Flink Value类型
Value
数据类型实现了org.apache.flink.types.Value
,其中包括read()
和write()
两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink
提供了內建的Value
类型有IntValue、DoubleValue
以及StringValue
等,用户可以结合原生数据类型和Value
类型使用。
特殊数据类型
在Flink
中也支持一些比较特殊的数据数据类型,例如Scala
中的List
、Map
、Either
、Option
、Try
数据类型,以及Java中Either
数据类型,还有Hadoop
的Writable
数据类型。如下代码所示,创建Map
和List
类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs
类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint
帮助Flink
推断数据类型信息,关于Tyeps Hmt
介绍可以参考下一小节。
//创建 map 类型数据集
Map map = new HashMap<>();
map.put("name","zzx");
map.put("age",12);
env.fromElements(map);
//创建 List 类型数据集
env.fromElements(Arrays.asList(1,2,3,4,5),Arrays.asList(3,4,5));
TypeInformation信息获取: 通常情况下Flink
都能正常进行数据类型推断,并选择合适的serializers
以及comparators
。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink
并不能很容易地获取到数据集中的数据类型信息。同时在Scala API
和Java API
中,Flink
分别使用了不同的方式重构了数据类型信息。
Scala API类型信息
Scala API
通过使用Manifest
和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API
出现类型擦除的问题,这使得Scala API
具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros
框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink
中注册成TypeInformation
以支持上层计算算子使用。
当使用Scala API
开发 Flink
应用,如果使用到Flink
已经通过TypeInformation
定义的数据类型,TypeInformation
类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink
应用程序时就会报could not find implicit value for evidence parameter of type TypeInformation
的错误。这时需要将TypeInformation
类隐式参数引入到当前程序环境中,代码实例如下:
import org.apache.flink.api.scala._
Java API类型信息
由于Java
的泛型会出现类型擦除问题,Flink
通过Java
反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示Ctype Himts
来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单通过在returns
方法中传入TypeHint
实例指定输出参数类型,帮助Flink
系统对输出类型进行数据类型参数的推断和收集。
//定义泛型函数,输入参数 T,O 输出参数为 O
class MyMapFucntion<T,O> implements MapFunction<T,O>{@Overridepublic O map(T t) throws Exception {//定义计算逻辑return null;}
}//通过 List 集合创建数据集
DataStreamSource<Integer> input = env.fromCollection(integers);
input.flatMap(new MyMapFucntion<String,Integer>()).returns(new TypeHint<Integer>() {//通过returns方法指定返回参数类型
})
在使用Java API
定义POJOs
类型数据时,PojoTypeInformation
为POJOs
类中的所有字段创建序列化器,对于标准的类型,例如Integer
、String
、Long
等类型是通过Flink
自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo
序列化工具来进行序列化。通常情况下,如果Kryo
序列化工具无法对POJOs
类序列化时,可以使用Avro
对POJOs
类进行序列化,如下代码通过在ExecutionConfig
中调用 enableForceAvro()
来开启Avro
序列化。
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 avro 序列化
env.getConfig().enableForceAvro();
如果用户想使用Kryo
序列化工具来序列化POJOs
所有字段,则在ExecutionConfig
中调用enableForceKryo()
来开启Kryo
序列化。
//获取运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启 Kryo 序列化
env.getConfig().enableForceKryo();
如果默认的Kryo
序列化类不能序列化POJOs
对象,通过调用ExecutionConfig
的addDefaultKryoSerializer()
方法向Kryo
中添加自定义的序列化器。
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
自定义TypeInformation
除了使用已有的TypeInformation
所定义的数据格式类型之外,用户也可以自定义实现TypeInformation
,来满足的不同的数据类型定义需求。Flink
提供了可插拔的 Type Information Factory
让用户将自定义的TypeInformation
注册到Flink
类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory
接口,返回相应的类型信息。通过@TypeInfo
注解创建数据类型,定义CustomTuple
数据类型。
@TypeInfo(CustomTypeInfoFactory.class)
public class CustomTuple<T0,T1>{public T0 field0;public T1 field1;
}
然后定义CustomTypeInfoFactory
类继承于TypeInfoFactory
,参数类型指定CustomTuple
。最后重写createTypeInfo
方法,创建的CustomTupleTypeInfo
就是CustomTuple
数据类型TypeInformation
。
public class CustomTypeInfoFactory extends TypeInfoFactory<CustomTuple>{@Overridepublic TypeInfomation<CustomTuple> createTypeInfo(Type t, Map<String,TypeInfoFactory<?>> genericParameters){return new CustomTupleTypeInfo(genericParameters.get("T0"),genericParameters.get("T1");}
}
相关文章:
Flink 数据类型 TypeInformation信息
Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我么需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知…...
基于python的leetcode算法介绍之递归
文章目录 零 算法介绍一 简单示例 辗转相除法Leetcode例题与思路[509. 斐波那契数](https://leetcode.cn/problems/fibonacci-number/)解题思路:题解: [206. 反转链表](https://leetcode.cn/problems/reverse-linked-list/)解题思路:题解&…...
2023年度佳作:AIGC、AGI、GhatGPT、人工智能大语言模型的崛起与挑战
目录 前言 01 《ChatGPT 驱动软件开发》 内容简介 02 《ChatGPT原理与实战》 内容简介 03 《神经网络与深度学习》 04 《AIGC重塑教育》 内容简介 05 《通用人工智能》 目 录 前言 2023年是人工智能大语言模型大爆发的一年,一些概念和英文缩写也在这一…...
Axure的交互以及情形的介绍
一. 交互 1.1 交互概述 通俗来讲就是,谁用了什么方法做了什么事情,主体"谁"对应的就是axure中的元件,"什么方法"对应的就是交互事件,比如单击事件、双击事件,"什么事情"对应的就是交互…...
【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用
【MATLAB第84期】基于MATLAB的波形叠加极限学习机SW-ELM代理模型的sobol全局敏感性分析法应用 前言 跟往期sobol区别: 1.sobol计算依赖于验证集样本,无需定义变量上下限。 2.SW-ELM自带激活函数,计算具有phi(x)e^x激…...
米游社区表情包整合网站源码
源码介绍 米游社表情包整合网站源码,来自Github大佬的项目,包含米游兔123枚,米游社 玩家12枚,崩坏 星穹铁道112枚,绝区零218枚,NAP32枚,崩坏RPG62枚,崩坏3-1282枚,原神 …...
easyexcel调用公共导出方法导出数据
easyexcel备忘 Slf4j public class ConditionDownloadUtil {//扫描在xboot 包下所有IService 接口的子类, 每次启动服务后, 重新扫描public final static Class[] classesExtendsIService ClassUtil.scanPackageBySuper("cn.exrick.xboot", IService.class).toArra…...
C语言插入排序算法及代码
一、原理 在待排序的数组里,从数组的第二个数字开始,通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。 二、代码部分 #include<stdio.h> #include<stdlib.h> int ma…...
2023年中国法拍房用户画像和数据分析
法拍房主要平台 法拍房主要平台有3家,分别是阿里、京东和北交互联平台。目前官方认定纳入网络司法拍卖的平台共有7家,其中阿里资产司法拍卖平台的挂拍量最大。 阿里法拍房 阿里法拍房数据显示2017年,全国法拍房9000套;2018年&a…...
Android 清除临时文件,清空缓存
python 代码: import os import shutil import tracebackdef delete_folder(path):if os.path.exists(path):print(f"删除文件夹: {path}")shutil.rmtree(path)print("删除完成")def delete_file(path):if os.path.exists(path):print(f"删…...
Guava限流神器:RateLimiter使用指南
1. 引言 可能有些小伙伴听到“限流”这个词就觉得头大,感觉像是一个既复杂又枯燥的话题。别急,小黑今天就要用轻松易懂的方式,带咱们一探RateLimiter的究竟。 想象一下,当你去超市排队结账时,如果收银台开得越多&…...
【六大排序详解】开篇 :插入排序 与 希尔排序
插入排序 与 希尔排序 六大排序之二 插入排序 与 希尔排序1 排序1.1排序的概念 2 插入排序2.1 插入排序原理2.2 排序步骤2.3 代码实现 3 希尔排序3.1 希尔排序原理3.2 排序步骤3.3 代码实现 4 时间复杂度分析 Thanks♪(・ω・)ノ下一篇文章见&am…...
凸优化问题求解
这里写目录标题 1. 线性规划基本定理2.单纯形法2.1 转轴运算 3. 内点法3.1 线性规划的内点法 1. 线性规划基本定理 首先我们指出,线性规划均可等价地化成如下标准形式 { min c T x , s . t A x b , x ⪰ 0 , \begin{align}\begin{cases}\min~c^Tx,\\\mathrm{s.…...
文件操作入门指南
目录 一、为什么使用文件 二、什么是文件 2.1 程序文件 2.2 数据文件 2.3 文件名 三、文件的打开和关闭 3.1 文件指针 3.2 文件的打开和关闭 四、文件的顺序读写 编辑 🌻深入理解 “流”: 🍂文件的顺序读写函数介绍: …...
Axure之交互与情节与一些实例
目录 一.交互与情节简介 二.ERP登录页到主页的跳转 三.ERP的菜单跳转到各个页面的跳转 四.省市联动 五.手机下拉加载 今天就到这里了,希望帮到你哦!!! 一.交互与情节简介 "交互"通常指的是人与人、人与计算机或物体…...
【数据库设计和SQL基础语法】--连接与联接--多表查询与子查询基础(二)
一、子查询基础 1.1 子查询概述 子查询是指在一个查询语句内部嵌套另一个查询语句的过程。子查询可以嵌套在 SELECT、FROM、WHERE 或 HAVING 子句中,用于从数据库中检索数据或执行其他操作。子查询通常返回一个结果集,该结果集可以被包含它的主查询使用…...
Android studio中导入opencv库
具体opencv库的导入流程参考链接:Android Studio开发之路 (五)导入OpenCV以及报错解决 一、出现的错误:NullPointerException: Cannot invoke “java.io.File.toPath()” because “this.mySdkLocation” is null 解决办法&#…...
Linux(1)_基础知识
第一部分 一、Linux系统概述 创始人:芬兰大学大一的学生写的Linux内核,李纳斯托瓦兹。 Linux时unix的类系统; 特点:多用户 多线程的操作系统; 开源操作系统; 开源项目:操作系统,应用…...
网络相关面试题
简述 TCP 连接的过程(淘系) 参考答案: TCP 协议通过三次握手建立可靠的点对点连接,具体过程是: 首先服务器进入监听状态,然后即可处理连接 第一次握手:建立连接时,客户端发送 syn 包…...
Vue2面试题:说一下对跨域的理解?
http请求分为两大类:普通http请求(如百度请求)和ajax请求(跨域是出现在ajax请求) 同源策略:在浏览器发起ajax请求时,当前的网址和被请求的网址协议、域名、端口号必须完全一致,目的是…...
Axure中如何使用交互样式交互事件交互动作情形
🎬 艳艳耶✌️:个人主页 🔥 个人专栏 :《产品经理如何画泳道图&流程图》 ⛺️ 越努力 ,越幸运 目录 一、Axure中交互样式 1、什么是交互样式? 2、交互样式的作用? 3、Axure中如何…...
1112. 迷宫(DFS之连通性模型)
1112. 迷宫 - AcWing题库 一天Extense在森林里探险的时候不小心走入了一个迷宫,迷宫可以看成是由 n∗n 的格点组成,每个格点只有2种状态,.和#,前者表示可以通行后者表示不能通行。 同时当Extense处在某个格点时,他只…...
飞天使-k8s知识点1-kubernetes架构简述
文章目录 名词功能要点 k8s核心要素CNCF 云原生框架简介k8s组建介绍 名词 CI 持续集成, 自动化构建和测试:通过使用自动化构建工具和自动化测试套件,持续集成可以帮助开发人员自动构建和测试他们的代码。这样可以快速检测到潜在的问题,并及早…...
linux中deadline调度原理与代码注释
简介 deadline调度是比rt调度更高优先级的调度,它没有依赖于优先级的概念,而是给了每个实时任务一定的调度时间,这样的好处是:使多个实时任务场景的时间分配更合理,不让一些实时任务因为优先级低而饿死。deadline调度…...
jquery、vue、uni-app、小程序的页面传参方式
jQuery、Vue、Uni-app 和小程序(例如微信小程序)都有它们自己的页面传参方式。下面分别介绍这几种方式的页面传参方式: jQuery: 在jQuery中,页面传参通常是通过URL的查询参数来实现的。例如: <a href"page2…...
ModuleNotFoundError: No module named ‘openai.error‘
ModuleNotFoundError: No module named ‘openai.error’ result self.fn(*self.args, **self.kwargs) File “H:\chatGPTWeb\chatgpt-on-wechat\channel\chat_channel.py”, line 168, in _handle reply self._generate_reply(context) File “H:\chatGPTWeb\chatgpt-on-wec…...
理解pom.xml中的parent标签
✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: 循序渐进学SpringBoot ✨特色专栏&…...
element ui el-avatar 源码解析零基础逐行解析
avatar功能介绍 快捷配置头像的样式 avatar 的参数配置 属性说明参数size尺寸type string 类型 (‘large’,‘medium’,‘small’)number类型 validator 校验shape形状circle (原型) square(方形)icon传入的iconsrc传入的图片st…...
Linux下c语言实现动态库的动态调用
在Linux操作系统下,有时候需要在不重新编译程序的情况下,运行时动态地加载库,这时可以通过Linux操作系统提供的API可以实现,涉及到的API主要有dlopen、dlsym和dlclose。使用时,需要加上头文件#include <dlfcn.h>…...
为什么MCU在ADC采样时IO口有毛刺?
大家在使用MCU内部ADC进行信号采样一个静态电压时,可能在IO口上看到这样的波形。这个时候大家一般会认识是信号源有问题,但仔细观察会发现这个毛刺的频率是和ADC触发频率一样的。 那么为什么MCU在ADC采样时IO口会出现毛刺呢?这个毛刺对结果有…...
如需手机网站建设/网站的营销推广方案
#服务器重启故障、服务器异常死机故障。#温度过高 会自动关机#CPU内存负载过大会宕机或者重启 1、chkconfig acpid offservice acpid stop 2、vi /boot/grub/grub.conf在kernel一行最后加上acpioff noacip#kernel... acpioff noacip 3、reboot转载于:https://blog.51cto.com/ki…...
华夏运用网站/百度手机助手苹果版
linux系统信息查询: 内核版本,分支(发行版)名称,位数,cpu信息等。1、# uname -a (显示系统名、节点名称、操作系统的发行版号、操作系统版本、运行系统的机器 ID 号。)Linux hzhsan2015 2.6.32.12-0.7-default #1 SMP 2010-05-20…...
游戏后端开发/seo外链在线提交工具
使用buildroot构建树莓派4 Linux系统 host配置 使用虚拟机作为编译主机 虚拟化软件:VMware 15.5虚拟网络:NAT虚拟硬盘:默认配置20GCPU/RAM:2x2/8G操作系统:Ubuntu 16.04 Desktop 安装操作系统后配置国内镜像源加速…...
网站用户权限/在线看seo网站
报错提示 1.网上查资料发现原来css-loader和style-loader是被我全局安装了 2.正确做法cnpm install css-loader style-loader --save 3.打包运行成功 转载于:https://www.cnblogs.com/Cavalary/p/7852817.html...
网络运营推广平台/邵阳seo排名
webbench最多可以模拟3万个并发连接去测试网站的负载能力,个人感觉要比Apache自带的ab压力测试工具好,安装使用也特别方便。1、适用系统:Linux2、编译安装:wget http://blog.s135.com/soft/linux/webbench/webbench-1.5.tar.gztar…...
电影下载网站模板/网站推广及seo方案
目录 01 准备演讲内容 02 快速掌握「抑扬顿挫」技巧 03 准备完整的演讲稿 04 害怕在公众面前发言怎么办? 05 如何面对冷场? 06 「好口才人士」具备的7个说话习惯 07 跟柴静学演讲技巧 01 准备演讲内容 02 快速掌握「抑扬顿挫」技巧 03 准备完整的演讲稿 04 害怕在…...