当前位置: 首页 > news >正文

Flink--API 之Transformation-转换算子的使用解析

目录

一、常用转换算子详解

(一)map 算子

(二)flatMap 算子

(三)filter 算子

(四)keyBy 算子

元组类型

POJO

(五)reduce 算子

二、合并与连接操作

(一)union 算子

(二)connect 算子

三、侧输出流(Side Outputs)

四、总结


        在大数据处理领域,Apache Flink 凭借其强大的流处理和批处理能力备受青睐。而转换算子作为 Flink 编程模型中的关键部分,能够对数据进行灵活多样的处理操作,满足各种复杂业务场景需求。本文将深入介绍 Flink 中常见的转换算子,包括 map、flatMap、filter、keyBy、reduce 等,并结合详细代码示例讲解其使用方法,同时探讨 union、connect 等合并连接操作以及侧输出流等特性,帮助读者全面掌握 Flink 转换算子的精髓。

一、常用转换算子详解

(一)map 算子

功能概述
        map 算子主要用于对输入流中的每个元素进行一对一的转换操作,基于用户自定义的映射逻辑将输入元素转换为新的输出元素。

代码示例
假设我们有一份访问日志数据,格式如下:

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css

        我们要将其转换为一个 LogBean 对象,包含访问 ip、用户 userId、访问时间戳 timestamp、访问方法 method、访问路径 path 等字段。

假如需要用到日期工具类,可以导入lang3包:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version>
</dependency>

以下是代码实现:


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class MapDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}

        在上述代码中,通过 map 函数的自定义逻辑,将每行日志字符串按空格拆分后,进行相应字段提取与时间戳转换,最终封装成 LogBean 对象输出。

第二个版本:

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

(二)flatMap 算子

功能概述
        flatMap 算子可以将输入流中的每个元素转换为零个、一个或多个输出元素。它适用于需要对输入元素进行展开、拆分等操作的场景。

代码示例
        假设有数据格式为“张三,苹果手机,联想电脑,华为平板”这样的文本文件,我们要将其转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等形式。

flatmap.log文件如:

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

        这里在 flatMap 函数内部,按逗号拆分每行数据,遍历拆分后的数组(除第一个元素作为名称外),通过 collector 将新组合的字符串收集输出。

(三)filter 算子

功能概述
        filter 算子依据用户定义的过滤条件,对输入流元素进行筛选,满足条件的元素继续向下游传递,不满足的则被过滤掉。

代码示例
        读取map算子中的访问日志数据,过滤出访问 IP 是 83.149.9.216 的访问日志,代码实现如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");// 数据处理转换DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return ip.equals("83.149.9.216");}});// 数据输出filterStream.print();// 执行env.execute();}
}

        在 filter 函数中,通过拆分每行日志获取 IP 地址,并与目标 IP 进行比较决定是否保留该条日志数据。

(四)keyBy 算子

功能概述
        keyBy 算子在流处理中用于对数据按照指定的键进行分组,类似于 SQL 中的 group by,后续可基于分组进行聚合等操作,支持对元组类型 POJO 类型数据按不同方式指定分组键。

流处理中没有groupBy,而是keyBy

KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]

元组类型

单个字段keyBy

//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
wordAndOne.keyBy(v -> v.f0)

多个字段keyBy

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

类似于sql中的group by

select sex,count(1) from student group by sex;

group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组 

POJO

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

单个字段keyBy

source.keyBy(a -> a.getProvince());

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

代码示例
        假设有数据表示不同球类的数量,格式为 Tuple2(球类名称,数量),如 Tuple2.of("篮球", 1) 等,需求是统计篮球、足球各自的总数量。代码如下:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// 数据处理转换/*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});*/KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);keyedStream.sum(1).print();// 执行env.execute();}
}

这里通过 lambda 表达式指定 Tuple2 的第一个元素(球类名称)作为分组键,对数据分组后使用 sum 聚合统计每种球类的数量总和。

pojo演示

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 14:32:52**/
public class Demo07 {@Data@AllArgsConstructorstatic class Ball{private String ballName;private int num;}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出// 以下演示数据是pojoDataStreamSource<Ball> ballSource = env.fromElements(new Ball("篮球", 1),new Ball("篮球", 2),new Ball("篮球", 3),new Ball("足球", 3),new Ball("足球", 2),new Ball("足球", 3));ballSource.keyBy(ball -> ball.getBallName()).print();ballSource.keyBy(new KeySelector<Ball, String>() {@Overridepublic String getKey(Ball ball) throws Exception {return ball.getBallName();}});//5. execute-执行env.execute();}
}

