大数据Flink(八十八):Interval Join(时间区间 Join)

文章目录
Interval Join(时间区间 Join)
Interval Join(时间区间 Join)
Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。
应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以可以理解 Interval Join 就是用于消灭回撤流的。
Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):
- Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
- Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。
- Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
- Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R])
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
使用示例:
间隔联接需要至少一个等联接谓词和在两侧限制时间的联接条件。可以通过比较两个输入表中相同类型的时间属性(即处理时间或事件时间)的两个适当的范围谓词(<, <=, >=, >)BETWEEN谓词或单个相等谓词来定义这种条件。
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
如果订单在收到订单后四个小时内发货,则上面的示例会将所有订单与其相应的发货合并在一起。

实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):
下面为 Inner Interval Join:
Flink SQL> CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time
) WITH ('connector' = 'socket','hostname' = 'node1', 'port' = '8888','format' = 'csv'
);Flink SQL> CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time
)
WITH ('connector' = 'socket','hostname' = 'node1', 'port' = '9999','format' = 'csv'
);Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '10' SECOND AND click_log_table.row_time;
开启netcat的8888端口,输入数据:
1,hadoop,1658300800 ->2022-07-20 15:06:40
开启netcat的9999端口,输入数据:
1,zhangsan,1658300805 ->2022-07-20 15:06:45
输出结果如下:
s_id s_params c_id c_params1 hadoop 1 zhangsan
9999端口,继续输入数据:
1,zhangsan,1658300811 -> 2022-07-20 15:06:51
输出结果没有反应,因为这个时间:2022-07-20 15:06:51超过了时间区间下限。
如果是 Left Interval Join:
Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;
开启netcat的8888端口,输入数据:
1,hadoop,1658300800 ->2022-07-20 15:06:40
开启netcat的9999端口,输入数据:
1,zhangsan,1658300805 ->2022-07-20 15:06:45
输出结果如下:
s_id s_params c_id c_params1 hadoop 1 zhangsan
8888端口,继续输入数据:
1,hadoop,1658300801 ->2022-07-20 15:06:41
1,hadoop,1658300811 ->2022-07-20 15:06:51
输出结果如下:
s_id s_params c_id c_params1 hadoop 1 zhangsan1 hadoop 1 zhangsan
2022-07-20 15:06:51这条数据不会有任何的输出,因为已经超过了右表的边界。
如果是 Full Interval Join:
Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;
开启netcat的8888端口,输入数据:
1,hadoop,1658300800 ->2022-07-20 15:06:40
1,hadoop,1658300801 ->2022-07-20 15:06:41
开启netcat的9999端口,输入数据:
1,zhangsan,1658300805 ->2022-07-20 15:06:45
1,zhangsan,1658300811 ->2022-07-20 15:06:51
输出结果如下:
s_id s_params c_id c_params1 hadoop 1 zhangs1 hadoop 1 zhangsan
- 关于 Interval Join 的注意事项:
实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
大数据Flink(八十八):Interval Join(时间区间 Join)
文章目录 Interval Join(时间区间 Join) Interval Join(时间区间 Join) Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Jo…...
数字IC笔试千题解--判断题篇(五)
前言 出笔试题汇总,是为了总结秋招可能遇到的问题,做题不是目的,在做题的过程中发现自己的漏洞,巩固基础才是目的。 所有题目结果和解释由笔者给出,答案主观性较强,若有错误欢迎评论区指出,资料…...
Kubernetes(k8s)上搭建一主两从的mysql8集群
Kubernetes上搭建一主两从的mysql8集群 环境准备搭建nfs服务器安装NFS暴露nfs目录开启nfs服务器 安装MySQL集群创建命名空间创建MySQL密码的Secret安装MySQL主节点创建pv和pvc主节点的配置文件部署mysql主节点 安装第一个MySQL Slave节点创建pv和pvc第一个从节点配置文件部署my…...
MySQL备份与恢复
MySQL备份与恢复一、备份1、数据备份的重要性2、数据备份分类2.1 物理备份2.2 逻辑备份 3、数据库备份策略4、常用的备份方法和工具5、数据库上云迁移 二、数据库完全备份1、简介2、物理冷备份与恢复2.1 物理冷备份2.2 备份恢复2.3 补充知识date 3、mysqldump备份与恢复3.1 完全…...
【RTOS学习】单片机中的C语言
🐱作者:一只大喵咪1201 🐱专栏:《RTOS学习》 🔥格言:你只管努力,剩下的交给时间! 本喵默认各位小伙伴都会C语言,我们平时学习C语言都是在Windows环境下学习的࿰…...
确知波束形成matlab仿真
阵列信号处理中的导向矢量 假设一均匀线性阵列,有N个阵元组成,满足:远场、窄带假设。 图1. 均匀线性阵模型 假设信源发射信号,来波方向为 θ \theta θ,第一个阵元接收到的信号为 x ( t ) x(t) x(t),则第…...
并发编程相关面试题
线程基础 线程和进程的区别: ----------------------------------------------------------------------- 创建线程的方式: 1 继承Thread类 2 实现runnable接口 3 实现callable 接口(有返回值的) 4 线程池创建线程 ------…...
Cpp/Qt-day050921Qt
目录 实现使用数据库的登录注册功能 头文件: registrwidget.h: widget.h: 源文件: registrwidget.c: widget.h: 效果图: 思维导图 实现使用数据库的登录注册功能 头文件: registrwidget.h: #ifndef REGISTRWIDGET_H #de…...
视频汇聚/视频云存储/视频监控管理平台EasyCVR分发rtsp流起播慢优化步骤详解
安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…...
ElementUI之登陆+注册->饿了吗完成用户登录界面搭建,axios之get请求,axios之post请求,跨域,注册界面
饿了吗完成用户注册登录界面搭建axios之get请求axios之post请求跨域 1.饿了吗完成用户注册登录界面搭建 将端口号8080改为8081 导入依赖,在项目根目录使用命令npm install element-ui -S,添加Element-UI模块 -g:将依赖下载node_glodal全局依…...
2023华为杯研究生数学建模研赛E题出血脑卒中完整论文(含28个详细预处理数据及结果表格)
大家好呀,从发布赛题一直到现在,总算完成了全国研究生数学建模竞赛(数模研赛)E题完整的成品论文。 本论文可以保证原创,保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文。…...
Java中的继承是什么?
在Java中,继承是一种面向对象编程的概念,它允许一个类(称为子类或派生类)继承另一个类(称为父类或基类)的属性和方法。通过继承,子类可以获得父类的属性和方法,并且可以添加自己的特…...
Python - flask后端开发笔记
Flask入门 有一篇很全面的博客可以参考:Python Flask Web 框架入门 跨域问题处理 from flask_cors import CORS CORS(app,supports_credentialsTrue,origins[url], # 前端url列表 ) 文件发送 from flask import send_from_directory app.route(/download) …...
Flutter实现PS钢笔工具,实现高精度抠图的效果。
演示: 代码: import dart:ui;import package:flutter/material.dart hide Image; import package:flutter/services.dart; import package:flutter_screenutil/flutter_screenutil.dart; import package:kq_flutter_widgets/widgets/animate/stack.dart…...
苏宁滑块验证
网址:https://passport.suning.com/ids/login总结一下,别被他的表面现象给骗了,这玩意儿,个人认为,腾讯的都没法跟他比!!! 难点:动态混淆,vmp,图片…...
c语言。。。
gcc thread.c -lpthread -o app -fexec-charsetgbkthread.c为当前目录下编写的c代码 代码中引入了<pthread.h>线程库,所以要加上-lpthread -o app 输出.exe的c可执行文件,文件名为app -fexec-charsetgbk 设置编码方式,防止控制台输出中…...
vue-cli创建项目、vue项目目录结(运行vue项目)、ES6导入导出语法、vue项目编写规范
vue-cli创建项目、vue项目目录结构、 ES6导入导出语法、vue项目编写规范 1 vue-cli创建项目 1.1 vue-cli 命令行创建项目 1.2 使用vue-cli-ui创建 2 vue项目目录结构 2.1 运行vue项目 2.2 vue项目的目录结构 3 es6导入导出语法 4 vue项目编写规范 4.1 修改项目 4.2 以后…...
QT读取DLL加载算法
有这样一个场景,我有一个GUI软件,把他想象成PS软件,集成了很多工具。现在我要添加新算法(PS工具),该怎么办? 有三种办法: 第一种我把新算法代码加到项目中,编译整个项目。 第二种,新…...
HTTPX-用于Python的下一代HTTP客户端
1、前言 在使用 Python 进行接口自动化时,大多数都会使用 requests 模块,requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。 本篇将介绍 Python 的下一代 HTTP 客户端 - HTTPX 2、简介 HTT…...
[LLM+AIGC] 01.应用篇之中文ChatGPT初探及利用ChatGPT润色论文对比浅析(文心一言 | 讯飞星火)
近年来,人工智能技术火热发展,尤其是OpenAI在2022年11月30日发布ChatGPT聊天机器人程序,其使用了Transformer神经网络架构(GPT-3.5),能够基于在预训练阶段所见的模式、统计规律和知识来生成回答,…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)
一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解,适合用作学习或写简历项目背景说明。 🧠 一、概念简介:Solidity 合约开发 Solidity 是一种专门为 以太坊(Ethereum)平台编写智能合约的高级编…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
