FlinkSQL之temporary join开发
在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。
背景介绍
关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:
https://www.51cto.com/article/713922.html
目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:
1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。
2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。
3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。
图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》
应用场景&实例分享
当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:
CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM ( SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;
同上我们也需要定义流量日志明细流的watermark,并进行双流join
CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`
结果如下:
--商品标签信息
12:00> SELECT * FROM tag_ri;id tag(商品标签)
======= ======================= t1 A12:30> SELECT * FROM tag_ri;id tag(商品标签)
======= =======================t1 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id time
======= ========t1 12:00 t1 12:15 t1 12:30 --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id time tag(商品标签)
======= ======== =======================t1 12:00 At1 12:15 At1 12:30 B
开发经验
▐ 稀疏数据处理
由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
▐ 数据延迟下发
问题
在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。
原因分析
我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table
select a.*,b.tag
from
(
select * from source_1
union all
select * from source_2
union all
select * from source_3
union all
select * from source_4
) a
temporay join
b流
source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。
解法
对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。
总结
1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。
2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。
参考资料
https://www.51cto.com/article/713922.html
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout
团队介绍
我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。
¤ 拓展阅读 ¤
3DXR技术 | 终端技术 | 音视频技术
服务端技术 | 技术质量 | 数据算法
相关文章:

FlinkSQL之temporary join开发
在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在…...
第二十六节 直方图均衡化
图像直方图均衡化 图像直方图均衡化可以增强图像增强,对输入图像进行直方图均衡化处理,提升后续对象检测的准确率在Opencv人脸检测的代码演示中已经很常见了,此外对医学影像图像与卫星遥感图像也经常通过直方图均衡化来提升图像质量 Opencv…...

工单管理用什么工具好?8款推荐清单
本文推荐的8款项目工单管理系统有:1. PingCode; 2.Worktile; 3.Teambition; 4.致远OA; 5.TAPD; 6.Gitee; 7.Wrike; 8.Trello。 很多企业在处理项目工单时,依然依赖电子邮件、Excel表格,甚至是手动记录。这样做不仅效率低下,还容易导致工单遗漏…...

工地安全新突破:AI视频监控提升巡检与防护水平
在建筑工地和其他劳动密集型行业,工人的安全一直是管理工作的重中之重。为了确保工地的安全管理更加高效和智能化,AI视频监控卫士。通过人工智能技术,系统不仅能实时监控,还能自动识别工地现场的安全隐患,为工地管理者…...

World of Warcraft [CLASSIC][80][the Ulduar]
Ulduar 奥杜尔副本介绍 奥杜尔共计14个BOSS,通常说的10H就是10个苦难模式就是全通,9H就是除了【观察者奥尔加隆】,特别说明开启【观察者奥尔加隆】,是需要打掉困难模式4个守护者的。 所以人们经常说的类似“10H 观察者”、“10H…...
python实现数据库的增删改查功能,图形化版本
import tkinter from tkinter import * import psycopg2 from tkinter import messagebox#连接信息 t_conn{"dbname": "d1","user": "u1","password": "123qqq...A","port": "15400","h…...

pipeline开发笔记
pipeline开发笔记 jenkins常用插件Build Authorization Token Root配置GitLab的webhooks(钩子)配置构建触发器--示例 piblish over sshBlue OceanWorkspace Cleanup PluginGit插件PipelineLocalization: Chinese (Simplified) --中文显示Build Environment Plugin 显示构建过程…...

spark读取parquet文件
源码 parquet文件读取的入口是FileSourceScanExec,用parquet文件生成对应的RDD 非bucket文件所以走createNonBucketedReadRDD方法。 createNonBucketedReadRDD 过程: 确定文件分割参数 openCostInBytes4M 相关参数spark.sql.files.openCostInBytes4M…...

redis详细教程(1.String类型)
Redis 的 String 类型内部使用了一种叫做 SDS(Simple Dynamic String)的结构。SDS 的设计比传统的 C 语言字符串更加高效和安全,主要特点如下: 头部信息:SDS 的头部包含了一些元数据,比如字符串的长度、剩…...

用友U8接口-库存管理(7)
概括 本文的操作需要正确部署U8API主要讲述库存管理接口的使用,以产成品入库单作为说明,其他单据接口都是大同小异的!许多时候先在ERP做个单,然后仿造ERP单据参数,构造接口JSON参数是不错的做法。 获取Token访问令牌…...
Spring Boot HikariCP数据库连接池入门
1. 概述 在我们的项目中,数据库连接池基本是必不可少的组件。在目前数据库连接池的选型中,主要是 Druid ,为监控而生的数据库连接池。HikariCP ,号称性能最好的数据库连接池。 至于怎么选择,两者都非常优秀&#x…...

Docker快速上手教程:MacOS系统【安装/配置/使用/原理】全链路速通
背景 最近换了个 Macbook Air M3, 写个人项目需要用到 Docker,配置过程有一点点坎坷,还是得记录下避免重蹈覆辙。 什么。为什么是买 Air 而不是 Pro Max? 因为码农的钱也是钱啊。 这里我不会先讲原理,我认为工程的事情都是先看到现象,有了概念的轮廓,才应该去研究原理,…...

【JavaSE】认识String类,了解,进阶到熟练掌握
#1024程序员节 | 征文# 下面就让博主带领大家一起解决心中关于String类的疑问吧~~~ 1.字符串构造: 第一种和第二种(有一定的区别,在常量池上) public static void main(String[] args) { // 使用常量串构造 String s1 "h…...

vue3 vben-admin 窗口大小更改后 echarts尺寸变为 100px的问题
问题描述: 当切换切换tab 并且窗口尺寸更改时, echarts的尺寸因为父元素为 0, 自动设置为 100px 网上查找资料的结果: 1,使用vue 中的 v-if 来重新设置dom树 缺点: 频繁操作dom树结构, 极其消耗性能 优点: 自适应展示 2,设置固定宽高 缺点: 不能自适应展示, 无需消耗额外…...

Web应用框架-Django应用基础(3)-Jinja2
1.创建姓名模板 username里的数据发生改变,页面中渲染的数据发生改变,该效果称为动态数据 #hello/views:def hello_user(request):username000html <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8&quo…...

js(深浅拷贝,节流防抖,this指向,改变this指向的方法)
一、深浅拷贝 1.基本数据类型和引用数据类型的区别: 1. 基本数据类型的变量存储的是值 引用数据类型的变量存储的是地址值 2. 基本数据类型的变量存储的值在栈内存 引用数据类型的变量存储的值在堆内存 3. 基本数据类型的变量存储的是值和值之间相互不影响 引用数据…...

香橙派5(RK3588)使用npu加速yolov5推理的部署过程
香橙派5使用npu加速yolov5推理的部署过程 硬件环境 部署过程 模型训练(x86主机) 在带nvidia显卡(最好)的主机上进行yolo的配置与训练, 获取最终的best.pt模型文件, 详见另一篇文档 模型转换(x86主机) 下载airockchip提供的yolov5(从pt到onnx) 一定要下这个版本的yolov5, …...

基于MWORKS的蓝桥杯「智能装备数字化建模大赛」正式发布,首期培训本周六开启
为强化装备数字化人才培养,推动装备数字化技术快速发展,第十六届蓝桥杯全国软件和信息技术专业人才大赛设置专项赛暨智能装备数字化建模大赛,使用MWORKS作为参赛软件。关于参赛软件授权、技术支持与培训、教材与案例开发支持、成果转化培训及…...

021、深入解析前端请求拦截器
目录 深入解析前端请求拦截器: 1. 引言 2. 核心实现与基础概念 2.1 基础拦截器实现 2.2 响应拦截器配置 3. 实际应用场景 3.1 完整的用户认证系统 3.2 文件上传系统 3.3 API请求缓存系统 3.4 请求重试机制 3.5 国际化处理 4. 性能优化实践 4.1 请求合并…...
windows中的tracert命令
在 Windows 操作系统中,tracert(全称 Trace Route)是一个用于确定 IP 数据包到达目标主机所经过的路径的命令行工具。它通过发送具有不同生存时间(TTL)的 ICMP(Internet Control Message Protocolÿ…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...

MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...

关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...