(五)reduce 算子

功能概述
        reduce 算子可对一个数据集或一个分组进行聚合计算,将多个元素逐步合并为一个最终元素,常用于求和、求最值等场景,sum 底层其实也是基于 reduce 实现。

代码示例
        读取访问日志,统计每个 IP 地址的访问 PV 数量,代码如下:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class ReduceDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return Tuple2.of(logBean.getIp(), 1);}});//4. sink-数据输出KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);// sum的底层是 reduce// keyByStream.sum(1).print();//  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0,  t1.f1 + t2.f1);}}).print();//5. execute-执行env.execute();}
}

        先将日志数据处理成包含 IP 和计数 1 的 Tuple2 格式,按 IP 分组后,在 reduce 函数中对相同 IP 的计数进行累加,得到每个 IP 的访问 PV 数。

二、合并与连接操作

(一)union 算子

功能概述
        union 算子能够合并多个同类型的流,将多个 DataStream 合并成一个 DataStream,但注意合并时流的类型必须一致,且不会对数据去重

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");// 合并流DataStream<String> unionStream = stream1.union(stream2);unionStream.print();// 执行env.execute();}
}

上述代码将两个包含字符串元素的流进行合并输出。

(二)connect 算子

功能概述
        connect 算子可连接 2 个不同类型的流,连接后形成 ConnectedStreams,内部两个流保持各自的数据和形式独立,之后需通过自定义处理逻辑(如 CoMapFunction 等)处理后再输出,且处理后的数据类型需相同。

代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<Long> stream3 = env.fromSequence(1, 10);// 连接流ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 处理流DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return value;}@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});// 输出mapStream.print();// 执行env.execute();}
}

        这里连接了一个字符串流和一个长整型序列流,通过 CoMapFunction 分别将字符串按原样、长整型转换为字符串后合并输出。

三、侧输出流(Side Outputs)

功能概述
        侧输出流可根据自定义规则对输入流数据进行分流,将满足不同条件的数据输出到不同的“分支”流中,方便后续针对性处理。

代码示例
以下示例对流中的数据按照奇数和偶数进行分流并获取分流后的数据:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {// 1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流// 从0到100生成一个Long类型的数据流作为示例数据DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签,用于区分偶数和奇数的侧输出流OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));// 2. source-加载数据// 使用ProcessFunction来处理每个元素,决定将其输出到哪个侧输出流或者主输出流SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value代表每一个数据,判断是否为偶数if (value % 2 == 0) {// 如果是偶数,将其输出到偶数的侧输出流中ctx.output(tag_even, value);} else {// 如果是奇数,将其输出到奇数的侧输出流中ctx.output(tag_odd, value);}}});// 3. 获取奇数的所有数据,从主输出流中获取对应标签(tag_odd)的侧输出流数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据,同样从主输出流中获取对应标签(tag_even)的侧输出流数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");// 4. sink-数据输出(这里通过打印展示了侧输出流的数据,实际应用中可对接其他下游操作)// 5. 执行任务env.execute();}
}

在上述代码中:

  • 首先,我们创建了 StreamExecutionEnvironment 来准备 Flink 的执行环境,并设置了运行模式为自动(AUTOMATIC)。
  • 接着,通过 fromSequence 方法生成了一个从 0 到 100 的 Long 类型的数据流作为示例的输入数据。
  • 然后,定义了两个 OutputTag,分别用于标记偶数和奇数的侧输出流,并且指定了对应的类型信息(这里都是 Long 类型)。
  • 在 process 函数中,针对每个输入的元素(value),通过判断其是否能被 2 整除来决定将其输出到对应的侧输出流中。如果是偶数,就通过 ctx.output(tag_even, value) 将其发送到偶数侧输出流;如果是奇数,则通过 ctx.output(tag_odd, value) 发送到奇数侧输出流。
  • 最后,通过 getSideOutput 方法分别获取奇数和偶数侧输出流的数据,并进行打印输出,以此展示了侧输出流的分流及获取数据的完整流程。实际应用场景中,这些侧输出流的数据可以进一步对接不同的业务逻辑进行相应处理,比如奇数流进行一种聚合计算,偶数流进行另一种统计分析等。

        这样,利用侧输出流的特性,我们可以很灵活地根据自定义条件对数据进行分流处理,满足多样化的数据处理需求。

