当前位置: 首页 > news >正文

Flink1.17实战教程(第六篇:容错机制)

系列文章目录

Flink1.17实战教程(第一篇:概念、部署、架构)
Flink1.17实战教程(第二篇:DataStream API)
Flink1.17实战教程(第三篇:时间和窗口)
Flink1.17实战教程(第四篇:处理函数)
Flink1.17实战教程(第五篇:状态管理)
Flink1.17实战教程(第六篇:容错机制)
Flink1.17实战教程(第七篇:Flink SQL)


文章目录

  • 系列文章目录
  • 1. 检查点(Checkpoint)
    • 1.1 检查点的保存
    • 1.2 从检查点恢复状态
    • 1.3 检查点算法
      • 1.3.1 检查点分界线(Barrier)
      • 1.3.2 分布式快照算法(Barrier对齐的精准一次)
      • 1.3.3 分布式快照算法(Barrier对齐的至少一次)
      • 1.3.4 分布式快照算法(非Barrier对齐的精准一次)
    • 1.4 检查点配置
      • 1.4.1 启用检查点
      • 1.4.2 检查点存储
      • 1.4.3 其它高级配置
      • 1.4.4 通用增量 checkpoint (changelog)
      • 1.4.5 最终检查点
    • 1.5 保存点(Savepoint)
      • 1.5.1 保存点的用途
      • 1.5.2 使用保存点
      • 1.5.3 使用保存点切换状态后端
  • 2. 状态一致性
    • 2.1 一致性的概念和级别
    • 2.2 端到端的状态一致性
  • 3. 端到端精确一次(End-To-End Exactly-Once)
    • 3.1 输入端保证
    • 3.2 输出端保证
    • 3.3 Flink和Kafka连接时的精确一次保证


1. 检查点(Checkpoint)

在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点

在这里插入图片描述

1.1 检查点的保存

1)周期性的触发保存
“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置

2)保存的时间点
我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。
这样做可以实现一个数据被所有任务(算子)完整地处理完,状态得到了保存。
如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。

3)保存的具体流程
检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。
回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的是:
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…
我们所需要的就是每个任务都处理完“hello”之后保存自己的状态。

1.2 从检查点恢复状态

检查点的保存具体流程:
在这里插入图片描述
处理数据过程发生故障
在这里插入图片描述
重启应用 ==>> 读取检查点,重置状态
在这里插入图片描述
重置偏移量
在这里插入图片描述
继续处理数据
在这里插入图片描述

1.3 检查点算法

在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。

1.3.1 检查点分界线(Barrier)

借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)

在这里插入图片描述

1.3.2 分布式快照算法(Barrier对齐的精准一次)

watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”算法。算法的核心就是两个原则:
当上游任务向多个并行下游任务发送barrier时,需要广播出去;
而当多个上游任务向同一个下游任务传递分界线时,需要在下游任务执行“分界线对齐”操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。

1)场景说明
在这里插入图片描述

2)检查点保存算法具体过程为:

1.触发检查点保存在这里插入图片描述
2.分界线向下游传递
在这里插入图片描述
3.分界线对齐
在这里插入图片描述
4.有状态算子将状态保存至持久化存储
在这里插入图片描述

(1)触发检查点:JobManager向Source发送Barrier;
(2)Barrier发送:向下游广播发送;
(3)Barrier对齐:下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存;
(4)状态保存:有状态的算子将状态保存至持久化。
(5)先处理缓存数据,然后正常继续处理

完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。

(补充)由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。

为了应对这种场景,Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。

1.3.3 分布式快照算法(Barrier对齐的至少一次)

1.触发检查点保存
在这里插入图片描述
2.分界线向下游传递
在这里插入图片描述
3.分界线对齐
在这里插入图片描述
4.有状态算子将状态保存至持久化存储
在这里插入图片描述

1.3.4 分布式快照算法(非Barrier对齐的精准一次)

1.触发检查点保存
在这里插入图片描述
2.分界线向下游传递
在这里插入图片描述
3.非Barrier对齐
在这里插入图片描述
在这里插入图片描述

1.4 检查点配置

检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。

1.4.1 启用检查点

默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为500毫秒,这种方式已经被弃用。
检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。

1.4.2 检查点存储

检查点具体的持久化存储位置,取决于“检查点存储”的设置。默认情况下,检查点存储在JobManager的堆内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口。
具体可以通过调用检查点配置的.setCheckpointStorage()来配置,需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage:作业管理器的堆内存和文件系统。

// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

对于实际生产应用,我们一般会将CheckpointStorage配置为高可用的分布式文件系统(HDFS,S3等)。

1.4.3 其它高级配置

检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置。

CheckpointConfig checkpointConfig = env.getCheckpointConfig();

1)常用高级配置

  • 检查点模式(CheckpointingMode)
    设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once就够用了,而且处理效率会更高。
  • 超时时间(checkpointTimeout)
    用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
  • 最小间隔时间(minPauseBetweenCheckpoints)
    用于指定在上一个检查点完成之后,检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,实际并发为1。
  • 最大并发检查点数量(maxConcurrentCheckpoints)
    用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。
  • 开启外部持久化存储(enableExternalizedCheckpoints)
    用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。
    DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
    RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
  • 检查点连续失败次数(tolerableCheckpointFailureNumber)
    用于指定检查点连续失败的次数,当达到这个次数,作业就失败退出。默认为0,这意味着不能容忍检查点失败,并且作业将在第一次报告检查点失败时失败。

2)开启非对齐检查点

  • 非对齐检查点(enableUnalignedCheckpoints)
    不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为exctly-once,并且最大并发的检查点个数为1。
  • 对齐检查点超时时间(alignedCheckpointTimeout)
    该参数只有在启用非对齐检查点的时候有效。参数默认是0,表示一开始就直接用非对齐检查点。如果设置大于0,一开始会使用对齐的检查点,当对齐时间超过该参数设定的时间,则会自动切换成非对齐检查点。

代码中具体设置如下:

public class CheckpointConfigDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");// TODO 检查点配置// 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 2、指定检查点的存储位置checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");// 3、checkpoint的超时时间: 默认10分钟checkpointConfig.setCheckpointTimeout(60000);// 4、同时运行中的checkpoint的最大数量checkpointConfig.setMaxConcurrentCheckpoints(1);// 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置了>0,并发就会变成1checkpointConfig.setMinPauseBetweenCheckpoints(1000);// 6、取消作业时,checkpoint的数据 是否保留在外部系统// DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删)// RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 7、允许 checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉checkpointConfig.setTolerableCheckpointFailureNumber(10);// TODO 开启 非对齐检查点(barrier非对齐)// 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1checkpointConfig.enableUnalignedCheckpoints();// 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点// 如果大于0, 一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));env.socketTextStream("hadoop102", 7777).flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1).print();env.execute();}
}

1.4.4 通用增量 checkpoint (changelog)

在 1.15 之前,只有RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
Rocksdb状态后端启用增量checkpoint:

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

从 1.15 开始,不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint。

1)执行过程

(1)带状态的算子任务将状态更改写入变更日志(记录状态)
在这里插入图片描述
(2)状态物化:状态表定期保存,独立于检查点
在这里插入图片描述
(3)状态物化完成后,状态变更日志就可以被截断到相应的点
在这里插入图片描述

2)注意事项

(1)目前标记为实验性功能,开启后可能会造成资源消耗增大:
HDFS上保存的文件数变多
消耗更多的IO带宽用于上传变更日志
更多的CPU用于序列化状态更改
TaskManager使用更多内存来缓存状态更改
(2)使用限制:
Checkpoint的最大并发必须为1
从 Flink 1.15 开始,只有文件系统的存储类型实现可用(memory测试阶段)
不支持 NO_CLAIM 模式

3)使用方式
(1)方式一:配置文件指定

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem 
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog 
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM

(2)方式二:在代码中设置
需要引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope>
</dependency>

开启changelog:

env.enableChangelogStateBackend(true);

1.4.5 最终检查点

如果数据源是有界的,就可能出现部分Task已经处理完所有数据,变成finished状态,不继续工作。从 Flink 1.14 开始,这些finished状态的Task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

1.5 保存点(Savepoint)

除了检查点外,Flink还提供了另一个非常独特的镜像保存功能——保存点(savepoint)
从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。

1.5.1 保存点的用途

保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。
保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:

  • 版本管理和归档存储
  • 更新Flink版本
  • 更新应用程序
  • 调整并行度
  • 暂停应用程序

需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定:

DataStream<String> stream = env.addSource(new StatefulSource()).uid("source-id").map(new StatefulMapper()).uid("mapper-id").print();

对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。

1.5.2 使用保存点

保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。

(1)创建保存点
要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

