大数据-玩转数据-Flink状态编程(上)
一、Flink状态编程
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。
SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编程的难度。
Flink的状态管理是它的优势之一。
二、什么是状态
在流式计算中有些操作一次处理一个独立的事件(比如解析一个事件), 有些操作却需要记住多个事件的信息(比如窗口操作)。
流式计算分为无状态计算和有状态计算两种情况。
无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。
在简单聚合、窗口聚合、处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时能正确地恢复,这就需要一套完整的管理机制来处理所有状态。
三、为什么需要管理状态
下面的几个场景都需要使用流处理的状态功能:
去重: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
检测: 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
聚合: 对一个时间窗口内的数据进行聚合分析,分析一个小时内水位的情况
更新机器学习模型: 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。
四、Flink中的状态分类
Managed State
状态管理方式 Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩
状态数据结构 Flink提供多种常用数据结构, 例如:ListState, MapState等
使用场景 绝大数Flink算子。
Raw State
状态管理方式 用户自己管理
状态数据结构 字节数组: byte[]
使用场景 所有算子
从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State一般是在已有算子和Managed State不够用时,用户自定义算子时使用。
在我们平时的使用中Managed State已经足够我们使用。
对Managed State继续细分,它又有2种类型
Operator State(算子状态)
Keyed State(键控状态)
Operator State
适用用算子类型: 可用于所有算子: 常用于source, sink,
例如:FlinkKafkaConsumer
状态分配:一个算子的子任务对应一个状态
创建和访问方式: 实现CheckpointedFunction或ListCheckpointed(已经过时)接口
横向扩展 :并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量
支持的数据结构: ListState,UnionListStste和BroadCastState
Keyed State
适用用算子类型: 只能用于用于KeyedStream上的算子
状态分配 :一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State
创建和访问方式:重写RichFunction, 通过里面的RuntimeContext访问w
横向扩展 :并发改变, State随着Key在实例间迁移
支持的数据结构:ValueState, ListState,MapState ReduceState, AggregatingState
五、算子状态的使用
Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
注意: 算子子任务之间的状态不能互相访问
Operator State的实际应用场景不如Keyed State多,它经常被用在Source或Sink等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证Flink应用的Exactly-Once语义。
Flink为算子状态提供三种基本数据结构:
列表状态(List state),将状态表示为一组数据的列表
联合列表状态(Union list state),也是将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。一种是均匀分配(List state),另外一种是将所有 State 合并为全量 State 再分发给每个实例(Union list state)。
广播状态(Broadcast state)
是一种特殊的算子状态. 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。
三种状态的实现
六、键控状态的使用
键控状态是根据输入数据流中定义的键(key)来维护和访问的,只能用于KeyedStream(keyBy算子处理之后)。相同key的所有数据都会访问相同的状态。
键控状态支持的数据类型
注意:
a)所有的类型都有clear(), 清空当前key的状态
b)这些状态对象仅用于用户与状态进行交互.
c)状态不是必须存储到内存, 也可以存储在磁盘或者任意其他地方
d)从状态获取的值与输入元素的key相关
七、状态后端
状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
状态后端主要负责两件事:
本地(taskmanager)的状态管理
将检查点(checkpoint)状态写入远程存储
状态后端的分类及配置
状态后端作为一个可插入的组件, 没有固定的配置, 我们可以根据需要选择一个合适的状态后端。
MemoryStateBackend
内存级别的状态后端(默认),
存储方式:本地状态存储在TaskManager的内存中, checkpoint 存储在JobManager的内存中.
特点:快速, 低延迟, 但不稳定
使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用
FsStateBackend
存储方式: 本地状态在TaskManager内存, Checkpoint时, 存储在文件系统(hdfs)中
特点: 拥有内存级别的本地访问速度, 和更好的容错保证
使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中
RocksDBStateBackend
将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)
存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统(hdfs)中.
使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境
八、案例列表状态
package com.lyh.flink09;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.List;public class state_programe1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop100",9999).map(new MyMapFunctin()).print();env.execute();}public static class MyMapFunctin implements MapFunction<String,Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {count++;return count;}// 初始化时会调用这个方法,向本地状态中填充数据. 每个子任务调用一次@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initialize.....");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state",Long.class));for (Long c : state.get()) {count += c;}}// Checkpoint时会调用这个方法,我们要实现具体的snapshot逻辑,比如将哪些本地状态持久化@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshot.....");state.clear();state.add(count);}}
}
九、案例广播状态
package com.lyh.flink09;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class state_broad1_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);DataStreamSource<String> dataStream = env.socketTextStream("hadoop100", 9999);DataStreamSource<String> controlStream = env.socketTextStream("hadoop100", 8888);MapStateDescriptor<String, String> stateDescriptor = new MapStateDescriptor<>("state", String.class, String.class);// 广播流BroadcastStream<String> broadcastStream = controlStream.broadcast(stateDescriptor);dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// 从广播状态中取值, 不同的值做不同的业务ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);if ("1".equals(state.get("switch"))) {out.collect("切换到1号配置....");} else if ("0".equals(state.get("switch"))) {out.collect("切换到0号配置....");} else {out.collect("切换到其他配置....");}}@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, String> state = ctx.getBroadcastState(stateDescriptor);// 把值放入广播状态state.put("switch", value);}}).print();env.execute();}}
相关文章:
大数据-玩转数据-Flink状态编程(上)
一、Flink状态编程 有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。 SparkStreaming在状态管理这块做的不好, 很多时候需要借助于外部存储(例如Redis)来手动管理状态, 增加了编…...
主动获取用户的ColaKey接口
主动获取用户的ColaKey接口 一、主动获取用户的ColaKey接口二、使用步骤1、接口***重要提示:建议使用https协议,当https协议无法使用时再尝试使用http协议***2、请求参数 三、 请求案例和demo1、请求参数例子(POST请求,参数json格式)2、响应返…...
C#写一个UDP程序判断延迟并运行在Centos上
服务端 using System.Net.Sockets; using System.Net;int serverPort 50001; Socket server; EndPoint client new IPEndPoint(IPAddress.Any, 0);//用来保存发送方的ip和端口号CreateSocket();void CreateSocket() {server new Socket(AddressFamily.InterNetwork, SocketT…...
Kafka核心原理第二弹——更新中
架构原理 一、高吞吐机制:Batch打包、缓冲区、acks 1. Kafka Producer怎么把消息发送给Broker集群的? 需要指定把消息发送到哪个topic去 首先需要选择一个topic的分区,默认是轮询来负载均衡,但是如果指定了一个分区key&#x…...
巨人互动|游戏出海H5游戏出海规模如何?
H5游戏出海是指将H5游戏推广和运营扩展到国外市场的行为,它的规模受到多个因素的影响。本文小编讲一些关于H5游戏出海规模的详细介绍。 1、市场规模 H5游戏出海的规模首先取决于目标市场的规模。不同国家和地区的游戏市场规模差异很大,有些市场庞大而成…...
【爬虫】实验项目三:验证码处理与识别
目录 一、实验目的 二、实验预习提示 三、实验内容 实验要求 基本要求: 改进要求A: 改进要求B: 四、实验过程 基本要求 五、源码如下 六、资料 一、实验目的 部分网站可能会使用验证机制来阻止用户无效登录或者是验证用户不是用程…...
广东成人高考报名将于9月14日开始!
截图来自广东省教育考试院官网* 今年的广东成人高考正式报名时间终于确定了! 报名时间:2023年 9 月14—20日 准考证打印时间:考前一周左右 考试时间:2023年10月21—22日 录取时间:2023年12 月中上旬 报名条件: …...
pytorch中文文档学习笔记
先贴上链接 torch - PyTorch中文文档 首先我们需要安装拥有pytorch的环境 conda指令 虚拟环境的一些指令 查看所有虚拟环境 conda info -e 创建新的虚拟环境 conda create -n env_name python3.6 删除已有环境 conda env remove -n env_name 激活某个虚拟环境 activate env…...
element-ui全局导入与按需引入
全局引入 npm i element-ui -S 安装好depencencies里面可以看到安装的element-ui版本 然后 在 main.js 中写入以下内容: import Vue from vue; import ElementUI from element-ui; import element-ui/lib/theme-chalk/index.css; import App from ./App.vue;Vue.…...
go 地址 生成唯一索引v2 --chatGPT
问:golang 函数 getIndex(n,addr,Hlen,Tlen) 返回index。参数n为index的上限,addr为包含大小写字母数字的字符串,Hlen为截取addr头部的长度,Tlen为截取addr尾部的长度 gpt: 你可以编写一个函数来计算根据给定的参数 n、addr、Hlen 和 Tlen …...
JSON XML
JSON(JavaScript Object Notation)和XML(eXtensible Markup Language)是两种常用的数据交换格式,用于在不同系统之间传输和存储数据。 JSON是一种轻量级的数据交换格式,它使用易于理解的键值对的形式表示数…...
2023年MySQL实战核心技术第四篇
七 . 吃透索引:...
cmake编译(qtcreator)mingw下使用的osg3.6.5
官网下载osg3.6.5源码,先不使用依赖库,直接进行编译 如果generate后报错,显示找不到boost必须库,则手动增加路径。然后先在命令行中使用mingw32-make,如果显示不存在,则需要去环境变量里配置一下这个工具的…...
Python钢筋混凝土结构计算.pdf-混凝土强度设计值
计算原理: 需要注意的是,根据不同的规范和设计要求,上述公式可能会有所差异。因此,在进行混凝土强度设计值的计算时,请参考相应的规范和设计手册,以确保计算结果的准确性和合规性。 代码实现: …...
elasticsearch的索引库操作
索引库就类似数据库表,mapping映射就类似表的结构。我们要向es中存储数据,必须先创建“库”和“表”。 mapping映射属性 mapping是对索引库中文档的约束,常见的mapping属性包括: type:字段数据类型,常见的…...
把握市场潮流,溯源一流品质:在抖in新风潮 国货品牌驶过万重山
好原料、好设计、好品质、好服务……这个2023,“国货”二字再度成为服饰行业的发展关键词。以消费热潮为翼,越来越多代表性品类、头部品牌展现出独特价值,迎风而上,在抖音电商掀起一轮轮生意风潮。 一个设问是:在抖音…...
【网络教程】Python如何优雅的分割URL
文章目录 URL分割方法是一种用于解析URL字符串的方法,它可以将URL分解成不同的组成部分,如协议、域名、端口、路径等。在Python中,我们可以使用urllib.parse模块中的urlsplit方法来实现URL分割。 使用方法 下面是一个简单的示例代码,演示了如何使用urlsplit方法解析URL字符…...
1998-2014年工业企业数据库和绿色专利匹配
1998-2014年工业企业数据库绿色专利匹配 1、时间:1998-2014年 2、样本量:470万 3、来源:工业企业数据库、国家知识产权局、WIPO 4、指标: 企业匹配唯一标识码、组织机构代码、企业名称、年份、法定代表人、法定代表人职务、行…...
Python基于Mirai开发的QQ机器人保姆式教程(亲测可用)
在本教程中,我们将使用Python和Mirai来开发一个QQ机器人,本文提供了三个教学视频,包教包会,本文也很贴心贴了代码和相关文件。话不多说,直接开始教学。 目录 一、安装配置MIrai 图片验证码报错: 二、机器…...
算法笔记:堆
【如无特别说明,皆为最小二叉堆】 1 介绍 2 特性 结构性:符合完全二叉树的结构有序性:满足父节点小于子节点(最小化堆)或父节点大于子节点(最大化堆) 3 二叉堆的存储 顺序存储 二叉堆的有序…...
vue3 判断包含某个字符
<img v-if"node.level 1 && checkIfIncludeSubStr(node.label, 人口)"src"/assets/images/icon-convention-01.png" width"16"class"inlineBlock Vmiddle" style"margin-right: 8px;"/>const data reactive…...
MySQL的故事——查询性能优化
查询性能优化 文章目录 查询性能优化一、查询优化器的提示(hint)二、优化特定类型的查询 一、查询优化器的提示(hint) HIGH_PRIORITY和LOW_PRIORITY 这个提示告诉MySQL,当多个语句同时访问某一个表时,哪些语句的优先级相对高些,哪些相对低些…...
在外SSH远程连接macOS服务器【cpolar内网穿透】
文章目录 前言1. macOS打开远程登录2. 局域网内测试ssh远程3. 公网ssh远程连接macOS3.1 macOS安装配置cpolar3.2 获取ssh隧道公网地址3.3 测试公网ssh远程连接macOS 4. 配置公网固定TCP地址4.1 保留一个固定TCP端口地址4.2 配置固定TCP端口地址 5. 使用固定TCP端口地址ssh远程 …...
Nosql数据库服务之redis
Nosql数据库服务之redis 一图详解DB的分支产品 Nosql数据库介绍 是一种非关系型数据库服务,它能解决常规数据库的并发能力,比如传统的数据库的IO与性能的瓶颈,同样它是关系型数据库的一个补充,有着比较好的高效率与高性能。 专…...
当AI遇到IoT:开启智能生活的无限可能
文章目录 1. AI和IoT的融合1.1 什么是人工智能(AI)?1.2 什么是物联网(IoT)?1.3 AI和IoT的融合 2. 智能家居2.1 智能家居安全2.2 智能家居自动化 3. 医疗保健3.1 远程监护3.2 个性化医疗 4. 智能交通4.1 交通…...
Qt5界面Qt Designer上添加资源图片后,ModuleNotFoundError: No module named ‘rcc_rc‘ 的终极解决方案
在网上找了很久都没弄明白,最后还是自己思考解决了。 起因: 用 Qt Designer 添加资源文件作为背景图,编译 \resource\static\qrc> pyuic5 -o .\xx.py .\xx.ui发现在 xx.py 文件末尾中多了一个语句: import rcc_rc然后运行就…...
社群运营怎么做?
社区运营虽然说起来简单,可是实际执行起来却常常发现无从下手。刑天营销曾经做过社区运营的案子,我们也总结一套自己的方法,要做好社群运营,以下的这些问题就不能忽视: 一、做好社区定位 做社区运营,首先…...
Vite,Vue3项目引入dataV报错的解决方法
背景:开发一个大屏项目中,需要是要DataV的那边边框,装饰等,只是DataV是基于vue2的,vue3版的作者还在开发中,于是翻了DataV的源码,发现使用esm方式时是直接引入源码而不经过打包,其源码中使用的vue语法vue3也支持,所以可以直接在vue3中引入使用. vite,vue3项目直接引入DataV 安…...
QT(8.30)常用类与组件,实现登录界面
1.作业: 完成一个登录界面(图片未附带): 头文件: #ifndef WIDGET_H #define WIDGET_H#include <QWidget>#include <QLineEdit>//行编辑器#include<QIcon>//图标#include<QLabel>//标签#include<QPushButton>//按钮#include<QIc…...
【Two Stream network (Tsn)】(二) 阅读笔记
贡献 将深度神经网络应用于视频动作识别的难点,是如何同时利用好静止图像上的 appearance information以及物体之间的运动信息motion information。本文主要有三点贡献: 1.提出了一种融合时间流和空间流的双流网络; 2.证明了直接在光流上训…...
eclipse静态网站开发/网站排名优化教程
一、前言 从昨天起,就给大家分享一些树和森林的代码啦,昨天分享的是求树的深度,今天要给大家分享的是森林的遍历以及求叶子的个数。 对于森林,大家可以做这样的理解,一个深度大于1,根节点子树个数大于1的…...
国内免费建站网站/网站推广软件哪个最好
这道题是求中序遍历的题,可以用回溯法,也可以用迭代吗法 回溯法 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self…...
征婚网站做原油/深圳最好seo
shell 基础练习题 1、编写脚本/root/bin/systeminfo.sh,显示当前主机系统信息,包括主机名,IPv4地址,操作系统版本,内核版本,CPU型号,内存大小,硬盘大小 #!/bin/bash echo -e "\e[1;35mThis…...
镇江丹阳疫情最新情况/郑州企业网站seo
实验室有自己的服务器,同时院里也有集群,我用内网或者外网连接自己的服务器的时候都没什么问题,但是连接集群就一直连接不上,报错如下 vscode Acquiring lock on xxxx省略第一个解决办法 第一个方法是进入到服务器中自己的文件目…...
做高仿包的网站有哪些/百度搜索排名规则
chunlvxiong的博客 题目描述: 给出一个5*5的棋盘,每个骑士可以走日字走到空格位置,问最少几步形成如下局面。 如果最少步数超过15步,输出-1。 思考&分析: 搜索无非也就是深搜或广搜,如果广搜的话由于总…...
国外扁平化网站/编写网页的软件
关于VIEW 登陆后 键盘无法输入的问题。主要因为使用了360防火墙的缘故! PCOIP为VMware私有协议,不在360信任列表。使用RDP方式没有此问题。 解决方法有两种:1.打开360安全卫士,进入木马防火墙————关闭键盘记录防护 ÿ…...