四、总结

        本文围绕 Apache Flink 转换算子展开,旨在助力读者洞悉其核心要点与多样应用,以灵活处理复杂业务数据。

  1. 常用转换算子
    • map:基于自定义逻辑,对输入元素逐一变换,如剖析日志字符串、提取关键信息并转换格式,封装为定制对象输出,契合精细化处理需求。
    • flatMap:将输入元素按需拆分为零个及以上输出元素,凭借拆分、重组操作,挖掘数据深层价值,适配展开、细化数据场景。
    • filter:依设定条件甄别筛选,精准把控数据流向,剔除不符元素,保障下游数据贴合业务关注点。
    • keyBy:类比 SQL 分组,依指定键归拢数据,为聚合奠基,以不同方式适配多元数据类型,实现分类统计。
    • reduce:聚焦数据集或分组,渐进聚合,可求和、求最值等,sum 操作底层亦仰仗于此,高效整合数据得出汇总结果。
  2. 合并与连接操作
    • union:整合同类型多流为一,操作简便,唯需留意类型一致,虽不除重但拓宽数据维度。
    • connect:桥接不同类型流,借自定义逻辑协同处理,统一输出类型,达成跨流数据交互融合。
  3. 侧输出流特性:基于自定义规则巧妙分流,借OutputTag标记、ProcessFunction判定,将数据导向不同 “分支”,按需对接各异业务逻辑,于复杂场景中尽显灵活应变优势,全方位满足数据处理多元化诉求。

相关文章:

Flink--API 之Transformation-转换算子的使用解析

目录 一、常用转换算子详解 &#xff08;一&#xff09;map 算子 &#xff08;二&#xff09;flatMap 算子 &#xff08;三&#xff09;filter 算子 &#xff08;四&#xff09;keyBy 算子 元组类型 POJO &#xff08;五&#xff09;reduce 算子 二、合并与连接操作 …...

每日十题八股-2024年11月27日

1.类型互转会出现什么问题吗&#xff1f; 2.为什么用bigDecimal 不用double &#xff1f; 3.装箱和拆箱是什么&#xff1f; 4.Java为什么要有Integer&#xff1f; 5.Integer相比int有什么优点&#xff1f; 6.那为什么还要保留int类型&#xff1f; 7.说一下 integer的缓存 8.怎么…...

OpenCV截取指定图片区域

import cv2 img cv2.imread(F:/2024/Python/demo1/test1/man.jpg) cv2.imshow(Image, img) # 显示图片 #cv2.waitKey(0) # 等待按键x, y, w, h 500, 100, 200, 200 # 示例坐标 roi img[y:yh, x:xw] # 截取指定区域 cv2.imshow(ROI, roi) cv2.waitKey(0) cv…...

Java部分新特性

模式匹配 instance of 模式匹配 之前写法 public void print(Object o) {if (o instanceof String){String str (String) obj;System.out.println("This is a String of length " s.length());} else {System.out.println("This is not a String");} …...

【SpringBoot】28 API接口防刷(Redis + 拦截器)

Gitee仓库 https://gitee.com/Lin_DH/system 介绍 常用的 API 安全措施包括&#xff1a;防火墙、验证码、鉴权、IP限制、数据加密、限流、监控、网关等&#xff0c;以确保接口的安全性。 常见措施 1&#xff09;防火墙 防火墙是网络安全中最基本的安全设备之一&#xff0c…...

IT运维专家给年轻人一些职业上的建议

运维工作在现代企业中是非常重要的一环,保证系统的稳定性、可用性以及安全性对企业的正常运营至关重要。以下是我给年轻人的一些职业发展建议,希望能够帮助你们在运维领域找到方向并取得成功。 1. 夯实基础,扎实技术功底 精通操作系统与网络:运维工作需要深入理解操作系统…...

Django基础之路由

一.前言 前面我们说了django的安装于基础配置&#xff0c;基础知识点我就细分下来&#xff0c;每天和大家讲一点&#xff0c;今天就要和大家说django的基础知识点了&#xff0c;我们今天先来讲路由&#xff0c;内容不多&#xff0c;希望大家记住 二.传统路由 路由就是前面一个…...

Python实例化中默认值的行为及应用

Python实例化中默认值的行为及应用 适合初学者阅读 本文要点 使用可变对象作为默认参数会导致所有实例共享同一对象&#xff0c;引发意外的数据修改。不可变对象作为默认参数时&#xff0c;每次实例化都会创建新的对象&#xff0c;不会共享数据。推荐使用None作为默认值&…...

【WRF后处理】WRF模拟效果评价及可视化:MB、RMSE、IOA、R

【WRF后处理】模拟效果评价及可视化 准备工作模型评价指标Python实现代码Python处理代码:导入站点及WRF模拟结果可视化图形及评价指标参考在气象和环境建模中(如使用 WRF 模型进行模拟),模型性能评价指标是用于定量评估模拟值与观测值之间偏差和拟合程度的重要工具。 本博客…...

ShenNiusModularity项目源码学习(4:身份认证)

ShenNiusModularity项目有两套启动方式&#xff0c;一种是ShenNius.Admin.Mvc项目启动&#xff0c;该项目为MVC模式&#xff0c;带前台页面&#xff0c;也有后台服务&#xff0c;另一种是ShenNius.Admin.Hosting&#xff0c;该项目启动后仅提供后台服务&#xff0c;供其它前台项…...

python+django自动化部署日志采用‌WebSocket前端实时展示

一、开发环境搭建和配置 # channels是一个用于在Django中实现WebSocket、HTTP/2和其他异步协议的库。 pip install channels#channels-redis是一个用于在Django Channels中使用Redis作为后台存储的库。它可以用于处理#WebSocket连接的持久化和消息传递。 pip install channels…...

flink学习(6)——自定义source和kafka

概述 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够>1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够>1) --类 【建议使用的】 ——…...