bin/flink savepoint :jobId [:targetDirectory]

这里jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径。
对于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定:

state.savepoints.dir: hdfs:///flink/savepoints

当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId

(2)从保存点重启应用
我们已经知道,提交启动一个Flink作业,使用的命令是flink run;现在要从保存点重启一个应用,其实本质是一样的:

bin/flink run -s :savepointPath [:runArgs]

这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的,如果是基于yarn的运行模式还需要加上 -yid application-id。我们在第三章使用web UI进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。

1.5.3 使用保存点切换状态后端

使用savepoint恢复状态的时候,也可以更换状态后端。但是有一点需要注意的是,不要在代码中指定状态后端了, 通过配置文件来配置或者-D 参数配置。
打包时,服务器上有的就provided,可能遇到依赖问题,报错:javax.annotation.Nullable找不到,可以导入如下依赖:

        <dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency>

(1)提交flink作业

bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar

(2)停止flink作业时,触发保存点
方式一:stop优雅停止并触发保存点,要求source实现StoppableFunction接口

bin/flink stop -p savepoint路径 job-id -yid application-id

方式二:cancel立即停止并触发保存点

bin/flink cancel -s savepoint路径 job-id -yid application-id

案例中source是socket,不能用stop

bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_0001

(3)从savepoint恢复作业,同时修改状态后端

bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backend=rocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar

(4)从保存下来的checkpoint恢复作业

bin/flink run-application -d -t yarn-application -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar

如果停止作业时,忘了触发保存点也不用担心,现在版本的flink支持从保留在外部系统的checkpoint恢复作业,但是恢复时不支持切换状态后端

2. 状态一致性

2.1 一致性的概念和级别

一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估。
流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”
一般说来,状态一致性有三种级别:

  • 最多一次(At-Most-Once)
  • 至少一次(At-Least-Once)
  • 精确一次(Exactly-Once)

2.2 端到端的状态一致性

我们已经知道检查点可以保证Flink内部状态的一致性,而且可以做到精确一次。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?
没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换,还涉及到从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫做“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到at-least-once一致性级别,主要看数据源能够重放数据;而能否达到exactly-once级别,流处理器内部、数据源、外部存储都要有相应的保证机制。

3. 端到端精确一次(End-To-End Exactly-Once)

实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”。我们知道,对于Flink内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到exactly-once的一致性语义了。
所以端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。

在这里插入图片描述

3.1 输入端保证

输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了,例如socket文本流。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证at-most-once的一致性语义,相当于没有保证。

想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。

数据源可重放数据,或者说可重置读取数据偏移量,加上Flink的Source算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求,当然也是实现端到端exactly-once的基本要求。

3.2 输出端保证

有了Flink的检查点机制,以及可重放数据的外部数据源,我们已经能做到at-least-once了。但是想要实现exactly-once却有更大的困难:数据有可能重复写入外部系统。

因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对Flink内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。

所以这时,我们只保证了端到端的at-least-once语义。

为了实现端到端exactly-once,我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种:

  • 幂等写入
  • 事务写入

我们需要外部存储系统对这两种写入方式的支持,而Flink也为提供了一些Sink连接器接口。接下来我们进行展开讲解。

1)幂等(Idempotent)写入
所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。

这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如Redis中键值存储,或者关系型数据库(如MySQL)中满足查询条件的更新操作。

需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的。

2)事务(Transactional)写入
如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。

输出端最大的问题,就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。

事务是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消。事务有四个基本特性:原子性、一致性、隔离性和持久性,这就是著名的ACID。

在Flink流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。

具体来说,又有两种实现方式:预写日志(WAL)两阶段提交(2PC)
(1)预写日志(write-ahead-log,WAL)
我们发现,事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,能够实现事务写入呢?

预写日志(WAL)就是一种非常简单的方式。具体步骤是:
①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统。
④在成功写入所有数据后,在内部再次确认相应的检查点,将确认信息也进行持久化保存。这才代表着检查点的真正完成。

我们会发现,这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在Flink中DataStream API提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。

需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。

但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。

(2)两阶段提交(two-phase-commit,2PC)
前面提到的各种实现exactly-once的方式,多少都有点缺陷;而更好的方法就是传说中的两阶段提交(2PC)。

顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持。

