当前位置: 首页 > news >正文

Spark开发

第一步:创建RDD

Spark提供三种创建RDD方式:** 集合、本地文件、HDFS文件**

  1. 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  2. 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
  3. 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。
使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了

	object CreateRddByArrayscala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")val sc = new SparkContext(conf)//创建集合 driver中执行val arr = Array(1,2,3,4,5)//基于集合创建RDDval rdd =sc.parallelize(arr)//对集合数据求和val sum =rdd.reduce(_ + _)//这行代码再driver中执行println(sum)

** 注意**
val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行parallelize还有reduce之类的操作是在worker节点中执行的

使用本地文件和HDFS文件创建RDD

通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容。textFile()方法支持针对目录、压缩文件以及通配符创建RDD

/*** 通过文件创建RDD*/
object CreateRddByFilescala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")val sc = new SparkContext(conf)var path = "D:\\hello.txt"//path = hdfs://bigdata01:9000/test/hello.txtvar rdd =sc.textFile(path,minPartitions = 2)//获取每一行数据的长度,计算文件内数据的总长度val length = rdd.map(_.length).reduce(_+_)println(length);sc.stop() }
}

** Spark中对RDD的操作**
Spark对RDD的操作可以整体分为两类:Transformation和Action

Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等.
Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序.
不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子
其中Transformation算子有一个特性:** lazy **
lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行.
只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。
Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。
Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行

算子 介绍
map       将RDD中的每个元素进行处理,一进一出
filter    对RDD中每个元素进行判断,返回true则保留
flatMap   与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey  对每个相同key对应的value进行排序操作(全局排序)
join     对两个包含<key,value>对的RDD进行join操作
distinct 对RDD中的元素进行全局去重

Transformation操作开发实战

  1. map:对集合中每个元素乘以2
  2. filter:过滤出集合中的偶数
  3. flatMap:将行拆分为单词
  4. groupByKey:对每个大区的主播进行分组
  5. reduceByKey:统计每个大区的主播数量
  6. sortByKey:对主播的音浪收入排序
  7. join:打印每个主播的大区信息和音浪收入
  8. distinct:统计当天开播的大区信息

scala代码如下:

object TransformationOpScala {def main(args: Array[String]): Unit = {val sc=  getSparkContextgroupByKeyOp(sc)}//flatMap:将行拆分为单词def flatMapOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(" good good study","day day up"))dataRdd.flatMap(_.split(" ")).foreach(println(_))}//groupbyKey 对每个大区主播进行分组def groupByKeyOp(sc: SparkContext): Unit = {val dataRdd =sc.parallelize(Array((150001,"us"),(1500002,"CN"),(150003,"CN"),(1500004,"IN")))//需要使用map对tuple中的数据位置进行互换,因为需要把大区作为key进行分组操作dataRdd.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{//获取大区val area=tup._1println(area+":")//获取同一个大区对应的所有用户idval it = tup._2for(uid <- it){println(uid+" ")}println()})}//filter:过滤出集合中的偶数def filterOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.filter(_ %2 ==0).foreach(println(_))}
//map:对集合中每个元素乘以2def mapOp(sc: SparkContext): Unit = {val dataRdd =  sc.parallelize(Array(1,2,3,4,5))dataRdd.map(_ * 2).foreach(println(_))}private def getSparkContext = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")new SparkContext(conf)}
}

常用Action介绍

算子 介绍
reduce   将RDD中的所有元素进行聚合操作
collect  将RDD中所有元素获取到本地客户端(Driver)
count    获取RDD中元素总数
take(n)  获取RDD中前n个元素
saveAsTextFile 将RDD中元素保存到文件中,对每个元素调用toString
countByKey     对每个key对应的值进行count计数
foreach        遍历RDD中的每个元素

scala代码:

object ActionOpScala {def main(args: Array[String]): Unit = {val sc =getSparkContext//reduce聚合计算//reduceOp(sc)//collect:获取元素集合//colletOp(sc)// count:获取元素总数//countOp(sc)//saveAsTextFile:保存文件//saveAsTextFileOp(sc)//countByKey:统计相同的key出现多少次//countByKeyOp(sc)//foreach:迭代遍历元素foreachOp(sc)sc.stop()}//foreach:迭代遍历元素def foreachOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.foreach(println(_))}//countByKey:统计相同的key出现多少次def countByKeyOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))val res = dataRdd.countByKey()for((k,v) <- res){println(k+","+v)}}//saveAsTextFile:保存文件def saveAsTextFileOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.saveAsTextFile("hdfs://bigdata01:9000/out001")}
// count:获取元素总数def countOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))val res = dataRdd.count()println(res)}//collect:获取元素集合def colletOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))//collect 返回的是一个Array数组val res = dataRdd.collect()for(item <- res){println(item)}}
//reduce聚合计算def reduceOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))val num = dataRdd.reduce(_ + _)println(num)}private def getSparkContext = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")new SparkContext(conf)}
}

相关文章:

Spark开发

第一步&#xff1a;创建RDD Spark提供三种创建RDD方式&#xff1a;** 集合、本地文件、HDFS文件** 使用程序中的集合创建RDD&#xff0c;主要用于进行测试&#xff0c;可以在实际部署到集群运行之前&#xff0c;自己使用集合构造一些测试数据&#xff0c;来测试后面的spark应…...

Tornado异步框架

简介&#xff1a; tornado是Python的web框架。tornado和主流的web服务器框架有明显的区别&#xff1a;它是非阻塞式服务器&#xff0c;而且速度非常快&#xff0c;得力于其非阻塞的方式和epoll的运用tornado可以每秒处理数以千计的连接&#xff08;号称&#xff09; 基本配置 …...

openpnp - error - 吸嘴没下降到板子上, 就将元件松开

文章目录openpnp - error - 吸嘴没下降到板子上, 就将元件松开概述笔记ENDopenpnp - error - 吸嘴没下降到板子上, 就将元件松开 概述 以前用过国内一家openpnp厂家出的设备, 他们家的openpnp是自己改过的. 贴片流程已经走过一遍. 这次还是按照以前记录的笔记, 按照国内那家的…...

【Java】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细

时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称&#xff0c;在AbbreviatedMonthN…...

快鲸SCRM发布口腔企业私域运营解决方案

口腔企业普遍面临着以下几方面运营痛点问题 1、获客成本居高不下&#xff0c;恶性竞争严重 2、管理系统落后&#xff0c;人员流失严重 3、客户顾虑多、决策时间长 4、老客户易流失&#xff0c;粘性差 以上这些痛点&#xff0c;不得不倒逼口腔企业向精细化运营客户迈进。 …...

Verilog实现组合逻辑电路

在verilog 中可以实现的数字电路主要分为两类----组合逻辑电路和时序逻辑电路。组合逻辑电路比较简单&#xff0c;仅由基本逻辑门组成---如与门、或门和非门等。当电路的输入发生变化时&#xff0c;输出几乎&#xff08;信号在电路中传递时会有一小段延迟&#xff09;立即就发生…...

2023前端菜鸟笔试血泪史html5-one--找到工作前都更新

1.说说对html语义化的理解 什么的HTML语义化&#xff0c;顾名思义&#xff0c;HTML语义化就是可以不通过了解HTML的内容&#xff0c;就可以知道这个部分所代表的的意义。 HTML语义化的意义&#xff1a;在使用HTML标签构建页面时&#xff0c;避免大篇幅的使用无语义的标签。 …...

蓝牙调试工具集合汇总

BLE 该部分主要分享一下常用的蓝牙调试工具&#xff0c;方便后续蓝牙抓包及分析。 目录 1 hciconfig 2 hcitool 3 hcidump 4 hciattach 5 btmon 6 bluetoothd 7 bluetoothctl 1 hciconfig 工具介绍&#xff1a;hciconfig&#xff0c;HCI 设备配置工具 命令格式&…...

Java 获取文件后缀名【一文总结所有方法】

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

UML常见图的总结

一、概述 UML&#xff1a;Unified Modeling Language&#xff0c;统一建模语言&#xff0c;支持从需求分析开始的软件开发的全过程。是一个支持模型化和软件系统开发的图形化语言、为软件开发的所有阶段提供模型化和可视化支持&#xff0c;包括由需求分析到规格&#xff0c;到…...

WebRTC系列-工具系列之音频相关工具

文章目录 1. audio_util数据格式转换类2. WavFile文件读写类2.1 读取wav文件2.2 写入wav文件这篇文章主要介绍WebRTC中一些音频工具这些,大部分都在 common_audio目录下,这个文件夹下提供音频的大量算法,包括sinc重采样算法,音频数据格式的转换:例如 float转int16_t格式等…...

7 线性回归及Python实现

1 统计指标 随机变量XXX的理论平均值称为期望: μE(X)\mu E(X)μE(X)但现实中通常不知道μ\muμ, 因此使用已知样本来获取均值 X‾1n∑i1nXi.\overline{X} \frac{1}{n} \sum_{i 1}^n X_i. Xn1​i1∑n​Xi​.方差variance定义为&#xff1a; σ2E(∣X−μ∣2).\sigma^2 E(|…...

适合小团队协作、任务管理、计划和进度跟踪的项目任务管理工具有哪些?

适合小团队协作、任务管理、计划和进度跟踪的项目任务管理工具有哪些? 大家可以参考这个模板&#xff1a;http://s.fanruan.com/irhj8管理项目归根结底在管理人、物&#xff0c;扩展来说便是&#xff1a; 人&#xff1a;员工能力、组织机制&#xff1b; 物&#xff1a;项目内…...

从100%进口到自主可控,从600块降到10块,中科院攻克重要芯片

前言 2月28日&#xff0c;“20多位中科院专家把芯片价格打到10块”冲上微博热搜&#xff0c;据河南省官媒大象新闻报道&#xff0c;热搜中提到的中科院专家所在企业为全球最大的PLC分路器芯片制造商仕佳光子&#xff0c;坐落于河南鹤壁。 为实现芯片技术自主可控自立自强&#…...

关于git的一些基本点总结

1.什么是git? git是一个常用的分布式版本管理工具。 2.git 的常用命令: clone&#xff08;克隆&#xff09;: 从远程仓库中克隆代码到本地仓库 checkout &#xff08;检出&#xff09;:从本地仓库中检出一个仓库分支然后进行修订 add&#xff08;添加&#xff09;: 在提交前…...

PyTorch保姆级安装教程

1 安装CUDA1.1 查找Nvidia适用的CUDA版本桌面右键&#xff0c;【打开 NVIDIA控制面板】查看【系统信息】查看NVIDIA的支持的CUDA的版本&#xff0c;下图可知支持的版本是 10.11.2 下载CUDACUDA下载官方网址https://developer.nvidia.com/cuda-toolkit-archive找到适合的版本下载…...

MySQL 上亿大表如何优化?

背景XX 实例&#xff08;一主一从&#xff09;xxx 告警中每天凌晨在报 SLA 报警&#xff0c;该报警的意思是存在一定的主从延迟。&#xff08;若在此时发生主从切换&#xff0c;需要长时间才可以完成切换&#xff0c;要追延迟来保证主从数据的一致性&#xff09;XX 实例的慢查询…...

Git(狂神课堂笔记)

1.首先去git官网下载我们对应的版本Git - Downloading Package (git-scm.com) 2.安装后我们会发现git文件夹里有三个应用程序&#xff1a; Git Bash&#xff1a;Unix与Linux风格的命令行&#xff0c;使用最多&#xff0c;推荐最多 Git CMD&#xff1a;Windows风格的命令行 G…...

「2」指针进阶,最详细指针和数组难题解题思路

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练 &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下定决心去做” &#x1f680;&#x1f680;&#x1f680;大家觉不错…...

云服务器是做什么的?云服务器典型的应用场景介绍

云服务器可能是很多企业以及个人上云用户的必选产品了&#xff0c;但是对于初学者或者非专业的用户来说云服务器还是比较陌生的&#xff0c;它到底是干什么的&#xff0c;如此生活中哪些地方可以接触到&#xff0c;这篇文章将详细的介绍云服务器使用的应用场景以及相关的操作 本…...

【论文随笔】Transfer of temporal logic formulas in reinforcement learning

Zhe Xu and Ufuk Topcu. 2019. Transfer of temporal logic formulas in reinforcement learning. In Proceedings of the 28th International Joint Conference on Artificial Intelligence (IJCAI’19). AAAI Press, 4010–4018. 这是一篇将inference和learning结合起来的文章…...

蓝桥杯-货物摆放

蓝桥杯-货物摆放1、题目描述1.1 答案提交1.2 运行限制2、解决方案2.1 方案一&#xff1a;暴力解法(三重循环)2.2 方案二&#xff1a;找出乘机的因子1、题目描述 小蓝有一个超大的仓库&#xff0c;可以摆放很多货物。 现在&#xff0c;小蓝有 n 箱货物要摆放在仓库&#xff0c;每…...

10 种顶流聚类算法 Python 实现(附完整代码)

聚类或聚类分析是无监督学习问题。它通常被用作数据分析技术&#xff0c;用于发现数据中的有趣模式&#xff0c;例如基于其行为的客户群。 有许多聚类算法可供选择&#xff0c;对于所有情况&#xff0c;没有单一的最佳聚类算法。相反&#xff0c;最好探索一系列聚类算法以及每…...

微信小程序第一节 —— 自定义顶部、底部导航栏以及获取胶囊体位置信息。

一、前言 大家好&#xff01;我是 是江迪呀。我们在进行微信小程序开发时&#xff0c;常常需要自定义一些东西&#xff0c;比如自定义顶部导航、自定义底部导航等等。那么知道这些自定义内容的具体位置、以及如何适配不同的机型就变得尤为重要。下面让我以在iPhone机型&#x…...

快速吃透π型滤波电路-LC-RC滤波器

π型滤波器简介 π型滤波器包括两个电容器和一个电感器&#xff0c;它的输入和输出都呈低阻抗。π型滤波有RC和LC两种&#xff0c; 在输出电流不大的情况下用RC&#xff0c;R的取值不能太大&#xff0c;一般几个至几十欧姆&#xff0c;其优点是成本低。其缺点是电阻要消耗一些…...

聊聊混沌工程

这是鼎叔的第五十四篇原创文章。行业大牛和刚毕业的小白&#xff0c;都可以进来聊聊。欢迎关注本专栏和微信公众号《敏捷测试转型》&#xff0c;大量原创思考文章陆续推出。混沌工程是一门新兴学科&#xff0c;它不仅仅只是个技术活动&#xff0c;还包含如何设计能够持续协作的…...

做为骨干网络的分类模型的预训代码安装配置简单记录

一、安装配置环境 1、准备工作 代码地址 GitHub - bubbliiiing/classification-pytorch: 这是各个主干网络分类模型的源码&#xff0c;可以用于训练自己的分类模型。 # 创建环境 conda create -n ptorch1_2_0 python3.6 # 然后启动 conda install pytorch1.2.0 torchvision…...

网络协议(九):应用层(域名、DNS、DHCP)

网络协议系列文章 网络协议(一)&#xff1a;基本概念、计算机之间的连接方式 网络协议(二)&#xff1a;MAC地址、IP地址、子网掩码、子网和超网 网络协议(三)&#xff1a;路由器原理及数据包传输过程 网络协议(四)&#xff1a;网络分类、ISP、上网方式、公网私网、NAT 网络…...

有趣的小知识(三)提升网站速度的秘诀:掌握缓存基础,让你的网站秒开

像MySql等传统的关系型数据库已经不能适用于所有的业务场景&#xff0c;比如电商系统的秒杀场景&#xff0c;APP首页的访问流量高峰场景&#xff0c;很容易造成关系型数据库的瘫痪&#xff0c;随着缓存技术的出现很好的解决了这个问题。 一、缓存的概念&#xff08;什么是缓存…...

SpringCloud之服务拆分和实现远程调用案例

服务拆分对单体架构项目来说&#xff1a;简单方便&#xff0c;高度耦合&#xff0c;扩展性差&#xff0c;适合小型项目。而对于分布式架构来说&#xff1a;低耦合&#xff0c;扩展性好&#xff0c;但架构复杂&#xff0c;难度大。微服务就是一种良好的分布式架构方案&#xff1…...

商丘专业做网站公司/网络营销推广公司

Eclipse January是Java中的一组常见数据结构&#xff0c;包括一个用于处理数字数据的强大库。 随着数据量和复杂性的急剧增加&#xff08;即所谓的“大数据”&#xff09;&#xff0c;Eclipse January提供了一个数字库&#xff0c;可以简化多维数组形式的数据的处理和操作。 库…...

做公司网站怎么做/网络推广网站

原文地址&#xff1a;Oracle查看当前用户权限作者&#xff1a;zhangjianjiekf1、查看当前用户拥有的角色权限信息 select * from role_sys_privs; 2、查看所有用户&#xff1a; select * from all_users; 3、查看当前用户的详细信息&#xff1a; select * from user_users; …...

api网站模板/资源搜索引擎

在网上看了几篇AdaBoost的介绍后&#xff0c;感觉网上介绍的都不好&#xff0c;不能够让人完全理解&#xff0c;因此就下载了一个外国人写的代码&#xff0c;总算透彻的理解了AdaBoost&#xff0c;可以向Transfer开进了&#xff0c;现在分享一下代码&#xff1a;主函数代码[pla…...

天津网站建设 文率科技/怎么弄推广广告

0. 前言 听了实验室6位老师的讲座之后&#xff0c;对老师们的研究内容有了基本的认识。下面将对这6次的内容做个总结。 1. 主动学习 主动学习方法&#xff0c;是指能够从任务出发&#xff0c;通过对任务的理解来制定标准&#xff0c;挑选最关键的样本&#xff0c;使其最有助于…...

英语网站大全免费/市场调研报告怎么写的

使用JavaScript开发IE浏览器本地插件实例 投稿&#xff1a;junjie 字体&#xff1a;[增加 减小] 类型&#xff1a;转载 时间&#xff1a;2015-02-18 我要评论 这篇文章主要介绍了使用JavaScript开发IE浏览器本地插件实例,本文讲解使用JS注册表的方式开发一个IE浏览器本地插件,需…...

简单的网站设计怎么做/海外推广代理商

Hyperledger Fabric笔记2--运行fabric测试网络 1、获取fabric相关源代码 首先&#xff0c;需要在/opt/gopath中新建目录, mkdir -p /opt/gopath/src/github.com/hyperledger/ 其次&#xff0c;下载fabric源码&#xff0c;git clone https://github.com/hyperledger/fabric.g…...