Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录
1.维表
2.数据准备
创建源数据
创建维度表
创建Sink表
3.配置任务
Flink SQL创建kafka源表
Flink SQL创建MySQL维表
Flink SQL创建MySQL结果表
编写计算任务
核验数据
1.维表
目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。
本案例采用MySQL创建维表,与创建MySQL sink表语法相同。
2.数据准备
创建源数据
重启kafka,创建Topic: case_kafka_mysql
写入json格式的数据
{"ts": "20201011","id": 8,"price_amt":211}
创建维度表
在MySQL中创建名为product_dim的表
CREATE TABLE `product_dim` (`id` bigint(11) NOT NULL,`coupon_price_amt` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
向数据表插入如下数据:
INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表
在MySQL中创建名为sync_test_3的表
CREATE TABLE `sync_test_3` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`ts` varchar(64) DEFAULT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
3.配置任务
Flink SQL创建kafka源表
create table flink_test_3 (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'case_kafka_mysql','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test3','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '127.0.0.1:2181/kafka');
Flink SQL创建MySQL维表
create table flink_test_3_dim (id BIGINT,coupon_price_amt BIGINT
)
WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'product_dim','username' = 'root','password' = 'Admin','lookup.max-retries' = '3','lookup.cache.max-rows' = 1000);
WITH参数
| 参数 | 说明 | 类型 | 备注 |
| lookup.cache.max-rows | 指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。 | Integer | 默认情况下,维表Cache是未开启的。 |
| lookup.cache.ttl | 指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。 | Duration | 默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows和 lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。 |
| lookup.cache.caching-missing-key | 是否缓存空的查询结果。 | Boolean | 参数取值如下: true(默认值):缓存空的查询结果。 false:不缓存空的查询结果。 |
| lookup.max-retries | 查询数据库失败的最大重试次数。 | Integer | 默认值为3。 |
Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (ts string,total_gmv bigint,PRIMARY KEY (ts) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8','table-name' = 'sync_test_3','username' = 'root','password' = 'Admin');
编写计算任务
INSERT INTO sync_test_3
SELECTts,SUM(price_amt - coupon_price_amt) AS total_gmv
FROM(SELECTa.ts as ts,a.price_amt as price_amt,b.coupon_price_amt as coupon_price_amtFROMflink_test_3 as aLEFT JOIN flink_test_3_dim FOR SYSTEM_TIME AS OF a.proctime as bON b.id = a.id)
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;
相关文章:
Flink流批一体计算(24):Flink SQL之mysql维表实时关联
目录 1.维表 2.数据准备 创建源数据 创建维度表 创建Sink表 3.配置任务 Flink SQL创建kafka源表 Flink SQL创建MySQL维表 Flink SQL创建MySQL结果表 编写计算任务 核验数据 1.维表 目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎…...
鸿蒙(HarmonyOS)应用开发——从网络获取数据(题目答案)
判断题 1.在http模块中,多个请求可以使用同一个httpRequest对象,httpRequest对象可以复用。 错误(False) 2.使用http模块发起网络请求后,可以使用destroy方法中断网络请求。 正确(True) 3.Web组件onConfirm(callback: (event?: { url: …...
力扣:197. 上升的温度(Python3)
题目: 表: Weather ------------------------ | Column Name | Type | ------------------------ | id | int | | recordDate | date | | temperature | int | ------------------------ id 是该表具有唯一值的列。 该表…...
uniApp应用软件在运行时,不符合华为应用市场审核标准。解决方案合集!
(暂时用不到的也建议收藏一下,因为文章持续更新中) 最新更改时间:20023-12-10 第一次做App应用开发相信大家一定都遇到过华为应用市场审核的“驳回”! 有些问题一看就明白可以立马修改,而有一些问题修改意…...
c#编码技巧(十五):新语法糖record深入分析
c#编码技巧(十四):新语法糖record深入分析 从 C# 9 开始新增了一个关键字record,用于封装数据。 record实质是微软提供的一个语法糖,因很多开源项目都用到了这个关键字,说明这个语法糖比较实用。 那么这个record类型和普通class类…...
Java IO流(五)(字符集基础知识简介)
字符集 计算机的存储规则(英文字符) 常见字符集介绍 a.GB2312字符集:1980年发布,1981年5月1日实施的简体中文汉字编码国家标准。收录7445个图形字符,其中包括6763个简体汉字 b.BIG5字符集:台湾地区繁体中…...
周周爱学习之Redis重点总结
redis重点总结 在正常的业务流程中,用户发送请求,然后到缓存中查询数据。如果缓存中不存在数据的话,就会去数据库查询数据。数据库中有的话,就会更新缓存然后返回数据,数据库中也没有的话就会给用户返回一个空。 1.缓…...
免费的SEO外链发布工具,提升排名的利器
互联网已经成为信息传播和商业发展的重要平台。而对于拥有网站的个人、企业来说,如何让自己的网站在搜索引擎中脱颖而出?SEO(Search Engine Optimization)作为提高网站在搜索引擎中排名的关键手段. 什么是SEO外链? S…...
腾讯字节常考的linux命令
1 ps 1.1 ps -ef 有哪些字段 ps -ef 命令在Unix/Linux系统中用于显示当前运行的进程。输出的字段通常包括: UID:启动进程的用户ID。PID:进程ID。PPID:父进程ID。C:CPU利用率。STIME:进程启动时间。TTY&a…...
JAVA后端自学技能实操合集
JAVA后端自学技能实操 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习FastDFS使用docker安装FastDFS(linux)集成到springboot项目中 内容将会持续更新中,有需要添加什么内容可以再评论区留言,大家一起学习 FastDFS 组名:文件上传后所在的 st…...
C++ 关联容器
关联容器 关联容器支持高效的关键字查找和访问。 两个主要的关联容器(associative container)类型是 map 和 set。 map 中的元素是一些关键字——值对。 关键字起到索引的作用,值则表示与索引相关联的数据。 set 中的每个元素只包含一个关键…...
ES6之函数新增的扩展
参数 ES6允许为函数的参数设置默认值 function log(x, y World) {console.log(x, y); }console.log(Hello) // Hello World console.log(Hello, China) // Hello China console.log(Hello, ) // Hello函数的形参是默认声明的,不能使用let或const再次声明 functi…...
postgresql安装部署(docker版本)
1.在线部署 创建数据库存储目录 mkdir /home/pgdata创建容器 docker run --name postgresql --restartalways -d -p 5432:5432 -v /home/pgdata:/var/lib/postgresql/data --shm-size10g -e POSTGRES_PASSWORD密码 postgis/postgis:12-3.2-alpine–name为设置容器名称 -d表…...
【Python/Java/C++三种语言】20天拿下华为OD笔试之【位运算】2023B-出错的或电路【欧弟算法】全网注释最详细分类最全的华为OD真题
文章目录 题目描述与示例题目描述输入描述输出描述示例一输入输出说明 示例二输入输出说明 解题思路代码PythonJavaC时空复杂度 华为OD算法/大厂面试高频题算法练习冲刺训练 题目描述与示例 题目描述 某生产门电路的厂商发现某一批次的或门电路不稳定,具体现象为计…...
vscode 编译运行c++ 记录
一、打开文件夹,新建或打开一个cpp文件 二、ctrl shift p 进入 c/c配置 进行 IntelliSense 配置。主要是选择编译器、 c标准, 设置头文件路径等,配置好后会生成 c_cpp_properties.json; 二、编译运行: 1、选中ma…...
错题总结(四)
1.【一维数组】输入10个整数,求平均值 编写一个程序,从用户输入中读取10个整数并存储在一个数组中。然后,计算并输出这些整数的平均值。 int main() {int arr[10];int sum 0;for (int n 0; n < 10; n){scanf("%d", &arr…...
ORACLE使用Mybatis-plus批量插入
ORACLE使用mybatis-plus自带的iservice.saveBatch方法时,会报DML Returing cannot be batch错误: 推测原因是oracle不支持insert into table_name (,) values (,),()的写法。且oracle不会自动生…...
vue,uniapp的pdf等文件在线预览
vue,uniapp文件在线预览方案,用了个稍微偏门一点的方法实现了 通过后端生成文件查看页面,然后前端只要展示这个网页就行,uniapp就用web-view来展示,后台系统就直接window.open()打开就行 示例查看PDF文件,…...
SpringBoot 项目 Jar 包加密,防止反编译
1场景 最近项目要求部署到其他公司的服务器上,但是又不想将源码泄露出去。要求对正式环境的启动包进行安全性处理,防止客户直接通过反编译工具将代码反编译出来。 2方案 第一种方案使用代码混淆 采用proguard-maven-plugin插件 在单模块中此方案还算简…...
DockerFile中途执行出错的解决办法
DockerFile中途执行出错的解决办法 你们是否也曾经因为DockerFile中途执行出错,而对其束手无策?总是对docker避之不及! 但是当下载的源码运用到了docker,dockerFile 执行到一半,报错了怎么办? 现状 那么当DockerFile执行一半出错后,会产生什么结果呢? 如图可知,生成…...
高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)
题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...
jdbc查询mysql数据库时,出现id顺序错误的情况
我在repository中的查询语句如下所示,即传入一个List<intager>的数据,返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致,会导致返回的id是从小到大排列的,但我不希望这样。 Query("SELECT NEW com…...
Qt的学习(一)
1.什么是Qt Qt特指用来进行桌面应用开发(电脑上写的程序)涉及到的一套技术Qt无法开发网页前端,也不能开发移动应用。 客户端开发的重要任务:编写和用户交互的界面。一般来说和用户交互的界面,有两种典型风格&…...
UE5 音效系统
一.音效管理 音乐一般都是WAV,创建一个背景音乐类SoudClass,一个音效类SoundClass。所有的音乐都分为这两个类。再创建一个总音乐类,将上述两个作为它的子类。 接着我们创建一个音乐混合类SoundMix,将上述三个类翻入其中,通过它管理每个音乐…...
高抗扰度汽车光耦合器的特性
晶台光电推出的125℃光耦合器系列产品(包括KL357NU、KL3H7U和KL817U),专为高温环境下的汽车应用设计,具备以下核心优势和技术特点: 一、技术特性分析 高温稳定性 采用先进的LED技术和优化的IC设计,确保在…...
Android屏幕刷新率与FPS(Frames Per Second) 120hz
Android屏幕刷新率与FPS(Frames Per Second) 120hz 屏幕刷新率是屏幕每秒钟刷新显示内容的次数,单位是赫兹(Hz)。 60Hz 屏幕:每秒刷新 60 次,每次刷新间隔约 16.67ms 90Hz 屏幕:每秒刷新 90 次,…...