具体的实现步骤为:
①当第一条数据到来时,或者收到检查点的分界线时,Sink任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
③当Sink任务收到JobManager发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。
当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC)的方式充分利用了Flink现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自JobManager的checkpoint成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以2PC协议不仅真正意义上实现了exactly-once,而且通过搭载Flink的检查点机制来实现事务,只给系统增加了很少的开销。

Flink提供了TwoPhaseCommitSinkFunction接口,方便我们自定义实现两阶段提交的SinkFunction的实现,提供了真正端到端的exactly-once保证。新的Sink架构,使用的是TwoPhaseCommittingSink接口。
不过两阶段提交虽然精巧,却对外部系统有很高的要求。这里将2PC对外部系统的要求列举如下:

  • 外部系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务。
  • 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
  • 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
  • Sink任务必须能够在进程失败后恢复事务。
  • 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。

可见,2PC在实际应用同样会受到比较大的限制。具体在项目中的选型,最终还应该是一致性级别和处理性能的权衡考量。

3.3 Flink和Kafka连接时的精确一次保证

在流处理的应用中,最佳的数据源当然就是可重置偏移量的消息队列了;它不仅可以提供数据重放的功能,而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表,Kafka可以说与Flink是天作之合,实际项目中也经常会看到以Kafka作为数据源和写入的外部系统的应用。在本小节中,我们就来具体讨论一下Flink和Kafka连接时,怎样保证端到端的exactly-once状态一致性。
在这里插入图片描述
1)整体介绍
既然是端到端的exactly-once,我们依然可以从三个组件的角度来进行分析:

(1)Flink内部
Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。
(2)输入端
输入数据源端的Kafka可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在Source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
(3)输出端
输出端保证exactly-once的最佳实现,当然就是两阶段提交(2PC)。作为与Flink天生一对的Kafka,自然需要用最强有力的一致性保证来证明自己。
也就是说,我们写入Kafka的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

2)需要的配置
在具体应用中,实现真正的端到端exactly-once,还需要有一些额外的配置:

(1)必须启用检查点
(2)指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置
为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");// TODO 1、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// TODO 2.读取kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setGroupId("atguigu").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*** TODO 3.写出到Kafka* 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint* 2、sink设置保证级别为 精准一次* 3、sink设置事务前缀* 4、sink设置事务超时时间: checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// TODO 3.1 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// TODO 3.2 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// TODO 3.3 精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个topic的消费者,要设置事务的隔离级别为“读已提交”,如下:

public class KafkaEOSDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用两阶段提交写入的TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setGroupId("atguigu").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// TODO 作为 下游的消费者,要设置 事务的隔离级别 = 读已提交.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

相关文章:

Flink1.17实战教程(第六篇:容错机制)

系列文章目录 Flink1.17实战教程&#xff08;第一篇&#xff1a;概念、部署、架构&#xff09; Flink1.17实战教程&#xff08;第二篇&#xff1a;DataStream API&#xff09; Flink1.17实战教程&#xff08;第三篇&#xff1a;时间和窗口&#xff09; Flink1.17实战教程&…...

OpenCV实战 -- 维生素药片的检测记数

文章目录 检测记数原图经过操作开始进行消除粘连性--形态学变换总结实现方法1. 读取图片&#xff1a;2. 形态学处理&#xff1a;3. 二值化&#xff1a;4. 提取轮廓&#xff1a;5. 轮廓筛选和计数&#xff1a; 分水岭算法&#xff1a;逐行解释在基于距离变换的分水岭算法中&…...

【AI】注意力机制与深度学习模型

目录 一、注意力机制 二、了解发展历程 2.1 早期萌芽&#xff1a; 2.2 真正意义的注意力机制&#xff1a; 2.3 2015 年及以后&#xff1a; 2.4 自注意力与 Transformer&#xff1a; 2.5 BERT 与预训练模型&#xff1a; 三、基本框架 1. 打分函数&#xff08;Score Fun…...

HTML5和JS实现新年礼花效果

HTML5和JS实现新年礼花效果 2023兔年再见&#xff0c;2024龙年来临了&#xff01; 祝愿读者朋友们在2024年里&#xff0c;身体健康&#xff0c;心灵愉悦&#xff0c;梦想成真。 下面是用HTML5和JS实现新年礼花效果&#xff1a; 源码如下&#xff1a; <!DOCTYPE html>…...

【owt-server】一些构建项目梳理

【owt-server】清理日志&#xff1a;owt、srs、ffmpeg 【owt】p2p client mfc 工程梳理【m98】webrtc vs2017构建带符号的debug库【OWT】梳理构建的webrtc和owt mfc工程 m79的mfc客户端及owt-client...

