当前位置: 首页 > news >正文

【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理

文章目录

  • 01 基本概念
  • 02 工作原理
  • 03 优势与劣势
  • 04 核心组件
  • 05 Watermark 生成器 使用
  • 06 应用场景
  • 07 注意事项
  • 08 案例分析
    • 8.1 窗口统计数据不准
    • 8.2 水印是如何解决延迟与乱序问题?
    • 8.3 详细分析
  • 09 项目实战demo
    • 9.1 pom依赖
    • 9.2 log4j2.properties配置
    • 9.3 Watermark水印作业

01 基本概念

Watermark 是用于处理事件时间的一种机制,用于表示事件时间流的进展。在流处理中,由于事件到达的顺序和延迟,系统需要一种机制来衡量事件时间的进展,以便正确触发窗口操作等。Watermark 就是用来标记事件时间的进展情况的一种特殊数据元素。

02 工作原理

Watermark 的生成方式通常是由系统根据数据流中的事件来自动推断生成的。一般来说,系统会根据事件时间戳和一定的策略来生成 Watermark,以此来表示事件时间的进展。在 Flink 中,通常会有内置的 Watermark 生成器或者用户自定义的生成器来实现这个功能。

当一个 Watermark 被生成后,它会被发送到流处理的所有并行任务中。任务会根据接收到的 Watermark,将小于或等于 Watermark 的事件时间的数据触发相关操作(如窗口计算),以此来确保计算的正确性。

03 优势与劣势

优点:

  • Watermark 可以确保流处理系统正确处理事件时间,避免了由于乱序和延迟引起的计算错误。
  • 可以根据业务需求和数据特征灵活调整 Watermark 生成的策略,以适应不同的场景。
  • Watermark 的引入使得流处理系统更具健壮性,能够处理各种实时数据场景。

缺点:

  • Watermark 的生成可能会带来一定的开销,尤其是在数据量庞大、事件频繁的情况下,可能会对系统性能产生一定影响。
  • 对于某些特殊的场景,例如极端乱序或者延迟过大的情况,Watermark 可能无法完全解决事件时间处理的问题。

04 核心组件

  • Apache Flink中的水印(Watermark)是事件时间处理的核心组件之一,它用于解决无序事件流中的事件时间问题。水印是一种元数据,用于告知系统事件时间流的进度,从而使系统能够在处理延迟的数据时做出正确的决策。

    以下是Flink中水印的核心组件:

    1. Watermark生成器(Watermark Generator)
      • Watermark生成器负责生成水印,并将其插入到数据流中。
      • 水印生成的策略通常与数据源有关。例如,对于有序的数据源,可以根据数据的事件时间直接生成水印;对于无序数据源,则可能需要一些启发式方法来生成水印。
    2. AssignerWithPeriodicWatermarks
      • 这是一个Flink提供的接口,用于在数据流中分配水印。
      • 实现此接口的类需要实现两个方法:extractTimestamp()用于提取事件时间戳,getCurrentWatermark()用于生成当前水印。
    3. AssignerWithPunctuatedWatermarks
      • 与上述相似,但是这个接口适用于在特定条件下(例如特定的事件)生成水印的场景。
    4. Watermark延迟(Watermark Lag)
      • 衡量系统中水印到达事件流的延迟程度。通常,水印到达得越快,系统对事件时间处理的准确性就越高。
    5. Watermark策略(Watermarking Strategy)
      • 这是一个配置项,用于确定水印生成的策略。可以基于固定的时间间隔生成水印,也可以根据事件流的特性进行自适应调整。
    6. Watermark传递和处理
      • Flink通过数据流将水印传递给各个操作符(operators),从而确保水印在整个流处理拓扑中传递。
      • 在处理过程中,水印用于确定事件时间窗口(Event Time Windows)的关闭时机,以及触发一些基于事件时间的操作,如触发窗口计算等。
    7. 处理水印(Handling Watermarks)
      • 在窗口计算等操作中,Flink需要根据水印来判断是否可以触发计算操作,以此保证结果的正确性和完整性。

    水印的核心作用在于解决事件时间处理中的乱序问题,通过适当的水印策略和生成机制,可以有效地处理延迟数据和乱序数据,保证数据处理的准确性和时效性。

05 Watermark 生成器 使用

