网站域名删除时间查询/青岛谷歌推广
文章目录
- 起源与发展
- flink在github上的现状
- 实时计算VS离线计算
- 实时计算
- 离线计算
- 实时计算常用的场景
- 框架流处理流程
- flink电商场景下的业务图示例
- flink中一些重要特性
- 有界数据和无界数据
- 时间语义、水位线
- 事件时间
- 处理时间
- 水位线
- flink窗口
- 概念
- 理想中的数据处理
- 含有延迟数据的数据处理
- Flink存储桶概念
- 窗口类型
- 滚动窗口
- 滑动窗口
- 会话窗口
- 全局窗口
- flink状态管理
- 检查点(Checkpoint)
- 检查点恢复数据过程
- 下载安装
- 入门Demo示例
- pom配置
- Demo代码
- 打包到集群
- 流运行时执行环境
- 任务槽Slot
- 扩展Demo
- 时间窗口Demo
- Table Api Demo
- 对迟到数据处理Demo
起源与发展
Flink
起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink
就是在此基础上被重新设计出来的。
在德语中,“flink”
一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了,这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 Hadoop、Hive?),更是因为松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由:柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一根以红棕色为主的渐变色羽毛。于是,Flink
的松鼠 Logo 就设计成了红棕色,而且拥有一个漂亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应了 Apache 的风格,似乎也预示着 Flink
未来将要大放异彩。
从命名上,我们也可以看出
Flink
项目对于自身特点的定位,那就是对于大数据处理,要做到快速和灵活。
- 2014 年 8 月,
Flink
第一个版本 0.6 正式发布(至于 0.5 之前的版本,那就是在Stratosphere 名下的了)。与此同时 Fink 的几位核心开发者创办了 Data Artisans 公司,主要做 Fink 的商业应用,帮助企业部署大规模数据处理解决方案。 - 2014 年 12 月,
Flink
项目完成了孵化,一跃成为 Apache 软件基金会的顶级项目。 - 2015 年 4 月,
Flink
发布了里程碑式的重要版本 0.9.0,很多国内外大公司也正是从这时开始关注、并参与到Flink
社区建设的。 - 2019 年 1 月,长期对
Flink
投入研发的阿里巴巴,以 9000 万欧元的价格收购了 Data Artisans 公司;之后又将自己的内部版本 Blink 开源,继而与 8 月份发布的Flink
1.9.0版本进行了合并。自此之后,Flink
被越来越多的人所熟知,成为当前最火的新一代大数据处理框架。 - 2020年:
Flink Forward Asia
和Flink Forward Europe
两个Flink
社区活动成功举办。 - 2021年:
Flink
与 ClickHouse 成为 Apache 流式处理项目的顶级项目。 - 2022年:
Flink 1.15 和 1.16
版本相继发布,引入了许多新特性和改进。
flink在github上的现状
目前已经到了1.16.1的release版本
实时计算VS离线计算
实时计算
- 数据实时到达
- 数据到达次序独立,不受应用系统所控制
- 数据规模大且无法预知容量
- 原始数据一经处理,除非特意保存,否则不能被再次取出处理,或者再次提取数据代价昂贵
离线计算
- 数据量大且时间周期长(一天、一星期、一个月、半年、一年)
- 在大量数据上进行复杂的批量计算操作
- 数据在计算之前已经固定,不再会发生变化
- 能够方便的查询批量计算的结果
实时计算常用的场景
- 实时数据存储:实时数据存储的时候做一些微聚合、过滤某些字段、数据脱敏,组建数据仓库,实时 ETL等
- 实时数据分析:实时数据接入机器学习框架(TensorFlow)或者一些算法进行数据建模、分析,然后动态的给出商品推荐、广告推荐等
- 实时监控告警:金融相关涉及交易、实时风控、行车流量预警、服务器监控告警、应用日志告警等
- 实时数据报表:活动营销时销售额/销售量大屏,TopN 商品等
框架流处理流程
flink电商场景下的业务图示例
flink中一些重要特性
Apache Flink 是一个功能强大的流处理引擎,如果你要在 Flink 上进行研发,下面是一些你必须了解的重要特性:
- 数据流处理和批处理统一:Flink 支持将批处理和流处理统一到一个编程模型中,这使得 Flink 应用程序可以同时处理无界和有界数据集。
- 事件时间处理:Flink 引入了基于事件时间的处理模式,这使得 Flink 应用程序可以处理无序事件数据,并且可以处理延迟事件。
- 窗口操作:Flink 可以对无界数据流进行窗口操作,例如滑动窗口、会话窗口、全局窗口等等。
- 状态管理:Flink 提供了高效的状态管理,可以轻松地维护和访问应用程序的状态。
- 高可用性:Flink 提供了可靠的容错机制,以保证 Flink 应用程序的高可用性和数据完整性。
- 数据源集成:Flink 提供了对多种数据源的支持,例如 Mysql HBase、Elasticsearch、Hive、Kafka、Redis 等等。
- Flink SQL:Flink SQL 允许用户使用 SQL 语言来进行 Flink 应用程序的编写,这使得 Flink 应用程序的编写变得更加简单和直观。
- 深度学习支持:Flink 提供了对深度学习的支持,可以将 TensorFlow、PyTorch 等深度学习框架与 Flink 结合使用,以便在流处理中进行模型训练和推理。
- 可扩展性:Flink 可以轻松地水平扩展,以处理大规模数据集,同时还能保持低延迟。
有界数据和无界数据
Flink 则认为,流处理才是最基本的操作,批处理也可以统一为流处理。在 Flink 的世界观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流。
- 无界数据流(Unbounded Data Stream)
所谓无界数据流,就是有头没尾,数据的生成和传递会开始但永远不会结束,如上图所示。我们无法等待所有数据都到达,因为输入是无界的,永无止境,数据没有“都到达”的时候。所以对于无界数据流,必须连续处理,也就是说必须在获取数据后立即处理。在处理无界流时,为了保证结果的正确性,我们必须能够做到按照顺序处理数据。
- 有界数据流(Bounded Data Stream)
有界数据流有明确定义的开始和结束,如上图所示,所以我们可以通过获取所有数据来处理有界流。处理有界流就不需要严格保证数据的顺序了,因为总可以对有界数据集进行排序。有界流的处理也就是批处理。
时间语义、水位线
在讲解flink的窗口之前,需要先了解一下flink的时间语义中的处理时间、事件时间以及水位线,
事件时间
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
处理时间
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。
当然处理时间和时间时间可能在处理事件的时候不一样,因网络波动或者其他因素影响到数据的传输导致在事件处理时不一致,如8点产生的数据在8点01s到达,迟到了一秒,当前系统时间为8:01,而事件时间却是8:00
水位线
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
flink窗口
概念
Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口该关闭了,就停止收集数据、触发计算并输出结果。
理想中的数据处理
如下图所示,到到达间隔时间时,触发这个窗口里所有数据的计算
含有延迟数据的数据处理
如下图所示,显示情况中一定会出现因为网络问题数据迟到
的问题,下面定义了一个延迟时间为2s的窗口,这样 0~10 秒的窗口会在时间戳为 12 秒的数据到来之后,才真正关闭计算输出结果,这样就可以正常包含迟到的 9 秒数据了。但是这样一来,0~10 秒的窗口不光包含了迟到的 9 秒数据,连 11 秒和 12 秒的数据也包含进去了。我们为了正确处理迟到数据,结果把早到的数据划分到了错误的窗口——最终结果都是错误的。
Flink存储桶概念
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”,如下图所示。在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
窗口类型
滚动窗口
滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。
滑动窗口
与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。
会话窗口
会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。这里的会话类似Web 应用中 session 的概念,不过并不表示两端的通讯过程,而是借用会话超时失效的机制来描述窗口。简单来说,就是数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。
全局窗口
这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。
flink状态管理
状态的存储和管理是由 Flink Runtime 负责的,Flink 提供了多种状态后端来存储和管理状态数据,例如内存状态后端、文件系统状态后端、RocksDB 状态后端等等。Flink 还提供了可插拔的状态后端接口,可以自定义状态后端来满足特定的需求。Flink 还提供了高效的状态快照机制,可以在应用程序故障时对状态进行快速恢复,以保证应用程序的正确性和数据的完整性。
检查点(Checkpoint)
有状态流应用中的检查点(checkpoint)
,其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
检查点恢复数据过程
这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。
1、在算子计算时发生故障
2、应用重启
3、读取检查点,重置状态
4、读取检查点,重制偏移量
5、继续处理数据
下载安装
flink下载页:点击跳转
flink官方文档地址:点击跳转
安装后启动bin目录下./start-cluster.sh文件(⚠️:需要配置java环境变量),访问地址:localhost:8081
入门Demo示例
pom配置
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 引入日志管理相关依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
Demo代码
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkExampleWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(4);//使用nc -lk 9999,进行数据流输入,如hello word hello flinkDataStream<String> dataStream = executionEnvironment.socketTextStream("localhost", 9999);dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String data, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = data.split("\\s");System.out.println(JSON.toJSON(split));for (String word : split) {collector.collect(Tuple2.of(word, 1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).sum(1).print();executionEnvironment.execute();}
}
打包到集群
使用maven对项目package打包,将打好的jar包上传到集群中,可以通过界面上传或者拷贝到机器上直接用指令进行运行
界面上传
点击Submit之后可以取相应位置看输出日志
日志查看
流运行时执行环境
每个 Flink 应用程序都需要一个执行环境
env
。流媒体应用程序需要使用StreamExecutionEnvironment
.应用程序中进行的DataStream API
调用构建了一个附加到StreamExecutionEnvironment
. 当env.execute()
被调用时,这个图被打包并发送到JobManager
,JobManager
将作业并行化并将它的切片分配给任务管理器以供执行。您的作业的每个并行切片都将在 任务槽中执行。
任务槽Slot
每个worker(TaskManager)都是一个
JVM进程
,可以在不同的线程中执行一个或多个子任务。为了控制TaskManager
接受多少任务,它有所谓的任务槽(至少一个)。每个TaskManager
有一个插槽意味着每个任务组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。拥有多个槽意味着更多的子任务共享同一个 JVM。同一个 JVM 中的任务共享TCP 连接
(通过多路复用)和心跳消息
。它们还可以共享数据集和数据结构
,从而减少每个任务的开销。
扩展Demo
时间窗口Demo
import lombok.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Random;import java.sql.Timestamp;public class FlinkWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(100);SingleOutputStreamOperator<UserEvent> eventStream = env.addSource(new CustomSouce()).assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserEvent>() {@Overridepublic long extractTimestamp(UserEvent element, longrecordTimestamp) {return element.getTimestemp();}}));//按照访问url分区,求出每个 url 的访问量SingleOutputStreamOperator<UserViewCount> urlCountStream = eventStream.keyBy(data -> data.getUrl())//滚动窗口:5s间隔.window(TumblingEventTimeWindows.of(Time.seconds(5)))//自定义聚合函数.aggregate(new NewUrlViewCountAgg(), new NewUrlViewCountResult());// 对结果中同一个窗口的统计数据,进行排序处理SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd).process(new TopN(3));result.print("result");env.execute();}//自定义增量聚合public static class NewUrlViewCountAgg implements AggregateFunction<UserEvent, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(UserEvent value, Long accumulator) {return accumulator + 1L;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//自定义全窗口函数,只需要包装窗口信息public static class NewUrlViewCountResult extends ProcessWindowFunction<Long, UserViewCount, String, TimeWindow> {@Overridepublic void process(String url, ProcessWindowFunction<Long, UserViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UserViewCount> out) throws Exception {// 结合窗口信息,包装输出内容long currentWindowStartTimeStemp = context.window().getStart();long currentWindowEndTimeStemp = context.window().getEnd();out.collect(new UserViewCount(url, elements.iterator().next(), currentWindowStartTimeStemp, currentWindowEndTimeStemp));}}// 自定义处理函数,排序取 top npublic static class TopN extends KeyedProcessFunction<Long, UserViewCount, String> {// 将 n 作为属性private Integer n;// 定义一个列表状态private ListState<UserViewCount> urlViewCountListState;public TopN(Integer n) {this.n = n;}public TopN() {super();}@Overridepublic void open(Configuration parameters) throws Exception {// 从环境中获取列表状态句柄urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<UserViewCount>("url-view-count-list",Types.POJO(UserViewCount.class)));}@Overridepublic void processElement(UserViewCount value, KeyedProcessFunction<Long, UserViewCount, String>.Context ctx, Collector<String> out) throws Exception {// 将 count 数据添加到列表状态中,保存起来urlViewCountListState.add(value);// 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<Long, UserViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {// 将数据从列表状态变量中取出,放入 ArrayList,方便排序ArrayList<UserViewCount> urlViewCountArrayList = new ArrayList<>();for (UserViewCount urlViewCount : urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空状态,释放资源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new Comparator<UserViewCount>() {@Overridepublic int compare(UserViewCount o1, UserViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 提取前n名(由函数入口提供),构建输出结果StringBuilder result = new StringBuilder();result.append("========================================\n");result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");for (int i = 0; i < this.n; i++) {UserViewCount userViewCount = urlViewCountArrayList.get(i);if (null == userViewCount) {break;}result.append("处理数据所在窗口:" + new Timestamp(userViewCount.getWindowStart()) + "~" + new Timestamp(userViewCount.getWindowEnd()));UserViewCount needUserView = userViewCount;String info = " No." + (i + 1) + " "+ "url:" + userViewCount.url + " "+ "浏览量:" + userViewCount.count + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}}@Data@NoArgsConstructor@ToString@AllArgsConstructorpublic static class UserEvent {private String userName;private String url;private Long timestemp;}@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic static class UserViewCount {public String url;public Long count;public Long windowStart;public Long windowEnd;@Overridepublic String toString() {return "UserViewCount{" +"url='" + url + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}public static class CustomSouce implements SourceFunction<UserEvent> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<UserEvent> ctx) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};while (running) {ctx.collect(new UserEvent(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar .getInstance().getTimeInMillis()));// 隔 200 ms生成一个点击事件,方便观测Thread.sleep(200);}}@Overridepublic void cancel() {running = false;}}}
运行结果
Table Api Demo
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class TableApiDemo {public static void main(String[] args) throws Exception {// 获取流执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperator<UserEvent> eventStream = env.fromElements(new UserEvent("Alice", "./home", 1000L),new UserEvent("Bob", "./cart", 1000L),new UserEvent("Alice", "./prod?id=1", 5 * 1000L),new UserEvent("Cary", "./home", 60 * 1000L),new UserEvent("Bob", "./prod?id=3", 90 * 1000L),new UserEvent("Alice", "./prod?id=7", 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 将数据流转换成表Table eventTable = tableEnv.fromDataStream(eventStream);// 用执行 SQL 的方式提取数据Table visitTable = tableEnv.sqlQuery("select url, userName from " + eventTable);// 将表转换成数据流,打印输出tableEnv.toDataStream(visitTable).print();// 执行程序env.execute();}@Data@NoArgsConstructor@ToString@AllArgsConstructorpublic static class UserEvent {private String userName;private String url;private Long timestemp;}}
运行结果
对迟到数据处理Demo
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.example.entity.Event;import java.sql.Timestamp;
import java.time.Duration;public class LateDataProcessExampleDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取 socket 文本流/** 样例数据* Alice ./home 1000* Alice ./home 2000* Alice ./home 10000* Alice ./home 9000* Alice ./cart 12000* Alice ./prod?id=100 15000* Alice ./home 9000* Alice ./home 8000* Alice ./prod?id=200 70000* Alice ./home 8000* Alice ./prod?id=300 72000* Alice ./home 8000*/SingleOutputStreamOperator<Event> stream =env.socketTextStream("localhost", 9999).map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] fields = value.split("\\s");return new Event(fields[0].trim(), fields[1].trim(),Long.valueOf(fields[2].trim()));}})// 方式一:设置 watermark 延迟时间,2 秒钟.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, longrecordTimestamp) {return element.getTimestamp();}}));// 定义侧输出流标签OutputTag<Event> outputTag = new OutputTag<Event>("late") {};SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.getUrl())//设置10s的时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 方式二:允许窗口处理迟到数据,设置 1 分钟的等待时间.allowedLateness(Time.minutes(1))// 方式三:将最后的迟到数据输出到侧输出流.sideOutputLateData(outputTag).aggregate(new UrlViewCountAgg(), new UrlViewCountResult());result.print("result");/*** 输出数据* origin data input> Event(user=Alice, url=./prod?id=300, timestamp=72000)* origin data input> Event(user=Alice, url=./home, timestamp=8000)* late data result> Event(user=Alice, url=./home, timestamp=8000)* * 我们设置窗口等待的时间为 1 分钟,所以当时间推进到 10000 + 60 * 1000 = 70000 时,窗* 口就会真正被销毁。此前的所有迟到数据可以直接更新窗口的计算结果,而之后的迟到数据已* 经无法整合进窗口,就只能用侧输出流来捕获了。需要注意的是,这里的“时间”依然是由水* 位线来指示的,所以时间戳为 70000 的数据到来,并不会触发窗口的销毁;当时间戳为 72000* 的数据到来,水位线推进到了 72000 – 2 * 1000 = 70000,此时窗口真正销毁关闭,之后再来的* 迟到数据就会输出到侧输出流了:*/result.getSideOutput(outputTag).print("late data result");// 为方便观察,可以将原始数据也输出stream.print("origin data input");env.execute();}public static class UrlViewCountAgg implements AggregateFunction<Event, Long,Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {@Overridepublic void process(String url, Context context, Iterable<Long> elements,Collector<UrlViewCount> out) throws Exception {// 结合窗口信息,包装输出内容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(new UrlViewCount(url, elements.iterator().next(), start,end));}}@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic static class UrlViewCount {public String url;public Long count;public Long windowStart;public Long windowEnd;@Overridepublic String toString() {return "UrlViewCount{" +"url='" + url + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +'}';}}
}
相关文章:

大数据flink框架入门分享(起源与发展、实时与离线计算、场景、处理流程、相关概念、特性普及、入门Demo)
文章目录起源与发展flink在github上的现状实时计算VS离线计算实时计算离线计算实时计算常用的场景框架流处理流程flink电商场景下的业务图示例flink中一些重要特性有界数据和无界数据时间语义、水位线事件时间处理时间水位线flink窗口概念理想中的数据处理含有延迟数据的数据处…...

由点到面贯穿整个Java泛型理解
泛型概述 Java泛型(generics)是DK5中引入的一个新特性,泛型提供了编译时类型安全监测机制,该机制允许我们在编译时检测到非法的类型数据结构。 泛型的本质就是参数化类型,也就是所操作的数据类型被指定为一个参数。 如我们经常使用的Array…...

北斗RTK高精度定位在AI领域的应用
随着北斗高精度定位技术越来越成熟,通过GNSS高精度定位与机器人结合,越来越多的智能机器人走进我们生活中。像驾培机器人、智能除草机器人、智能巡检机器人、北斗划线机器人等智能机器人已经广泛的投入使用。驾培机器人驾培机器人:通考车安装…...

2023年再不会 IOC 源码,就要被淘汰了
👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙…...

MQ面试题
1、为什么使用消息队列? 其实就是问问你消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这个场景里用消息队列是什么? 面试官问你这个问题,期望的一个回答是说,你们公司有个什么业务场景&…...

