当前位置: 首页 > news >正文

Flink1.17实战教程(第五篇:状态管理)

系列文章目录

Flink1.17实战教程(第一篇:概念、部署、架构)
Flink1.17实战教程(第二篇:DataStream API)
Flink1.17实战教程(第三篇:时间和窗口)
Flink1.17实战教程(第四篇:处理函数)
Flink1.17实战教程(第五篇:状态管理)
Flink1.17实战教程(第六篇:容错机制)
Flink1.17实战教程(第七篇:Flink SQL)


文章目录

  • 系列文章目录
  • 1. Flink中的状态
    • 1.1 概述
    • 1.2 状态的分类
  • 2. 按键分区状态(Keyed State)
    • 2.1 值状态(ValueState)
    • 2.2 列表状态(ListState)
    • 2.3 Map状态(MapState)
    • 2.4 归约状态(ReducingState)
    • 2.5 聚合状态(AggregatingState)
    • 2.6 状态生存时间(TTL)
  • 3. 算子状态(Operator State)
    • 3.1 列表状态(ListState)
    • 3.2 联合列表状态(UnionListState)
    • 3.3 广播状态(BroadcastState)
  • 4. 状态后端(State Backends)
    • 4.1 状态后端的分类(HashMapStateBackend/RocksDB)
    • 4.2 如何选择正确的状态后端
    • 4.3 状态后端的配置


1. Flink中的状态

1.1 概述

在这里插入图片描述

1.2 状态的分类

1)托管状态(Managed State)和原始状态(Raw State)
Flink的状态有两种:托管状态(Managed State)原始状态(Raw State)托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
通常我们采用Flink托管状态来实现需求。

2)算子状态(Operator State)和按键分区状态(Keyed State)
接下来我们的重点就是托管状态(Managed State)。
我们知道在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。在这种情况下,状态的访问方式又会有所不同。
基于这样的想法,我们又可以将托管状态分为两类:算子状态按键分区状态

在这里插入图片描述

在这里插入图片描述
另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State。比如RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。从这个角度讲,Flink中所有的算子都可以是有状态的
无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享

2. 按键分区状态(Keyed State)

按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。

需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

2.1 值状态(ValueState)

顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

public interface ValueState<T> extends State {T value() throws IOException;void update(T value) throws IOException;
}

这里的T是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState。
我们可以在代码中读写值状态,实现对于状态的访问和更新。

  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。

在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {super(name, typeClass, null);
}

这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。
案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。

public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {// TODO 1.定义状态ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中,初始化状态// 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//                                lastVcState.value();  // 取出 本组 值状态 的数据
//                                lastVcState.update(); // 更新 本组 值状态 的数据
//                                lastVcState.clear();  // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null,判断)int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();// 2. 求差值的绝对值,判断是否超过10Integer vc = value.getVc();if (Math.abs(vc - lastVc) > 10) {out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();}
}

2.2 列表状态(ListState)

将需要保存的数据,以列表(List)的形式组织起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

  • Iterable get():获取当前的列表状态,返回的是一个可迭代类型Iterable;
  • update(List values):传入一个列表values,直接对状态进行覆盖;
  • add(T value):在状态列表中添加一个元素value;
  • addAll(List values):向列表中添加多个元素,以列表values形式传入。

类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。
案例:针对每种传感器输出最高的3个水位值

public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.来一条,存到list状态里vcListState.add(value.getVc());// 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的Iterable<Integer> vcListIt = vcListState.get();// 2.1 拷贝到List中List<Integer> vcList = new ArrayList<>();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 对List进行降序排序vcList.sort((o1, o2) -> o2 - o1);// 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)if (vcList.size() > 3) {// 将最后一个元素清除(第4个)vcList.remove(3);}out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());// 3.更新list状态vcListState.update(vcList);//                                vcListState.get();            //取出 list状态 本组的数据,是一个Iterable
//                                vcListState.add();            // 向 list状态 本组 添加一个元素
//                                vcListState.addAll();         // 向 list状态 本组 添加多个元素
//                                vcListState.update();         // 更新 list状态 本组数据(覆盖)
//                                vcListState.clear();          // 清空List状态 本组数据}}).print();env.execute();}
}