Linux shell编程学习笔记38:history命令

目录 0 前言 1 history命令的功能、格式和退出状态1.1 history命令的功能1.2 history命令的格式1.3退出状态2 命令应用实例2.1 history&#xff1a;显示命令历史列表2.2 history -a&#xff1a;将当前会话的命令行历史追加到历史文件~/.bash_history中2.3 history -c&#xf…...

elasticsearch安装教程(超详细)

1.1 创建网络&#xff08;单点部署&#xff09; 因为我们还需要部署 kibana 容器&#xff0c;因此需要让 es 和 kibana 容器互联&#xff0c;所有先创建一个网络&#xff1a; docker network create es-net 1.2.加载镜像 采用的版本为 7.12.1 的 elasticsearch&#xff1b;…...

arkts中@Watch监听的使用

概述 Watch用于监听状态变量的变化&#xff0c;当状态变量变化时&#xff0c;Watch的回调方法将被调用。Watch在ArkUI框架内部判断数值有无更新使用的是严格相等&#xff08;&#xff09;&#xff0c;遵循严格相等规范。当在严格相等为false的情况下&#xff0c;就会触发Watch的…...

【Jmeter】Jmeter基础9-BeanShell介绍

3、BeanShell BeanShell是一种完全符合Java语法规范的脚本语言,并且又拥有自己的一些语法和方法。 3.1、Jmeter中使用的BeanShell 在Jmeter中&#xff0c;除了配置元件&#xff0c;其他类型的元件中都有BeanShell。BeanShell 是一种完全符合Java语法规范的脚本语言,并且又拥…...

详解数组的轮转

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…...

html 表格 笔记

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>第二个页面</title><meta name"language" content"cn"> </head> <body><h2 sytle"width:500px;…...

计算机网络【HTTP 面试题】

HTTP的请求报文结构和响应报文结构 HTTP请求报文主要由请求行、请求头、空行、请求正文&#xff08;Get请求没有请求正文&#xff09;4部分组成。 1、请求行 由三部分组成&#xff0c;分别为&#xff1a;请求方法、URL以及协议版本&#xff0c;之间由空格分隔&#xff1b;请…...

linux基于用户身份对资源访问进行控制的解析及过程

linux中用户分为三类 1.超级用户&#xff08;root&#xff09; 拥有至高无上的权限 2.普通用户 人为创建、权限小&#xff0c;权限受到控制 3.程序用户 运行程序的用户&#xff0c;不是给人使用的&#xff0c;给程序使用的&#xff0c;一般不给登录&#xff01; 组账…...

手动创建idea SpringBoot 项目

步骤一&#xff1a; 步骤二&#xff1a; 选择Spring initializer -> Project SDK 选择自己的JDK版本 ->Next 步骤三&#xff1a; Maven POM ->Next 步骤四&#xff1a; 根据JDK版本选择Spring Boot版本 11版本及以上JDK建议选用3.2版本&#xff0c;JDK为11版本…...

【Go语言入门:Go语言的数据结构】

文章目录 3.Go语言的数据结构&#xff1a;3.1. 指针3.2. struct&#xff08;结构体&#xff09;3.3. Map(映射,哈希&#xff09; 3.Go语言的数据结构&#xff1a; 简介&#xff1a; 在Go语言中&#xff0c;数据结构体可以分为四种类型&#xff1a;基础类型、聚合类型、引用类型…...

QT designer的ui文件转py文件之后,实现pycharm中运行以方便修改逻辑,即添加实时模板框架

为PyCharm中的实时模板&#xff0c;你需要遵循以下步骤&#xff1a; 打开PyCharm的设置: 选择 File > Settings&#xff08;在macOS上是 PyCharm > Preferences&#xff09;。 导航到实时模板: 在设置中找到 Editor > Live Templates。 添加新的模板组 (可选): 为了…...

什么是负载均衡?

负载均衡是指在计算机网络领域中&#xff0c;将客户端请求分配到多台服务器上以实现带宽资源共享、优化资源利用率和提高系统性能的技术。负载均衡可以帮助小云有效解决单个服务器容量不足或性能瓶颈的问题&#xff0c;小云通过平衡流量负载&#xff0c;使得多台服务器能够共同…...

Python和Java的优缺点

