当前位置: 首页 > news >正文

Flink实现实时数据处理

代码如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(4)
# 必须开启 checkpoint,时间间隔为毫秒,否则不能输出数据
s_env.enable_checkpointing(600000)  # 600000 十分钟env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(s_env, environment_settings=env_settings)# 显式注册 Kafka 连接器
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink/lib/flink-connector-kafka-1.16.0.jar")# 创建 source 表
t_env.execute_sql("""
CREATE TABLE source_blog_kafka (ip_address STRING,post_id INT,event_type STRING,proctime AS PROCTIME()  -- 添加处理时间列
) WITH ('connector' = 'kafka','topic' = 'blog_views','properties.bootstrap.servers' = '110.40.130.231:9092','properties.group.id' = 'flink_kafka_hive_blog_consumers','scan.startup.mode' = 'earliest-offset','format' = 'json'
)
""")# 创建 sink 表
t_env.execute_sql("""
CREATE TABLE sink_blog_hive (post_id INT,view_count INT
) WITH ('connector' = 'kafka','topic' = 'blog_view_stats','properties.bootstrap.servers' = '110.40.130.231:9092','format' = 'json'
)
""")# 定义 SQL 查询
t_env.execute_sql("""
INSERT INTO sink_blog_hive
SELECT post_id, CAST(COUNT(*) AS INT) AS view_count
FROM source_blog_kafka
GROUP BY post_id, TUMBLE(proctime, INTERVAL '1' MINUTE)
""")# 启动执行环境
t_env.execute("Blog View Stats")print("Data processing job started.")

展示了一个使用 Apache Flink 进行实时数据处理的例子,该例子从 Kafka 消费数据,对博客文章的浏览量进行统计,并将结果写回到另一个 Kafka 主题。以下是对你提供的代码的一些解释和建议:

解释

  1. 初始化执行环境

    • StreamExecutionEnvironment 和 StreamTableEnvironment 被用来创建流处理环境。set_parallelism(4) 设置了并行度为4,意味着任务可以在四个线程上并行执行。
    • enable_checkpointing(600000) 开启了检查点机制,确保在发生故障时能够恢复状态。这里设置的检查点间隔是十分钟。
  2. 配置 Kafka 连接器

    • 通过 pipeline.jars 参数指定了 Kafka 连接器的 JAR 文件路径,这是为了让 Flink 能够与 Kafka 集群通信。
  3. 定义 Source 和 Sink 表

    • source_blog_kafka 表用于从 Kafka 的 blog_views 主题读取数据,数据格式为 JSON。
    • sink_blog_hive 表用于将处理后的数据写入 Kafka 的 blog_view_stats 主题,同样以 JSON 格式存储。
  4. SQL 查询

    • 使用 SQL 对 source_blog_kafka 表中的数据按 post_id 分组,并计算每分钟内的点击次数,然后将结果插入到 sink_blog_hive 表中。
  5. 启动任务

    • 最后调用 t_env.execute("Blog View Stats") 启动任务执行。

建议

  • 检查点配置:确保 Kafka 连接器版本支持 Flink 的检查点功能,这对于实现精确一次(exactly-once)语义非常重要。
  • 资源管理:确保 Flink 集群有足够的资源来运行这个任务,特别是当并行度设置较高时。
  • 错误处理:考虑添加适当的错误处理逻辑,比如设置重启策略,以便在遇到失败时能够自动恢复。
  • 监控和调试:部署后,利用 Flink 提供的监控工具来跟踪作业的状态和性能,及时发现并解决问题。

相关文章:

Flink实现实时数据处理

代码如下: #!/usr/bin/python # -*- coding: UTF-8 -*-from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes# 初始化执行环境 s_env StreamExecutionEnvironment.get_…...

11.9.2024刷华为

文章目录 HJ31 单词倒排HJ32 密码提取语法知识记录 傻逼OD题目又不全又要收费,看毛线,莫名奇妙 HW这叼机构别搁这儿害人得不得? 我觉得我刷完原来的题目 过一遍华为机考的ED卷出处,就行了 HJ31 单词倒排 游戏本做过了好像 HJ3…...

Chromium 中chrome.system.storage扩展接口定义c++

一、chrome.system.storage 您可以使用 chrome.system.storage API 查询存储设备信息,并在连接和分离可移动存储设备时收到通知。 权限 system.storage 类型 EjectDeviceResultCode 枚举 "success" 移除命令成功执行 - 应用可以提示用户移除设备。…...

【Qt聊天室客户端】登录窗口

1. 验证码 具体实现 登录界面中创建验证码图片空间&#xff0c;并添加到布局管理器中 主要功能概述&#xff08;创建一个verifycodewidget类专门实现验证码操作&#xff09; 详细代码 // 头文件#ifndef VERIFYCODEWIDGET_H #define VERIFYCODEWIDGET_H#include <QWidget>…...

如何显示模型特征权重占比图【数据分析】

可视化模型的特征权重 1、流程 1、导入库: numpy:用于处理数组和矩阵。 matplotlib.pyplot:用于绘图。 sklearn.datasets:用于加载数据集。 sklearn.ensemble.RandomForestClassifier:用于训练随机森林模型。2、加载数据集: 使用load_iris函数加载Iris数据集。3、训练模…...

Ubuntu24安装MySQL

下载deb包&#xff1a; 先更新系统包&#xff1a; sudo apt update sudo apt update -y下载mysql: wget https://dev.mysql.com/get/mysql-apt-config_0.8.17-1_all.deb 安装deb包&#xff1a; sudo dpkg -i mysql-apt-config_0.8.17-1_all.deb目前mysql还没有正式支持Ubun…...

