当前位置: 首页 > news >正文

spark的学习-05

SparkSql

结构化数据与非结构化数据

结构化数据就类似于excel表中的数据(统计的都是结构化的数据)一般都使用sparkSql处理结构化的数据

结构化的文件:JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc

结构化的表:数据库中表的数据:MySQL、Oracle、Hive

我们在sparkcore中导入数据使用的是textFile,而在sparksql中怎么导入数据呢

使用的是DataFrame进行数据的导入

将一些结构化的数据进行sql查询,需要将数据变为表,是表就必须有表结构,表结构就是Schema

一个经典的wordcount案例:

代码如下:(里面有sql和dsl两种写法)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象spark = SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate()print(spark)# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()spark.stop()

以上的代码还可以使用with进行优化

补充:

with的作用: 我们在创建对象的时候,经常需要关闭(close、stop) 如果忘记关闭,太多对象的话就会影响性能,使用with自动帮我们关闭

什么时候可以使用with呢

源码中有 __enter__ 和 __exit__ 的时候就可以使用with进行优化

优化过后的代码: (此时就不需要在手动stop关闭了)

import osfrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("SparkSQL-wordcount案例").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:# 创建一个DataFrame对象,读取数据df = spark.read.text("../../datas/wordcount/data.txt")# 创建一个临时表,表名为 wordcountdf.createOrReplaceTempView("wordcount")# 第一种写法,使用sparksqlspark.sql("""with t as ( select word from wordcount lateral view explode(split(value," ")) wordtemp as word),t2 as (select trim(word) word from t where trim(word) != "")select word,count(1) countNum from t2 group by word order by countNum desc""").show()# 第二种写法,使用 dsldf.select(F.explode(F.split("value"," ")).alias("word")) \.where(" trim(word) != '' ").groupby("word").count().orderBy("count",ascending=False).show()#这里的where(F.trim("word") != "") 还可以写成 where(" trim(word) != '' ")# 还可以这样写df.select(F.explode(F.split("value"," ")).alias("word")) \.where(F.trim("word") != "").groupby(F.col("word")).agg(F.count(F.col("word")).alias("cou")).orderBy(F.col("cou"),ascending=False).show()

一个案例:

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

  • 电影评分数据:datas/movie/ratings.dat【用户id、电影id、评分、评分时间】

数据如下:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
  • 电影信息数据:datas/movie/movies.dat【电影id、电影名称、分类】

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action

首先,给定的数据不是我们所经常使用的格式化数据,所以需要先将数据进行格式化

可以使用RDD的算子将数据改为我们想要的格式化数据

也可以直接利用sql,将非格式化的数据修改为我们需要的格式的数据

写这个案例我们可以利用前面所学的 RDD 和 sparkSQL一起完成这个案例

使用RDD+SparkSQL

代码如下:

import os
import refrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSessionif __name__ == '__main__':os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_131'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = r'D:\BigDate\05-Hadoop\software\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 创建spark对象with SparkSession.builder.master("local[2]").appName("MovieTop10").config("spark.sql.shuffle.partitions", 2).getOrCreate() as spark:print(spark)rating_df = spark.sparkContext.textFile("../../datas/movie/ratings.dat").map(lambda line:re.split("::",line)) \.filter(lambda item:len(item) == 4).map(lambda item:(item[0],item[1],item[2],item[3])) \.toDF(["user_id","movie_id","score","score_time"]).createOrReplaceTempView("rating")# spark.sql("""#     select * from rating# """).show()movie_df = spark.sparkContext.textFile("../../datas/movie/movies.dat") \.map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2])) \.toDF(["movie_id", "movie_name", "movie_categry"]).createOrReplaceTempView("movie")# spark.sql("""#     select * from movie# """).show(truncate=False)#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数spark.sql("""select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000 order by avgRate desc limit 10""").show(truncate=False)# 保留两位小数后,结果可能有重复的,想要获取重复排名也只算一位的可以使用排名函数,dense_rank()spark.sql("""with t as (select m.movie_name,round(avg(r.score),2) avgRate,count(1) countNum from movie m join rating r on m.movie_id = r.movie_idgroup by m.movie_name having countNum >2000),t2 as (select *,dense_rank() over(order by avgRate desc) paiming from t) select * from t2 where paiming <= 10""").show()
复习 排名函数:
1、row_number()

row_number从1开始,按照顺序,生成分组内记录的序列,row_number()的值不会存在重复,当排序的值相同时,按照表中记录的顺序进行排列

效果如下:
98                1
97                2
97                3
96                4
95                5
95                6没有并列名次情况,顺序递增
2、rank()

生成数据项在分组中的排名,排名相等会在名次中留下空位

