当前位置: 首页 > news >正文

FlinkSQL之temporary join开发

2705c43b7f4f8013c53321f32a9a2fed.gif

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在此文中,供各位同学参考与交流。

ffcd1b37f760b4430542917b5e7f8f77.png

背景介绍

关于实时flinkSQL的双流join的背景知识可以先阅读以下文章:

https://www.51cto.com/article/713922.html

目前我们有一条流量日志明细的TT流A,以及一条商品标签的TT流B,在flink中对A流和B流进行双流join类似于将A流关联一个hbase维表。temporary join有以下特点:

1. 单流驱动:虽然是双流join,但数据下发只由一条流驱动。

2. 需要定义versioned table,versioned table记录了每个时刻的属性信息,双流join时被动查询。类似于银行汇率表,在货币兑换的时候需要参考兑换时刻的汇率。

3. 查询携带时间版本信息:temporary join携带由两条流的watermark触发,因此查询到的属性是对应时间内的属性。

48804d99d95cf0aa49670aa4181b12e5.png

图片来源:孙金城, 《Blink 漫谈系列 - Temporal Table JOIN》

应用场景&实例分享

当需要根据实时汇率*货币金额计算总金额,实时商品价格*成交件数计算总成交金额时,经常会使用temporary join获取实时的汇率和价格信息。在笔者的流量升级业务迭代中,我们需要获取实时的商品标签,因此需要定义商品标签的versioned table,写法如下:

CREATE TEMPORARY TABLE `tag_ri` (`id` VARCHAR,`tag` VARCHAR,`time` VARCHAR,`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0) --定义watermark
) WITH ('connector' = 'tt','router' = '******','topic' = 'tag_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8'
);--定义version table
CREATE TEMPORARY VIEW `tag`
AS
SELECT `id`, `tag`, `time`, `ts`
FROM (    SELECT `id`, `tag`, `time`, `ts`, ROW_NUMBER() OVER (PARTITION BY `id` --关联主键ORDER BY `time` DESC) AS `rownum`FROM `tag_ri`)
WHERE `rownum` = 1;

同上我们也需要定义流量日志明细流的watermark,并进行双流join

CREATE TEMPORARY TABLE `log_ri` (`id` VARCHAR,`time` VARCHAR,......`ts` AS `TO_TIMESTAMP`(`time`, 'yyyy-MM-dd HH:mm:ss'),WATERMARK FOR `ts` AS `withOffset`(`ts`, 0)
) WITH ('connector' = 'tt','router' = '******','topic' = 'log_ri','lineDelimiter' = '\n','fieldDelimiter' = '\u0001','encoding' = 'utf-8',
);select `a`.`id`,......,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`

结果如下:

--商品标签信息
12:00> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================  t1                 A12:30> SELECT * FROM tag_ri;id              tag(商品标签)
=======      =======================t1                 B--流量明细日志查询 t1商品共三条明细
SELECT * FROM log_ri;id              time
=======          ========t1              12:00       t1              12:15       t1               12:30       --执行temporary join
select `a`.`id`,`a`.`time`,`b`.`tag`
from  (SELECT *FROM `log_ri`) AS `a`
LEFT JOIN `tag` FOR SYSTEM_TIME AS OF `a`.`ts` AS `b` ON `a`.`id` = `b`.`id`id               time              tag(商品标签)
=======          ========        =======================t1              12:00                   At1              12:15                   At1              12:30                   B
开发经验
  稀疏数据处理

由于temporary join是由两条流的watermark触发,如果versioned table是一条稀疏的流(在一段时间内无数据流入),那么join可能存在等待不下发数据的现象,可以通过设置参数 set table.exec.source.idle-timeout = 10s ,可以让A流数据不进行等待,具体参数介绍可以参考:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

  数据延迟下发
  • 问题

在实际开发中,我们发现temporay join后数据一直等待不下发,整点才会进行下发的现象。

8f9f7e4845d6e74bf7cb6e93c76e09a0.jpeg

  • 原因分析

我们结合SQL语法,对TT日志进行回流分析:代码逻辑是四路source union后, join 定义的versioned table

select a.*,b.tag
from
(
select * from source_1 
union all 
select * from source_2
union all 
select * from source_3
union all 
select * from source_4
) a
temporay join 
b流

source_4会在整点流入少部分当前小时59分钟的数据,而temporay join 是由两边的watermark所触发,所以会有a流等待b流的时间到达当前小时59分钟后再触发的现象。

ccf9a4c7f6f2b304cbb642b493d40e34.jpeg

  • 解法

对source_4中log_time>当前时间的部分,做temporary join时将log_time置为当前时间,该问题就解决了。

7b3d2f9b26327c4027ac08d723febb8d.png

总结

1. 在单流驱动的双流join场景中,temporary join是一种常见的处理方式。

2. temporary join由两条流的watermark触发,需要对两条流的watermark进行预处理,防止数据稀疏和数据抢跑等现象影响数据下发。

5f2cad15473f0438ff03df1f6df8861d.png

参考资料

  • https://www.51cto.com/article/713922.html

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

af16dc18390c18a051935ea9c0e343b9.png

团队介绍

我们是淘天集团-业务技术-商家数据团队,专注于开发和维护生意参谋这一全渠道、全链路、一站式的数据平台,同时也负责品牌数据银行和策略中心两大产品。旨在为商家提供全面的数据服务,包括但不限于经营分析、市场洞察、客群洞察等,以帮助商家提高商业决策效率。

¤ 拓展阅读 ¤

3DXR技术 | 终端技术 | 音视频技术

服务端技术 | 技术质量 | 数据算法

相关文章:

FlinkSQL之temporary join开发

在实时开发中,双流join获取目标对应时刻的属性时,经常使用temporary join。笔者在流量升级的实时迭代中,需要让流量日志精准的匹配上浏览时间里对应的商品属性,使用temporary join开发过程中踩坑不少,将一些经验沉淀在…...

第二十六节 直方图均衡化

图像直方图均衡化 图像直方图均衡化可以增强图像增强,对输入图像进行直方图均衡化处理,提升后续对象检测的准确率在Opencv人脸检测的代码演示中已经很常见了,此外对医学影像图像与卫星遥感图像也经常通过直方图均衡化来提升图像质量 Opencv…...

工单管理用什么工具好?8款推荐清单

本文推荐的8款项目工单管理系统有:1. PingCode; 2.Worktile; 3.Teambition; 4.致远OA; 5.TAPD; 6.Gitee; 7.Wrike; 8.Trello。 很多企业在处理项目工单时,依然依赖电子邮件、Excel表格,甚至是手动记录。这样做不仅效率低下,还容易导致工单遗漏…...

工地安全新突破:AI视频监控提升巡检与防护水平

在建筑工地和其他劳动密集型行业,工人的安全一直是管理工作的重中之重。为了确保工地的安全管理更加高效和智能化,AI视频监控卫士。通过人工智能技术,系统不仅能实时监控,还能自动识别工地现场的安全隐患,为工地管理者…...

World of Warcraft [CLASSIC][80][the Ulduar]

Ulduar 奥杜尔副本介绍 奥杜尔共计14个BOSS,通常说的10H就是10个苦难模式就是全通,9H就是除了【观察者奥尔加隆】,特别说明开启【观察者奥尔加隆】,是需要打掉困难模式4个守护者的。 所以人们经常说的类似“10H 观察者”、“10H…...

python实现数据库的增删改查功能,图形化版本

import tkinter from tkinter import * import psycopg2 from tkinter import messagebox#连接信息 t_conn{"dbname": "d1","user": "u1","password": "123qqq...A","port": "15400","h…...

pipeline开发笔记

pipeline开发笔记 jenkins常用插件Build Authorization Token Root配置GitLab的webhooks(钩子)配置构建触发器--示例 piblish over sshBlue OceanWorkspace Cleanup PluginGit插件PipelineLocalization: Chinese (Simplified) --中文显示Build Environment Plugin 显示构建过程…...

spark读取parquet文件

源码 parquet文件读取的入口是FileSourceScanExec,用parquet文件生成对应的RDD 非bucket文件所以走createNonBucketedReadRDD方法。 createNonBucketedReadRDD 过程: 确定文件分割参数 openCostInBytes4M 相关参数spark.sql.files.openCostInBytes4M…...

redis详细教程(1.String类型)

Redis 的 String 类型内部使用了一种叫做 SDS(Simple Dynamic String)的结构。SDS 的设计比传统的 C 语言字符串更加高效和安全,主要特点如下: 头部信息:SDS 的头部包含了一些元数据,比如字符串的长度、剩…...

用友U8接口-库存管理(7)

概括 本文的操作需要正确部署U8API主要讲述库存管理接口的使用,以产成品入库单作为说明,其他单据接口都是大同小异的!许多时候先在ERP做个单,然后仿造ERP单据参数,构造接口JSON参数是不错的做法。 获取Token访问令牌…...

Spring Boot HikariCP数据库连接池入门

1. 概述 在我们的项目中,数据库连接池基本是必不可少的组件。在目前数据库连接池的选型中,主要是 Druid ,为监控而生的数据库连接池。HikariCP ,号称性能最好的数据库连接池。 至于怎么选择,两者都非常优秀&#x…...

Docker快速上手教程:MacOS系统【安装/配置/使用/原理】全链路速通

背景 最近换了个 Macbook Air M3, 写个人项目需要用到 Docker,配置过程有一点点坎坷,还是得记录下避免重蹈覆辙。 什么。为什么是买 Air 而不是 Pro Max? 因为码农的钱也是钱啊。 这里我不会先讲原理,我认为工程的事情都是先看到现象,有了概念的轮廓,才应该去研究原理,…...

【JavaSE】认识String类,了解,进阶到熟练掌握

#1024程序员节 | 征文# 下面就让博主带领大家一起解决心中关于String类的疑问吧~~~ 1.字符串构造: 第一种和第二种(有一定的区别,在常量池上) public static void main(String[] args) { // 使用常量串构造 String s1 "h…...

vue3 vben-admin 窗口大小更改后 echarts尺寸变为 100px的问题

问题描述: 当切换切换tab 并且窗口尺寸更改时, echarts的尺寸因为父元素为 0, 自动设置为 100px 网上查找资料的结果: 1,使用vue 中的 v-if 来重新设置dom树 缺点: 频繁操作dom树结构, 极其消耗性能 优点: 自适应展示 2,设置固定宽高 缺点: 不能自适应展示, 无需消耗额外…...

Web应用框架-Django应用基础(3)-Jinja2

1.创建姓名模板 username里的数据发生改变&#xff0c;页面中渲染的数据发生改变&#xff0c;该效果称为动态数据 #hello/views:def hello_user(request):username000html <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8&quo…...

js(深浅拷贝,节流防抖,this指向,改变this指向的方法)

一、深浅拷贝 1.基本数据类型和引用数据类型的区别&#xff1a; 1. 基本数据类型的变量存储的是值 引用数据类型的变量存储的是地址值 2. 基本数据类型的变量存储的值在栈内存 引用数据类型的变量存储的值在堆内存 3. 基本数据类型的变量存储的是值和值之间相互不影响 引用数据…...

香橙派5(RK3588)使用npu加速yolov5推理的部署过程

香橙派5使用npu加速yolov5推理的部署过程 硬件环境 部署过程 模型训练(x86主机) 在带nvidia显卡(最好)的主机上进行yolo的配置与训练, 获取最终的best.pt模型文件, 详见另一篇文档 模型转换(x86主机) 下载airockchip提供的yolov5(从pt到onnx) 一定要下这个版本的yolov5, …...

基于MWORKS的蓝桥杯「智能装备数字化建模大赛」正式发布,首期培训本周六开启

为强化装备数字化人才培养&#xff0c;推动装备数字化技术快速发展&#xff0c;第十六届蓝桥杯全国软件和信息技术专业人才大赛设置专项赛暨智能装备数字化建模大赛&#xff0c;使用MWORKS作为参赛软件。关于参赛软件授权、技术支持与培训、教材与案例开发支持、成果转化培训及…...

021、深入解析前端请求拦截器

目录 深入解析前端请求拦截器&#xff1a; 1. 引言 2. 核心实现与基础概念 2.1 基础拦截器实现 2.2 响应拦截器配置 3. 实际应用场景 3.1 完整的用户认证系统 3.2 文件上传系统 3.3 API请求缓存系统 3.4 请求重试机制 3.5 国际化处理 4. 性能优化实践 4.1 请求合并…...

windows中的tracert命令

在 Windows 操作系统中&#xff0c;tracert&#xff08;全称 Trace Route&#xff09;是一个用于确定 IP 数据包到达目标主机所经过的路径的命令行工具。它通过发送具有不同生存时间&#xff08;TTL&#xff09;的 ICMP&#xff08;Internet Control Message Protocol&#xff…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

【Linux】自动化构建-Make/Makefile

前言 上文我们讲到了Linux中的编译器gcc/g 【Linux】编译器gcc/g及其库的详细介绍-CSDN博客 本来我们将一个对于编译来说很重要的工具&#xff1a;make/makfile 1.背景 在一个工程中源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;mak…...

华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)

题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...

前端高频面试题2:浏览器/计算机网络

本专栏相关链接 前端高频面试题1&#xff1a;HTML/CSS 前端高频面试题2&#xff1a;浏览器/计算机网络 前端高频面试题3&#xff1a;JavaScript 1.什么是强缓存、协商缓存&#xff1f; 强缓存&#xff1a; 当浏览器请求资源时&#xff0c;首先检查本地缓存是否命中。如果命…...