微服务架构面试内容整理-Eureka

Spring Cloud Netflix 是一个为构建基于 Spring Cloud 的微服务应用提供的解决方案,利用 Netflix 的开源组件来实现常见的分布式系统功能。以下是 Spring Cloud Netflix 的一些主要组件和特点: 服务注册与发现:Eureka 是一个 RESTful 服务,用于注册和发现微服务。服务实例在…...

qt QErrorMessage详解

1、概述 QErrorMessage是Qt框架中用于显示错误消息的一个对话框类。它提供了一个简单的模态对话框&#xff0c;用于向用户显示错误或警告消息。QErrorMessage通常用于应用程序中&#xff0c;当需要向用户报告错误但不希望中断当前操作时。它提供了一个标准的错误消息界面&…...

SpringBoot 将多个Excel打包下载

在Spring Boot应用中&#xff0c;如果你需要将多个Excel文件打包成一个ZIP文件并提供下载&#xff0c;你可以使用一些Java库来帮助完成这个任务。这里我将展示如何使用Apache POI来生成Excel文件&#xff0c;以及使用Java.util.zip来创建ZIP文件&#xff0c;并通过Spring Boot的…...

分页存储小总结

知识点: 什么是分页存储? 将内存空间分为一个个大小相等的分区&#xff08;比如&#xff1a;每个分区4KB&#xff09;&#xff0c;每个分区就是一个“页框”&#xff08;页框页帧内存块物理块物理页面&#xff09;。每个页框有一个编号&#xff0c;即“页框号”&#xff08;…...

Star-CCM+应用篇之动力电池温度场仿真操作流程与方法

1 动力电池温度场仿真项目 电池包内模组温度分布、电芯温度分布、温升速率、充电时间等。 2 动力电池温度场仿真分析流程图 图1 电池包热流场分析流程 3 动力电池温度场仿真参数需求 类别...

Spring Boot应用开发:从入门到精通

Spring Boot应用开发&#xff1a;从入门到精通 Spring Boot是Spring框架的一个子项目&#xff0c;旨在简化Spring应用的初始搭建和开发过程。通过自动配置和约定大于配置的原则&#xff0c;Spring Boot使开发者能够快速构建独立的、生产级别的Spring应用。本文将深入探讨Sprin…...

【JAVA项目】基于jspm的【医院病历管理系统】

技术简介&#xff1a;采用jsp技术、MySQL等技术实现。 系统简介&#xff1a;通过标签分类管理等方式&#xff0c;实现管理员&#xff1b;个人中心、医院公告管理、用户管理、科室信息管理、医生管理、出诊信息管理、预约时间段管理、预约挂号管理、门诊病历管理、就诊评价管理、…...

Python中的常见配置文件写法

在软件开发过程中&#xff0c;开发者常常需要利用一些固定的参数或常量。对于这些相对恒定且频繁使用的元素&#xff0c;一种常见的做法是将它们集中存储在一个特定的文件中&#xff0c;以避免在多个模块代码中重复定义&#xff0c;从而维护核心代码的清晰度和整洁性。 具体而…...

语义分割实战——基于PSPnet神经网络动物马分割系统源码

第一步&#xff1a;准备数据 动物马分割数据&#xff0c;总共有328张图片&#xff0c;里面的像素值为0和1&#xff0c;所以看起来全部是黑的&#xff0c;不影响使用 第二步&#xff1a;搭建模型 psp模块的样式如下&#xff0c;其psp的核心重点是采用了步长不同&#xff0c;po…...

Python+Appium编写脚本

一、环境配置 1、安装JDK&#xff0c;版本1.8以上 2、安装Python&#xff0c;版本3.x以上&#xff0c;用来解释python 3、安装node.js&#xff0c;版本^14.17.0 || ^16.13.0 || >18.0.0&#xff0c;用来安装Appimu Server 4、安装npm&#xff0c;版本>8&#xff0c;用…...

RK3288 android7.1 适配 ilitek i2c接口TP

一&#xff0c;Ilitek 触摸屏简介 Ilitek 提供多种型号的触控屏控制器&#xff0c;如 ILI6480、ILI9341 等&#xff0c;采用 I2C 接口。 这些控制器能够支持多点触控&#xff0c;并具有优秀的灵敏度和响应速度。 Ilitek 的触摸屏控制器监测屏幕上的触摸事件。 当触摸发生时&am…...

C++ 越来越像函数式编程了!

C 越来越像函数式编程了 大家好&#xff0c;欢迎来到今天的博客话题。今天我们要聊的是 C 这门老牌的强类型语言是如何一步一步向函数式编程靠拢的。从最早的函数指针&#xff0c;到函数对象&#xff08;Functor&#xff09;&#xff0c;再到 std::function 和 std::bind&…...

maven工程结构说明

1、maven工程文件目录 |-- pom.xml # Maven 项目管理文件 |-- src # 放项目源文件|-- main # 项目主要代码| |-- java # Java 源代码目录| | -- com/example/myapp…...

【GESP】C++一级真题练习(202312)luogu-B3921,小杨的考试

GESP一级真题练习。为2023年12月一级认证真题。逻辑计算问题。 题目题解详见&#xff1a;【GESP】C一级真题练习(202312)luogu-B3921&#xff0c;小杨的考试 | OneCoder 【GESP】C一级真题练习(202312)luogu-B3921&#xff0c;小杨的考试 | OneCoderGESP一级真题练习。为2023…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行&#xff0c;YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID&#xff1a; YW3…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...