在 Apache Flink 中,提供了一些内置的 Watermark 生成器,这些生成器可以用于简化在流处理中的 Watermark 管理。以下是一些常用的内置 Watermark 生成器:

  1. BoundedOutOfOrdernessTimestampExtractor:

    • 描述: 这是 Flink 内置的基于有界乱序时间的 Watermark 生成器。

    • 用法: 用户可以通过指定最大允许的乱序时间来创建一个 BoundedOutOfOrdernessTimestampExtractor 实例。通常情况下,用户需要实现 extractTimestamp 方法,从事件中提取事件时间戳。

    • 示例:

      public class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<MyEvent> {public MyTimestampExtractor(Time maxOutOfOrderness) {super(maxOutOfOrderness);}@Overridepublic long extractTimestamp(MyEvent event) {return event.getTimestamp();}
      }
      
  2. AscendingTimestampExtractor:

    • 描述: 这是一个简单的 Watermark 生成器,适用于按照事件时间戳升序排列的数据流。

    • 用法: 用户只需实现 extractAscendingTimestamp 方法,从事件中提取事件时间戳。

    • 示例:

      public class MyAscendingTimestampExtractor extends AscendingTimestampExtractor<MyEvent> {@Overridepublic long extractAscendingTimestamp(MyEvent event) {return event.getTimestamp();}
      }
      
  3. AssignerWithPunctuatedWatermarks:

    • 描述: 这是一种特殊类型的 Watermark 生成器,它可以基于某些事件的属性产生 Watermark。

    • 用法: 用户需要实现 checkAndGetNextWatermark 方法,根据事件的某些属性来判断是否生成 Watermark。

    • 示例:

      public class MyPunctuatedWatermarkAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getTimestamp();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {// 根据 lastElement 的某些属性判断是否生成 Watermarkif (lastElement.getProperty() > threshold) {return new Watermark(extractedTimestamp);}return null; // 如果不生成 Watermark,则返回 null}
      }
      

这些内置的 Watermark 生成器提供了灵活性和方便性,使得在 Flink 中实现基于事件时间的处理变得更加容易。根据具体的业务需求和数据特征,可以选择合适的 Watermark 生成器来确保准确的事件时间处理。

06 应用场景

在Apache Flink 1.18中,水印(Watermark)是事件时间处理的核心组件,用于解决事件时间流处理中的乱序和延迟数据的问题。下面是一些Flink 1.18中集成Watermark水印的应用场景:

  1. 流式窗口操作
    • 在流式处理中,经常需要对事件进行窗口化操作,例如按时间窗口、会话窗口等进行聚合计算。Watermark的到达可以作为触发窗口计算的信号,确保窗口在事件时间上的正确性。这种情况下,Watermark能够确保窗口内的数据已经全部到达,可以进行聚合计算,同时还能够处理延迟的数据。
  2. 处理乱序数据
    • 在实际的数据流中,事件通常不会按照严格的时间顺序到达,可能存在乱序的情况。Watermark可以帮助系统理清事件的先后顺序,确保在事件时间上的正确性。通过适当设置Watermark的生成策略,可以根据数据特性来处理乱序数据,保证数据处理的正确性。
  3. 事件时间窗口计算
    • 在处理事件时间窗口时,Watermark起到了关键作用。它确定了窗口的关闭时机,即在Watermark达到窗口的结束时间时,系统可以安全地关闭该窗口,并对其中的数据进行计算。这确保了窗口计算的正确性,同时也能够处理延迟数据,使得窗口计算能够在数据到达时即时进行。
  4. 处理迟到的数据
    • Watermark还可以用于处理迟到的数据,即已经超过窗口关闭时限但仍然到达的数据。通过设置适当的延迟容忍阈值,可以容忍一定程度的迟到数据,并将其纳入窗口计算中。这样可以提高数据处理的完整性和准确性。
    • 实时数据监控和异常检测
    • 在实时数据流中,通常需要对数据进行实时监控和异常检测。Watermark可以用于确定事件时间的进度,从而实现实时监控和异常检测。例如,可以基于事件时间窗口对数据进行统计分析,发现突发的异常情况,并及时采取相应的措施。

总的来说,Flink 1.18中集成Watermark水印的应用场景涵盖了广泛的实时数据处理领域,包括流式窗口操作、处理乱序数据、事件时间窗口计算、处理迟到的数据以及实时数据监控和异常检测等方面。Watermark作为事件时间处理的核心组件,为Flink提供了处理实时数据流的强大功能,能够确保数据处理的准确性和时效性。

07 注意事项

Apache Flink 中水印(Watermark)的使用是关键的,特别是在处理事件时间(Event Time)数据时。水印是一种机制,用于处理无序事件流,并确保在执行窗口操作时数据的完整性和正确性。以下是在使用 Flink 1.18 中水印的一些注意事项:

  1. 水印生成器(Watermark Generators)的选择
    • Flink 提供了多种内置的水印生成器,如 BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor。
    • BoundedOutOfOrdernessTimestampExtractor 适用于处理带有乱序的数据流,它会为每个事件引入一定的延迟。
    • AscendingTimestampExtractor 适用于处理按事件顺序到达的数据流,它假定数据已经按照事件时间排序。
  2. 水印延迟(Watermark Lag)的设置
    • 设置水印延迟是非常重要的,它决定了 Flink 在处理数据时能够容忍的事件延迟时间。
    • 如果设置的水印延迟过小,可能会导致窗口操作不正确,因为 Flink 认为某些事件已经到达,但实际上它们还没有到达。
    • 如果设置的水印延迟过大,可能会导致窗口操作的延迟增加,因为 Flink 需要等待更长时间以确保数据的完整性。
  3. 数据源的处理
    • 在读取数据源时,确保正确地分配时间戳并生成水印。这通常需要在数据源的读取逻辑中明确指定时间戳和水印生成的逻辑。
  4. 水印与窗口操作的关系
    • 在执行窗口操作(如窗口聚合、窗口计算等)时,水印的生成和处理是至关重要的。
    • 水印确保在触发窗口计算时,Flink 已经收到了窗口结束时间之前的所有数据,从而确保计算结果的准确性。
  5. 定期检查水印生成是否正常
    • 在部署 Flink 作业时,建议定期检查水印的生成情况。可以通过 Flink 的监控界面或日志来查看水印的生成情况,并根据需要调整水印生成的逻辑和设置。
  6. 监控和调试
    • 在使用水印时,需要重点关注作业的监控和调试,以确保水印的生成和处理是符合预期的。
    • 如果发现数据延迟或窗口计算不正确,可以通过监控数据流和日志来定位和解决问题,可能需要调整水印的生成逻辑或调整水印延迟来改善作业的性能和准确性。
  7. 数据倾斜和性能优化
    • 在使用水印时,需要注意数据倾斜可能会影响水印的生成和处理性能。可以通过合理的数据分片和并行处理来减轻数据倾斜带来的影响,从而提高作业的性能和稳定性。

