当前位置: 首页 > news >正文

Flink中的状态管理

一.Flink中的状态

1.1 概述

在Flink中,算子任务可以分为有状态无状态两种状态。

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。例如MapFilterFlatMap都是属于无状态算子。 

有状态的算子任务,就是除了当前数据外,还需要一些其他的数据来得到计算结果。这里的其他数据就是所谓的“状态”。例如聚合函数、窗口函数都属于有状态算子

1.2 状态的分类

1.2.1 托管状态(Managed State)和原始状态(Raw State) 

Flink的状态有两种,托管状态(Managed State原始状态(Raw State托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,所有的状态具体管理则需要自行实现。

一般使用托管状态即可。后面的所有内容也仅是基于托管状态的。

1.2.2 算子状态(Operator State)和按键分区状态(Keyed State)

在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。在这种情况下,状态的访问方式又会有所不同。

基于这样的想法,又可以将托管状态分为两类:算子状态按键分区状态经过KeyBy操作后的状态则被称为"按键分区状态(Keyed State)",否则就是“算子状态(Operator State)”。

算子状态

算子状态的状态作用范围为当前算子任务实例-即每个task(分区)间状态不共享。

算子状态可以用在所有算子上,使用时与本地变量没什么区别,在使用时需实现checkpoint接口。假如使用新的Source架构,则需要继承SourceReaderBase抽象类。 

按键分区状态

按键分区状态只有在KeyBy后才能使用,因为状态是根据输入流中定义的键(Key)来维护和访问的。每个Key分区间状态不共享

二.按键分区状态(Keyed State

按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。

2.1 值状态(ValueState

顾名思义,状态中只保存一个“值”(value)。ValueState<T>本身是一个接口,源码如下: 

@PublicEvolving
public interface ValueState<T> extends State {T value() throws IOException;void update(T var1) throws IOException;
}

这里的T是泛型,表示值状态数据类型。

对值的操作主要有以下:

// 获取当前状态值
T value()// 更新/覆盖状态值
update(T value)

在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}

这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。

案例:检测每种传感器的水位值,如果连续的两个水位值差值超过10,就输出报警。

public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// 定义状态,用于每组当前的水位线ValueState<Integer> lastVcState;// 必须在open方法中,初始化状态@Overridepublic void open(Configuration parameters) throws Exception {// 初始化值状态,需传入值状态描述器(唯一的名称,值的类型)lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastVc", Types.INT));}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {// 1.取出上一条水位线int lastVc = lastVcState.value() == null ? value.getVc() : lastVcState.value();// 2.判断是否超过10if (Math.abs(value.getVc() - lastVc) > 10) {out.collect("传感器:" + value.getId() + ",上一次水位线:" + lastVc + ",当前水位线:" + value.getVc() + ",触发报警(相差超过10)!!!");}// 更新当前状态lastVcState.update(value.getVc());}});process.print();env.execute();}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,13
s1,5,9
s1,6,22
s2,9,10
s2,10,23

输出:

传感器:s1,上一次水位线:1,当前水位线:13,触发报警(相差超过10)!!!
传感器:s1,上一次水位线:9,当前水位线:22,触发报警(相差超过10)!!!
传感器:s2,上一次水位线:10,当前水位线:23,触发报警(相差超过10)!!!

如果不使用状态存储,则需要定义HashMap存储每个Key的水位线,没有状态高效。

2.2 列表状态(ListState

将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。 

对 List 状态的操作主要有以下:

// 获取当前的列表状态,返回的是一个可迭代类型Iterable<T>
Iterable<T> get()// 传入一个列表values,直接对状态进行覆盖
update(List<T> values)// 向列表中添加多个元素,以列表values形式传入
addAll(List<T> values)

类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

案例:针对每种传感器输出最高的3个水位值。

public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// 定义 ListStateListState<Integer> vcListState;// 初始化 ListState@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<>("vcListState",Types.INT));}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {// 将当前水位线存入 ListStatevcListState.add(value.getVc());// 将 ListState (迭代器)中的值取出,拷贝到 List 中,List<Integer> vcList = new ArrayList<Integer>();for (Integer vc : vcListState.get()) {vcList.add(vc);}// 排序vcList.sort(((o1, o2) -> o2 - o1));// 取前三if (vcList.size() > 3) {vcList.remove(3);}out.collect("当前传感器:"+value.getId()+",最大的3个水位线为:"+vcList.toString());// 更新 ListStatevcListState.update(vcList);}});process.print();env.execute();}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,4,4
s1,5,3
s1,6,6
s2,5,6
s3,6,5
s2,4,7

输出:

当前传感器:s1,最大的3个水位线为:[1]
当前传感器:s1,最大的3个水位线为:[4, 1]
当前传感器:s1,最大的3个水位线为:[4, 3, 1]
当前传感器:s1,最大的3个水位线为:[6, 4, 3]
当前传感器:s2,最大的3个水位线为:[6]
当前传感器:s3,最大的3个水位线为:[5]
当前传感器:s2,最大的3个水位线为:[7, 6]

2.3 Map状态(MapState

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。使用与Map非常类似。

对Map状态的操作主要有以下:

// 根据key查询mapState中的value
UV get(UK key)// 向mapState中put一个键值对
put(UK key, UV value)// 向mapState中put多个键值对
putAll(Map<UK, UV> map)// 将指定key对应的键值对删除
remove(UK key)// 判断是否存在指定的key
boolean contains(UK key)// 获取映射状态中所有的键值对
Iterable<Map.Entry<UK, UV>> entries()// 获取映射状态中所有的键(key),返回一个可迭代Iterable类型
Iterable<UK> keys()// 获取映射状态中所有的值(value),返回一个可迭代Iterable类型
Iterable<UV> values()// 判断映射是否为空
boolean isEmpty()

案例:统计每种传感器每种水位值出现的次数。

public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// 定义Map状态,键为vc(Integer),值为count(Integer)MapState<Integer,Integer> vcCountMapState;// 初始化Map状态@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer,Integer>("vcCountMapState",Types.INT,Types.INT));}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {// 判断map状态中是否存在该vc,存在则count+1,否则put进map状态Integer vc = value.getVc();if(vcCountMapState.contains(value.getVc())){Integer vcCount = vcCountMapState.get(vc);vcCountMapState.put(vc , ++vcCount);}else{vcCountMapState.put(vc , 1);}StringBuilder outStr = new StringBuilder();outStr.append("传感器:"+value.getId()+",下的所有水位线及出现次数:\n");// 遍历该key下的所有键值for (Map.Entry<Integer, Integer> entry : vcCountMapState.entries()) {outStr.append("vc="+entry.getKey()+",count="+entry.getValue()+"\n");}outStr.append("------------------------------------------------------");out.collect(outStr.toString());}});process.print();env.execute();}
}

输入:

[root@VM-55-27-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,1
s2,1,1
s1,4,1

输出:

传感器:s1,下的所有水位线及出现次数:
vc=1,count=1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数:
vc=1,count=1
vc=2,count=1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数:
vc=1,count=2
vc=2,count=1
------------------------------------------------------
传感器:s2,下的所有水位线及出现次数:
vc=1,count=1
------------------------------------------------------
传感器:s1,下的所有水位线及出现次数:
vc=1,count=3
vc=2,count=1
------------------------------------------------------

2.4 归约状态(ReducingState

类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来

与之前不同的是,在归约状态描述器中需要传入ReduceFunction实现具体的归约逻辑。

对归约状态的操作主要有以下:

// 把新数据和之前的状态进行归约,并用得到的结果更新状态。
add(IN)// 获取归约状态中的值
OUT get()

案例:计算每种传感器的水位和。

public class KeyedReducingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// 定义Reducing状态ReducingState<Integer> vcSumReducingState;// 初始化Reducing状态(需要传入ReduceFunction实现具体的归约逻辑)@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSumReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("vcSumReducingState",// 归约逻辑 (两数相加)(v1, v2) -> v1+v2,Types.INT));}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {vcSumReducingState.add(value.getVc());out.collect("传感器:"+value.getId()+",水位线总值为:"+vcSumReducingState.get());}});process.print();env.execute();}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,3,3
s2,4,4
s2,5,5
s1,6,6
s3,7,7

输出:

传感器:s1,水位线总值为:1
传感器:s1,水位线总值为:3
传感器:s1,水位线总值为:6
传感器:s2,水位线总值为:4
传感器:s2,水位线总值为:9
传感器:s1,水位线总值为:12
传感器:s3,水位线总值为:7

2.5 聚合状态(AggregatingState

与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。并且允许输入、输出、中间累加器类型可以不一致。

对聚合状态的操作主要有以下:

// 向聚合状态中添加元素
add(IN)// 从聚合状态中获取结果
OUT get()

案例:计算每种传感器的平均水位。

public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));// 对传感器做KeyBySingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// 定义聚合状态AggregatingState<Integer,Double> vcAvgAggState;// 初始化聚合状态@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggState =getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("vcAvgAggState",// 聚合逻辑new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {// 初始化累加器@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}// 累加逻辑 (水位相加,次数+1)@Overridepublic Tuple2<Integer, Integer> add(Integer integer, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + integer , accumulator.f1 + 1);}// 结果 水位 / 次数@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return (accumulator.f0 * 1D) / accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> acc1) {return null;}},Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {vcAvgAggState.add(value.getVc());out.collect("传感器:"+value.getId()+",平均水位为:"+vcAvgAggState.get());}});process.print();env.execute();}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2
s1,4,4
s2,5,5
s2,6,6
s1,7,7

输出:

传感器:s1,平均水位为:1.0
传感器:s1,平均水位为:1.5
传感器:s1,平均水位为:2.3333333333333335
传感器:s2,平均水位为:5.0
传感器:s2,平均水位为:5.5
传感器:s1,平均水位为:3.5

2.6 状态生存时间(TTL)

随着Flink程序的运行,状态所消耗的存储空间也会随之增长,如果不限制则可能会导致存储空间耗尽。可以使用 .clear() 方法清除状态,但是不够灵活。

可以在状态描述器中通过.enableTimeToLive()方法启动TTL功能,并创建一个StateTtlConfig配置对象。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);// 开启TTL
stateDescriptor.enableTimeToLive(ttlConfig);

 主要的配置项:

.newBuilder() :状态TTL配置的构造器方法,需传入Time参数,设定状态过期时间
.setUpdateType():设置更新类型,什么时机进行更新失效时间(重置失效时间)

        OnCreateAndWrite :创建状态和更改状态(写操作)时更新失效时间

        OnReadAndWrite:无论读写操作都会更新失效时间

.setStateVisibility()设置状态的可见性

        NeverReturnExpired:表示从不返回过期值

        ReturnExpireDefNotCleanedUp:如果过期状态还存在,则返回

示例代码:

public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L));SingleOutputStreamOperator<String> process = sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {// 创建 StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(10)) // 状态存活时间为10s.updateTtlOnCreateAndWrite()  // 创建/更新状态时重置存活时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期状态.build();// 启用 TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVc", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);lastVcState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context ctx, Collector<String> out) throws Exception {out.collect("传感器:"+value.getId()+",当前状态值:"+lastVcState.value());lastVcState.update(value.getVc());}});process.print();env.execute();}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
...间隔10s
s1,2,2
s1,3,3
s1,4,4
s1,5,5

输出:

传感器:s1,当前状态值:null
传感器:s1,当前状态值:1
传感器:s1,当前状态值:null
传感器:s1,当前状态值:3
传感器:s1,当前状态值:4

三.算子状态(Operator State

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。(每个算子子任务共享一个算子状态,子任务间不共享)

算子状态的实际应用场景不如Keyed State,一般用在Source或Sink等与外部系统连接的算子上,一般使用不多。

当算子并行度发生变化时,算子状态也支持在并行的算子子任务实例间做重新分配,根据状态的类型不同,重组分配的方案也会不同。

算子状态也支持不同的结构类型,主要有三种:ListStateUnionListStateBroadcastState

3.1 列表状态(ListState)

与Keyed State中的ListState一样,将状态表示为一组数据的列表

与Keyed State中的列表状态的区别是,在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

当算子并行度进行缩放调整时,算子的状态列表将会被全部收集收集起来,再通过轮询的方式重新依次分配给新的所有并行任务。

算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

案例:在map算子中计算数据的个数。

/*** 在map算子中计算数据的个数*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 全局算子并行度为2env.setParallelism(2);env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyCountMapFunction()).print();env.execute();}// 实现 CheckPointedFunction 接口public static class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction {// 定义本地变量private Long count = 0L;// 定义算子状态private ListState<Long> state;// Map算子逻辑@Overridepublic Long map(String s) throws Exception {return ++count;}/*** 状态快照:用于将本地变量持久化至算子状态中,,开启checkpoint时才会调用* @param context the context for drawing a snapshot of the operator* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("调用了snapshotState方法...");// 清空状态state.clear();// 将本地变量存入状态中state.add(count);}/**、* 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次* @param context the context for initializing the operator* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("调用了initializeState方法...");// 从上下文中获取算子状态state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("list-state", Types.LONG));// 从算子状态中将数据拷贝至本地变量if (context.isRestored()) { // 判断是否初始化成功for (Long v : state.get()) {count += v;}}}}
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
a
b
c
d
e
f
g

输出:

调用了initializeState方法...
调用了initializeState方法...
1> 1
2> 1
1> 2
2> 2
1> 3
2> 3
1> 4

3.2 联合列表状态(UnionListState

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

在并行度进行缩放调整时,联合列表与普通列表不同,联合列表会将所有并行子任务的列表状态收集起来,并直接向所有并行子任务广播完整的列表。如果列表中状态项太多则不推荐使用联合里欸包状态。

使用上也与ListState类似,只需要在实现CheckpointedFunction类的initializeState方法时,通过上下文获取算子状态使用 .getUnionListState() 即可,其他与ListState无异。

state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>("list-state", Types.LONG));

3.3 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)

在并行度进行缩放操作时,由于是全局状态,也不会造成影响。

案例:水位超过指定的阈值发送告警,阈值可以动态修改

/*** 水位超过指定的阈值发送告警,阈值可以动态修改。*/
public class OperatoBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("xxx.xxx.xxx.xxx", 1234).map(new MyMapFunctionImpl());// 配置流:用于广播配置(阈值配置将发往这条流)DataStreamSource<String> configDS = env.socketTextStream("xxx.xxx.xxx.xxx", 4321);// 将配置流进行广播MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);// 将数据流和广播后的配置流使用connect进行连接BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);// 调用processsensorBCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 数据流的处理逻辑,可以通过上下文读取广播状态(只读)* @param value The stream element.* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {//  通过上下文获取广播状态的值(阈值)ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);// 未从广播状态中读到值则设置默认值Integer threshold = broadcastState.get("threshold") != null ? broadcastState.get("threshold"): 0;if(value.getVc() > threshold){out.collect("传感器:"+ value.getId()+",当前水位为:"+ value.getVc()+",触发了阈值:"+threshold);}}/*** 配置广播流的处理逻辑,可以通过上下文可以往广播状态写入值* @param value The stream element.* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, BroadcastProcessFunction<WaterSensor, String, String>.Context ctx, Collector<String> out) throws Exception {// 读取流中的阈值,写入广播状态中BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("threshold" , Integer.valueOf(value));}}).print();env.execute();}
}

输入:

// 数据流:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,2

输出:

2> 传感器:s1,当前水位为:1,触发了阈值:0
1> 传感器:s1,当前水位为:2,触发了阈值:0

输入:

// 广播配置流:
[root@VM-55-24-centos ~]# nc -lk 4321
10
// 数据流:
[root@VM-55-24-centos ~]# nc -lk 1234
s1,7,7
s1,11,11

输出:

1> 传感器:s1,当前水位为:11,触发了阈值:10

简单来说,就是一条流广播后专门读取配置,与普通的数据流进行连结,然后广播流将配置加载到广播状态中,这样普通的数据流就能够在不重启程序的情况下通过上下文动态读取配置。

应用场景:MySQL定义中一张配置表,定义一条配置流读取MySQL中的binlog,配置表如有修改,就将相应的配置广播出去,更改数据库即可实现线上程序动态配置。

四.状态后端(State Backends

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)状态后端主要负责管理本地状态的存储方式和位置

4.1 状态后端的分类(HashMapStateBackend/RocksDB)

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。

4.1.1 哈希表状态后端(HashMapStateBackend

HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在TaskmanagerJVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

4.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里

RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

EmbeddedRocksDBStateBackend始终执行的是异步快照(快照时不会阻塞任务),所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

4.2 如何选择正确的状态后端

HashMapStateBackendEmbeddedRocksDBStateBackend
存储介质Taskmanager的JVM堆内存Taskmanager的JVM的文件磁盘
读写速度
  • 由此可以看出,虽然HashMapStateBackend的读写速度快,但是使用的是Taskmanager的JVM堆内存,如果存储的状态较大,则可能会将Taskmanager的内存耗尽。
  • EmbeddedRocksDBStateBackend则存在Taskmanager的本地磁盘中,可以存储大的状态,不过牺牲了一定的读写速度。

4.3 状态后端的配置

在默认配置下,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的配置名称为state.backend,可修改为hashmap或rocksdb。除此之外,还可以在提交作业时通过参数设置状态后端、以及在代码中指定。

4.3.1 配置默认的状态后端

flink-conf.yaml中,可以使用state.backend来配置默认状态后端

配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。

# 默认状态后端
state.backend: hashmap

4.2.2 为每个作业(Per-job/Application)单独配置状态后端

  • 通过执行环境设置 hashMapStateBackend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置状态后端为HashMap
HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();
env.setStateBackend(hashMapStateBackend);
  • 通过执行环境设置 EmbeddedRocksDBStateBackend

在IDE使用EmbeddedRocksDBStateBackend则需要导入以下依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

设置 EmbeddedRocksDBStateBackend 状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置状态后端为RocksDB
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
env.setStateBackend(embeddedRocksDBStateBackend);

4.2.3 提交参数设置状态后端

[root@VM-55-24-centos flink-1.17.0]# 
bin/flink run -m localhost:1234 -D state.backend=rocksdb -c com.xxx.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

-D :指定状态后端

相关文章:

Flink中的状态管理

一.Flink中的状态 1.1 概述 在Flink中&#xff0c;算子任务可以分为有状态和无状态两种状态。 无状态的算子任务只需要观察每个独立事件&#xff0c;根据当前输入的数据直接转换输出结果。例如Map、Filter、FlatMap都是属于无状态算子。 而有状态的算子任务&#xff0c;就…...

【linux】线程互斥

线程互斥 1.线程互斥2.可重入VS线程安全3.常见锁的概念 喜欢的点赞&#xff0c;收藏&#xff0c;关注一下把&#xff01; 1.线程互斥 到目前为止我们学了线程概念&#xff0c;线程控制接下来我们进行下一个话题&#xff0c;线程互斥。 有没有考虑过这样的一个问题&#xff0c…...

机器学习原理到Python代码实现之LinearRegression

Linear Regression 线性回归模型 该文章作为机器学习的第一篇文章&#xff0c;主要介绍线性回归模型的原理和实现方法。 更多相关工作请参考&#xff1a;Github 算法介绍 线性回归模型是一种常见的机器学习模型&#xff0c;用于预测一个连续的目标变量&#xff08;也称为响应变…...

Hive SQL / SQL

1. 建表 & 拉取表2. 插入数据 insert select3. 查询3.1 查询语句语法/顺序3.2 关系操作符3.3 聚合函数3.4 where3.5 分组聚合3.6 having 筛选分组后结果3.7 显式类型转换 & select产生指定值的列 4. join 横向拼接4.1 等值连接 & 不等值连接4.2 两表连接4.2.1 内连…...

程序媛的mac修炼手册--MacOS系统更新升级史

啊&#xff0c;我这个口罩三年从未感染过新冠的天选免疫王&#xff0c;却被支原体击倒&#x1f637;大意了&#xff0c;前几天去医院体检&#xff0c;刚检查完出医院就摘口罩了&#x1f926;大伙儿还是要注意戴口罩&#xff0c;保重身体啊&#xff01;身体欠恙&#xff0c;就闲…...

【数据库原理】(9)SQL简介

一.SQL 的发展历史 起源&#xff1a;SQL 起源于 1970 年代&#xff0c;由 IBM 的研究员 Edgar F. Codd 提出的关系模型概念演化而来。初期&#xff1a;Boyce 和 Chamberlin 在 IBM 开发了 SQUARE 语言的原型&#xff0c;后发展成为 SQL。这是为了更好地利用和管理关系数据库。…...

第二百五十二回

文章目录 概念介绍实现方法示例代码 我们在上一章回中介绍了如何在页面中添加图片相关的内容&#xff0c;本章回中将介绍如何给组件添加阴影.闲话休提&#xff0c;让我们一起Talk Flutter吧。 概念介绍 我们在本章回中介绍的阴影类似影子&#xff0c;只是它不像影子那么明显&a…...

Leetcode 3701 · Find Nearest Right Node in Binary Tree (遍历和BFS好题)

3701 Find Nearest Right Node in Binary TreePRE Algorithms This topic is a pre-release topic. If you encounter any problems, please contact us via “Problem Correction”, and we will upgrade your account to VIP as a thank you. Description Given a binary t…...

网站被攻击了,接入CDN对比直接使用高防服务器有哪些优势

网站是互联网行业中经常被攻击的目标之一。攻击是许多站长最害怕遇到的情况。当用户访问一个网站&#xff0c;页面半天打不开&#xff0c;响应缓慢&#xff0c;或者直接打不开&#xff0c;多半是会直接走开&#xff0c;而不是等待继续等待相应。针对网站攻击的防护&#xff0c;…...

location常用属性和方法

目录 Location 对象 Location 对象属性 Location 对象方法 location.assign() location.replace() location.reload() Location 对象 Location 对象包含有关当前 URL 的信息。Location 对象是 Window 对象的一个部分&#xff0c;可通过 window.location 属性来访问。 L…...

二分图

目录 二分图 染色法判定二分图 匈牙利算法 二分图 二分图&#xff0c;又叫二部图&#xff0c;将所有点分成两个集合&#xff0c;使得所有边只出现在集合之间的点之间&#xff0c;而集合内部的点之间没有边。二分图当且仅当图中没有奇数环。只要图中环的边数没奇数个数的&am…...

[VUE]3-路由

目录 路由 Vue-Router1、Vue-Router 介绍2、路由配置3、嵌套路由3.1、简介3.2、实现步骤3.3、⭐注意事项 4、⭐router-view标签详解 ​&#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅…...

Kafka(六)消费者

目录 Kafka消费者1 配置消费者bootstrap.serversgroup.idkey.deserializervalue.deserializergroup.instance.idfetch.min.bytes1fetch.max.wait.msfetch.max.bytes57671680 (55 mebibytes)max.poll.record500max.partition.fetch.bytessession.timeout.ms45000 (45 seconds)he…...

RK3399平台入门到精通系列讲解(实验篇)共享工作队列的使用

🚀返回总目录 文章目录 一、工作队列相关接口函数1.1、初始化函数1.2、调度/取消调度工作队列函数二、信号驱动 IO 实验源码2.1、Makefile2.2、驱动部分代码工作队列是实现中断下半部分的机制之一,是一种用于管理任务的数据结构或机制。它通常用于多线程,多进程或分布式系统…...

STM32 基于 MPU6050 的飞行器姿态控制设计与实现

基于STM32的MPU6050姿态控制设计是无人机、飞行器等飞行器件开发中的核心技术之一。在本文中&#xff0c;我们将介绍如何利用STM32和MPU6050实现飞行器的姿态控制&#xff0c;并提供相应的代码示例。 1. 硬件连接及库配置 首先&#xff0c;我们需要将MPU6050连接到STM32微控制…...

大数据平台Bug Bash大扫除最佳实践

一、背景 随着越来越多的"新人"在日常工作以及大促备战中担当大任&#xff0c;我们发现仅了解自身系统业务已不能满足日常系统开发运维需求。为此&#xff0c;大数据平台部门组织了一次Bug Bash活动&#xff0c;既能提升自己对兄弟产品的理解和使用&#xff0c;又能…...

JavaScript 中的数组过滤

在构建动态和交互式程序时&#xff0c;您可能需要添加一些交互式功能。例如&#xff0c;用户单击按钮以筛选一长串项目。 您可能还需要处理大量数据&#xff0c;以仅返回与指定条件匹配的项目。 在本文中&#xff0c;您将学习如何使用两种主要方法在 JavaScript 中过滤数组。…...

随机森林(Random Forest)

随机森林&#xff08;Random Forest&#xff09;是一种集成学习方法&#xff0c;通过组合多个决策树来提高模型的性能和鲁棒性。随机森林在每个决策树的训练过程中引入了随机性&#xff0c;包括对样本和特征的随机选择&#xff0c;以提高模型的泛化能力。以下是随机森林的基本原…...

本地引入Element UI后导致图标显示异常

引入方式 npm 安装 推荐使用 npm 的方式安装&#xff0c;它能更好地和 webpack 打包工具配合使用。 npm i element-ui -SCDN 目前可以通过 unpkg.com/element-ui 获取到最新版本的资源&#xff0c;在页面上引入 js 和 css 文件即可开始使用。 <!-- 引入样式 --> <…...

UE5.1_UMG序列帧动画制作

UE5.1_UMG序列帧动画制作 UMG序列帧动画制作相对比较简单&#xff0c;不像视频帧需要创建媒体播放器那么复杂&#xff0c;以下简要说明&#xff1a; 1. 事件函数 2. 准备序列帧装入数组 3. 构造调用事件函数 4. 预览 序列帧UMG0105 5. 完成&#xff01;按需配置即可。...

总结HarmonyOS的技术特点

HarmonyOS是华为自主研发的面向全场景的分布式操作系统。它的技术特点主要体现在以下几个方面&#xff1a; 分布式架构&#xff1a;HarmonyOS采用了分布式架构设计&#xff0c;通过组件化和小型化等方法&#xff0c;支持多种终端设备按需弹性部署&#xff0c;能够适配不同类别的…...

从0到1入门C++编程——04 类和对象之封装、构造函数、析构函数、this指针、友元

文章目录 一、封装二、项目文件拆分三、构造函数和析构函数1.构造函数的分类及调用2.拷贝函数调用时机3.构造函数调用规则4.深拷贝与浅拷贝5.初始化列表6.类对象作为类成员7.静态成员 四、C对象模型和this指针1.类的对象大小计算2.this指针3.空指针访问成员函数4.const修饰成员…...

Robot Operating System 2: Design, Architecture, and Uses In The Wild

Robot Operating System 2: Design, Architecture, and Uses In The Wild (机器人操作系统 2&#xff1a;设计、架构和实际应用) 摘要&#xff1a;随着机器人在广泛的商业用例中的部署&#xff0c;机器人革命的下一章正在顺利进行。即使在无数的应用程序和环境中&#xff0c;也…...

TinyEngine 服务端正式开源啦!!!

背景介绍 TinyEngine 低代码引擎介绍 随着企业对于低代码开发平台的需求日益增长&#xff0c;急需一个通用的解决方案来满足各种低代码平台的开发需求。正是在这种情况下&#xff0c;低代码引擎应运而生。它是一种通用的开发框架&#xff0c;通过对低代码平台系统常用的功能进…...

网页设计与制作web前端设计html+css+js成品。电脑网站制作代开发。vscodeDrea 【企业公司宣传网站(HTML静态网页项目实战)附源码】

网页设计与制作web前端设计htmlcssjs成品。电脑网站制作代开发。vscodeDrea 【企业公司宣传网站&#xff08;HTML静态网页项目实战&#xff09;附源码】 https://www.bilibili.com/video/BV1Hp4y1o7RY/?share_sourcecopy_web&vd_sourced43766e8ddfffd1f1a1165a3e72d7605...

Avalonia学习(二十)-登录界面演示

今天开始继续Avalonia练习。 本节&#xff1a;演示实现登录界面 在网上看见一个博客&#xff0c;展示Avalonia实现&#xff0c;仿照GGTalk&#xff0c;我实现了一下&#xff0c;感觉是可以的。将测试的数据代码效果写下来。主要是样式使用&#xff0c;图片加载方式。 只有前…...

Spring依赖注入的魔法:深入DI的实现原理【beans 五】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 Spring依赖注入的魔法&#xff1a;深入DI的实现原理【beans 五】 前言DI的基本概念基本概念&#xff1a;为什么使用依赖注入&#xff1a; 构造器注入构造器注入的基本概念&#xff1a;示例&#xff1a…...

【学习笔记】1、数字逻辑概论

1.1 数字信号 数字信号&#xff0c;在时间和数值上均是离散的。数字信号的表达方式&#xff1a;二值数字逻辑和逻辑电平描述的数字波形。 &#xff08;1&#xff09; 数字波形的两种类型 数值信号又称为“二值信号”。数字波形又称为“二值位形图”。 什么是一拍 一定的时…...

设置代理IP地址对网络有什么影响?爬虫代理IP主要有哪些作用?

在互联网的广泛应用下&#xff0c;代理IP地址成为了一种常见的网络技术。代理IP地址可以改变用户的上网行为&#xff0c;进而影响网络访问的速度和安全性。本篇文章将探讨设置代理IP地址对网络的影响&#xff0c;以及爬虫代理IP的主要作用。 首先&#xff0c;让我们来了解一下代…...

聊聊jvm的mapped buffer的统计

序 本文主要研究一下jvm的mapped buffer的统计 示例 private void writeDirectBuffer() {// 分配一个256MB的直接缓冲区ByteBuffer buffer ByteBuffer.allocateDirect(256 * 1024 * 1024);// 填充数据Random random new Random();while (buffer.remaining() > 4) {buff…...

手机怎么制作h5作品/搜索引擎优化的作用是什么

。。。。认真吧。。。转载于:https://www.cnblogs.com/lalawu/p/3553173.html...

网站的后台登录注册怎么做/个人免费网站建设

一、构造函数初始化列表 推荐在构造函数初始化列表中进行初始化 构造函数的执行分为两个阶段 初始化段 普通计算段 &#xff08;一&#xff09;、对象成员及其初始化 C Code 12345678910111213141516171819202122232425262728293031323334353637383940#include <iostream&…...

温州模板网站建站/百度seo高级优化

对于Java中的常量这一主题&#xff0c;似乎存在许多混淆。有些人使用整数或字符串来定义常量&#xff0c;而另一些人则使用枚举。.我还遇到了在它们自己的接口中定义的常量--其中使用常量的类必须实现接口。此策略通常称为接口常量设计模式。在本文中&#xff0c;我们将查看在J…...

做网站容易吗/央视新闻的新闻

一&#xff0e;WITH AS的含义 WITH AS短语&#xff0c;也叫做子查询部分&#xff08;subquery factoring&#xff09;&#xff0c;可以让你做很多事情&#xff0c;定义一个SQL片断&#xff0c;该SQL片断会被整个SQL语句所用到。 其实就是把一大堆重复用到的SQL语句放在with as …...

web前端培训十大坑爹/深圳seo优化推广公司

第六章 TCP与UDP 6.1 传输层的作用 6.1.1 传输层定义 IP层负责将数据包从发送端传输到接收端&#xff0c;而传输层负责建立发送端与接收端的连接&#xff0c;并判断发送端的发送程序与接收端的接收程序&#xff08;通过端口号&#xff09;。在TCP层中&#xff0c;通常称为客户…...

免费网站代码/seo快速排名优化公司

1、前言 指纹模块和树莓派实现的网上的资料比较少。所以把自己研究成功了的指纹模块给大家分享一下。现在目前市场上指纹模块比较多&#xff0c;大多还支持二次开发&#xff0c;但是难易度不同&#xff0c;一般价格高一点的相对开发简单&#xff0c;很多实现方法都已封装好了。…...