效果如下:
98                1
97                2
97                2
96                4
95                5
95                5
94                7
有并列名次情况,顺序跳跃递增
3、dense_rank()

生成数据项在分组中的排名,排名相等会在名次中不会留下空位

效果如下:
98                1
97                2
97                2
96                3
95                4
95                4
94                5
有并列名次情况,顺序递增
只使用 SparkSQL:

以上是RDD + sparkSQL的写法, 还可以通过 sparkSQL的写法硬写出来

通过split()方法,根据非格式化数据的分隔符,将数据切成我们需要的DataFrame类型的数据

df1 = spark.read.text("../../datas/movie/movies.dat").createOrReplaceTempView("movie1")
df2 = spark.read.text("../../datas/movie/ratings.dat").createOrReplaceTempView("rating1")#统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1)select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000 order by avgRote desc limit 10
""").show(truncate=False)# 同样也可以写成排名函数
spark.sql("""with m1 as (select split(value,"::")[0] movie_id,split(value,"::")[1] movie_name,split(value,"::")[2] movie_categary from movie1),r1 as ( select split(value,"::")[0] user_id,split(value,"::")[1] movie_id,split(value,"::")[2] score,split(value,"::")[3] score_time from rating1),t as ( select m1.movie_name,round(avg(r1.score),2) avgRote,count(1) countNum from m1 join r1 on m1.movie_id = r1.movie_idgroup by m1.movie_name having countNum >2000),t2 as ( select *,dense_rank() over(order by avgRote desc) paiming from t)select * from t2 where paiming <= 10
""").show(truncate=False)

相关文章:

spark的学习-05

SparkSql 结构化数据与非结构化数据 结构化数据就类似于excel表中的数据&#xff08;统计的都是结构化的数据&#xff09;一般都使用sparkSql处理结构化的数据 结构化的文件&#xff1a;JSON、CSV【以逗号分隔】、TSV【以制表符分隔】、parquet、orc 结构化的表&#xff1a;…...

SQL注入(SQL Injection)详解

SQL注入&#xff08;SQL Injection&#xff09;是一种代码注入技术&#xff0c;它通过在应用程序的输入字段中插入或“注入”恶意的SQL语句&#xff0c;从而操控后端数据库服务器执行非预期的命令。这种攻击方式常用于绕过应用程序的安全措施&#xff0c;未经授权地访问、修改或…...

深入解析 OpenHarmony 构建系统-2-目录结构与核心组件

引言 OpenHarmony作为一款面向全场景的分布式操作系统,其构建系统在开发过程中扮演着至关重要的角色。本文将详细介绍OpenHarmony构建系统的目录结构和核心组件,帮助开发者更好地理解和使用这一强大的工具。 目录结构概览 以下是OpenHarmony构建系统的目录结构,每个目录和…...

网络安全应急响应(归纳)

目录 一、概述二、理论 系统排查 系统基本信息 windowsLinux用户信息 WindowsLinux启动项&#xff1a;开机系统在前台或者后台运行的程序&#xff0c;是病毒等实现持久化驻留的常用方法。 WindowsLinux任务计划&#xff1a;由于很多计算机都会自动加载“任务计划”&#xff0c…...

【网络协议栈】网络层(上)网络层的基本理解、IP协议格式、网络层分组(内附手画分析图 简单易懂)

绪论​ “It does not matter how slowly you go as long as you do not stop.”。本章是自上而下的进入网络协议栈的第三个篇幅–网络层–&#xff0c;本章我将带你了解网络层&#xff0c;以及网络层中非常重要的IP协议格式和网络层的分片组装问题&#xff0c;后面将持续更新网…...

数据库类型介绍

1. 关系型数据库&#xff08;RDBMS&#xff09; 关系型数据库是最常见的一类数据库&#xff0c;它们通过表&#xff08;Table&#xff09;来存储数据&#xff0c;表之间通过关系&#xff08;如主键和外键&#xff09;来关联。 • MySQL&#xff1a;开源的关系型数据库管理系统&…...

一步一步从asp.net core mvc中访问asp.net core WebApi

"从asp.net core mvc中访问asp.net core WebApi"看到这个标题是不是觉得很绕口啊&#xff0c;但的确就是要讲一讲这样的访问。前面我们介绍了微信小程序访问asp.net core webapi(感兴趣的童鞋可以看看前面的博文有关WEBAPI的搭建)&#xff0c;这里我们重点不关心如何…...

linux中kubectl命令使用

一.命令介绍 kubectl 是 Kubernetes 集群管理的命令行工具&#xff0c;用于与 Kubernetes API 交互。你可以通过它来管理和操作 Kubernetes 集群中的资源&#xff0c;如 Pod、Deployment、Service 等。下面是如何在不同操作系统上下载和使用 kubectl 的方法。 二.下载 kubect…...

Linux 系统结构

Linux系统一般有4个主要部分&#xff1a;内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本的操作系统结构&#xff0c;它们使得用户可以运行程序、管理文件并使用系统。 1. linux内核 内核是操作系统的核心&#xff0c;具有很多最基本功能&#xff0c;它…...

ESP32-S3设备智能化升级,物联网无线AI语音交互,让生活更加便捷和有趣

在人工智能和物联网技术的推动下&#xff0c;无线AI语音交互技术正在成为智能设备的新选择。这种技术的发展&#xff0c;不仅改变了我们与设备的沟通方式&#xff0c;更开启了一个新的智能交互方案。 想象一下&#xff0c;通过简单的语音指令&#xff0c;就能控制家中的灯光、…...

Python的函数(补充浅拷贝和深拷贝)

一、定义 函数的定义&#xff1a;实现【特定功能】的代码块。 形参&#xff1a;函数定义时的参数&#xff0c;没有实际意义 实参&#xff1a;函数调用/使用时的参数&#xff0c;有实际意义 函数的作用&#xff1a; 简化代码提高代码重用性便于维护和修改提高代码的可扩展性…...

oracle查询字段类型长度等字段信息

1.查询oracle数据库的字符集 SELECT * FROM NLS_DATABASE_PARAMETERS WHERE PARAMETER NLS_CHARACTERSET; 2.查询字段长度类型 SELECT * FROM user_tab_columns WHERE table_name user AND COLUMN_NAME SNAME 请确保将user替换为您想要查询的表名。sname为字段名 这里的字…...

C语言 | Leetcode C语言题解之第559题N叉树的最大深度

题目&#xff1a; 题解&#xff1a; /*** Definition for a Node.* struct Node {* int val;* int numChildren;* struct Node** children;* };*/int maxDepth(struct Node* root) {if (!root) {return 0;}int depth 0;// 创建空队列const int qCap 10e4 1;str…...

光流法(Optical Flow)

一、简介 光流法&#xff08;Optical Flow&#xff09;是一种用于检测图像序列中像素运动的计算机视觉技术。其基于以下假设&#xff1a; 1.亮度恒定性假设&#xff1a;物体在运动过程中&#xff0c;其像素值在不同帧中保持不变。 2.空间和时间上的连续性&#xff1a;相邻像素之…...

Rancher的安装

1. 概览 1.1 用户界面优势 Rancher 提供了一个直观的图形用户界面&#xff08;GUI&#xff09;。对于不熟悉 Kubernetes 复杂的命令行操作&#xff08;如使用kubectl&#xff09;的用户来说&#xff0c;通过 Rancher 的界面可以方便地进行资源管理。例如&#xff0c;用户可以在…...

【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备

【Linux】获得同一子网下当前在线设备IP/Latency/MAC 通过nmap指定CIDR扫描当前在线设备 通过路由器的后台&#xff0c;查看当前在线设备&#xff0c;受到网卡版本的影响&#xff0c;有时会有部分设备看不见MAC和分配的IP。此时&#xff0c;可以借助命令行工具扫描子网下所有连…...

Ubuntu22.04安装DataEase

看到DataEase的驾驶舱&#xff0c;感觉比PowerBI要好用一点&#xff0c;于是搭建起来玩玩。Dataease推荐的操作系统是Ubuntu22.04/Centos 7。 下载了Ubuntu22.04和DataEase 最新版本的离线安装包 一.安装ubuntu22.04 在安装的时候&#xff0c;没有顺手设置IP地址信息&#xff…...

Taro React-Native IOS 打包发布

http网络请求不到 配置 fix react-native facebook::flipper::SocketCertificateProvider‘ (aka ‘int‘) is not a function or func_rn运行debug提示flipper-CSDN博客 Xcode 15&#xff08;iOS17&#xff09;编译适配报错_no template named function in namespace std-CS…...

【卷积神经网络CNN】基于深度学习动物图像识别系统(完整系统源码+数据库+开发笔记+详细部署教程+启动教程)✅

目录 【卷积神经网络CNN】基于深度学习动物图像识别系统&#xff08;完整系统源码数据库开发笔记详细部署教程启动教程&#xff09;✅ 一、项目背景 二、项目目标 三、项目创新点 四、项目功能 五、开发技术介绍 六、数据库设计 七、启动步骤 八、项目功能展示 九、开…...

图像处理椒盐噪声

椒盐噪声&#xff0c;也称为脉冲噪声&#xff0c;是图像中经常见到的一种噪声。它是一种随机出现的白点或者黑点&#xff0c;可能是亮的区域有黑色像素或是在暗的区域有白色像素&#xff08;或是两者皆有&#xff09;。这些白点和黑点会在图像中随机分布&#xff0c;导致图像中…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...