当前位置: 首页 > news >正文

Flink(java版)

watermark

时间语义和 watermark

注意:数据进入flink的时间:如果用这个作为时间语义就不存在问题,但是开发中往往会用处理时间
作为时间语义这里就需要考虑延时的问题。
如上图,数据从kafka中获取出来,从多个分区中获取,这时候时间肯定有乱序,这时候就需要使用事
件时间。

场景:游戏连续过五关,给予奖励
地铁里面玩游戏,连过三关断网了,二分钟过了八关。这时候是用处理时间还是事件时间呢?
处理时间的优势:牺牲一定的数据准确性,没有延迟

package com.atguigu.apitest.window;/**import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认为当前机器的cpu的最大核数//env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermarkDataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));})// 乱序数据设置时间戳和watermark.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718211,32.8
sensor_1,1547718212,37.1注意:第一个窗口是[1547718195,1547718210);

sensor_1,1547718213,33
sensor_1,1547718224,32.1
sensor_1,1547718225,31.6
sensor_1,1547718226,21.2
sensor_1,1547718227,33.6第二个窗口大小:第一个窗口是[1547718210,1547718225);

1.理想状态:来一条数据处理一条,每条数据代表对时间推进;如图到5之后就将【0,5)的窗口关闭并输出;2.乱序状态:原因:网络延迟、分布式、分区导致乱序数据产生;网络延迟和分布式处理造成的乱序都是几十毫秒和几百毫秒的范围的差距;这将回造成大多数延迟数据集中在几十毫秒和几百毫秒的范围内;3.解决方案:将时间事件放慢

flink的三重保证:1.设置watermaker将几百毫秒的数据全部输出;2.先输出一个近似的结果,但是不要关闭窗口后面延迟的时间还需要更新;3.当延时时间到了,窗口就关闭了;兜底方案使用侧输出流保证数据不丢失;注意:数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经到
达了,因此,window 的执行也是由 Watermark 触发的。
6 3 2 5 4 1 
比如设置3秒的watermaker:
到达5:说明2秒之前的数据都到齐了,后面2,3都可以输出
到达6:说明3秒之前的数据都到齐了,大于等于3秒的数据才能输出意义:watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太
小数据就不准确,需要通过具体的业务场景去平衡这个值;

watermark 用来让程序自己平衡延迟和结果正确性:如果设置太大延迟太高,设置太小,乱序数据
没有搞定,数据就不准确,需要通过具体的业务场景去平衡这个值;如何找到watermaker:首先要了解乱序程度;
解决方案:通过机器学习构建一个模型,构建当前业务模型中的延迟状态的分布情况;

如图:大部分的延时数据都20ms和80ms之间的范围中,这时候设置80ms就搞定大部分乱序数据;
这时候还有很少的数据,如果对数据准确性要求比较高,这时候就需要设置窗口迟到机制去保证
数据的准备性;最后还有网络延迟的数据还是没有输出这时候就需要添加侧输出流作为兜底方案。

 watermark 生成问题

默认:来一条生产一条watermaker,如果短时间数据量比较大,会造成watermaker都一样造成资
源浪费;周期性添加watermaker:每隔一段时间更新一下watermaker 
周期性时间缺点:实时性不好;数据过于分散会造成资源浪费;如何选择:看数据的分布,过于集中使用周期性生成模式,数据稀疏,使用默认的模型;

状态编程 

需求:我们可以利用 Keyed state,实现这样一个需求: 检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警

package com.atguigu.apitest.state;/*** Copyright (c) 2018-2028 尚硅谷 All Rights Reserved* <p>* Project: FlinkTutorial* Package: com.atguigu.apitest.state* Version: 1.0* <p>* Created by wushengran on 2020/11/10 16:33*/import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName: StateTest3_KeyedStateApplicationCase* @Description:* @Author: wushengran on 2020/11/10 16:33* @Version: 1.0*/
public class StateTest3_KeyedStateApplicationCase {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 定义一个flatmap操作,检测温度跳变,输出报警SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));resultStream.print();env.execute();}// 实现自定义函数类public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{// 私有属性,温度跳变阈值private Double threshold;public TempChangeWarning(Double threshold) {this.threshold = threshold;}// 定义状态,保存上一次的温度值private ValueState<Double> lastTempState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));}@Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {// 获取状态Double lastTemp = lastTempState.value();// 如果状态不为null,那么就判断两次温度差值if( lastTemp != null ){Double diff = Math.abs( value.getTemperature() - lastTemp );if( diff >= threshold )out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));}// 更新状态lastTempState.update(value.getTemperature());}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}
sensor_1,1547718206,36.3
sensor_1,1547718206,37.9
sensor_1,1547718206,48
sensor_6,1547718201,15.4
sensor_6,1547718201,35
sensor_1,1547718226,36

 状态后端