pnpm 基本详细使用(安装、卸载、使用)
一、简介 官网地址、GitHub地址、官方安装文档、官方卸载文档。 pnpm 全称 performant npm,意思为 高性能的 npm。pnpm 由 npm/yarn 衍生而来,解决了 npm/yarn 内部潜在的 bug,极大的优化了性能,扩展了使用场景。被誉为 最先进的…...

Kafka生产者的粘性分区算法
分区算法分类 kafka在生产者投递消息时,会根据是否有key采取不用策略来获取分区。 存在key时会根据key计算一个hash值,然后采用hash%分区数的方式获取对应的分区。 而不存在key时采用随机算法选取分区,然后将所有的消息封装到这个batch上直…...

java基础篇
1.基础篇注释注释是在程序指定位置添加的说明性信息注释不参与程序运行,仅起到说明作用单行注释 格式:// 注释信息多行注释 格式:/* 注释信息 */关键字关键字:就是被Java语言赋予了特定含义的单词java中共有53个关键字1.全部有小写…...

Java与Winform进行AES加解密数据传输的工具类与对应关系和示例
场景 AndroidJava中使用Aes对称加密的工具类与使用: AndroidJava中使用Aes对称加密的工具类与使用_霸道流氓气质的博客-CSDN博客 上面讲的Java与安卓进行数据传输时使用AES加解密的示例工具类。 如果Java需要与其他第三方平台比如Winform程序进行数据传递时也需…...

