Flink任务如何跑起来之 2.算子 StreamOperator
Flink任务如何跑起来之 2.算子 StreamOperator
前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。
UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。
// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)
这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。
问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?
1. Flink算子
在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。
一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator
。
因此Function作为算子的一部分参与后续的数据加工。
算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:
- 生命周期、状态和容错管理,主要是
AbstractStreamOperator
抽象类及其子类实现,以及未来的AbstractStreamOperatorV2
抽象类。 - 数据处理,主要是
OneInputStreamOperator
、TwoInputStreamOperator
和MultipleInputStreamOperator
接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。
StreamOperator完整的顶层抽象如下。
- AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
包含UDF的算子需继承其AbstractUdfStreamOperator子类
对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
将来将会使用AbstractStreamOperatorV2替换该基类 - OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
- TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
- AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
当前仅仅用于和MultipleInputStreamOperator一起配合使用。
OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。
MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下
// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {// 处理数据void processElement(StreamRecord<IN> element) throws Exception;
}// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {// 处理双流输入中第一个流上的元素void processElement1(StreamRecord<IN1> element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecord<IN2> element) throws Exception;
}// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {List<Input> getInputs();
}
在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator
抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法,实际上调用Function的方法@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}}@Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);}
}
常用的实现类基本继承自AbstractUdfStreamOperator抽象类。
单流输入,如map、fliter、source、sink等实现类
sink算子有两个实现类,分别是SinkOperator
和StreamSink<IN>
。二者的关系为SinkOperator
是StreamSink<RowData>
的特例。
双流输入,如concat、intervalJoin等实现类
本文开头提到通过SimpleOperatorFactory.of
方式生成SimpleOperatorFactory
实例,该方法如下
public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {if (operator == null) {return null;} else if (operator instanceof StreamSource&& ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);} else if (operator instanceof StreamSink&& ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory<>(operator);}
}
得到SimpleOperatorFactory
实例后,在实际执行时,通过其createStreamOperator
方法得到StreamOperator实例。
1.1. 算子生成示例
上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation
的过程
// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例,传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}
在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)
的逻辑。
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient Output<StreamRecord<OUT>> output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);// 实例化StreamMap时,指定ALWAYS的算子链生成策略chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));}
}
要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。
2. 算子链
Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain
抽象类表示。
算子的合并策略在ChainingStrateg枚举类中定义,详情如下
/*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */
public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起,大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起,但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}
相关文章:
Flink任务如何跑起来之 2.算子 StreamOperator
Flink任务如何跑起来之 2.算子 StreamOperator 前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。 UDF场景下,DataStream到…...
学习笔记——路由网络基础——路由优先级(preference)
1、路由优先级(preference) 路由优先级(preference)代表路由的优先程度。当路由器从多种不同的途径获知到达同一个目的网段的路由(这些路由的目的网络地址及网络掩码均相同)时,路由器会比较这些路由的优先级,优选优先级值最小的路由。 路由来源的优先…...
数据预处理——调整方差、标准化、归一化(Matlab、python)
对数据的预处理: (a)、调整数据的方差; (b)、标准化:将数据标准化为具有零均值和单位方差;(均值方差归一化(Standardization)) (c)、最值归一化,也称为离差标准化,是对原始数据的…...
opencv_特征检测和描述
理解特征 寻找独特的特定模式或特定特征,可以轻松跟踪和比较。 拼图:在图像中搜索这些特征,找到它们,在其他图像中查找相同的特征并对齐它们。而已。 基本上,角被认为是图像中的好特征。 在本单元中,我…...
CID引流电商下的3C产品选品策略深度解析
摘要:随着电商行业的迅猛发展和消费者需求的日益多样化,CID引流电商作为一种新兴的电商模式,逐渐受到了广泛关注。在这一模式下,3C产品作为高客单价、高技术含量的代表品类,其选品策略的制定显得尤为重要。本文将从多…...
DeepSORT(目标跟踪算法)中的状态向量与状态转移矩阵
DeepSORT(目标跟踪算法)中的状态向量与状态转移矩阵 flyfish 状态转移矩阵(State Transition Matrix)F的构造 这篇是一定要看的,拖到文章的最后部分,需要理解状态转移矩阵怎么来的,怎么是这个…...
李宏毅深度学习01——基本概念简介
视频链接 基本概念 Regression(回归): 类似于填空 Classification(分类): 类似于选择 Structure Learning(机器学习): ?? 机器学习找对应函数…...
TcpClient 服务器、客户端连接
TcpClient 服务器 TcpListener 搭建tcp服务器的类,基于socket套接字通信的 1 创建服务器对象 TcpListener server new TcpListener(IPAddress.Parse("127.0.0.1"), 3000); 2 开启服务器 设置最大连接数 server.Start(1000); 3 接收客户端的链接,只能…...
13大最佳工程项目管理系统软件盘点
国内外主流的13款工程项目管理系统软件:Worktile、中建软件、泛微建筑项目管理软件、LiquidPlanner、Wrike、建文软件、广联达、Microsoft Project、泛普软件、Procore、Buildertrend、Fieldwire、Autodesk Construction Cloud。 在快速变化的工程领域,有…...
SpringMVC:拦截器(Interceptor)
1. 简介 拦截器(Interceptor)类似于过滤器(Filter) Spring MVC的拦截器作用是在请求到达控制器之前或之后进行拦截,可以对请求和响应进行一些特定的处理。拦截器可以用于很多场景下: 1. 登录验证…...
【Python】selenium使用find_element时解决【NoSuchWindowException】问题的方法
NoSuchWindowException 是 Selenium WebDriver 中的一种异常,当尝试切换到一个不存在的窗口时,或者在尝试获取窗口句柄时窗口已经关闭或不存在,就会抛出这个异常。 以下是一些解决 NoSuchWindowException 的常见方法: 检查窗口是…...
PTA:7-188 水仙花数
作者 王秀秀 单位 山东交通学院 任务描述 本关任务:输出100到999之间的所有的“水仙花数”。所谓的“水仙花数”是指一个3位数,其各位数字立方和等于该数本身。 例如,153是一个水仙花数,因为 15313 53 33 提示 关键在于对一…...
HTML静态网页成品作业(HTML+CSS+JS)—— 美食企业曹氏鸭脖介绍网页(4个页面)
🎉不定期分享源码,关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 🏷️本套采用HTMLCSS,使用Javacsript代码实现 图片轮播切换,共有4个页面。 二、…...
SCI二区|鲸鱼优化算法(WOA)原理及实现【附完整Matlab代码】
目录 1.背景2.算法原理2.1算法思想 3.结果展示4.参考文献5.代码获取 1.背景 2016年,S Mirjalili受到自然界座头鲸社会行为启发,提出了鲸鱼优化算法(Whale Optimization Algorithm, WOA)。 2.算法原理 WOA模拟了座头鲸的社会行为…...
人脸匹配——OpenCV
人脸匹配 导入所需的库加载dlib的人脸识别模型和面部检测器读取图片并转换为灰度图比较两张人脸选择图片并显示结果比较图片创建GUI界面运行GUI主循环运行显示全部代码 导入所需的库 cv2:OpenCV库,用于图像处理。 dlib:一个机器学习库&#x…...
韩顺平0基础学java——第22天
p441-459 异常exception 选中代码块,快捷键ctraltt6,即trt-catch 如果进行了异常处理,那么即使出现了异常,但是会继续执行 程序过程中发生的异常事件分为两大类: 异常体系图※ 常见的运行异常:类型转换…...
神经网络介绍及教程案例
神经网络介绍及教程&案例 神经网络(Neural Networks)是机器学习和人工智能中的一种关键技术,模仿了人类大脑的工作方式,能够处理复杂的数据和任务。以下是神经网络的一些基础介绍: 基本概念 神经元(N…...
16个不为人知的资源网站,强烈建议收藏!
整理了16个不为人知的资源网站,涵盖了课程学习、办公技能、娱乐休闲、小说音乐等多种资源,强烈建议收藏! #学习网站 1、中国大学MOOC icourse163.org/ 这是一个汇集了国内顶尖大学免费课程资源的平台,众多985工程院校如北京大…...
pandas获取某列最大值的所有数据
第一种方法: 按照某列进行由大到小的排序,然后再进去去重,保留第一个值,最终保留的结果就是最大值的数据 # 由大到小排序 data_frame data_frame.sort_values(bycolumn_a, ascendingFalse)# 按照column_b列去重保留第一条&#…...
App UI 风格展现非凡创意
App UI 风格展现非凡创意...
rocketmq-5.1.2的dleger高可用集群部署
1、背景 原先为5.0.0版本,因检查出有漏洞,升级到5.1.2版本。 【Rocketmq是阿里巴巴在2012年开发的分布式消息中间件,专为万亿级超大规模的消息处理而设计,具有高吞吐量、低延迟、海量堆积、顺序收发等特点。在一定条件下…...
无线网络与物联网技术[1]之近距离无线通信技术
无线网络与物联网技术 近距离无线通信技术WIFIWi-Fi的协议标准Wi-Fi的信道Wi-Fi技术的术语Wi-Fi的组网技术Ad-hoc模式无线接入点-APAP:FAT AP vs FIT AP Wi-Fi的特点与应用Wi-Fi的安全技术 Bluetooth蓝牙技术概论蓝牙的技术协议蓝牙的组网技术微微网piconet(了解)散…...
Codeforces Round 952 (Div. 4)
题解写到博客园了,懒得复制过来了了,放个链接 https://www.cnblogs.com/yxcblogs/p/18243276 推广一下自己记录的算法编程竞赛模板仓库 GitHub - yxc-s/programming-template: This repository contains C programming templates optimized for competi…...
spark MLlib (DataFrame-based) 中的聚类算法Bisecting K-Means、K-Means、Gaussian Mixture
Bisecting K-Means 核心原理: Bisecting K-Means 是一种层次 K-Means 聚类算法,基于 Steinbach、Karypis 和 Kumar 的论文《A comparison of document clustering techniques》,并对 Spark 环境进行了修改和适应。 该算法通过递归地将数据集…...
天降流量于雀巢?元老品牌如何创新营销策略焕新生
大家最近有看到“南京阿姨手冲咖啡”的视频吗?三条雀巢速溶咖啡入杯,当面加水手冲,十元一份售出,如此朴实的售卖方式迅速在网络上走红。而面对这一波天降的热度,雀巢咖啡迅速做出了回应,品牌组特地去到了阿…...
新疆在线测宽仪配套软件实现的9大功能!
在线测宽仪可应用于各种热轧、冷轧板带材的宽度尺寸检测,材质不限,木质、钢制、铁质、金属、纸质、塑料、橡胶等都可以进行无损非接触式的检测,在各式各样的产线应用中,有些厂家,需要更加详尽完备的分析信息࿰…...
考研计组chap3存储系统
目录 一、存储器的基本概念 80 1.按照层次结构 2.按照各种分类 (41)存储介质 (2)存取方式 (3)内存是否可更改 (4)信息的可保存性 (5)读出之后data是否…...
杨氏矩阵和杨辉三角的空间复杂度较小的解题思路
文章目录 题目1 杨氏矩阵题目2 杨辉三角 题目1 杨氏矩阵 有一个数字矩阵,矩阵的每行从左到右是递增的,矩阵从上到下是递增的,请编写程序在这样的矩阵中查找某个数字是否存在。 要求:时间复杂度小于O(N); 思路: 我们可以通过题目…...
【第六篇】SpringSecurity的权限管理
一、权限管理的实现 服务端的各种资源要被SpringSecurity的权限管理控制可以通过注解和标签两种方式来处理。 放开了相关的注解后在Controller中就可以使用相关的注解来控制了 JSR250注解 /*** JSR250*/ @Controller @RequestMapping("/user") public class UserC…...
未来工作场所:数字化转型的无限可能
探索技术如何重塑我们的工作环境与协作方式 引言 在21世纪的第三个十年,数字化转型已不再仅仅是科技公司的专利,它如同一股不可阻挡的潮流,深刻地渗透到了每一个行业的血脉之中。从灵活的远程办公模式到工作流程的智能化重构,技术…...
一些做淘宝优惠券的网站/windows7优化大师官方下载
引子每周我家都会举行一个复盘活动,复盘自己的优点、缺点,干得好的、不好的。在复盘活动上,老公每次只能想到自己的优点,觉得自己特完美。嗯……我的角度看完全不是这样。举个例子,他是典型的双标。如果他加班特别晚&a…...
wordpress承载/安卓手机性能优化软件
在学习汇编语言的时候,状态标志位中CF和OF的区别让我头疼,说实话一直不太明白CF和OF是什么意思,但后来看了王爽的汇编语言就懂了。CF是进位标志位,CF1表示进位或者借位,CF0没有进位或者借位。举个例子一个数是8位&…...
济南网站建设套餐/360竞价推广客服电话
在研究Condition时,发现它的API提供了BoudedBuffer实现,并指出ArrayBlockingQueue就是一个BoudedBuffer的高阶实现。因此深入研究了下BoudedBuffer,其核心思想是:1. 使用一个循环数组2. 定义一个Count,作为put和take的…...
哪里有学习做网站的/1688的网站特色
2016-12-12 11:57:12使用ajax的开发项目过程中,经常需要将json格式的字符串返回到前端,前端解析成js对象(JSON )。如果直接以json的格式返回则方便很多,有时候通过后台直接写到页面中则会以字符串的方式存在,那么就用到了将字符串…...
xdebug wordpress/网络公司排名
windows server 2003服务器上邮件服务器的认证问题关于windows server 2003上如何利用自身的组件来配置邮件服务器的方法网上已经有很多,这里不再讨论,我在这里简单说了server03自带邮件服务器的认证问题。假设计算机有2个账户,1个test1,另一…...
老网站文章突然无收录/站长之家素材网
2021年的互联网行业竞争越来越严峻,面试也是越来越难,一直以来我都想整理一套完美的面试宝典,奈何难抽出时间,这套完整的java后端学习路线以及1000道的Java面试手册我整理了整整1个月,上传到Git上目前star数达到了30K …...