2.3 Map状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似。

  • get(UK key):传入一个key作为参数,查询对应的value值;
  • put(UK key, UV value):传入一个键值对,更新key对应的value值;
  • putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
  • remove(UK key):将指定key对应的键值对删除;
  • boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。另外,MapState也提供了获取整个映射相关信息的方法;
  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
  • Iterable keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
  • Iterable values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个boolean值。

案例需求:统计每种传感器每种水位值出现的次数。

public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer, Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 1.判断是否存在vc对应的keyInteger vc = value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含这个vc的key,直接对value+1Integer count = vcCountMapState.get(vc);vcCountMapState.put(vc, ++count);} else {// 1.2 如果不包含这个vc的key,初始化put进去vcCountMapState.put(vc, 1);}// 2.遍历Map状态,输出每个k-v的值StringBuilder outStr = new StringBuilder();outStr.append("======================================\n");outStr.append("传感器id为" + value.getId() + "\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() + "\n");}outStr.append("======================================\n");out.collect(outStr.toString());//                                vcCountMapState.get();          // 对本组的Map状态,根据key,获取value
//                                vcCountMapState.contains();     // 对本组的Map状态,判断key是否存在
//                                vcCountMapState.put(, );        // 对本组的Map状态,添加一个 键值对
//                                vcCountMapState.putAll();  // 对本组的Map状态,添加多个 键值对
//                                vcCountMapState.entries();      // 对本组的Map状态,获取所有键值对
//                                vcCountMapState.keys();         // 对本组的Map状态,获取所有键
//                                vcCountMapState.values();       // 对本组的Map状态,获取所有值
//                                vcCountMapState.remove();   // 对本组的Map状态,根据指定key,移除键值对
//                                vcCountMapState.isEmpty();      // 对本组的Map状态,判断是否为空
//                                vcCountMapState.iterator();     // 对本组的Map状态,获取迭代器
//                                vcCountMapState.clear();        // 对本组的Map状态,清空}}).print();env.execute();}
}

2.4 归约状态(ReducingState)

类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法类似于ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。
归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。

public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。
案例:计算每种传感器的水位和

......
.process(new KeyedProcessFunction<StringWaterSensorInteger>() {private ReducingState<Integer> sumVcState;@Overridepublic void open(Configuration parameters) throws Exception {sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState"Integer::sumInteger.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})

2.5 聚合状态(AggregatingState)

与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。
同样地,AggregatingState接口调用方法也与ReducingState相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction进行聚合并更新状态。
案例需求:计算每种传感器的平均水位

public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer, Double> vcAvgAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAvgAggregatingState",new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0, 0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0 * 1D / accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
//                                                                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 将 水位值 添加到  聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg = vcAvgAggregatingState.get();out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);//                                vcAvgAggregatingState.get();    // 对 本组的聚合状态 获取结果
//                                vcAvgAggregatingState.add();    // 对 本组的聚合状态 添加数据,会自动进行聚合
//                                vcAvgAggregatingState.clear();  // 对 本组的聚合状态 清空数据}}).print();env.execute();}
}

2.6 状态生存时间(TTL)

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  • .newBuilder()
    状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间
  • .setUpdateType()
    设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。
  • .setStateVisibility()
    设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
    除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));sensorDS.keyBy(r -> r.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {// 先获取状态值,打印 ==》 读取状态Integer lastVc = lastVcState.value();out.collect("key=" + value.getId() + ",状态值=" + lastVc);// 如果水位大于10,更新状态值 ===》 写入状态if (value.getVc() > 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}

3. 算子状态(Operator State)

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。
算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。
当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

3.1 列表状态(ListState)

与Keyed State中的ListState一样,将状态表示为一组数据的列表。
与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。
案例实操:在map算子中计算数据的个数。

public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream("hadoop102", 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState...");// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次** @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState...");// 3.1 从 上下文 初始化 算子状态state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count += c;}}}}
}

3.2 联合列表状态(UnionListState)

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
使用方式同ListState,区别在如下标红部分:

state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

3.3 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
案例实操:水位超过指定的阈值发送告警,阈值可以动态修改。

public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// 配置流(用来广播配置)DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);// TODO 1. 将 配置流 广播MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {// TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);Integer threshold = broadcastState.get("threshold");// 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来threshold = (threshold == null ? 0 : threshold);if (value.getVc() > threshold) {out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");}}/*** 广播后的配置流的处理方法:  只有广播流才能修改 广播状态* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {// TODO 4. 通过上下文获取广播状态,往里面写数据BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);broadcastState.put("threshold", Integer.valueOf(value));}}).print();env.execute();}
}

