Spark 7:Spark SQL 函数定义
SparkSQL 定义UDF函数


方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名
# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])df = rdd.toDF(["num"])# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格# 参数2: UDF的处理逻辑, 是一个单独的方法# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算df.selectExpr("udf1(num)").show()# DSL 风格中使用# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象df.select(udf2(df['num'])).show()# TODO 2: 方式2注册, 仅能用于DSL风格udf3 = F.udf(num_ride_10, IntegerType())df.select(udf3(df['num'])).show()df.selectExpr("udf3(num)").show()

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])df = rdd.toDF(["line"])# 注册UDF, UDF的执行函数定义def split_line(data):return data.split(" ") # 返回值是一个Array对象# TODO1 方式1 构建UDFudf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))# DLS风格df.select(udf2(df['line'])).show()# SQL风格df.createTempView("lines")spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)# TODO 2 方式2的形式构建UDFudf3 = F.udf(split_line, ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

# coding:utf8
import string
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 假设 有三个数字 1 2 3 我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回# 比如传入1 我们返回 {"num":1, "letters": "a"}rdd = sc.parallelize([[1], [2], [3]])df = rdd.toDF(["num"])# 注册UDFdef process(data):return {"num": data, "letters": string.ascii_letters[data]}"""UDF的返回值是字典的话, 需要用StructType来接收"""udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\add("letters", StringType(), nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)

SparkSQL 使用窗口函数


# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder. \appName("create df"). \master("local[*]"). \config("spark.sql.shuffle.partitions", "2"). \getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([('张三', 'class_1', 99),('王五', 'class_2', 35),('王三', 'class_3', 57),('王久', 'class_4', 12),('王丽', 'class_5', 99),('王娟', 'class_1', 90),('王军', 'class_2', 91),('王俊', 'class_3', 33),('王君', 'class_4', 55),('王珺', 'class_5', 66),('郑颖', 'class_1', 11),('郑辉', 'class_2', 33),('张丽', 'class_3', 36),('张张', 'class_4', 79),('黄凯', 'class_5', 90),('黄开', 'class_1', 90),('黄恺', 'class_2', 90),('王凯', 'class_3', 11),('王凯杰', 'class_1', 11),('王开杰', 'class_2', 3),('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \add("class", StringType()). \add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()
SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等
相关文章:
Spark 7:Spark SQL 函数定义
SparkSQL 定义UDF函数 方式1语法: udf对象 sparksession.udf.register(参数1,参数2,参数3) 参数1:UDF名称,可用于SQL风格 参数2:被注册成UDF的方法名 参数3:声明UDF的返回值类型 ud…...
ThinkPHP 文件上传 fileSystem 扩展的使用
ThinkPHP 文件上传 ThinkPHP 文件上传 扩展 filesystem一、安装 FileSystem 扩展二、认识 filesystem 配置文件 config/filesystem.php三、上传验证(涉及到验证器的知识点)四、文件上传demo ThinkPHP 文件上传 扩展 filesystem ThinkPHP 为我们 提供了 …...
液体神经网络LLN:通过动态信息流彻底改变人工智能
巴乌米克泰吉 一、说明 在在人工智能领域,神经网络已被证明是解决复杂问题的非常强大的工具。多年来,研究人员不断寻求创新方法来提高其性能并扩展其能力。其中一种方法是液体神经网络(LNN)的概念,这是一个利用动态计算…...
2023年的今天,PMP项目管理认证还值得考吗?
首先我肯定它值得考,PMP认证的教材和考纲都会随着项目管理工具和市场趋势而更新,不用担心会过时。 PMP项目管理认证是什么? 英文全称是Project Management Professional,中文全称叫做项目管理专业人士资格认证。它是由美国项目管…...
【JavaSE专栏91】Java如何主动发起Http、Https请求?
作者主页:Designer 小郑 作者简介:3年JAVA全栈开发经验,专注JAVA技术、系统定制、远程指导,致力于企业数字化转型,CSDN学院、蓝桥云课认证讲师。 主打方向:Vue、SpringBoot、微信小程序 本文讲解了如何使用…...
给oracle逻辑导出clob大字段、大数据量表提提速
文章目录 前言一、大表数据附:查询大表 二、解题思路1.导出排除大表的数据2.rowid切片导出大表数据Linux代码如下(示例):Windows代码如下(示例):手工执行代码如下(示例)&…...
研发规范第九讲:通用类命名规范(重点)
研发规范第九讲:通用类命名规范(重点) 无规范不成方圆。我自己非常注重搭建项目结构的起步过程,应用命名规范、模块的划分、目录(包)的命名,我觉得非常重要,如果做的足够好ÿ…...
python+django+协同过滤算法-基于爬虫的个性化书籍推荐系统(包含报告+源码+开题)
为了提高个性化书籍推荐信息管理的效率;充分利用现有资源;减少不必要的人力、物力和财政支出来实现管理人员更充分掌握个性化书籍推荐信息的管理;开发设计专用系统--基于爬虫的个性化书籍推荐系统来进行管理个性化书籍推荐信息,以…...
系统架构:软件工程
文章目录 资源知识点自顶向下与自底向上形式化方法结构化方法敏捷方法净室软件工程面向服务的方法面向对象的方法快速应用开发螺旋模型软件过程和活动开放式源码开发方法功用驱动开发方法统一过程模型RUP基于构件的软件开发UML 资源 信息系统开发方法 知识点 自顶向下与自底…...
泰迪大数据实训平台产品介绍
大数据产品包括:大数据实训管理平台、大数据开发实训平台、大数据编程实训平台等 大数据实训管理平台 泰迪大数据实训平台从课程管理、资源管理、实训管理等方面出发,主要解决现有实验室无法满足教学需求、传统教学流程和工具低效耗时和内部教学…...
Linux- 文件夹相关的常用指令
1. 统计文件夹下的文件数量 在 Linux 下,有几种方法可以统计文件夹下的文件数量: 使用 ls 和 wc 命令: 这种方式可以统计目录下的直接子文件(不包括子目录里的文件)。 ls -l <目录路径> | wc -l注意:…...
在 macOS 中安装 TensorFlow 1g
tensorflow 需要多大空间 pip install tensorflow pip install tensorflow Looking in indexes: https://pypi.douban.com/simple/ Collecting tensorflowDownloading https://pypi.doubanio.com/packages/1a/c1/9c14df0625836af8ba6628585c6d3c3bf8f1e1101cafa2435eb28a7764…...
数学建模:CRITIC赋权法
🔆 文章首发于我的个人博客:欢迎大佬们来逛逛 CRITIC赋权法 算法流程 构建原始数据矩阵 X X X,他是一个 m ∗ n m * n m∗n 的矩阵, m m m 表示评价对象个数, n n n 表示指标个数对原始数据矩阵进行正向化处理计算…...
Facebook message tag 使用攻略
Messenger 讯息传不出去?无法发送FB 讯息给非好友? 2020年3月,Facebook 为了防止用户被过多的推广或垃圾讯息困扰而更新使用条款,现在商家要用FB传讯息给所有人(包括非好友),应该使用 Facebook …...
气传导耳机哪个品牌比较好?综合表现很不错的气传导耳机推荐
气传导耳机不仅能够提升幸福感还能听到周围环境声,大大提高安全性。如果你在寻找一款高品质的气传导耳机,又不知从何入手时,不要担心,我已经为你精心挑选了四款市面上综合表现很不错的气传导耳机,让你享受更好的音质…...
Rabbitmq的消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象 ,只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题: 数据体积过大 有安全漏洞 可读…...
nvidia-docker的使用
拉取镜像 docker pull nvidia/cuda可能出现的问题 问题描述 Error response from daemon: manifest for nvidia/cuda:latest not found: manifest unknown: manifest解决方法: 为找到正确且合适的docker镜像版本 在supported-tags中找到与自己系统对应的cuda版本…...
C++新经典 | C语言
目录 一、基础之查漏补缺 1.float精度问题 2.字符型数据 3.变量初值问题 4.赋值&初始化 5.头文件之<> VS " " 6.逻辑运算 7.数组 7.1 二维数组初始化 7.2 字符数组 8.字符串处理函数 8.1 strcat 8.2 strcpy 8.3 strcmp 8.4 strlen 9.函数 …...
物联网智慧种植农业大棚系统
一、项目背景 智慧农业是是将物联网技术和农业生产箱管理的新型农业,依托部署在农业生产现场的各种传感节点,以物联网网关为通道形成数据传输网络,可以实现控制柜、环境监测传感器、气象监测机器等设备的远程监控,达到及时高校的…...
TabBar组件如何跳转页面?
1、先引入 2、假数据 const tabs [{key: home,title: 首页,icon: <AppOutline />,badge: Badge.dot,},{key: todo,title: 待办,icon: <UnorderedListOutline />,badge: 5,},{key: message,title: 消息,icon: (active: boolean) >active ? <MessageFill /&…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
什么是库存周转?如何用进销存系统提高库存周转率?
你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