OpenAI模型的API调用与使用-测试(2)
OpenAI模型的API调用与使用-测试(2)1. 参考Quick start搭建一个demo1.1 安装openai包1.2 demo测试11.3 demo测试2参考资料1. 参考Quick start搭建一个demo 1.1 安装openai包 注意关掉科学上网工具,下载openai包 pip install openai安装好后…...

【LeetCode】剑指 Offer 22. 链表中倒数第k个节点 p136 -- Java Version
题目链接:https://leetcode.cn/problems/lian-biao-zhong-dao-shu-di-kge-jie-dian-lcof/ 1. 题目介绍(22. 链表中倒数第k个节点) 输入一个链表,输出该链表中倒数第k个节点。为了符合大多数人的习惯,本题从1开始计数&…...

经典卷积模型回顾7-轻量化模型MobileNet实现图像分类(matlab)
MobileNet是一种轻量级卷积神经网络,适用于较小的设备和低功耗环境。在MATLAB中,可以使用Deep Learning Toolbox进行MobileNet的图像分类训练。 使用预先训练好的MobileNet模型对自定义数据集进行微调训练: matlab % 导入数据集 imds im…...

程序员压力大?用 PyQt 做一个美*女GIF设置桌面,每天都有好心情
嗨害大家好鸭!我是小熊猫~ 要说程序员工作的最大压力不是来自于工作本身, 而是来自于需要不断学习才能更好地完成工作, 因为程序员工作中面对的编程语言是在不断更新的, 同时还要学习熟悉其他语言来提升竞争力… 好了,…...