4. 状态后端(State Backends)

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置

4.1 状态后端的分类(HashMapStateBackend/RocksDB)

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
(1)哈希表状态后端(HashMapStateBackend)
HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
(2)内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里
RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。
EmbeddedRocksDBStateBackend始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

4.2 如何选择正确的状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

4.3 状态后端的配置

在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
(1)配置默认的状态后端
在flink-conf.yaml中,可以使用state.backend来配置默认状态后端。
配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;如果配置项的值是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。
下面是一个配置HashMapStateBackend的例子:

# 默认状态后端
state.backend: hashmap# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录。

(2)为每个作业(Per-job/Application)单独配置状态后端
通过执行环境设置,HashMapStateBackend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());

通过执行环境设置,EmbeddedRocksDBStateBackend。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version>
</dependency>

而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink),所以只要我们的代码中没有使用RocksDB的相关内容,就不需要引入这个依赖。

相关文章:

Flink1.17实战教程(第五篇:状态管理)

系列文章目录 Flink1.17实战教程&#xff08;第一篇&#xff1a;概念、部署、架构&#xff09; Flink1.17实战教程&#xff08;第二篇&#xff1a;DataStream API&#xff09; Flink1.17实战教程&#xff08;第三篇&#xff1a;时间和窗口&#xff09; Flink1.17实战教程&…...

ES慢查询分析——性能提升6 倍

问题 生产环境频繁报警。查询跨度91天的数据&#xff0c;请求耗时已经来到了30s。报警的阈值为5s。我们期望值是5s内&#xff0c;大于该阈值的请求&#xff0c;我们认为是慢查询。这些慢查询&#xff0c;最终排查&#xff0c;是因为走到了历史集群上。受到了数据迁移的一定影响…...

[NAND Flash 4.3] 闪存的物理学原理_NAND Flash 的读、写、擦工作原理

依公知及经验整理,原创保护,禁止转载。 专栏 《深入理解NAND Flash》 <<<< 返回总目录 <<<< 2.1.3.1 Flash 的物理学原理与发明历程 经典物理学认为 物体越过势垒,有一阈值能量;粒子能量小于此能量则不能越过,大于此能 量则可以越过。例如骑自行…...

海豚调度 Dolphinscheduler-3.2.0/DolphinScheduler-3.1.9 离线部署 伪集群模式

Dolphinscheduler-3.2.0(离线)伪集群模式 一、依赖(前置准备工作) 1.JDK&#xff1a;版本要求 JDK(1.8),安装并配置 JAVA_HOME 环境变量,并将其下的 bin 目录追加到PATH 环境变量中; 2.数据库&#xff1a;PostgreSQL(8.2.15) 或者MySQL(5.7),两者任选其一即可,如 MySQL 则需要…...

4.33 构建onnx结构模型-Expand

前言 构建onnx方式通常有两种&#xff1a; 1、通过代码转换成onnx结构&#xff0c;比如pytorch —> onnx 2、通过onnx 自定义结点&#xff0c;图&#xff0c;生成onnx结构 本文主要是简单学习和使用两种不同onnx结构&#xff0c; 下面以 Expand 结点进行分析 方式 方法一…...

LeetCode——1599. 经营摩天轮的最大利润

通过万岁&#xff01;&#xff01;&#xff01; 题目&#xff1a;就是一个摩天轮&#xff0c;一共有4个仓位&#xff0c;一个仓位中最多可以做4个人。然后每次上一个人boardingCost钱&#xff0c;但是我们转动1/4圈&#xff0c;需要的成本是runningCost。然后给我们一个数组cu…...

