Flink SQL
-
进入 JobManager 容器:
docker exec -it 21442d9ca797 /bin/bash -
启动 Flink 的 SQL 客户端:
/opt/flink/bin/sql-client.sh embedded -
尝试创建 Kafka 表:
在启动的 SQL 客户端中,尝试创建一个 Kafka 表,看看是否能够成功:
CREATE TABLE test_kafka_table (message STRING ) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = '110.40.130.231:9092','format' = 'json' );如果没有报错,说明 Kafka 连接器已成功加载。
以下是一个使用 Flink SQL 从 Kafka 读取数据、进行简单聚合计算、并将结果写入 MySQL 和 HDFS 的示例。这个示例假设你已经安装并配置好了 Flink、Kafka、MySQL 和 HDFS。
1. 从 Kafka 读取数据
首先,创建一个 Kafka 表来定义数据源。假设 Kafka 主题名为 user_behavior,包含用户行为数据,每条消息格式为 JSON,包含字段 user_id, item_id, category_id, behavior, ts (时间戳)。
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime AS PROCTIME(), -- 添加处理时间列WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 设置水印,允许5秒延迟
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','scan.startup.mode' = 'latest-offset'
);
2. 进行简单的聚合计算
接下来,对用户行为数据进行简单的聚合计算,例如按类别统计每分钟的行为次数。
CREATE VIEW behavior_count AS
SELECTcategory_id,TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,COUNT(*) as behavior_count
FROM user_behavior
GROUP BY category_id, TUMBLE(ts, INTERVAL '1' MINUTE);
使用了 TUMBLE 函数来创建滚动窗口,按每分钟对数据进行分组,并计算每个类别的行为次数。
3. 将处理后的数据写入 MySQL
为了将上述聚合结果写入 MySQL,首先创建一个 MySQL 表。
CREATE TABLE behavior_summary (category_id BIGINT,window_start TIMESTAMP(3),behavior_count BIGINT,PRIMARY KEY (category_id, window_start) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'behavior_summary','username' = 'myuser','password' = 'mypassword'
);
然后,可以INSERT INTO 语句将数据插入到 MySQL 表中。
INSERT INTO behavior_summary
SELECT * FROM behavior_count;
4. 将处理后的数据写入 HDFS
如果想将数据写入 HDFS,先创建一个 HDFS 表。
CREATE TABLE behavior_summary_hdfs (category_id BIGINT,window_start TIMESTAMP(3),behavior_count BIGINT
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/user/flink/behavior_summary','format' = 'csv'
);
接着,使用 INSERT INTO 语句将数据写入 HDFS。
INSERT INTO behavior_summary_hdfs
SELECT * FROM behavior_count;
总结
以上步骤展示了如何使用 Flink SQL 从 Kafka 读取数据、进行聚合计算,并将结果分别写入 MySQL 和 HDFS。这是一个基本的流程,根据实际需求,可以调整表结构、连接器配置以及 SQL 查询以适应不同的应用场景。
相关文章:
Flink SQL
进入 JobManager 容器: docker exec -it 21442d9ca797 /bin/bash 启动 Flink 的 SQL 客户端: /opt/flink/bin/sql-client.sh embedded 尝试创建 Kafka 表: 在启动的 SQL 客户端中,尝试创建一个 Kafka 表,看看是否能…...
鸿蒙UI开发——实现环形文字
1、背 景 有朋友提问:您好关于鸿蒙UI想咨询一个问题 如果我想实现展示环形文字是需要通过在Text组件中设置transition来实现么,还是需要通过其他方式来实现。 针对这位粉丝朋友的提问,我们做一下解答。 2、实现环形文字效果 ❓ 什么是环形…...
QT版发送邮件程序
简单的TCP邮箱程序 **教学与实践目的:**学会网络邮件发送的程序设计技术。 1.SMTP协议 邮件传输协议包括 SMTP(简单邮件传输协议,RFC821)及其扩充协议 MIME; 邮件接收协议包括 POP3 和功能更强大的 IMAP 协议。 服务…...
JavaSE:初识Java(学习笔记)
java是高级语言的面向对象语言 .[最贴近生活.最快速分析和设计程序] 一,计算机语言发展历史 二,Java体系结构 1,JavaSE(Java Standard Edition) 标准版,定位在个人计算机上的应用 这个版本是Jav…...
ClickHouse创建分布式表
ClickHouse创建分布式表 当数据量剧增的时候,clickhouse是采用分片的方式进行数据的存储的,类似于redis集群的实现方式。然后想进行统一的查询的时候,因为涉及到多个本地表,可以通过分布式表的方式来提供统一的入口。由于是涉及到…...
Flink转换算子
Apache Flink 是一个用于处理无界和有界数据的开源流处理框架。在 Flink 中,转换(Transformation)是数据流处理的核心组件之一,它们定义了如何从输入数据集生成输出数据集。以下是 Flink 中一些常见的转换算子: Map: 将…...
ThinkBook 14+ 2024 Ubuntu 触控板失效 驱动缺失问题解决
首先我的电脑是thinkbook14 2024,从ubuntu18到ubuntu24,笔者整个都试了一遍,触摸板都没反应,确认不是linux系统内核问题,原因为驱动缺失。 解决步骤: (1)下载驱动,网址如…...
【青牛科技】应用方案 | D75xx-150mA三端稳压器
概 述 D75XX系列是一套三端高电流低压稳压器。它们可以提供 150mA 的输出电流和允许输入电压高达30V。它们有几个固定的输出电压范围为3.0 V至5.0 V。CMOS 技术确保低电压降和低静态电流。 虽然这些设备主要设计为固定电压调节器,但它们可以与外部元件一起使用&…...
WPF之iconfont(字体图标)使用
1,前文: WPF的Xaml是与前端的Html有着高度相似性的标记语言,所以Xaml也可同Html一般轻松使用阿里提供的海量字体图标,从而有效的减少开发工作度。 2,下载字体图标: 登录阿里图标库网iconfont-阿里巴巴矢量…...
08、Java学习-面向对象中级:
Java学习第十二天——面向对象中级: IDEA: 创建完新项目后,再src里面创建.java文件进行编写。 src——存放源码文件(.java文件);out——存放编译后的字节码文件(.class文件) 在I…...
springboot集成onlyoffice(部署+开发)
前言 最近有个项目需求是实现前端页面可以对word文档进行编辑,并且可以进行保存,于是一顿搜索,找到开源第三方onlyoffice,实际上onlyOffice有很多功能,例如文档转化、多人协同编辑文档、文档打印等,我们只用…...
LabVIEW编程基础教学(二)--数据类型
在LabVIEW中,数据类型是非常重要的基本概念,因为它们决定了如何存储和操作数据。掌握这些基础数据类型对于编写有效的程序非常关键。以下是LabVIEW中的基础数据类型介绍: 1. 数值类型(Numeric) 整型(Inte…...
「Mac畅玩鸿蒙与硬件29」UI互动应用篇6 - 多选问卷小应用
本篇将带你实现一个多选问卷小应用,用户可以勾选选项并点击提交按钮查看选择的结果。通过本教程,你将学习如何使用 Checkbox 组件、动态渲染列表、状态管理及用户交互,构建完整的应用程序。 关键词 UI互动应用Checkbox 组件状态管理动态列表…...
Flutter中文字体设置指南:打造个性化的应用体验
在使用Flutter进行开发时,可能会遇到中文字体显示不正常或者字体不符合设计需求的情况。Flutter默认的中文字体往往无法满足某些用户对个性化和美观的需求。今天,我们就来详细探讨如何在Flutter应用中设置中文字体,并结合不同场景提供相应的解…...
git下载慢下载不了?Git国内国外下载地址镜像,git安装视频教程
git安装下载的视频教程在这 3分钟完成git下载和安装,git国内外下载地址镜像,Windows为例_哔哩哔哩_bilibili 一、Git安装包国内和国外下载地址镜像 1.1国外官方下载地址 打开Git的官方网站:Git官网下载页面。在页面上选择对应的系统&…...
安卓属性动画插值器(Interpolator)详解
属性动画(Property Animation)是 Android 中一个强大的动画框架,允许开发者对视图的任意属性(如位置、透明度、尺寸、颜色等)进行平滑的动态变化。插值器(Interpolator)作为属性动画的一部分&am…...
OSPF总结
1.定义及相关信息 (1)全称:Open ShortestPath First,开放式最短路径优先 (2)是一种基于链路状态算法的路由协议 (3)目前针对IPv4协议使用的是OSPF Version2(RFC2328) 目前针对IPv6 协议使用的是 OSPF Version3 ( RFC2740 ) (4)运行 OSPF 路由器之间…...
Spring Boot驱动的多维分类知识管理系统
1 绪论 1.1 研究背景 在这个推荐个性化的时代,采用新技术开发一个多维分类的知识管理系统来分享和展示内容是一个永恒不变的需求。本次设计的多维分类的知识管理系统有管理员和用户两个角色。 管理员可以管理用户信息,知识分类,知识信息等&am…...
CSS教程(七)- 背景
介绍 背景属性可以设置背景颜色、背景图片、背景平铺、背景图片位置、背景图像固定等。 1 背景颜色 属性名:background-color 作用:指定HTML元素的背景色。 取值:英文颜色、16进制、rgb、rgba、transparent(一般为透明&#…...
PNG图片批量压缩exe工具+功能纯净+不改变原始尺寸
小编最近有一篇png图片要批量压缩,大小都在5MB之上,在网上找了半天要么就是有广告,要么就是有毒,要么就是功能复杂,整的我心烦意乱。 于是我自己用python写了一个纯净工具,只能压缩png图片,没任…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...
永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