总的来说,水印在 Flink 中的使用是非常重要的,它能够确保在处理事件时间数据时保持数据的完整性和正确性。因此,在设计和部署 Flink 作业时,需要特别注意水印的生成和处理,以确保作业能够正确运行并获得良好的性能表现。

08 案例分析

8.1 窗口统计数据不准

当涉及到事件时间处理时,延迟和乱序是非常常见的情况。下面是一个简单的案例,演示了在事件时间处理中可能遇到的延迟和乱序问题。

假设我们有一个用于监控网站用户访问的实时数据流。每个事件都包含用户ID、访问时间戳和访问的网页URL。我们想要计算每个用户在每小时内访问的不同网页数量。

考虑到网络传输和数据处理可能会引入延迟和乱序,我们的数据流可能如下所示:

Event 1: {UserID: 1, Timestamp: 12:00:05, URL: "example.com/page1"}
Event 2: {UserID: 2, Timestamp: 12:00:10, URL: "example.com/page2"}
Event 3: {UserID: 1, Timestamp: 12:00:15, URL: "example.com/page2"}
Event 4: {UserID: 1, Timestamp: 11:59:58, URL: "example.com/page3"}   <-- 延迟
Event 5: {UserID: 2, Timestamp: 12:00:02, URL: "example.com/page4"}   <-- 乱序

在这个示例中,Event 4由于延迟而晚于其他事件到达,而Event 5由于乱序而在其本应到达的时间之前到达。

如果没有使用水印机制,Flink 可能会错误地将 Event 4 的数据统计到 12:00:00 ~ 12:01:00 的窗口中,这是因为 Flink 默认情况下是根据接收到事件的时间来进行处理的,而不是根据事件实际发生的事件时间。

8.2 水印是如何解决延迟与乱序问题?

在上述案例中,Flink 的水印(Watermark)机制通过指示事件时间的上限,帮助系统确定事件时间窗口的边界。水印本质上是一种元数据,它告知 Flink 在某个时间点之前的数据已经全部到达。

下面简要说明水印如何在案例中发挥作用:

  1. 处理延迟数据
    • 当 Event 4 发生延迟到达时,水印会逐渐推进,最终达到 Event 4 的事件时间戳(11:59:58)。
    • Flink 知道在水印之前的所有数据都已经到达,因此即使 Event 4 晚到,也不会影响窗口的触发。
  2. 处理乱序数据
    • 当 Event 5 由于乱序提前到达时,水印仍然在逐渐推进。
    • Flink 通过水印判断,在当前水印之前的所有数据都已到达,因此可以触发相应的窗口计算。
  3. 窗口触发
    • Flink 会根据水印确定触发窗口的时机。当水印到达某个时间戳时,Flink 知道在该水印之前的数据已经全部到达,可以安全地触发窗口计算。
    • 比如,在水印到达 12:00:05 时,Flink 可以触发 12:00:00 - 12:01:00 的窗口计算,处理这一时段内的数据。

综合来说,水印帮助 Flink 在事件时间处理中正确处理延迟和乱序的数据,确保窗口操作的准确性和完整性。通过逐渐推进水印,系统能够在事件时间轴上有序地进行处理,而不会受到延迟和乱序数据的影响。

8.3 详细分析

假设我们有以下十条乱序的事件数据,每条数据包含事件时间戳和相应的值:

事件时间戳(毫秒)  值
1000               10
2000               15
3000               12
1500               8
2500               18
1200               6
1800               14
4000               20
3500               16
3200               9

我们将使用Watermark来处理这些数据,并进行窗口统计。假设窗口大小为2秒,最大乱序时间为1秒。