Shell命令——sed命令
以下内容整理于《linux命令行与shell脚本编程大全【第三版】》一书。 一、简介sed编辑器 1、sed编辑器的本质 sed是stream editor的缩写,中文意思是“流编辑器”。 sed编辑器是一个命令行编辑器,也就是可以在命令行上完成数据的处理(替换、…...

C语言练习 | 初学者经典练习汇(2)
目录 1、编写一个程序从1到100中,所有出现9的个数 2、分数求和 3、10个整形数字中选出最大值 4、打印9*9的乘法口诀 5、字符串逆序 6、计算一个数的每位之和(递归实现) 7、递归实现n的K次方 8、写个冒泡排序,把一个整形数组变成升序。 9、二进制…...

git分支
分支什么是分支在版本控制过程中,同时推进多个任务,为每个任务,我们就可以创建每个任务的单独分支。使用分支意味着程序员可以把自己的工作从开发主线上分离开来,开发自己分支的时候,不会影响主线分支的运行。对于初学…...

Java每天15道面试题 | redisII
1、什么是 Redis?简述它的优缺点? Redis 本质上是一个 Key-Value 类型的内存数据库,很像 memcached,整个数据库统统加载在内存当中进行操作,定期通过异步操作把数据库数据 flush 到硬盘上进行保存。因为是纯内存操作&a…...

浏览器渲染原理
阶段 - Parse 1、解析HTML,浏览器将从服务器获取到的HTML文件之后,会产生一个渲染任务,交给消息队列(EventLoop/MessageLoop)。 2、在事件循环机制的作用下,会将渲染任务交给主线程 3、主线程在获取到渲染…...

华为OD机试题 - 查找单入口空闲区域(JavaScript)| 含思路
华为OD机试题 最近更新的博客使用说明本篇题解:查找单入口空闲区域题目输入输出示例一输入输出说明示例二输入输出说明示例三输入输出说明示例四输入输出说明Code解题思路华为OD其它语言版本<...

制造型企业想要做好数字化改造,要注意以下几点!
很多企业在“工业4.0、智能制造、互联网”等概念满天飞的环境下迷失了方向,不知该如何下手,盲目跟风,看别人投自动化,自己也跟着投,看别人上信息化,自己也跟着上。 其实,智能制造也好ÿ…...

【蓝桥杯集训·每日一题】AcWing 1488. 最短距离
文章目录一、题目1、原题链接2、题目描述二、解题报告1、思路分析2、时间复杂度3、代码详解三、知识风暴Dijkstra算法一、题目 1、原题链接 1488. 最短距离 2、题目描述 有 N 个村庄,编号 1 到 N。 村庄之间有 M 条无向道路,第 i 条道路连接村庄 ai 和村…...

比亚迪:全球最大电动汽车制造商的坎坷成长之路
来源:猛兽财经 作者:猛兽财经 特斯拉(TSLA)首席执行官埃隆马斯克表示,特斯拉最接近的竞争对手可能是一家中国电动汽车公司。猛兽财经认为,沃伦•巴菲特支持的比亚迪(0211)可能是马斯…...

Java开发 - Quartz初体验
前言 在上一篇博客中,我们对单点登录有了初步了解,这也让我们独立做系统有了最基础的保障。但在业务开发中,总是会出现一些定期处理的任务,我们首先想到的是Timer,但由于其调度功能单一,我们实际并不会用它…...

无头盔开发vr XR Device Simulator操作(更新)
1.摄像机(未开启TY键) 平移 上下左右:右键鼠标,移哪去哪 前后:右键快速滚动鼠标滚轮 旋转 XOY平面旋转:右键按住鼠标滚轮滚动鼠标滚轮 XOZ\YOZ平面旋转:右键按住鼠标滚轮移动鼠标 2.左手右手&am…...

《C++代码分析》第二回:函数重载const char* ,char*,const char[],char[]汇编代码上的区别
一、前言 C相比C是支持函数重载的,现在我们详细探讨一下C函数重载与类方法承载。 二、案例代码 我们编译如下代码,同样的我们关闭代码优化,删除符号链接文(.pdb) #include "windows.h" #include "w…...

【学习笔记】深入理解JVM之垃圾回收机制
【学习笔记】深入理解JVM之垃圾回收机制 更多文章首发地址:地址 参考: 《深入理解JAVA虚拟机》第三版 第三章尚硅谷 第134 - 203 集参考文章:https://blog.csdn.net/qq_48435252/article/details/123697193 1、概念 🌻 首先我们…...

49.在ROS中实现local planner(2)- 实现Purepersuit(纯跟踪)算法
48.在ROS中实现local planner(1)- 实现一个可以用的模板实现了一个模板,接下来我们将实现一个简单的纯跟踪控制,也就是沿着固定的路径运动,全局规划已经规划出路径点,基于该路径输出相应的控制速度 1. Pur…...

Allegro如何设通孔Pin和Via的消盘操作指导
Allegro如何设通孔Pin和Via的消盘操作指导 用Allegro做PCB设计的时候,除了可以在光绘设置里面设置内层通孔Pin和Via的消盘,在设计过程中,同样也可以设置消盘效果,以便实时显示,如下图 如何设置,具体操作如下 点击Setup点击Unused Pads Suppression...

Android工厂模式
工厂模式分为三种 :简单工厂模式 、工厂方法模式 、抽象工厂模式 。 目录 简单工厂模式 UML图 实现 使用场景: 优点 : 缺点: 工厂方法模式 UML图 实现 使用场景: 优点: 缺点: 抽象工厂模式 UM…...

神经网络硬件加速器-架构篇
架构设计 常规架构通常包括两种: 1、全流水线架构,顾名思义,将整个神经网络进行平铺,并对每一层进行优化设计,优点:实现高吞吐率和低延时。缺点:消耗大量硬件资源,通常无法跨网络或…...