开发常见问题及解决

1.DBeaver 报Public Key Retrieval is not allowed 在使用DBeaver连接数据库时出现“Public Key Retrieval is not allowed”错误&#xff0c;主要是因为数据库连接配置的安全策略导致的。以下是详细的解释和解决方法&#xff1a; 错误原因 这个错误通常出现在连接MySQL数据…...

python excel接口自动化测试框架!

今天采用Excel继续写一个接口自动化测试框架。 设计流程图 这张图是我的excel接口测试框架的一些设计思路。 首先读取excel文件&#xff0c;得到测试信息&#xff0c;然后通过封装的requests方法&#xff0c;用unittest进行测试。 其中&#xff0c;接口关联的参数通过正则进…...

mybatis:You have an error in your SQL syntax;

完整报错You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near false, false, false, false, false, false, false, false, false, false, false, at line 1 SQL: INSERT INTO user …...

使用 Maven 开发 IntelliJ IDEA 插件

使用 Maven 开发 IntelliJ IDEA 插件的完整流程 1. 创建 Maven 项目 1.1 使用 IntelliJ 创建 Maven 项目 打开 IntelliJ IDEA&#xff0c;点击 File > New > Project。选择 Maven&#xff0c;填写项目名称和 GroupId&#xff0c;例如&#xff1a; GroupId: com.exampl…...

Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测

漏洞说明&#xff1a; 打开链接&#xff1a;https://docs.microsoft.com/zh-cn/troubleshoot/windows-server/windows-security/restrict-cryptographic-algorithms-protocols-schannel 可以看到&#xff1a; 找到&#xff1a;应通过配置密码套件顺序来控制 TLS/SSL 密码 我们…...

uniapp生命周期:应用生命周期和页面生命周期

文章目录 1.应用的生命周期2.页面的生命周期 1.应用的生命周期 生命周期的概念&#xff1a;一个对象从创建、运行、销毁的整个过程被称为生命周期 生命周期函数&#xff1a;在生命周期中每个阶段会伴随着每一个函数的出发&#xff0c;这些函数被称为生命周期函数 所有页面都…...

基于SSM的婴幼儿用品商城系统+LW示例参考

1.项目介绍 功能模块&#xff1a;管理员&#xff08;产品管理、产品分类、会员管理、订单管理、秒杀活动、文章管理、数据统计等&#xff09;、普通用户&#xff08;登录注册、个人中心、购物车、我的收藏、各类信息查看等&#xff09;技术选型&#xff1a;SSM&#xff0c;jsp…...

【工具变量】城市供应链创新试点数据(2007-2023年)

一、测算方式&#xff1a;参考C刊《经济管理》沈坤荣和乔刚老师&#xff08;2024&#xff09;的做法&#xff0c;使用“供应链创新与应用试点”的政策虚拟变量&#xff08;TreatPost&#xff09;表征。若样本城市为试点城市&#xff0c;则赋值为 1&#xff0c;否则为 0&#xf…...

【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反

项目需要重新运行了一下generate_car.py的脚本&#xff0c;发现死活生成不了&#xff0c;研究了半天&#xff0c;发现脚本里面生成车辆的坐标值y和carla_ros_bridge_with_example_ego_vehicle.launch脚本打开的驾驶操控界面里面的y值正好是相反数! y1-y2 因为&#xff0c;我运行…...