使用Watermark前的统计

  1. 当接收到事件时间戳为1000毫秒时,将值10加入窗口。
  2. 当接收到事件时间戳为2000毫秒时,将值15加入窗口。
  3. 当接收到事件时间戳为3000毫秒时,将值12加入窗口。
  4. 当接收到事件时间戳为1500毫秒时,将值8加入窗口。
  5. 当接收到事件时间戳为2500毫秒时,将值18加入窗口。
  6. 当接收到事件时间戳为1200毫秒时,将值6加入窗口。
  7. 当接收到事件时间戳为1800毫秒时,将值14加入窗口。
  8. 当接收到事件时间戳为4000毫秒时,将值20加入窗口。
  9. 当接收到事件时间戳为3500毫秒时,将值16加入窗口。
  10. 当接收到事件时间戳为3200毫秒时,将值9加入窗口。

使用Watermark后的统计

Watermark的计算过程如下: Watermark = max(当前Watermark, 当前事件时间 - 最大乱序时间)

在这个例子中,我们设定最大乱序时间为1秒,即1000毫秒。

  1. 当收到事件时间戳为1000毫秒时,Watermark = max(0, 1000 - 1000) = 0毫秒。
  2. 当收到事件时间戳为2000毫秒时,Watermark = max(0, 2000 - 1000) = 1000毫秒。
  3. 当收到事件时间戳为3000毫秒时,Watermark = max(1000, 3000 - 1000) = 2000毫秒。
  4. 当收到事件时间戳为1500毫秒时,Watermark = max(2000, 1500 - 1000) = 2000毫秒。
  5. 当收到事件时间戳为2500毫秒时,Watermark = max(2000, 2500 - 1000) = 2000毫秒。
  6. 当收到事件时间戳为1200毫秒时,Watermark = max(2000, 1200 - 1000) = 2000毫秒。
  7. 当收到事件时间戳为1800毫秒时,Watermark = max(2000, 1800 - 1000) = 2000毫秒。
  8. 当收到事件时间戳为4000毫秒时,Watermark = max(2000, 4000 - 1000) = 3000毫秒。
  9. 当收到事件时间戳为3500毫秒时,Watermark = max(3000, 3500 - 1000) = 3000毫秒。
  10. 当收到事件时间戳为3200毫秒时,Watermark = max(3000, 3200 - 1000) = 3000毫秒。

Watermark确定了什么时候触发窗口统计。在本例中,当Watermark超过窗口的结束时间时,窗口将被关闭,并进行统计。因此,Watermark确保了即使在乱序数据的情况下,窗口统计也能够按照正确的事件时间顺序进行。

为了更清晰地展示Watermark的影响,以下是每个事件被处理时的Watermark状态和窗口统计的结果:

事件时间戳(毫秒)  值   Watermark    窗口统计结果
1000               10   0            10
2000               15   1000         25
3000               12   2000         27
1500               8    2000         27
2500               18   2000         30
1200               6    2000         30
1800               14   2000         32
4000               20   3000         36
3500               16   3000         36
3200               9    3000         36

这里的窗口统计结果是在Watermark触发时计算的。在Watermark超过窗口结束时间时,窗口会被关闭,并进行统计。

09 项目实战demo

9.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink_connector_file</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><!--通用依赖--><dependencies><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency><!--集成日志框架 end--><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- flink读取Text File文件依赖 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.18.0</version></dependency><!-- flink读取Text File文件依赖 end--><!-- flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><!-- flink基础依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.aurora.KafkaStreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build>
</project>

9.2 log4j2.properties配置

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