从 MySQL 的事务 到 锁机制 再到 MVCC

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、事务 1.1 含义 1.2 ACID 二、锁机制 2.1 锁分类 2.2 隔离级别 三、MVCC 3.1 介绍 3.2 隔离级别 3.3 原理 四、总结 前…...

PostGreSQL远程连接

1、找到PostGreSQL安装目录&#xff0c;修改“postgresql.conf”配置文件&#xff08;安装路径\data\postgresql.conf&#xff09;。 若不知道安装目录&#xff0c;则通过服务&#xff0c;找到PostGreSQL运行的任务&#xff0c;右击“属性”可以进行查看安装的目录。 进入该目…...

CSS 缩减顶部动画

<template><!-- mouseenter"startAnimation" 表示在鼠标进入元素时触发 startAnimation 方法。mouseleave"stopAnimation" 表示在鼠标离开元素时触发 stopAnimation 方法。 --><!-- 容器元素 --><div class"container" mou…...

开源掌机是什么?

缘起 最近在学习小游戏的开发&#xff0c;偶然发现有一种叫“掌机”的游戏机&#xff0c;可以玩远古的各类游戏机、街机游戏&#xff01;并且价格都还很便宜。这种神器的东西到底是什么&#xff1f;是如何工作的呢&#xff1f;有市场前景吗&#xff1f;带着这些疑问&#xff0…...

基于Wenet长音频分割降噪识别

Wenet是一个流行的语音处理工具&#xff0c;它专注于长音频的处理&#xff0c;具备分割、降噪和识别功能。它的长音频分割降噪识别功能允许对长时间录制的音频进行分段处理&#xff0c;首先对音频进行分割&#xff0c;将其分解成更小的段落或语音片段。接着进行降噪处理&#x…...

mysql基础-表操作

环境&#xff1a; 管理工具&#xff1a;Navicat 数据库版本&#xff1a;5.7.37 mysql的版本&#xff0c;我们可以通过函数&#xff0c;version()进行查看&#xff0c;本次使用的版本如下&#xff1a; 目录 1.管理工具 1.1创建表 1.2.修改表名 1.3.复制表 1.4.删除表 2…...

MySql——1146 - Table‘mysql.proc‘doesn‘t exit是这个

项目场景&#xff1a; 做自己的小项目需要连接mysql数据库 问题描述 点击数据库时报错 1146 - Table’mysql.proc’doesn’t exit 原因分析&#xff1a; 误删原生的mysql数据库 解决方案&#xff1a; 重新安装装部署mysql就好了 注意不要轻易删除原生的东西...

玩转贝启科技BQ3588C开源鸿蒙系统开发板 —— 代码下载(1)

本文主要参考&#xff1a; BQ3588C_代码下载 1. 安装依赖工具 安装命令如下&#xff1a; sudo apt-get update && sudo apt-get install binutils git git-lfs gnupg flexbison gperf build-essential zip curl zlib1g-dev gcc-multilib g-multiliblibc6-dev-i386 l…...

开源预约挂号平台 - 从0到上线

文章目录 开源预约挂号平台 - 从0到上线演示地址源码地址可以学到的技术前端技术后端技术部署上线开发工具其他技术业务功能 项目讲解前端创建项目 - 安装PNPM - 使用VSCODE - 安装插件首页顶部与底部 - 封装组建 - 使用scss左右布局中间内容部分路由 - vue-routerBANNER- 走马…...

Vue3的proxy

vue3.0中,使用proxy替换了原来遍历对象使用Object.defineProperty方法给属性添加set/get    vue的核心能力之一是监听用户定义的状态变化并响应式刷新DOM   vue2是通过替换状态对象属性的getter和setter来实现的,vue3则通过proxy进行   改为proxy后,可以突破vue当前的…...

Vue Router的介绍与引入