Jira使用笔记二 ScriptRunner 验证问题创建角色

背景 最近在对公司Jira工作流改造&#xff0c;收到这么一个要求&#xff1a;某些问题类型只有某些角色可以创建。本来是想通过Jira内建的权限控制来处理的。结果点到权限页面&#xff0c;心都凉透了。 好吧&#xff0c;那只能上脚本了。最终使用ScriptRunner的Simple scripte…...

Java线程的使用

Java中的线程是用来实现多任务并发执行的机制。在Java中&#xff0c;主要有两种方式来创建和使用线程&#xff1a;实现Runnable接口和继承Thread类。 实现Runnable接口&#xff1a; 创建一个类&#xff0c;实现Runnable接口&#xff0c;并重写run()方法。在run()方法中定义线程…...

自动化测试工具Ranorex Studio(四十三)-RANOREXPATH编辑器5

代码示例 下面的代码示例将讲解如何使用Ranorex API来编写代码模块&#xff0c;或者是使用用户代码来扩展录制的模块。 在代码中使用对象库 使用对象库等待UI元素 建立Adapter来访问更多的属性和方法 为对象库元素建立一组Adapter 使用Validate类 强制一个测试用例失败 设置aut…...

超高流量多级缓存架构设计!

文章内容已经收录在《面试进阶之路》&#xff0c;从原理出发&#xff0c;直击面试难点&#xff0c;实现更高维度的降维打击&#xff01; 文章目录 电商-多级缓存架构设计多级缓存架构介绍多级缓存请求流程负载均衡算法的选择轮询负载均衡一致性哈希负载均衡算法选择 应用层 Ngi…...

数据结构(Java)—— ArrayList

1.线性表 线性表&#xff08; linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列... 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条直线。但是在…...

实习冲刺第三十三天

102.二叉树的层序遍历 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]]示例…...

Uniapp开发下拉刷新功能onPullDownRefresh/onReachBottom

文章目录 1.onPullDownRefresh2.onReachBottom 1.onPullDownRefresh 在 js 中定义 onPullDownRefresh 处理函数&#xff08;和onLoad等生命周期函数同级&#xff09;&#xff0c;监听该页面用户下拉刷新事件。 需要在 pages.json 里&#xff0c;找到的当前页面的pages节点&am…...

什么是 C++ 中的函数对象?函数对象与普通函数有什么区别?如何定义和使用函数对象?

1) 什么是 C 中的函数对象&#xff1f;它有什么特点&#xff1f; 在 C 中&#xff0c;函数对象&#xff08;也称为仿函数或 functor&#xff09;是一种重载了 operator() 的对象。这意味着这些对象可以像函数一样被调用。函数对象通常用于需要传递行为&#xff08;即代码&…...

PointNet++论文复现

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…...

网站建设 康盛设计/媒体软文发稿

一、避免或简化排序 应当简化或避免对大型表进行重复的排序。当能够利用索引自动以适当的次序产生输出时&#xff0c;优化器就避免了排序的步骤。以下是一些影响因素&#xff1a;1.索引中不包括一个或几个待排序的列。2.GROUP BY或ORDER BY子句中列的次序与索引的次序不一样。…...

九龙坡区建设二校的网站/神马seo教程

资料下载 coding无法使用浏览器打开&#xff0c;必须用git工具下载&#xff1a; git clone https://e.coding.net/weidongshan/linux/doc_and_source_for_drivers.git视频观看 百问网驱动大全 I2C接口触摸屏驱动分析 参考资料&#xff1a; Linux 5.x内核 Documentation\de…...

青岛市建设局网站停工/广州网络推广培训

[Warning] incompatible implicit declaration of built-in function ‘memset’ 原因是memset第一个参数是void * 类型&#xff0c;我用的是char类型&#xff0c;存在隐式声明 第一想法是强制类型转换&#xff0c;memset((void*)xx, 0, sizeof(xx)); 还是不行&#xff0c;依然…...

年栾洪全单页做网站教程/东莞推广系统

1.返回第一页的10条 select t2.* from (select t1.* from youtable t1 ) t2where rownum<返回的条数2.返回非第一页的10条select t2.* from (select rownum rn,t1.* from youtable t1 where rownum<大数) t2where rn>小数如果想要返回10到20条&#xff0c;则大数为20&…...

好的文化网站模板下载/网站的seo 如何优化

修改COM口的端口号...

专业做网站的公司有没有服务器/运营培训班学费大概多少

转载于:https://blog.51cto.com/xcf007/109359...