详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数
1. 比较函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | ===,>,<,!=,>=,<= | id===1001,age>18 |
SQL | =,>,<,!=,>=,<= | id=‘1001’,age>18 |
2. 逻辑函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | &&,||,!,.isFalse | 1>1 && 2>1,true.isFalse |
SQL | and,or,is false,not | 1>1 and 2>1,true is false |
3. 算术函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | +,-,*,/,n1.power(n2) | 1 + 1,2.power(2) |
SQL | +,-,*,/,power(n1,n2) | 1 + 1,power(2,2) |
4. 字符串函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | string1 + string2,.upperCase(),.lowerCase(),.charLength() | ‘a’ + ‘b’, ‘hello’.upperCase() |
SQL | string1 || string2,upper(),lower(),char_length() | ‘a’ || ‘b’,upper(‘hello’) |
5. 时间函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | .toDate,.toTimestamp,currentTime(),n.days,n.minutes | ‘20230107’.toDate, 2.days,10.minutes |
SQL | Date string,Timestamp string,current_time,interval string range | Date ‘20230107’,interval ‘2’ hour/second/minute/day |
6. 聚合函数
API | 函数表达式 | 示例 |
---|---|---|
Table API | .count,.sum,.sum0 | id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0 |
SQL | count(),sum(),rank(),row_number() | count(*),sum(age) |
二、用户自定义函数(UDF)
UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用
1. 标量函数
Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出
/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}
2. 表函数
Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出
/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}
3. 聚合函数
Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出
/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}
4. 表聚合函数
Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出
/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}
相关文章:
详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数 1. 比较函数 API函数表达式示例Table API,>,<,!,>,<id1001,age>18SQL,>,<,!,>,<id‘1001’&…...
rsa加签验签C#和js以及java互通
js实现rsa加签验签 https://github.com/kjur/jsrsasign 11.1.0版本 解压选择需要的版本,这里选择all版本了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>JS RSA加签验签</title&g…...
C语言中数组和指针的关系
在C语言中,数组和指针之间存在着密切的关系,尽管它们在概念上是不同的。以下是关于C语言中数组和指针关系的一些要点: 数组名作为指针: 在大多数情况下,数组名在表达式中会被当作指向其第一个元素的指针。例如&#x…...
idea 新建一个 JSP(JavaServer Pages)项目
环境设置: 确保你的开发环境中已经安装了 Java 开发工具包(JDK)和一个 Java Web 开发的集成开发环境(IDE),比如 Eclipse、IntelliJ IDEA 或者 NetBeans。你还需要一个 Web 服务器,比如 Apache T…...
【名词解释】Unity中的表格布局组件及其使用示例
Unity中的表格布局组件通常指的是GridLayoutGroup,这是一个在Unity的UI系统中用来布局子对象的组件。它可以帮助开发者将UI元素按照网格的形式进行排列,非常适合创建表格、网格视图等布局。 名词解释: GridLayoutGroup:Unity UI…...
判断当前设备为移动端自适应 平板和pc端为375移动端样式
在libs的setRem.js中: let html document.querySelector("html"); function setRem() {let ui_w 375;let cl_w document.documentElement.clientWidth || document.body.clientWidth;cl_w > 750 ? cl_w 375 : "";html.style.fontSize …...
Science Advances|用于胃部pH监测和早期胃漏检测的生物可吸收无线无源柔性传感器(健康监测/柔性传感/柔性电子)
2024年4月19日,美国西北大学 John A. Rogers和中国科学技术大学吕頔(Di Lu)团队,在《Science Advances》上发布了一篇题为“Bioresorbable, wireless, passive sensors for continuous pH measurements and early detection of gastric leakage”的论文。论文内容如下: 一、…...
C# 使用 webview2 嵌入网页
需求:C#客户端程序, 窗口里嵌入一个web网页,可通过URL跳转的那种。并且,需要将登录的身份验证信息(token)设置到请求头里。 核心代码如下: // 打开按钮的点击事件 private void openBtn_Click(object sen…...
公司面试题总结(五)
25.谈一谈箭头函数与普通函数的区别,箭头函数主要解决什么问题? 箭头函数与普通函数的区别: ⚫ 语法简洁性: ◼ 箭头函数使用>符号定义,省略了 function 关键字,使得语法更为紧凑。 ◼ 对于单行函…...
Flutter笔记:关于WebView插件的用法(上)
Flutter笔记 关于WebView插件的用法(上) - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite:http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…...
计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive
课题研究的意义,国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代,人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展,其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法,…...
phpcms仿蚁乐购淘宝客网站模板
phpcms仿蚁乐购网站模板,淘宝客行业模板免费下载,该模板网站很容易吸引访客点击,提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计,且栏目列表以简洁,非常时尚大气。页面根据分辨率大小而自…...
leetcode695 岛屿的最大面积
题目 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合,这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0(代表水)包围着。 岛屿的面积是岛上值…...
小程序无法调用服务端问题排查
1、问题描述 突然有一天线上的小程序不能登录,经查小程序无法调用。经查无法小程序页面无法调用后台服务。 2、排查过程 由于无法登录小程序发布服务器,无法测试小程序前端服务器到服务端网络,并且小程序无法看到日志。所以就得从服务端和网…...
Linux:多线程的操作
多线程操作 进程与线程线程的创建 create_pthread创建线程池给线程传入对象的指针 线程等待 pthread_join退出线程 pthread_exit线程等待参数 retval 与 线程退出参数 retval 线程中断 pthread_cancel获取线程编号 pthread_self线程分离 pthread_detach 进程与线程 进程是资源…...
kunpeng的aarch64架构cpu、openeuler系统、昇腾服务器适配文档转换功能(doc转docx、ppt转pptx)
一、安装flatpak sudo yum install flatpak flatpak remote-add --if-not-exists flathub https://flathub.org/repo/flathub.flatpakrepo二、安装libreoffice flatpak install flathub org.libreoffice.LibreOffice三、使用 对于使用 flatpak 安装的 LibreOffice,不需要手…...
unity 打包PC安装包中常见文件的功能
目录 前言 一、打包好的文件 二、常用文件 1.文件夹XXX_Data 2.文件夹MonoBleedingEdge 3.文件夹XXX_Data内部 三、文件的应用 1.如果你替换了一个图片 2.如果你新增了或减少了图片和资源 3.场景中有变动 4.resources代码加载的资源改了 5.如果你代码替换了 四、作…...
【Ardiuno】实验使用ESP32单片机实现高级web服务器暂时动态图表功能(图文)
接下来,我们继续实验示例代码中的Wifi“高级web服务器”,配置相关的无线密码后,开始实验 #include <WiFi.h> #include <WiFiClient.h> #include <WebServer.h> #include <ESPmDNS.h>const char *ssid "XIAOFE…...
深入浅出服务网格(Service Mesh):现代微服务架构的护航者
什么是服务网格? 服务网格是一种专用于处理微服务间通信的基础设施层,通常以轻量级代理(sidecar)的形式部署在每个服务实例旁边。它主要负责以下几项任务: 服务发现:自动检测和注册服务实例,使…...
node调试
vscode安装插件:JavaScript Debugger (Nightly) 点击后生成一个launch.json文件 打断点,并发送一个请求来执行代码到断点处 按右上的向下箭头,进入源码,进行查看,左边查看变量等值...
docker拉取镜像失败超时的解决方法,docker配置国内镜像源
更换国内源 创建或修改 /etc/docker/daemon.json 文件 安装docker后一般只有 /etc/docker 这个目录 下面并没有 daemon.json 文件 我们直接创建 : vim /etc/docker/daemon.json {"registry-mirrors" : ["https://registry.docker-cn.com"…...
建造气膜结构体育馆需要注意的事项—轻空间
气膜结构体育馆以其快速建造、低成本、灵活性高等优势,越来越受到各类运动场所的青睐。气膜结构利用空气压力支撑膜材,从而形成自持结构,无需传统的钢筋混凝土框架。这类建筑适用于各种气候条件,且可根据需要快速搭建和拆卸。然而…...
使用脚手架创建vue2项目(关闭eslint语法检查 、运行项目时自动打开网址、src文件夹简写方法)
使用脚手架创建vue2项目会默认安装的插件(eslint) 这个插件是检查语法的。 假设我们在main.js中定义了一个变量,没有使用 eslint 就会检测出错误 (事实是我们并没有写错而是eslint 给我们判断是错的,所以这样会很麻烦ÿ…...
谷粒商城实战(036 k8s集群学习2-集群的安装)
Java项目《谷粒商城》架构师级Java项目实战,对标阿里P6-P7,全网最强 总时长 104:45:00 共408P 此文章包含第343p-第p345的内容 k8s 集群安装 kubectl --》命令行操作 要进入服务器 而且对一些不懂代码的产品经理和运维人员不太友好 所以我们使用可视化…...
复旦微FMQL20SM全国产ARM+FPGA核心板,替代xilinx ZYNQ7020系列
FMQL20SM核心板一款全国产工业核心板。基于复旦微FMQL20S400M四核ARM Cortex-A7(PS端) FPGA可编程逻辑资源(PL端)异构多核SoC处理器设计的全国产工业核心板,PS端主频高达1GHz。 核心板简介 FMQL20SM核心板是一款全国…...
NPM常见问题
文章目录 NPM常见问题1. 使用淘宝源安装包出错2. listen EADDRINUSE 服务端口被占用报错3. npm start 启动后过一会崩溃结束:内存溢出4. npm install的时候使用特定的源安装5. npm安装指定版本、最新版本6. npm ERR! cb() never called! 解决7. Unable to authentic…...
二开版视频CMS完整运营源码/新版漂亮APP手机模板/集成员分销功能等
一个二开的影视CMS,直接上传源码至网站根目录,访问网站域名即可安装。 测试环境:Nginx 1.20.1—MySQL 5.6.50–PHP-7.2(安装拓展/fileinfo) 上传源码,访问域名直接安装 后台地址:域名/MDadmi…...
JavaScript的数组排序
天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…...
从Log4j和Fastjson RCE漏洞认识jndi注入
文章目录 前言JNDI注入基础介绍靶场搭建漏洞验证注入工具 log4j RCE漏洞分析漏洞靶场检测工具补丁绕过 Fastjson RCE漏洞分析漏洞靶场检测工具补丁绕过 总结 前言 接着前文的学习《Java反序列化漏洞与URLDNS利用链分析》,想了解为什么 Fastjson 反序列化漏洞的利用…...
7-25 数字三角形问题
7-25 数字三角形问题 分数 10 全屏浏览 作者 夏仁强 单位 贵州工程应用技术学院 给定一个由n行数字组成的数字三角形如下图所示。试设计一个算法,计算出从三角形的顶至底的一条路径,使该路径经过的数字总和最大。 对于给定的由n行数字组成的数字三角…...
郑州小程序网站开发/电商网站分析
文章目录修改虚拟机IP复制网卡的配置Vi编辑器的常用命令实操部分复制网卡的配置Hadoop集群初体验20、secondarynameNode如何辅助管理FSImage与Edits文件 ⭐⭐⭐21、SecondaryNamenode存在的意义是什么? ⭐⭐⭐⭐22、SecondaryNamenode工作的触发因素有哪些ÿ…...
网站如何做收录排行/中国百强县市榜单
2012年5月12日lenky发表评论阅读评论 目前,kvm还不能直接启动ghost镜像,因为qemu支持的磁盘格式里还没有包含ghost,不过可以看到qemu已经支持vmdk,而利用symantec ghost工具恰好又能将ghost镜像转换为vmdk格式,所以虽…...
哪个国家的绘本网站做的好/百度怎么搜索网址打开网页
每篇一句 具备了技术深度,遇到问题可以快速定位并从根本上解决。有了技术深度之后,学习其它技术可以更快,再深入其它技术也就不会害怕 相关阅读 【小家Spring】聊聊Spring中的数据转换:Converter、ConversionService、TypeConvert…...
linux上安装wordpress/seo营销推广全程实例
代理设计模式: 动态代理举例: 接口: 被代理类: 测试:...
网站限制浏览次数是怎么做的/免费推广网站大全集合
如下就是著名额傅里叶变换公式,也是最伟大的数学公式之一我们输入一个有关时间t的函数,就会得到一个有关ω的输出函数,这个公式会告诉我们信号中存在哪些正弦波。为什么这么说呢?如果我们输入一个纯余弦函数或纯正弦波函数&#x…...
建设监督网站首页/seo模板建站
显然,FF已经在日益激烈的新能源造车势力中落后一步。 贾跃亭的FF(法拉第未来)和恒大在2018年最后一天结束了长达半年的“闹剧”,12月31日,恒大健康对外发布公告,表示其与贾跃亭控制的FF终止了原有的投资协…...