Python的优点&#xff1a; 简单易学&#xff1a;Python的语法简洁清晰&#xff0c;易于学习和理解。丰富的库和框架&#xff1a;Python拥有庞大的标准库和活跃的开源社区&#xff0c;可以快速使用各种功能强大的库和框架&#xff0c;比如NumPy、Pandas、Django等。可读性强&am…...

AES - 在tiny-AES-c基础上封装了2个应用函数(加密/解密)

文章目录 AES - 在tiny-AES-c基础上封装了2个应用函数(加密/解密)概述增加2个封装函数的AES库aes.haes.c在官方测试程序上改的测试程序(用来测试这2个封装函数)END AES - 在tiny-AES-c基础上封装了2个应用函数(加密/解密) 概述 在github山有个星数很高的AES的C库 tiny-AES-c …...

51和32单片机读取FSR薄膜压力传感器压力变化

文章目录 简介线性电压转换模块51单片机读取DO接线方式51代码实验效果 32单片机读取AO接线方式32代码实验效果 总结 简介 FSR薄膜压力传感器是可以将压力变化转换为电阻变化的一种传感器&#xff0c;单片机可以读取然后作为粗略测量压力&#xff08;仅提供压力变化&#xff0c;…...

【maven】pom.xml 文件详解

有关 maven 其他配置讲解参考 maven 配置文件 setting.xml 详解 pom.xml 文件是 Maven 项目的核心配置文件&#xff0c;其中包含了项目的元数据、构建配置、依赖管理等信息。以下是一个 pom.xml 文件的主要部分&#xff1a; <?xml version"1.0" encoding"U…...

SpringMVC源码解析——DispatcherServlet初始化

在Spring中&#xff0c;ContextLoaderListener只是辅助功能&#xff0c;用于创建WebApplicationContext类型的实例&#xff0c;而真正的逻辑实现其实是在DispatcherServlet中进行的&#xff0c;DispatcherServlet是实现Servlet接口的实现类。Servlet是一个JAVA编写的程序&#…...

搞定Apache Superset

踩雷了无数次终于解决了Superset的一系列问题 现在是北京时间2023年12月27日&#xff0c;亲测有效。 Superset概述 Apache Superset是一个现代的数据探索和可视化平台。它功能强大且十分易用&#xff0c;可对接各种数据源&#xff0c;包括很多现代的大数据分析引擎&#xff…...

【每日试题】java面试之ssm框架

以下是20道常见的SSM&#xff08;SpringSpring MVCMyBatis&#xff09;面试题目和答案&#xff1a; 什么是SSM框架&#xff1f; SSM是指SpringSpring MVCMyBatis的组合&#xff0c;它是Java Web开发中常用的轻量级框架集合。 介绍一下SSM框架各个组件的作用&#xff1f; Sprin…...

Flutter 疑难杂症集合

一. Flutter集成uni小程序sdk 1. 手机连接电脑测试打开uni小程序没问题&#xff0c;打包成apk后debug编译下的apk也没问题&#xff0c;但就是release编译的apk包打不开小程序。 报错情景&#xff1a;点击后页面会闪现一下黑色的背景&#xff0c;然后又跳转回了点击之前的页面。…...

PHP序列化总结1--序列化和反序列化的基础知识

序列化和反序列化的作用 1.序列化&#xff1a;将对象转化成数组或者字符串的形式 2.反序列化&#xff1a;将数组或字符串的形式转化为对象 为什么要进行序列化 这种数据形式中间会有很多空格&#xff0c;不同人有不同的书写情况&#xff0c;可能还会出现换行的情况 为此为了…...

【Linux】 last 命令使用

last 命令 用于检索和展示系统中用户的登录信息。它从/var/log/wtmp文件中读取记录&#xff0c;并将登录信息按时间顺序列出。 著者 Miquel van Smoorenburg 语法 last [-R] [-num] [ -n num ] [-adiox] [ -f file ] [name...] [tty...]last 命令 -Linux手册页 选项及作用…...

Git 分布式版本控制系统(序章1)

第一章 Git 分布式版本控制系统 为什么学Git? 某些企业面试需要掌握Git&#xff0c;同时&#xff0c;也方便管理自己的Qt项目。 一、Git 客户端下载&#xff08;Windows&#xff09; 下载地址 https://gitee.com/all-about-git#git-%E5%A4%A7%E5%85%A8 二、Git 的特点 分支…...

给WordPress网站添加返回顶部按钮