在这里是记录我引入Vue Router的全过程&#xff0c;引入方面也最好先看官方文档 一.介绍 Vue Router 是 Vue.js 的官方路由。它与 Vue.js 核心深度集成&#xff0c;让用 Vue.js 构建单页应用变得轻而易举。功能包括&#xff1a; 嵌套路由映射动态路由选择模块化、基于组件的…...

StratifiedKFold解释和代码实现

StratifiedKFold解释和代码实现 文章目录 一、StratifiedKFold是什么&#xff1f;二、 实验数据设置2.1 实验数据生成代码2.2 代码结果 三、实验代码3.1 实验代码3.2 实验结果3.3 结果解释3.4 数据打乱对这种交叉验证的影响。 四、总结 一、StratifiedKFold是什么&#xff1f; …...

四十八----react实战

一、项目中css模块化管理 1、css-loader 以下可以使用styles.xxx方式使用class是因为使用css-loader配置了module。 import styles from ./index.less export const App(){return <div className={styles.xxx}>hello word</div> }//webpack配置 {test:/\.css$/,u…...

三步实现Java的SM2前端加密后端解密

秦医如毒&#xff0c;无药可解。 话不多说&#xff0c;先上需要用到的js文件下载链接 和 jsp前端代码。 第一步&#xff1a;下载两个必备的js文件—— crypto-js.js、sm2.js 。 它们的下载链接如下↓&#xff08;该网页不魔法上网的话会很卡&#xff0c;毕竟github&#x…...

1分钟带你了解golang(go语言)

Golang&#xff1a;也被称为Go语言&#xff0c;是一种开源的编程语言。由Google的Robert Griesemer、Rob Pike和Ken Thompson于2007年开始设计&#xff0c;2009年11月正式对外发布。&#xff08;被誉为21世纪的C语言&#xff09; 像python一样的优雅&#xff0c;有c一样的性能…...

CSS-4

平面转换 整体认识 div {margin: 100px 0;width: 100px;height: 100px;background-color: pink;/* 过渡效果 */transition: all 1s;}/* 当鼠标悬停到div时&#xff0c;进行平面转换 */div:hover {transform: translate(800px) rotate(360deg) scale(2) skew(180deg);}作用&…...

Python为何适合开发AI项目?

Python在人工智能&#xff08;AI&#xff09;项目中的流行和广泛应用归因于多个因素&#xff0c;其中一些主要原因包括&#xff1a; 1、易学易用&#xff1a; Python语法简洁清晰&#xff0c;易于学习和理解。这使得新手能够更容易上手&#xff0c;并且对于处理复杂的AI算法和…...

总结心得:各设计模式使用场景

单例模式&#xff1a;创建单个对象 工厂模式&#xff1a;创建对象交给工厂完成&#xff0c;当需要创建的对象是一系列相互关联或相互依赖的产品族时 原型模式&#xff1a;克隆对象&#xff0c;避免创建初始化开销 建造者模式&#xff1a;创建一个复杂对象&#xff0c;该对象…...

详解Vue3中的事件监听方式

本文主要介绍Vue3中的事件监听方式。 目录 一、v-on指令二、使用符号简写三、事件修饰符四、动态事件名五、常见的监听事件六、自定义事件 在Vue3中&#xff0c;事件监听的方式与Vue2有一些不同。 下面是Vue3中事件监听方式的详细介绍&#xff1a; 一、v-on指令 Vue3中仍然使…...

Unity关于easySave2 easySave3保存数据的操作;包含EasySave3运行报错的解决

关于easySave2 easySave3保存数据的操作&#xff1b;包含EasySave3运行报错的解决 /// 数据存储路径&#xff08;Easy Save的默认储存位置为&#xff1a;Application.persistentDataPath&#xff0c;为了方便我们可以给它指定储存路径&#xff09; #region 存储数据/*/// /// 存…...

2022年全球软件质量效能大会(QECon上海站)-核心PPT资料下载

一、峰会简介 近年来&#xff0c;以云计算、移动互联网、物联网、工业互联网、人工智能、大数据及区块链等新一代信息技术构建的智能化应用和产品出现爆发式增长&#xff0c;突破了对于软件形态的传统认知&#xff0c;正以各种展现方式诠释着对新型智能软件的定义。这也使得对…...