9.3 Watermark水印作业

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;/*** 描述:Flink集成Watermark水印** @author 浅夏的猫* @version 1.0.0* @date 2024-02-08 10:31:40*/
public class WatermarkStreamingJob {private static final Logger logger = LoggerFactory.getLogger(WatermarkStreamingJob.class);public static void main(String[] args) throws Exception {// 创建 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 自定义数据源,每隔1000ms下发一条数据SourceFunction<JSONObject> dataSource = new SourceFunction<>() {private volatile boolean running = true;@Overridepublic void run(SourceContext<JSONObject> sourceContext) throws Exception {while (running) {long timestamp = System.currentTimeMillis();timestamp = timestamp - new Random().nextInt(11) + 10;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");// 格式化日期时间对象为指定格式的字符串String format = formatter.format(dateTime);JSONObject dataObj = new JSONObject();int transId = 8;dataObj.put("userId", "user_" + transId);dataObj.put("timestamp", timestamp);dataObj.put("datetime", format);dataObj.put("url", "example.com/page" + transId);logger.info("数据源url={},用户={},交易时间={},系统时间={}", "example.com/page" + transId, "user_" + transId, format);Thread.sleep(1000);sourceContext.collect(dataObj);}}@Overridepublic void cancel() {running = false;}};//创建水印策略处理事件发生时间TimestampAssignerSupplier<JSONObject> timestampAssignerSupplier = new TimestampAssignerSupplier<JSONObject>() {@Overridepublic TimestampAssigner<JSONObject> createTimestampAssigner(Context context) {return new TimestampAssigner<JSONObject>() {@Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {//使用自定义的事件发生时间来做水印,确保窗口统计的是按照我们的时间字段统计,提高准确度,否则默认使用消费时间return element.getLong("timestamp");}};}};//创建数据流env.addSource(dataSource).assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(timestampAssignerSupplier))//按照url分组.keyBy(new KeySelector<JSONObject, Object>() {@Overridepublic Object getKey(JSONObject jsonObject) throws Exception {return jsonObject.getString("url");}}).window(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new ReduceFunction<JSONObject>() {@Overridepublic JSONObject reduce(JSONObject reduceResult, JSONObject record) throws Exception {logger.info("窗口统计url={},用户流水={},次数={}", reduceResult.getString("url"), reduceResult.getString("userId"), reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum"));int urlNum = reduceResult.getInteger("urlNum") == null ? 1 : reduceResult.getInteger("urlNum");reduceResult.put("urlNum", urlNum + 1);return reduceResult;}}).print();// 执行任务env.execute("WatermarkStreamingJob");}
}

相关文章:

【天衍系列 03】深入理解Flink的Watermark:实时流处理的时间概念与乱序处理

文章目录 01 基本概念02 工作原理03 优势与劣势04 核心组件05 Watermark 生成器 使用06 应用场景07 注意事项08 案例分析8.1 窗口统计数据不准8.2 水印是如何解决延迟与乱序问题&#xff1f;8.3 详细分析 09 项目实战demo9.1 pom依赖9.2 log4j2.properties配置9.3 Watermark水印…...

day07.C++类与对象

一.类与对象的思想 1.1面向对象的特点 封装、继承、多态 1.2类的概念 创建对象的过程也叫类的实例化。每个对象都是类的一个具体实例&#xff08;Instance&#xff09;&#xff0c;拥有类的成员变量和成员函数。由{ }包围&#xff0c;由&#xff1b;结束。 class name{ //类的…...

String讲解

文章目录 String类的重要性常用的方法常用的构造方法String类的比较字符串的查找转化数字转化为字符串字符串转数字 字符串替换字符串的不可变性 字符串拆分字符串截取字符串修改 StringBuilder和StringBuffer String类的重要性 在c/c的学习中我们接触到了字符串&#xff0c;但…...

人群异常聚集监测系统-聚众行为检测与识别算法---豌豆云

聚众识别系统对指定区域进行实时监测&#xff0c;当监测到人群大量聚集、达到设置上限时&#xff0c;立即告警及时疏散。 旅游业作为国民经济战略性支柱产业&#xff0c;随着客流量不断增加&#xff0c;旅游景区和一些旅游城市的管理和服务面临着前所未有的挑战&#xff1a; …...

多模态基础---BERT

1. BERT简介 BERT用于将一个输入的句子转换为word_embedding&#xff0c;本质上是多个Transformer的Encoder堆叠在一起。 其中单个Transformer Encoder结构如下&#xff1a; BERT-Base采用了12个Transformer Encoder。 BERT-large采用了24个Transformer Encoder。 2. BERT的…...

图表示学习 Graph Representation Learning chapter2 背景知识和传统方法

图表示学习 Graph Representation Learning chapter2 背景知识和传统方法 2.1 图统计和核方法2.1.1 节点层次的统计和特征节点的度 节点中心度聚类系数Closed Triangles, Ego Graphs, and Motifs 图层次的特征和图的核节点袋Weisfieler–Lehman核Graphlets和基于路径的方法 邻域…...

OpenMVG(计算两个球形图像之间的相对姿态、细化重建效果)

目录 1 Bundle Adjustment(细化重建效果) 2 计算两个球形图像之间的相对姿态 1 Bundle Adjustment(细化重建效果) 数...

【QT+QGIS跨平台编译】之三十四:【Pixman+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

文章目录 一、Pixman介绍二、文件下载三、文件分析四、pro文件五、编译实践一、Pixman介绍 Pixman是一款开源的软件库,提供了高质量的像素级图形处理功能。它主要用于在图形渲染、合成和转换方面进行优化,可以帮助开发人员在应用程序中实现高效的图形处理。 Pixman的主要特…...

2.17学习总结

tarjan 【模板】缩点https://www.luogu.com.cn/problem/P3387 题目描述 给定一个 &#xfffd;n 个点 &#xfffd;m 条边有向图&#xff0c;每个点有一个权值&#xff0c;求一条路径&#xff0c;使路径经过的点权值之和最大。你只需要求出这个权值和。 允许多次经过一条边或者…...

Unity类银河恶魔城学习记录7-7 P73 Setting sword type源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili Sword_Skill_Controller.cs using System.Collections; using System.Col…...

安卓版本与鸿蒙不再兼容,鸿蒙开发工程师招疯抢

最近&#xff0c;互联网大厂纷纷开始急招华为鸿蒙开发工程师。这是一个新的信号。在Android和iOS长期霸占市场的今天&#xff0c;鸿蒙的崛起无疑为整个行业带来了巨大的震动。 2023年11月10日&#xff0c;网易更新了高级/资深Android开发工程师岗位&#xff0c;职位要求参与云音…...

《白话C++》第9章 泛型,Page842~844 9.4.2 AutoPtr

源起&#xff1a; C编程中&#xff0c;最容易出的问题之一&#xff0c;就是内存泄露&#xff0c;而new一个对象&#xff0c;却忘了delete它&#xff0c;则是造成内存泄露的主要原因之一 例子一&#xff1a; void foo() {XXXObject* xo new XXXObject;if(!xo->DoSomethin…...

服务流控(Sentinel)

引入依赖 <!-- 必须的 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency><!-- sentinel 核心库 --> <dependency><groupId>com.ali…...

点亮代码之灯,程序员的夜与电脑

在科技的海洋里&#xff0c;程序员是那些驾驶着代码船只&#xff0c;穿梭于虚拟世界的探险家。他们手中的键盘是航行的舵&#xff0c;而那台始终不愿关闭的电脑&#xff0c;便是他们眼中永不熄灭的灯塔。有人说&#xff0c;程序员不喜欢关电脑&#xff0c;这究竟是为什么呢&…...

ClickHouse--07--Integration 系列表引擎

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 Integration 系列表引擎1 HDFS1.1 语法1.2 示例&#xff1a; 2 MySQL2.1 语法2.2 示例&#xff1a; 3 Kafka3.1 语法3.2 示例&#xff1a;3.3 数据持久化方法 Integ…...

前端架构: 脚手架框架之yargs的11种基础核心特性的应用教程

脚手架框架之yargs的基础核心特性与应用 1 &#xff09;概述 yargs 是脚手架当中使用量非常大的一个框架进入它的npm官网: https://www.npmjs.com/package/yargs 目前版本: 17.7.2Weekly Downloads: 71,574,188 (动态数据)最近更新&#xff1a;last month (github)说明这是一个…...

MySQL性能调优篇(6)-主从复制的配置与管理

MySQL数据库主从复制是一种常用的数据复制和高可用性解决方案。它允许将一个MySQL主服务器上的数据自动复制到多个从服务器上&#xff0c;从而提供了数据冗余备份、读写分离等优势。本文将详细介绍MySQL数据库主从复制的配置与管理。 1. 原理概述 MySQL主从复制是基于二进制日…...

Linux第49步_移植ST公司的linux内核第1步_获取linux源码

已知ST公司的linux源码路径&#xff1a; /home/zgq/linux/atk-mp1/stm32mp1-openstlinux-5.4-dunfell-mp1-20-06-24/sources/arm-ostl-linux-gnueabi/linux-stm32mp-5.4.31-r0 1、创建“my_linux”目录 打开第1个终端 输入“ls回车” 输入“cd linux/回车”&#xff0c;切换…...

怎样学习Windows下命令行编写

第一&#xff1a;Windows下命令行指的是cmd和powershell命令行编写 第二&#xff1a;必须要用好help或/?命令&#xff0c;这个命令是最基本的也是最常用的命令列表和语法查看命令 第三&#xff1a;cmd命令使用help查看命令列表或“一串带参数的命令 /?"&#xff08;不…...

数据结构第十六天(二叉树层序遍历/广度优先搜索(BFS)/队列使用)

目录 前言 概述 接口 源码 测试函数 运行结果 往期精彩内容 前言 从前的日色变得慢&#xff0c;车&#xff0c;马&#xff0c;邮件都慢&#xff0c;一生,只够爱一个人。 概述 二叉树的层序遍历可以使用广度优先搜索&#xff08;BFS&#xff09;来实现。具体步骤如下&…...

6.s081 学习实验记录(八)Networking

文章目录 network driver network driver //TODO...

图解贝塞尔曲线生成原理

贝塞尔曲线是一种在计算机图形学中广泛使用的参数曲线&#xff0c;主要用于二维图形应用程序中。它是由法国工程师皮埃尔贝塞尔在1962年提出的&#xff0c;主要用于汽车车身设计。贝塞尔曲线的主要特点是&#xff0c;只要确定了控制点&#xff0c;就可以生成一条平滑的曲线。 …...

租房招聘|在线租房和招聘平台|基于Springboot的在线租房和招聘平台设计与实现(源码+数据库+文档)

在线租房和招聘平台目录 目录 基于Springboot的在线租房和招聘平台设计与实现 一、前言 二、系统功能设计 三、系统实现 1、房屋管理 2、招聘管理 3、平台资讯管理 4、平台资讯类型管理 四、数据库设计 1、实体ER图 六、论文参考 七、最新计算机毕设选题推荐 八、源…...

简单试验:用Excel进行爬虫

文章目录 Excel的版本具体操作实例从网站上爬取工商银行的汇率Excel的版本 office 2016,2019,365这几个版本都可以 具体操作 #mermaid-svg-NlIVMivGoJbdyWW0 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-NlIVMi…...

SQL 精讲-MySql 常用函数,MySQL语句精讲和举例

FORMAT(数值,保留位数) 四舍五入 SELECT *,FORMAT(score/3,2) from studentROUND(数值,保留位数) 四舍五入 SELECT ROUND(score/3,2) from studentCONCAT(字符串 1,字符串 2) 字符串拼接 SELECT CONCAT(customer_name, (,address,)) from mt_customerLEFT(字符串,长度) 截取…...

nlp中如何数据增强

在自然语言处理&#xff08;NLP&#xff09;中&#xff0c;数据增强是一种常用的技术&#xff0c;旨在通过对原始文本进行一系列变换和扩充&#xff0c;生成更多多样化的训练数据。这有助于提高模型的泛化能力和鲁棒性。下面是一些常见的数据增强方法在NLP中的应用&#xff1a;…...

python:xml.etree,用 xmltodict 转换为json数据,生成jstree所需的文件

请参阅&#xff1a;java : pdfbox 读取 PDF文件内书签 或者 python&#xff1a;从PDF中提取目录 请注意&#xff1a;书的目录.txt 编码&#xff1a;UTF-8&#xff0c;推荐用 Notepad 转换编码。 xml 是 python 标准库&#xff0c;在 D:\Python39\Lib\xml\etree pip install …...

C#log4net日志保存到Sqlserver数据库表(16)

要将log4net的日志保存到SQL Server数据库表中&#xff0c;你需要配置log4net使用一个数据库追加器&#xff08;appender&#xff09;&#xff0c;通常是AdoNetAppender。以下是一个示例配置&#xff0c;展示如何将log4net的日志输出配置为写入SQL Server数据库表。 首先&…...

SpringCloud-Nacos集群搭建

本文详细介绍了如何在SpringCloud环境中搭建Nacos集群&#xff0c;为读者提供了一份清晰而详尽的指南。通过逐步演示每个关键步骤&#xff0c;包括安装、配置以及Nginx的负载均衡设置&#xff0c;读者能够轻松理解并操作整个搭建过程。 一、Nacos集群示意图 Nacos&#xff0…...

第十五届蓝桥杯全国软件和信息技术专业人才大赛个人赛(软件赛)软件测试组竞赛规则及说明

第十五届蓝桥杯全国软件和信息技术专业人才大赛个人赛 (软件赛)软件测试组竞赛规则及说明 目录...

【算法与数据结构】496、503、LeetCode下一个更大元素I II

文章目录 一、496、下一个更大元素 I二、503、下一个更大元素II三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、496、下一个更大元素 I 思路分析&#xff1a;本题思路和【算法与数据结构】739、LeetCode每日温度类似…...

当AGI遇到人形机器人

为什么人类对人形机器人抱有执念 人形机器人是一种模仿人类外形和行为的机器人&#xff0c;它的研究和开发有着多方面的目的和意义。 人形机器人可以更好地适应人类的环境和工具。人类的生活和工作空间都是根据人的尺寸和动作来设计的&#xff0c;例如门、楼梯、桌椅、开关等…...

Pytorch卷积层原理和示例 nn.Conv1d卷积 nn.Conv2d卷积

内容列表 一&#xff0c;前提 二&#xff0c;卷积层原理 1.概念 2.作用 3. 卷积过程 三&#xff0c;nn.conv1d 1&#xff0c;函数定义&#xff1a; 2, 参数说明: 3,代码: 4, 分析计算过程 四&#xff0c;nn.conv2d 1, 函数定义 2, 参数&#xff1a; 3, 代码 4, 分析计算过程 …...

Qt 实现无边框窗口1.0

目录 项目需求&#xff1a; 1、没有边框&#xff1b; 2、点击windows系统的状态栏的程序运行图标可实现最大最小化&#xff1b; 3、可以移动窗口&#xff1b; 项目实现&#xff1a; 1、实现 无边框 2、实现 点击windows系统的状态栏的程序运行图标可实现最大最小化 3、实现 窗…...

Flume(二)【Flume 进阶使用】

前言 学数仓的时候发现 flume 落了一点&#xff0c;赶紧补齐。 1、Flume 事务 Source 在往 Channel 发送数据之前会开启一个 Put 事务&#xff1a; doPut&#xff1a;将批量数据写入临时缓冲区 putList&#xff08;当 source 中的数据达到 batchsize 或者 超过特定的时间就会…...

静态时序分析:SDC约束命令set_clock_transition详解

相关阅读 静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 在静态时序分析&#xff1a;SDC约束命令create_clock详解一文的最后&#xff0c;我们谈到了针对理想(ideal)时钟&#xff0c;可以使用set_clock_transition命令直…...

web 发展阶段 -- 详解

1. web 发展阶段 当前处于 移动 web 应用阶段。也是个风口&#xff08;当然是针对有能力创业的人来说的&#xff09;&#xff0c;如 抖音、快手就是这个时代的产物。 2. web 发展阶段引出前后端分离的过程 2.1 传统开发方式 2.2 前后端分离模式 衍生自移动 web 应用阶段。 3.…...

车载软件架构 —— Adaptive AUTOSAR软件架构中操作系统

车载软件架构 —— Adaptive AUTOSAR软件架构中操作系统 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师&#xff08;Wechat&#xff1a;gongkenan2013&#xff09;。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&…...

前缀和算法-截断数组

5057. 截断数组 - AcWing题库 给定一个长度为 n 的正整数数组 a1,a2,…,an 和一个正整数 p。 现在&#xff0c;要将该数组从中间截断&#xff0c;得到两个非空子数组。 我们规定&#xff0c;一个数组的价值等于数组内所有元素之和模 p 的结果。 我们希望&#xff0c;将给定数组…...

Kubernetes实战:Kubernetes中网络插件calico Daemon Sets显示异常红色

目录 一、排查步骤与解决方案1.1、POD排查问题定位1.2、针对问题解决错误1.3、继续针对问题解决错误 一、排查步骤与解决方案 1.1、POD排查问题定位 我的k8s集群由3个节点组成的&#xff0c;calico在每个节点上都有一个pod,通过kubectl get pod -A命令发现有一个pod的READY 为…...

深入探究:JSONCPP库的使用与原理解析

君子不器 &#x1f680;JsonCPP开源项目直达链接 文章目录 简介Json示例小结 JsoncppJson::Value序列化Json::Writer 类Json::FastWriter 类Json::StyledWriter 类Json::StreamWriter 类Json::StreamWriterBuilder 类示例 反序列化Json::Reader 类Json::CharReader 类Json::Ch…...

字节UC伯克利新研究 | Magic-Me:简单有效的主题ID可控视频生成框架

在生成模型领域&#xff0c;针对特定身份&#xff08;ID&#xff09;创建内容已经引起了极大的兴趣。在文本到图像生成&#xff08;T2I&#xff09;领域&#xff0c;以主题驱动的内容生成已经取得了巨大的进展&#xff0c;使图像中的ID可控。然而&#xff0c;将其扩展到视频生成…...

2024免费人像摄影后期处理工具Portraiture4.1

Portraiture作为一款智能磨皮插件&#xff0c;确实为Photoshop和Lightroom用户带来了极大的便利。通过其先进的人工智能算法&#xff0c;它能够自动识别并处理照片中的人物皮肤、头发和眉毛等部位&#xff0c;实现一键式的磨皮美化效果&#xff0c;极大地简化了后期处理的过程。…...

Spring Boot 笔记 010 创建接口_更新用户头像

1.1.1 usercontroller中添加updateAvatar&#xff0c;校验是否为url PatchMapping("updateAvatar")public Result updateAvatar(RequestParam URL String avatarUrl) {userService.updateAvatar(avatarUrl);return Result.success();} 1.1.2 userservice //更新头像…...

认识并使用HttpLoggingInterceptor

目录 一、前情回顾二、HttpLoggingInterceptor1、HttpLoggingInterceptor拦截器是做什么的&#xff1f;2、如何使用HttpLoggingInterceptor&#xff1f;2.1 日志级别2.2 如何看日志&#xff1f;2.2.1 日志级别&#xff1a;BODY2.2.2 日志级别&#xff1a;BASIC2.2.3 日志级别&a…...

内存块与内存池

&#xff08;1&#xff09;在运行过程中&#xff0c;MemoryPool内存池可能会有多个用来满足内存申请请求的内存块&#xff0c;这些内存块是从进程堆中开辟的一个较大的连续内存区域&#xff0c;它由一个MemoryBlock结构体和多个可供分配的内存单元组成&#xff0c;所有内存块组…...

【FPGA开发】HDMI通信协议解析及FPGA实现

本篇文章包含的内容 一、HDMI简介1.1 HDMI引脚解析1.2 HDMI工作原理1.3 DVI编码1.4 TMDS编码 二、并串转换、单端差分转换原语2.1 原语简介2.2 原语&#xff1a;IO端口组件2.3 IOB 输入输出缓冲区2.4 并转串原语OSERDESE22.4.1 OSERDESE2 工作原理2.4.2 OSERDESE2 级联示意图2.…...

[NSSRound#16 Basic]Web

1.RCE但是没有完全RCE 显示md5强比较&#xff0c;然后md5_3随便传 md5_1M%C9h%FF%0E%E3%5C%20%95r%D4w%7Br%15%87%D3o%A7%B2%1B%DCV%B7J%3D%C0x%3E%7B%95%18%AF%BF%A2%00%A8%28K%F3n%8EKU%B3_Bu%93%D8Igm%A0%D1U%5D%83%60%FB_%07%FE%A2&md5_2M%C9h%FF%0E%E3%5C%20%95r%D4w…...

[职场] 会计学专业学什么 #其他#知识分享#职场发展

会计学专业学什么 会计学专业属于工商管理学科下的一个二级学科&#xff0c;本专业培养具备财务、管理、经济、法律等方面的知识和能力&#xff0c;具有分析和解决财务、金融问题的基本能力&#xff0c;能在企、事业单位及政府部门从事会计实务以及教学、科研方面工作的工商管…...

docker (五)-docker存储-数据持久化

将数据存储在容器中&#xff0c;一旦容器被删除&#xff0c;数据也会被删除。同时也会使容器变得越来越大&#xff0c;不方便恢复和迁移。 将数据存储到容器之外&#xff0c;这样删除容器也不会丢失数据。一旦容器故障&#xff0c;我们可以重新创建一个容器&#xff0c;将数据挂…...