给WordPress网站底部添加一个按钮&#xff0c;点它就可以现实快速返回到顶部。有两种方法可以现实&#xff0c;一种是通过安装相关插件来实现。另外一种方式就是以纯属代码的方式来实现。 给WordPress网站底部添加一个按钮&#xff0c;点它就可以现实快速返回到顶部。有两种方…...

App Inventor 2 接入短信服务,实现短信验证码功能

发送短信验证码功能一般都是基于短信平台提供的sdk进行调用&#xff0c;这里是基于阿里云短信平台进行的开发&#xff0c;阿里云短信平台接入步骤请点此参考。 App Inventor 2拓展提供的函数如下&#xff1a; 主要提供2个函数&#xff0c;生成随机位数的数字随机码 和 发送短信…...

Linux环境grep搜索方法记录

1 grep grep 命令&#xff0c;用来搜索字符串所在位置&#xff0c;可以具体到不同文件&#xff0c;不同行&#xff1b; 在Linux 下&#xff0c;查看命令释义如下 zhaocubuntu2004:~$ grep --help Usage: grep [OPTION]... PATTERNS [FILE]... Search for PATTERNS in each FI…...

C语言-破解密码

题目描述 密码是我们生活中非常重要的东东&#xff0c;我们的那么一点不能说的秘密就全靠它了。哇哈哈. 接下来渊子要在密码之上再加一套密码&#xff0c;虽然简单但也安全。 假设老王原来一个BBS上的密码为zvbo941987,为了方便记忆&#xff0c;他通过一种算法把这个密码变换…...

ffmpeg 解码文件时的时间戳问题

实时流和普通文件 1 实时流 实时流编码时&#xff0c;我们一般不进行b帧编码&#xff0c;但是文件存储时为了减小大小&#xff0c;会增加b帧&#xff0c;实时流只带了I&#xff0c;P帧&#xff0c;那就会好很多 2 普通文件 很多文件带了b帧&#xff0c;所以要使用解码时间去同…...

Java企业电子招投标系统源代码,支持二次开发,采用Spring cloud框架

在数字化采购领域&#xff0c;企业需要一个高效、透明和规范的管理系统。通过采用Spring Cloud、Spring Boot2、Mybatis等先进技术&#xff0c;我们打造了全过程数字化采购管理平台。该平台具备内外协同的能力&#xff0c;通过待办消息、招标公告、中标公告和信息发布等功能模块…...

[python]基于faster whisper实时语音识别语音转文本

语音识别转文本相信很多人都用过&#xff0c;不管是手机自带&#xff0c;还是腾讯视频都附带有此功能&#xff0c;今天简单说下&#xff1a; faster whisper地址&#xff1a; https://github.com/SYSTRAN/faster-whisperhttps://link.zhihu.com/?targethttps%3A//github.com…...

2023纠结中前行? 2024继续还是放下?

喝下2023年的第一口雪碧&#xff0c;没有想像中的那么期待&#xff0c;甜水&#xff0c;放弃吧&#xff1b;还是吃些水果吧&#xff0c;不行吃块肉、喝两口酒~ 关于生活 挣扎了10几年的一颗牙“终于“掉了&#xff0c;几个月时间都在为新牙努力着&#xff1b;”进了医院就不在…...

原型链补充

1.什么是原型对象 函数的独有属性,他用prototype来表示,可以在函数的prototype上挂载一些公用的属性和方法,供实例化对象来访问。 2.__proto__属性 这个属性每一个对象都有,实例化对象就是通过这个属性,来访问原型对象上的属性和方法的。 3.三者之间的关系 1.在构造函数的原型…...

《Linux Nano命令详解:小而强大的文本编辑器》

《Linux Nano命令详解&#xff1a;小而强大的文本编辑器》 引言&#xff1a; 在Linux系统中&#xff0c;文本编辑是开发和系统管理中不可或缺的一部分。虽然有许多强大的文本编辑器可供选择&#xff0c;但Nano以其简单易用、小巧灵活而备受喜爱。本文将深入探讨Nano命令&…...

系列四、Eureka自我保护

一、Eureka自我保护 1.1、故障现象 保护模式主要用于一组客户端和Eureka Server之间存在网络分区场景下的保护。一旦进入保护模式&#xff0c;Eureka Server将会尝试保护其服务注册表中的信息&#xff0c;不再删除服务注册表中的数据&#xff0c;也就是不会注销任何微服务。如…...