状态后端: 1.本地的状态管理(如何存,上下文配置,怎么存,怎么写)  2.做快照容错,如何恢复数据

1. 测试环境:MemoryStateBackend
2. 生产环境:FsStateBackend 
3. 数据非常大时候:RocksDBStateBackend
state.backend: filesystem //默认使用FsStateBackend 
tate.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints 
//配置一个checkpoint的hdfs的存储路径jobmanager.execution.failover-strategy: region //区域化重启state.backend.incremental: false  //增量添加checkpoint

相关文章:

Flink(java版)

watermark 时间语义和 watermark 注意:数据进入flink的时间&#xff1a;如果用这个作为时间语义就不存在问题&#xff0c;但是开发中往往会用处理时间 作为时间语义这里就需要考虑延时的问题。 如上图&#xff0c;数据从kafka中获取出来&#xff0c;从多个分区中获取&#xf…...

什么是动态组件以及使用场景

文章目录 一、vue中的动态组件是什么&#xff1f;有什么用&#xff1f;二、使用demo1.tab页签中的使用2.模拟新闻页demo 三、用keep-alive包裹&#xff0c;保持状态总结 一、vue中的动态组件是什么&#xff1f;有什么用&#xff1f; 动态组件指可以动态切换组件的显示和隐藏。…...

CRM销售管理系统如何提高销售效率

CRM销售管理系统是帮助企业对销售活动进行管理、执行和优化的软件系统。它可以帮助企业提高销售效率&#xff0c;提高客户转化率&#xff0c;实现企业的业绩增长。那么&#xff0c;CRM销售管理系统好用吗&#xff1f; CRM销售管理系统的功能 线索管理&#xff1a; CRM系统可…...

纯小白安卓刷机1

文章目录 常见的英文意思刷机是什么&#xff1f;为什么要刷机&#xff1f;什么是BL锁&#xff08;BootLoader锁&#xff09;&#xff1f;我的机能够刷机吗&#xff1f;什么是Boot镜像/分区&#xff1f;什么是Recovery镜像/分区&#xff08;缩写为rec&#xff09;&#xff1f;什…...

C高级day4循环语句

1&#xff0c;思维导图 运行结果为&#xff1a; 运行结果为&#xff1a;...

Linux 操作系统云服务器安装部署 Tomcat 服务器详细教程

Tomcat 基本概述 Tomcat 服务器是Apache软件基金会&#xff08;Apache Software Foundation&#xff09;的 Jakarta 项目中的一个核心项目&#xff0c;由 Apache、Sun 和其他一些公司及个人共同开发而成。它是一个免费的开放源代码的 Web 应用服务器&#xff0c;属于轻量级应用…...

【易盾点选】

拿官网的点选做个例子吧&#xff0c;比较省事&#xff0c;水一篇~ ​ 官网的接口目前都改成V3了&#xff0c;多了个dt参数&#xff0c;以及加密的一个函数也变动了下 点选坐标在这&#xff0c;加密函数未变&#xff0c;用逗号拼接 整个加密里的函数变了&#xff0c;直接重新…...

vue中打印指定dom元素

和window.print()效果一样&#xff0c;调出打印窗口&#xff0c;只是当前使用的插件是vue-print-nb 官网地址&#xff1a;vue-print-nb vue2中使用 安装插件 npm install vue-print-nb --save导入插件 import Print from vue-print-nb // 全局使用 Vue.use(Print);//or// 单…...

OpenCV(三十六):霍夫直线检测

1.检测直线的霍夫变换原理 2.检测直线函数HoughLines() 检测直线流程: Step1:将参数空间的坐标轴离散化。 Step2:将图像中每个非0像素通过映射关系求取在参数空间通过的方格 Step3:统计参数空间内每个方格出现的次数&#xff0c;选取次数大于某一值的方格作为表示直线的方格…...

文心一言 VS 讯飞星火 VS chatgpt (91)-- 算法导论8.3 4题

四、用go语言&#xff0c;说明如何在 O(n) 时间内&#xff0c;对 0 到 n 3 − 1 n^3-1 n3−1 区间内的n个整数进行排序。 文心一言&#xff1a; 在 O(n) 时间内对 0 到 n^3-1 区间内的 n 个整数进行排序&#xff0c;可以使用基数排序&#xff08;Radix Sort&#xff09;算法…...

keep-alive缓存三级及三级以上路由

需求需要缓存这个出入记录&#xff0c;当tab切换时不重新加载&#xff0c;当刷新页面时&#xff0c;或把这个关闭在重新打开时重新加载如图&#xff1a; &#xff08;我这里用的是芋道源码的前端框架) keep-alive 1、include 包含页面组件name的这些组件页面&#xff0c;会被…...

vite vue项目 运行时 \esbuild\esbuild.exe 缺失 错误码 errno: -4058, code: ‘ENOENT‘,