【python报错】UserWarning: train_labels has been renamed targets

UserWarning: train_labels has been renamed targetswarnings.warn(“train_labels has been renamed targets”) 这是一条 Python 警告信息&#xff0c;它表示 train_labels 这个变量已经被重命名为 targets&#xff0c;在将来的版本中可能会移除 train_labels。因此&#x…...

算法专题四:前缀和

前缀和 一.一维前缀和(模板)&#xff1a;1.思路一&#xff1a;暴力解法2.思路二&#xff1a;前缀和思路 二. 二维前缀和(模板)&#xff1a;1.思路一&#xff1a;构造前缀和数组 三.寻找数组的中心下标&#xff1a;1.思路一&#xff1a;前缀和 四.除自身以外数组的乘积&#xff…...

STM32学习笔记十五:WS2812制作像素游戏屏-飞行射击游戏(5)探索动画之帧动画

本章又是个重要的章节——动画。 动画&#xff0c;本质上时一系列静态的画面连续播放&#xff0c;欺骗人眼产生动画效果。这个原理自打十九世纪电影诞生开始&#xff0c;就从来没变过。 我们的游戏中也需要一些动画效果&#xff0c;比如&#xff0c;被击中时的受伤效果&#…...

期末复习(程序设计)

根据字符出现频率排序 【问题描述】 给定一个字符串 s &#xff0c;根据字符出现的 频率 对其进行降序排序。一个字符出现的频率是它出现在字符串中的次数。 返回已排序的字符串。 频率相同的的字符按ascii值降序排序。 s不包含空格、制表符、换行符等特殊字符。 【输入格…...

html-css-js移动端导航栏底部固定+i18n国际化全局

需求&#xff1a;要做一个移动端的仿照小程序的导航栏页面操作&#xff0c;但是这边加上了i18n国家化&#xff0c;由于页面切换的时候会导致国际化失效&#xff0c;所以写了这篇文章 1.效果 切换页面的时候中英文也会跟着改变&#xff0c;不会导致切换后回到默认的语言 2.实现…...

Ubuntu Linux 入门指南:面向初学者

目录 1. Ubuntu Linux 简介 Ubuntu 的由来 Ubuntu 与其他 Linux 发行版的比较 Debian&#xff1a; Fedora&#xff1a; openSUSE&#xff1a; Arch Linux&#xff1a; Linux Mint&#xff1a; 第二部分&#xff1a;安装 Ubuntu 1. 准备安装 系统需求 创建 Ubuntu 启…...

常见算法面试题目

前言 总结一些常见的算法题目&#xff0c;每一个题目写一行思路&#xff0c;方便大家复习。具体题目的来源是下面的网站。 剑指offer 剑指offe2 leetcode200题 leetcode 100题 leetcode150题 leetcode 75题 文章目录 前言二叉树非递归遍历牛客JZ31 栈的压入、弹出序列 (…...

PiflowX组件-JDBCWrite

JDBCWrite组件 组件说明 使用JDBC驱动向任意类型的关系型数据库写入数据。 计算引擎 flink 有界性 Sink: Batch Sink: Streaming Append & Upsert Mode 组件分组 Jdbc 端口 Inport&#xff1a;默认端口 outport&#xff1a;默认端口 组件属性 名称展示名称默…...

算法导论复习题目

这题需要考虑什么呢&#xff1f; 一换元&#xff0c;二要使用主方法猜出结果&#xff0c;三是证明的时候添加一个低阶项来消除 LC检索 C&#xff08;x&#xff09;是从上帝视角来看的成本 对C(x)的一个估计&#xff1a; 由两个部分组成&#xff0c;就相当于由以往的经验对未来…...

HTTPS协议详解

目录 前言 一、HTTPS协议 1、加密是什么 2、为什么要加密 二、常见加密方式 1、对称加密 2、非对称加密 三、数据摘要与数据指纹 1、数据摘要 2、数据指纹 四、HTTPS加密策略探究 1、只使用对称加密 2、只使用非对称加密 3、双方都使用非对称加密 4、对称加密非…...