C++回调函数-实操(二)

回调通常通过函数指针、函数对象&#xff08;仿函数&#xff09;、Lambda 表达式或者 std::function 来实现。 1、函数指针实现回调 这一方法实现回调比较好记&#xff0c;就记住把函数当作参数传给方法&#xff0c;在方法中调用方法。 #include <iostream>// 回调函数…...

MySQL中常用的用户授权操作

mysql 用户授权 1 &#xff09;概述 让每个应用程序&#xff0c;单独开一个mysql的用户权限所有mysql用户存储在 mysql库的user表中 2 ) 多种用户授权方式示例 show databases; use mysql;select user, authentication_string, host from mysql.user;-- 创建和删除用户 -- c…...

LabVIEW开发智能火灾自动报警系统

LabVIEW开发智能火灾自动报警系统 系统基于LabVIEW虚拟仪器开发&#xff0c;由火灾报警控制器、感温感烟探测器、手动报警器、声光报警器、ZigBee无线通讯节点以及上位机电脑等组成&#xff0c;展示了LabVIEW在智能化火灾预警与控制方面的应用。该系统通过结合二总线协议和Zig…...

Vagrant使用教程

vmware下载地址&#xff1a;https://www.vmware.com/products/workstation-pro/workstation-pro-evaluation.html VirtualBox下载地址&#xff1a;https://www.virtualbox.org/wiki/Downloads Vagrant下载地址&#xff1a;https://developer.hashicorp.com/vagrant/install#…...

【Java】ThreadLocal原理与使用场景

ThreadLocal原理&#xff1a; 字段&#xff1a; //ThreadLocal对象的哈希码 private final int threadLocalHashCode nextHashCode();//生成ThreadLocal对象的哈希码时&#xff0c;需要用到该对象&#xff0c;从0开始 private static AtomicInteger nextHashCode new Atomic…...

软件测试/测试开发丨Linux进阶命令(curl、jq)

1、 curl 接口请求 curl是一个发起请求数据给服务器的工具curl支持的协议FTP、FTPS、HTTP、HTTPS、TFTP、SFTP、Gopher、SCP、Telnet、DICT、FILE、LDAP、LDAPS、IMAP、POP3、SMTP和RTSPcurl是一个非交互的工具 2、 curl 发起 get 请求 -G&#xff1a;使用get请求-d&#xf…...

模式识别与机器学习-SVM(带软间隔的支持向量机)

SVM&#xff08;带软间隔的支持向量机&#xff09; 软间隔思想的由来软间隔的引入 谨以此博客作为复习期间的记录。 软间隔思想的由来 在上一篇博客中&#xff0c;回顾了线性可分的支持向量机,但在实际情况中&#xff0c;很少有完全线性可分的情况&#xff0c;大部分线性可分…...

CentOS 7 firewalld+ipset+定时任务防御ssh暴力破解——筑梦之路

对于暴露在公网上的linux服务器&#xff0c;很容易被暴力破解登陆&#xff0c;为了增强服务器的安全性&#xff0c;因此对于ssh安全加固是很有必要的&#xff0c;这里主要介绍centos7 系统如何使用ipsetfirewalld定时任务来对ssh服务进行安全加固。 定义firewalld ipset fire…...

ElasticSearch的RestClient结合Sniffer提高可用性

一、背景 由于要安装分词器插件&#xff0c;所以需要重启ElasticSearch集群以使得新安装的插件生效 但是在重启集群的过程中&#xff0c;服务端代码却出现了大量错误&#xff0c;如下所示 java.net.ConnectException: Connection refused    at org.elasticsearch.client.R…...

【网络面试(2)】DNS原理-域名和IP地址的查询转换

从上一篇博客我们得知浏览器是如何生成了HTTP消息了&#xff0c;但是浏览器作为应用程序&#xff0c;是不具备向网络中发送请求的能力&#xff0c;而是需要委托给操作系统的内核协议栈来发送请求。在委托协议栈之前&#xff0c;浏览器还要做的一件事情就是将域名转换为IP地址。…...

【PHP】函数array_intersect、array_diff:从数组中取出、去除指定的几个键值

1.从数组中取出 &#xff1a;array_intersect 要从数组中取出指定的几个键值&#xff0c;可以使用 array_intersect_key 函数。以下是一个示例&#xff1a; $array [name > John,age > 30,email > johnexample.com,city > New York ];$keys [name, email];$resu…...