vite vue项目运行 npm run dev 报错某个模块启动文件丢失信息 D:\PengYe_code\2\vite-vue3-admin>npm run dev> vite-vue3-admin1.0.2 dev > vitenode:events:504throw er; // Unhandled error event^Error: spawn D:\PengYe_code\2\vite-vue3-admin\node_modules\vi…...

favicon.ico网站图标不显示问题 Failed to load resource: net::ERR_FILE_NOT_FOU

上述问题主要由于网站的小图标无法显示导致的&#xff1a;可以检查如下部分&#xff1a; 1、是否存在一个favicon.ico文件在根目录下 2、如果存在&#xff0c;看是否写的相对路径&#xff1a;改为绝对路径 <link rel"shortcut icon" href"../favicon.ico&quo…...

微服务·架构组件之服务注册与发现-Nacos

微服务组件架构之服务注册与发现之Nacos Nacos服务注册与发现流程 服务注册&#xff1a;Nacos 客户端会通过发送REST请求的方式向Nacos Server注册自己的服务&#xff0c;提供自身的元数据&#xff0c;比如ip地址、端口等信息。 Nacos Server接收到注册请求后&#xff0c;就会…...

Linux驱动【day2】

mychrdev.c: #include <linux/init.h> #include <linux/module.h> #include <linux/fs.h> #include<linux/uaccess.h> #include<linux/io.h> #include"head.h" unsigned int major; // 保存主设备号 char kbuf[128]{0}; unsigned int…...

4、Nginx 配置实例-反向代理

文章目录 4、nginx 配置实例-反向代理4.1 反向代理实例一4.1.1 实验代码 4.3 反向代理实例二4.3.1 实验代码 【尚硅谷】尚硅谷Nginx教程由浅入深 志不强者智不达&#xff1b;言不信者行不果。 4、nginx 配置实例-反向代理 4.1 反向代理实例一 实现效果&#xff1a;使用 nginx…...

2023年世界机器人大会回顾

1、前记&#xff1a; 本次记录是我自己去世界机器人博览会参观的一些感受&#xff0c;所有回顾为个人感兴趣部分的机器人产品分享。整个参观下来最大的感受就是科学技术、特别是机器人技术和人工智能毫无疑问地、广泛的应用在我们日常生活的方方面面&#xff0c;在安全巡检、特…...

Mac系统 AndroidStudio Missing essential plugin:org.jetbrains.android报错

打开Android Studio,提示 Missing essential plugin:org.jetbrains.android错误&#xff0c;产生的原因是Kotlin被禁用。 解决的方法是删除disabled_plugins.txt&#xff0c;Mac OS对应的路径为&#xff1a; /Users/xzh/Library/Application Support/Google/AndroidStudio202…...

读书笔记:多Transformer的双向编码器表示法(Bert)-1

多Transformer的双向编码器表示法 Bidirectional Encoder Representations from Transformers&#xff0c;即Bert&#xff1b; 本笔记主要是对谷歌Bert架构的入门学习&#xff1a; 介绍Transformer架构&#xff0c;理解编码器和解码器的工作原理&#xff1b;掌握Bert模型架构…...

第二证券:股利支付率和留存收益率的关系?

股利付出率和留存收益率是股票出资中非常重要的目标&#xff0c;它们可以反映公司的盈余才能和未来开展的潜力。那么&#xff0c;二者之间究竟有什么联系呢&#xff1f; 一、股利付出率和留存收益率的定义 股利付出率是指公司向股东分配的股息占当期净利润的比例&#xff0c;通…...

煤矿虚拟仿真 | 采煤工人VR虚拟现实培训系统

随着科技的发展&#xff0c;虚拟现实(VR)技术已经逐渐渗透到各个行业&#xff0c;其中包括煤矿行业。VR技术可以为煤矿工人提供一个安全、真实的环境&#xff0c;让他们在虚拟环境中进行实际操作和培训&#xff0c;从而提高他们的技能水平和安全意识。 由广州华锐互动开发的采煤…...

buuctf crypto 【[GXYCTF2019]CheckIn】解题记录

1.打开文件&#xff0c;发现密文 2.一眼base64&#xff0c;解密一下 3.解密后的字符串没有什么规律&#xff0c;看了看大佬的wp&#xff0c;是rot47加密&#xff0c;解密一下&#xff08;ROT5、ROT13、ROT18、ROT47位移编码&#xff09;...

微服务05-Docker基本操作

Docker的定义 1.什么是Docker Docker是一个快速交付应用、运行应用的技术&#xff1a; 可以将程序及其依赖、运行环境一起打包为一个镜像&#xff0c;可以迁移到任意Linux操作系统运行时利用沙箱机制形成隔离容器&#xff0c;各个应用互不干扰启动、移除都可以通过一行命令完…...

