当前位置: 首页 > news >正文

FlinkSQL之temporary join开发

2705c43b7f4f8013c53321f32a9a2fed.gif

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。

ffcd1b37f760b4430542917b5e7f8f77.png

背景介绍

关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:

https://www.51cto.com/article/713922.html

目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:

1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。

2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。

3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。

48804d99d95cf0aa49670aa4181b12e5.png

图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》

应用场景&实例分享

当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:

CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM (    SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;

同上我们也需要定义流量日志明细流的watermark,并进行双流join

CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`

结果如下:

--商品标签信息
12:00> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================  t1                 A12:30> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================t1                 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id              time
=======          ========t1              12:00       t1              12:15       t1               12:30       --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id               time              tag(商品标签)
=======          ========        =======================t1              12:00                   At1              12:15                   At1              12:30                   B
开发经验
  稀疏数据处理

由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

  数据延迟下发
  • 问题

在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。

8f9f7e4845d6e74bf7cb6e93c76e09a0.jpeg

  • 原因分析

我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table

select a.*,b.tag
from
(
select * from source_1 
union all 
select * from source_2
union all 
select * from source_3
union all 
select * from source_4
) a
temporay join 
b流

source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。

ccf9a4c7f6f2b304cbb642b493d40e34.jpeg

  • 解法

对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。

7b3d2f9b26327c4027ac08d723febb8d.png

总结

1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。

2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。

5f2cad15473f0438ff03df1f6df8861d.png

参考资料

  • https://www.51cto.com/article/713922.html

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

af16dc18390c18a051935ea9c0e343b9.png

团队介绍

我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

相关文章:

FlinkSQL之temporary join开发

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在…...

第二十六节 直方图均衡化

图像直方图均衡化 图像直方图均衡化可以增强图像增强,对输入图像进行直方图均衡化处理,提升后续对象检测的准确率在Opencv人脸检测的代码演示中已经很常见了,此外对医学影像图像与卫星遥感图像也经常通过直方图均衡化来提升图像质量 Opencv…...

工单管理用什么工具好?8款推荐清单

本文推荐的8款项目工单管理系统有:1. PingCode; 2.Worktile; 3.Teambition; 4.致远OA; 5.TAPD; 6.Gitee; 7.Wrike; 8.Trello。 很多企业在处理项目工单时,依然依赖电子邮件、Excel表格,甚至是手动记录。这样做不仅效率低下,还容易导致工单遗漏…...

工地安全新突破:AI视频监控提升巡检与防护水平

在建筑工地和其他劳动密集型行业,工人的安全一直是管理工作的重中之重。为了确保工地的安全管理更加高效和智能化,AI视频监控卫士。通过人工智能技术,系统不仅能实时监控,还能自动识别工地现场的安全隐患,为工地管理者…...

World of Warcraft [CLASSIC][80][the Ulduar]

Ulduar 奥杜尔副本介绍 奥杜尔共计14个BOSS,通常说的10H就是10个苦难模式就是全通,9H就是除了【观察者奥尔加隆】,特别说明开启【观察者奥尔加隆】,是需要打掉困难模式4个守护者的。 所以人们经常说的类似“10H 观察者”、“10H…...

python实现数据库的增删改查功能,图形化版本

import tkinter from tkinter import * import psycopg2 from tkinter import messagebox#连接信息 t_conn{"dbname": "d1","user": "u1","password": "123qqq...A","port": "15400","h…...

pipeline开发笔记

pipeline开发笔记 jenkins常用插件Build Authorization Token Root配置GitLab的webhooks(钩子)配置构建触发器--示例 piblish over sshBlue OceanWorkspace Cleanup PluginGit插件PipelineLocalization: Chinese (Simplified) --中文显示Build Environment Plugin 显示构建过程…...

spark读取parquet文件

源码 parquet文件读取的入口是FileSourceScanExec,用parquet文件生成对应的RDD 非bucket文件所以走createNonBucketedReadRDD方法。 createNonBucketedReadRDD 过程: 确定文件分割参数 openCostInBytes4M 相关参数spark.sql.files.openCostInBytes4M…...

redis详细教程(1.String类型)

Redis 的 String 类型内部使用了一种叫做 SDS(Simple Dynamic String)的结构。SDS 的设计比传统的 C 语言字符串更加高效和安全,主要特点如下: 头部信息:SDS 的头部包含了一些元数据,比如字符串的长度、剩…...

用友U8接口-库存管理(7)

概括 本文的操作需要正确部署U8API主要讲述库存管理接口的使用,以产成品入库单作为说明,其他单据接口都是大同小异的!许多时候先在ERP做个单,然后仿造ERP单据参数,构造接口JSON参数是不错的做法。 获取Token访问令牌…...

Spring Boot HikariCP数据库连接池入门

1. 概述 在我们的项目中,数据库连接池基本是必不可少的组件。在目前数据库连接池的选型中,主要是 Druid ,为监控而生的数据库连接池。HikariCP ,号称性能最好的数据库连接池。 至于怎么选择,两者都非常优秀&#x…...

Docker快速上手教程:MacOS系统【安装/配置/使用/原理】全链路速通

背景 最近换了个 Macbook Air M3, 写个人项目需要用到 Docker,配置过程有一点点坎坷,还是得记录下避免重蹈覆辙。 什么。为什么是买 Air 而不是 Pro Max? 因为码农的钱也是钱啊。 这里我不会先讲原理,我认为工程的事情都是先看到现象,有了概念的轮廓,才应该去研究原理,…...

【JavaSE】认识String类,了解,进阶到熟练掌握

#1024程序员节 | 征文# 下面就让博主带领大家一起解决心中关于String类的疑问吧~~~ 1.字符串构造: 第一种和第二种(有一定的区别,在常量池上) public static void main(String[] args) { // 使用常量串构造 String s1 "h…...

vue3 vben-admin 窗口大小更改后 echarts尺寸变为 100px的问题

问题描述: 当切换切换tab 并且窗口尺寸更改时, echarts的尺寸因为父元素为 0, 自动设置为 100px 网上查找资料的结果: 1,使用vue 中的 v-if 来重新设置dom树 缺点: 频繁操作dom树结构, 极其消耗性能 优点: 自适应展示 2,设置固定宽高 缺点: 不能自适应展示, 无需消耗额外…...

Web应用框架-Django应用基础(3)-Jinja2

1.创建姓名模板 username里的数据发生改变&#xff0c;页面中渲染的数据发生改变&#xff0c;该效果称为动态数据 #hello/views:def hello_user(request):username000html <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8&quo…...

js(深浅拷贝,节流防抖,this指向,改变this指向的方法)

一、深浅拷贝 1.基本数据类型和引用数据类型的区别&#xff1a; 1. 基本数据类型的变量存储的是值 引用数据类型的变量存储的是地址值 2. 基本数据类型的变量存储的值在栈内存 引用数据类型的变量存储的值在堆内存 3. 基本数据类型的变量存储的是值和值之间相互不影响 引用数据…...

香橙派5(RK3588)使用npu加速yolov5推理的部署过程

香橙派5使用npu加速yolov5推理的部署过程 硬件环境 部署过程 模型训练(x86主机) 在带nvidia显卡(最好)的主机上进行yolo的配置与训练, 获取最终的best.pt模型文件, 详见另一篇文档 模型转换(x86主机) 下载airockchip提供的yolov5(从pt到onnx) 一定要下这个版本的yolov5, …...

基于MWORKS的蓝桥杯「智能装备数字化建模大赛」正式发布,首期培训本周六开启

为强化装备数字化人才培养&#xff0c;推动装备数字化技术快速发展&#xff0c;第十六届蓝桥杯全国软件和信息技术专业人才大赛设置专项赛暨智能装备数字化建模大赛&#xff0c;使用MWORKS作为参赛软件。关于参赛软件授权、技术支持与培训、教材与案例开发支持、成果转化培训及…...

021、深入解析前端请求拦截器

目录 深入解析前端请求拦截器&#xff1a; 1. 引言 2. 核心实现与基础概念 2.1 基础拦截器实现 2.2 响应拦截器配置 3. 实际应用场景 3.1 完整的用户认证系统 3.2 文件上传系统 3.3 API请求缓存系统 3.4 请求重试机制 3.5 国际化处理 4. 性能优化实践 4.1 请求合并…...

windows中的tracert命令

在 Windows 操作系统中&#xff0c;tracert&#xff08;全称 Trace Route&#xff09;是一个用于确定 IP 数据包到达目标主机所经过的路径的命令行工具。它通过发送具有不同生存时间&#xff08;TTL&#xff09;的 ICMP&#xff08;Internet Control Message Protocol&#xff…...

【玩儿】Java 数字炸弹小游戏(控制台版)+ IO 数据存储

Java 数字炸弹小游戏&#xff08;控制台版&#xff09; IO 数据存储 数字炸弹小游戏概述功能实现实体类User.java 玩家信息实体类GameRecode.java 游戏记录实体类 自定义异常AccountLockedException.java 账号锁定异常PasswordErrorException.java 密码错误异常UnknowAccountEx…...

今日头条躺赚流量:自动化新闻爬取和改写脚本

构建一个自动化的新闻爬取和改写系统&#xff0c;实现热点新闻的自动整理和发布&#xff0c;需要分为以下几个模块&#xff1a;新闻爬取、信息解析与抽取、内容改写、自动发布。以下是每个模块的详细实现步骤和代码示例&#xff1a; 1. 新闻爬取模块 目标&#xff1a;从新闻网…...

日常实习与暑期实习详解

日常实习与暑期实习详解 问了下正在实习的同学&#xff0c;发现天要塌了–才知道日常实习是没有笔试的 1. 实习的定义 1.1 日常实习 日常实习是企业长期招聘的实习岗位&#xff0c;通常没有时间限制。企业会在需要时进行招聘&#xff0c;招聘对象包括在校大学生和大一、大二的…...

Git的原理和使用(六)

本文主要讲解企业级开发模型 1. 引入 交付软件的流程&#xff1a;开发->测试->发布上线 上面三个过程可以详细划分为一下过程&#xff1a;规划、编码、构建、测试、发 布、部署和维护 最初&#xff0c;程序⽐较简单&#xff0c;⼯作量不⼤&#xff0c;程序员⼀个⼈可以完…...

Elasticsearch 中的高效按位匹配

作者&#xff1a;来自 Elastic Alexander Marquardt 探索在 Elasticsearch 中编码和匹配二进制数据的六种方法&#xff0c;包括术语编码&#xff08;我喜欢的方法&#xff09;、布尔编码、稀疏位位置编码、具有精确匹配的整数编码、具有脚本按位匹配的整数编码以及使用 ESQL 进…...

LSTM,全称长短期记忆网络(Long Short-Term Memory),是一种特殊的循环神经网络(RNN)结构

关于lstm超参数设置&#xff0c;每个参数都有合适的范围&#xff0c;超过这个范围则lstm训练不再有效&#xff0c;loss不变&#xff0c;acc也不变 LSTM&#xff0c;全称长短期记忆网络&#xff08;Long Short-Term Memory&#xff09;&#xff0c;是一种特殊的循环神经网络&am…...

导出问题处理

问题描述 测试出来一个问题&#xff0c;使用地市的角色&#xff0c;导出数据然后超过了20w的数据&#xff0c;提示报错&#xff0c;我还以为是偶然的问题&#xff0c;然后是发现是普遍的问题&#xff0c;本地环境复现了&#xff0c;然后是&#xff0c;这个功能是三套角色&…...

通过cv库智能切片 把不同的分镜切出来 自媒体抖音快手混剪

用 手机自动化脚本&#xff0c;从自媒体上获取视频&#xff0c;一个商品对应几百个视频&#xff0c;我们把这几百个视频下载下来&#xff0c;进行分镜 视频切片&#xff0c;从自媒体上下载视频&#xff0c;通过cv库用直方图识别每个镜头进行切片。 下载多个图片进行视频的伪原…...

【机器学习】——numpy教程

文章目录 1.numpy简介2.初始化numpy3.ndarry的使用3.1numpy的属性3.2numpy的形状3.3ndarray的类型 4numpy生成数组的方法4.1生成0和1数组4.2从现有的数组生成4.3生成固定范围的数组4.4生成随机数组 5.数组的索引、切片6.数组的形状修改7.数组的类型修改8.数组的去重9.ndarray的…...

多线程——线程的状态

线程状态的意义 ‌线程状态的意义在于描述线程在执行过程中的不同阶段和条件&#xff0c;帮助开发者更好地管理和调度线程资源。 线程的多种状态 线程的状态是一个枚举类型&#xff08;Thread.State&#xff09;&#xff0c;可以通过线程名.getState&#xff08;&#xff09…...

网站建设销售销售流程/友情链接建立遵循的原则包括

注意&#xff1a; 有环无环的时候注意看箭头方向 不要看见成环就当成是有环了 _____________________________________________________________________________ 出度&#xff0c;入度&#xff1a; 无向图&#xff1a; 直接看图说&#xff1a;从1到0也可以 从0到1也可以 …...

广州专门做网站/seo推广排名公司

.col-lg-*: //*:代表列数&#xff0c;一行12列 //表示在屏幕大于等于≥1200px时按照这个规律展示 如&#xff1a; .col-lg-2 //表示在屏幕大于等于≥1200px时占2列而其他的按照表中的信息对照即可 也可以自己去bootstrap官网查找&#xff1a; 网址:https://v3.bootcss.com/cs…...

网站建设功能报价表/关键词挖掘排名

本文内容如有错误、不足之处&#xff0c;欢迎技术爱好者们一同探讨&#xff0c;在本文下面讨论区留言&#xff0c;感谢。 描述&#xff1a;单链表是一种链式存取的数据结构&#xff0c;用一组地址任意的存储单元存放线性表中的数据元素。链表中的数据是以结点来表示的&#xf…...

网站html5自适应/网络营销的三大基础

&nbsp Non-Breaking SPace 转载于:https://blog.51cto.com/algona/728303...

宝鸡住房和城乡建设局网站/国内搜索引擎排名

单向链表 单向链表最简单的实现形式就是由多个节点的集合共同构成一个线性序列。每个节点存储一个对象的引用&#xff0c;这个引用指向序列中的一个元素&#xff0c;即存储指向列表中的下一个节点 链表的第一个节点和最后一个结点分别为列表的头节点和尾节点 实现单向链表的代…...

建设项目查询官网/东莞网站优化公司

Debian/Ubuntu第一步&#xff0c;获取软件gpg --keyserver hkp://keys.gnupg.net --recv-keys 1C4CBDCDCD2EFD2Agpg -a --export CD2EFD2A | sudo apt-key add -添加下列内容到 /etc/apt/sources.list &#xff0c;替换 VERSION 为你的 Linux 分发版名称deb http://repo.percon…...