菜鸟学习vue3笔记-vue3 router回顾

1、路由router pnpm i vue-router2、创建使用环境 1.src下创建 router文件夹、里面创建index.ts文件 //创建一个路由暴露出去//1.引入createRouter import { createRouter, createWebHistory } from "vue-router";// import Home from ../components/Home.vue//…...

Mybatis枚举类型处理和类型处理器

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…...

2023 NCTF writeup

CRYPTO Sign 直接给了fx,gx&#xff0c;等于私钥给了&#xff0c;直接套代码&#xff0c;具体可以参考&#xff1a; https://0xffff.one/d/1424 fx [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0…...

golang的大杀器协程goroutine

在Golang中&#xff0c;协程&#xff08;Goroutine&#xff09;是轻量级的执行单元&#xff0c;用于实现并发编程。它是Golang语言的重要组成部分&#xff0c;提供了简洁、高效的方式来处理并发任务。 特点&#xff1a; 1&#xff09;轻量级&#xff1a;Go语言的协程是轻量级…...

[Angular] 笔记 9:list/detail 页面以及@Output

1. Output input 好比重力&#xff0c;向下传递数据&#xff0c;list 传给 detail&#xff0c;smart 组件传给 dumb 组件&#xff0c;父组件传给子组件。input 顾名思义&#xff0c;输入数据给组件。 output 与之相反&#xff0c;好比火箭&#xff0c;向上传递数据或事件。ou…...

Linux学习笔记(一)

如果有自己的物理服务器请先查看这篇文章 文章目录 网卡配置Linux基础指令ls:列出目录内容cd(mkdir.rmkdir): 切换文件夹(创建,删除操作)cp:复制文件或目录mv:文件/文件夹移动cat:查看文件vi:文件查看编辑man:查看命令手册more: 查看文件内容less : 查看文件内容 ps: 显示当前进…...

Python 爬虫 教程

python爬虫框架&#xff1a;Scrapyd&#xff0c;Feapder&#xff0c;Gerapy 参考文章&#xff1a; python爬虫工程师&#xff0c;如何从零开始部署ScrapydFeapderGerapy&#xff1f; - 知乎 神器&#xff01;五分钟完成大型爬虫项目 - 知乎 爬虫框架-feapder - 知乎 scrap…...

uniapp原生插件 - android原生插件打包流程 ( 避坑指南一)

【彩带- 避坑知识点】: 当时开发中安卓插件打包成功后&#xff0c;uniapp引用插件aar&#xff0c;用云打包 &#xff0c;总是提示不包含插件。原因是因为module的androidManifest.xml文件没有注册activity。 这一步 很重要&#xff0c;一定要注册。 --------------------------…...

搭建maven私服

maven maven简介 什么是maven&#xff1f; Maven这个单词来自于意第绪语&#xff08;犹太语&#xff09;&#xff0c;意为知识的积累。 Maven项目对象模型(POM)&#xff0c;可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的项目管理工具软件。 Maven 除了以…...

EST-100身份证社保卡签批屏按捺终端PC版web版本http协议接口文档,支持web网页开发对接使用

<!DOCTYPE html><html lang"zh-CN"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width,initial-scale1.0"><title>演示DEMO</title><script type"text/…...

基于SpringBoot的毕业论文管理系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot的毕业论文管理系统,java…...

iToF人脸识别

iToF(间接飞行时间)是一种测量光飞行时间的技术,主要应用于人脸识别。 iToF人脸识别技术在哪些场景下会用到 iToF人脸识别技术可以应用于许多场景,以下是一些常见的应用场景: 平安城市:在城市监控系统中,iToF人脸识别技术可以用于实时监控、目标检测和识别,以及异常行为…...

Django开发3

Django开发3 Django开发编辑用户9.靓号管理9.1 表结构9.2 靓号列表9.3 新建靓号9.4 编辑靓号9.5 搜索手机号9.6 分页 10.时间插件11.ModelForm和BootStrap操作 各位小伙伴想要博客相关资料的话关注公众号&#xff1a;chuanyeTry即可领取相关资料&#xff01; Django开发 部门管…...