OpenHarmony创新赛|赋能直播第三期

开放原子开源大赛OpenHarmony创新赛赋能直播间持续邀请众多技术专家一起分享应用开发技术知识&#xff0c;本期推出OpenHarmony应用开发之音视频播放器和三方库的使用和方法&#xff0c;助力开发者掌握多媒体应用技术的开发能力和使用三方库提升应用开发的效率和质量&#xff0…...

docker镜像详解

目录 什么是docker镜像镜像相关命令docker pulldocker imagesdocker searchdocker rmi导出 / 导入镜像 镜像分层镜像摘要镜像摘要的作用分发散列值 什么是docker镜像 Docker镜像是Docker容器的基础组件&#xff0c;它包含了运行一个应用程序所需的一切&#xff0c;包括代码、运…...

二叉树的顺序结构以及堆的实现——【数据结构】

W...Y的主页 &#x1f60a; 代码仓库分享 &#x1f495; 上篇文章&#xff0c;我们认识了什么是树以及二叉树的基本内容、表示方法……接下来我们继续来深入二叉树&#xff0c;感受其中的魅力。 目录 二叉树的顺序结构 堆的概念及结构 堆的实现 堆的创建 堆的初始化与…...

手写一个摸鱼神器:使用python手写一个看小说的脚本,在ide中输出小说内容,同事直呼“还得是你”

文章目录 一、准备python环境二、分析小说网的章节目录三、分析小说网的章节内容四、编写python脚本五、验证一下吧 一、准备python环境 windows从0搭建python3开发环境与开发工具 Python爬虫基础&#xff08;一&#xff09;&#xff1a;urllib库的使用详解 Python爬虫基础&a…...

【Python 实战】---- 实现批量图片的切割

1. 需求场景 在实际开发中&#xff0c;我们会遇到一种很无聊&#xff0c;但是又必须实现的需求&#xff0c;就是比如协议、大量的宣传页面、大量的静态介绍页面、或者大量静态页面&#xff0c;但是页面高度很高&#xff0c;甚至高度可能会达到50000px&#xff0c;但是为了渲染…...

MAYA粒子基础_场

重力场 牛顿场 径向场 均匀场和重力场的区别 空气场 推动物体 阻力场 推动物体 涡流场 湍流场 体积轴场...

趣解设计模式之《我买了宝马,为啥不让我停这?》

〇、小故事 我们怎么识别一辆汽车是宝马品牌的汽车呢&#xff1f;虽然宝马汽车车辆型号非常的多&#xff0c;而且外型也各不相同&#xff0c;但是只要是宝马品牌的汽车&#xff0c;它的车头一定会有宝马汽车的logo&#xff0c;那么这个就是大家最直观去确认一辆车是不是宝马牌…...

怎样建设卡盟网站/网页优化方案

今日通过&#xff1a;13 通过题目编号&#xff1a;1107/1112/1114/1115/1116/1120/1123/1127/1128/1009/1035/1045/1089 任务完成率&#xff1a;75% 预计完成需要时间&#xff1a;2天 预计达到第二名时间&#xff1a;1-2天转载于:https://www.cnblogs.com/wangximing/p/1105562…...

付费视频网站开发/专注网站建设服务机构

请记住,绘画具有破坏性.有可能使用AlphaComposite来实现这个结果,但更简单的解决方案可能是简单的建设性复合形状和涂料.下面的示例创建两个Rectangle,一个是我们要填充的区域,一个是我们要显示的区域,然后从第一个区域中减去第二个(创建窗口),然后将结果绘制在图像顶??部imp…...

做外贸没有网站需要注意什么/搜狗关键词优化软件

一、搜索“无线诊断”应用程序 二、在“无线诊断”应用程序中找“嗅探器”...

有什么正网站做兼职的/厦门seo关键词优化

1 ROS必须的环境变量 执行命令&#xff1a; source /opt/ros/ROSDISTRO/setup.bash (Replace ROSDISTRO with the desired ROS distribution, e.g. indigo.)可以完成设置。 当然通常是添加该命令到&#xff5e;/.bashrc,系统启动自动设置这些必须的环境变量。 注意&#xff1a…...

爱情动做网站推荐/免费测试seo

2019独角兽企业重金招聘Python工程师标准>>> 当设置了父元素的宽度&#xff0c;子元素设置宽度为100%后再在加上子元素上添加padding或margin值就会溢出。举个例子&#xff1a; <!-- 示例 --><!-- html --> <div class"parent"><div…...

怎么知道一个网站是谁做的/全国推广优化网站

目录 问题说明 临时解决方法 VS 2019以及新版本 VS 2017 VS2015 问题说明 所有版本的visual studio 链接团队资源都是空白页面,发现是嵌入式 web 浏览器的问题。...