【spark-spring boot】学习笔记
目录
- 说明
- RDD学习
- RDD介绍
- RDD案例
- 基于集合创建RDD
- RDD存入外部文件中
- 转换算子 操作
- map 操作
- 说明
- 案例
- flatMap操作
- 说明
- 案例
- filter 操作
- 说明
- 案例
- groupBy 操作
- 说明
- 案例
- distinct 操作
- 说明
- 案例
- sortBy 操作
- 说明
- 案例
- mapToPair 操作
- 说明
- 案例
- mapValues操作
- 说明
- 案例
- groupByKey操作
- 说明
- 案例
- reduceByKey操作
- 说明
- 案例
- sortByKey操作
- 说明
- 案例
- 行动算子 操作
- collect 操作
- 说明
- 案例
- count 操作
- 说明
- 案例
- first操作
- 说明
- 案例
- take操作
- 说明
- 案例
- countByKey操作
- 说明
- 案例
- saveAsTextFile 操作
- 说明
- 案例
- foreach操作
- 说明
- 案例
说明
本文依赖于上一篇文章:【spark学习】 spring boot 整合 spark 的相关内容,请先阅读上一篇文章,将spark需要的环境先配置好,并测试通过之后再进行此文章的阅读和操作。
RDD学习
RDD介绍
想象一下你有一个大大的数据表,里面包含了很多很多的信息。如果你想对这些数据进行操作,比如筛选出符合条件的数据、或者对数据做一些计算,RDD 就是 Spark 用来存储和操作这些数据的一种方式。它有两个基本操作:转换操作(就像是加工数据)和 行动操作(获取结果)
RDD案例
基于集合创建RDD
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/@PostConstructpublic void createRDD() {// 从集合创建 RDDList<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印}
}
输出结果:
RDD存入外部文件中
将RDD存入外部文件,用于观察文件结果和分区结果。txt文件数等于默认分区数。从结果中看出默认分区为4个。
注意:存放结果的文件夹路径必须没有被创建。
以下案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中。
package www.zhangxiaosan.top.timer;import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/@PostConstructpublic void createRDD() {// 从集合创建 RDDList<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");// parallelize(): 创建RDD,RDD中创建默认分区数// parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印// 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 }
}
运行结果:
在文件夹中存放的是运行程序生成的文件。如下图。
_SUCCESS文件:成功标记
part-XXX 文件:保存的数据文件 ,结果中有4个文件,说明默认分区为4个。
转换算子 操作
转换算子(Transformation)是指那些返回一个新的 RDD 的操作。转换算子不会立即执行计算,而是构建一个执行计划,只有当行动算子(Action)触发计算时,转换算子的操作才会实际执行。转换算子可以用来处理和转换数据,比如映射、过滤、聚合等。
map 操作
说明
map() 方法传入一个函数作为参数。map() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:map( 函数( RDD元素 ) )。 map() 方法会返回一个新的RDD。
案例
将RDD中的每个元素都拼接上字符串 “ say hi ”
package www.zhangxiaosan.top.timer;import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/@PostConstructpublic void createRDD() {// 从集合创建 RDDList<String> lists = Arrays.asList("张三", "李四", "王五");JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDD// lambda表达式传入匿名函数,进行元素拼接字符。// item 表示RDD中的元素parallelize = parallelize.map(item -> item + " say hi"); parallelize.collect().forEach(System.out::println); // 打印}
}
输出结果:
flatMap操作
说明
flatMap() 方法传入一个函数作为参数 。flatMap() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:flatMap( 函数( RDD元素 ) )。 flatMap() 方法会返回一个新的RDD。
flatMap() 方法 会将 RDD的元素扁平化处理成一个集合。
例如:
假设你有一个箱子,箱子里面放着几个小盒子,每个小盒子里又有一些玩具。flatMap 就是一个工具,它能帮你把每个小盒子里的玩具拿出来,直接放进一个大盒子里,最终把所有玩具放在一个地方。
每个小盒子里的玩具可以不止一个,甚至可能没有玩具(比如有的盒子是空的)。flatMap 会把每个盒子里的玩具都拿出来,放到一个大盒子里,最终得到一个扁平的大盒子,里面是所有玩具。
案例
将集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]
扁平化处理成:[ 1, 2, 3, 4, 5, hello, spark, 张三, 李四]
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void flatMapDemo() {// 声明集合:[ [1, 2, 3, 4, 5],[hello, spark],[张三, 李四] ]List<List<String>> arrs = Arrays.asList(Arrays.asList("1", "2", "3", "4", "5"),Arrays.asList("hello", "spark"),Arrays.asList("张三", "李四"));//输出声明的集合呢容System.out.println("原始集合打印:");arrs.forEach(item -> System.out.print(" " + item));// 分隔符System.out.println();System.out.println("-----------");System.out.println("flatMap操作后:");// 创建集合的RDDJavaRDD<List<String>> parallelize = javaSparkContext.parallelize(arrs);// flatMap操作List<String> collect = parallelize.flatMap(i -> i.iterator()).collect(); // 打印 flatMap操作 后的集合collect.forEach(item -> System.out.print(" " + item));System.out.println();}
}
filter 操作
说明
filter() 方法传入一个函数作为参数,函数返回值只能为Boolean值。filter() 方法会将RDD中的元素逐一进行调用,函数的参数即是RDD的元素。 如:filter( 函数( RDD元素 ) )。 filter() 方法会返回一个新的RDD。filter() 将RDD元素中满足条件的元素保留,即函数返回为true的元素;RDD中不满足条件的元素,即函数返回为false的元素过滤。
案例
过滤单数,保留双数
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void filterDemo() {// 声明集合List<Integer> arrs = Arrays.asList(-1, 0, 1, 2, 3, 4, 5);System.out.println("原始集合打印:");arrs.forEach(item -> System.out.print(" " + item));// 输出过滤前的数据System.out.println();System.out.println("-----------");System.out.println("filter操作后:");JavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// 过滤掉 单数 (取模不等于0的数字为单数,否则为偶数)。item % 2 == 0 的数字为双数,返回有true保留List<Integer> collect = parallelize.filter(item -> item % 2 == 0).collect();// 输出过滤后的数据collect.forEach(item -> System.out.print(" " + item));System.out.println();}
}
运行结果:
groupBy 操作
说明
将数据按某些条件进行分组。每个组由键值对组成。
groupBy() 常和以下聚合函数一起使用,来对分组数据进行统计分析。
常用的聚合操作包括:
- count(): 统计每个组的元素数量
- sum(): 计算每个组的元素总和
- avg(): 计算每个组的平均值
- max():计算每个组的最大值
- min(): 计算每个组的最小值
- agg(): 自定义聚合操作,可以结合多个聚合函数
案例
给写生成绩按照分数来分组。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void groupByDemo() {// 定义学生成绩数据List<Tuple2<String, Integer>> students = Arrays.asList(new Tuple2<>("张三", 85),new Tuple2<>("李四", 90),new Tuple2<>("王五", 85),new Tuple2<>("赵六", 95),new Tuple2<>("孙七", 90));// 创建RDDJavaRDD<Tuple2<String, Integer>> parallelize = javaSparkContext.parallelize(students);// 使用 groupBy 进行分组,按成绩分组JavaPairRDD<Object, Iterable<Tuple2<String, Integer>>> integerIterableJavaPairRDD = parallelize.groupBy(tuple -> tuple._2());// 使用 groupBy 按成绩分组。_2表示元组的第二个元素,即分数// 打印结果integerIterableJavaPairRDD.sortByKey().foreach(item -> System.out.println("成绩:" + item._1() + ",学生:" + item._2()));}
}
结果如下:
distinct 操作
说明
对RDD的元素进行分布式去重。返回新的RDD,新的RDD内的元素不重复出现。
案例
将集合中重复的元素去除。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void distinctDemo() {// 声明集合List<Integer> arrs = Arrays.asList(1,2,3,4,5,6,1,4,6);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// distinct()方法,用于去除重复元素。JavaRDD<Integer> distinct = parallelize.distinct();// 打印结果distinct.collect().forEach(System.out::println);}
}
运行结果:
sortBy 操作
说明
对RDD的元素进行排序,返回新的RDD。
sortBy() 可传入3个参数:
参数1:函数,每个RDD元素都会传入此函数中,此函数定义排序规则。
参数2:boolean值。定义排序顺序。true为升序,false降序。
参数3:Integer 整数,指定分区数量。
案例
将集合中的元素降序排序。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void sortByDemo() {// 创建无序集合List<Integer> arrs = Arrays.asList(7,2,5,4,1,9,6,3,8);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// 使用 sortBy 进行排序,按元素值排序,并返回一个RDD。JavaRDD<Integer> sortBy = parallelize.sortBy(item -> item, false, 1);// 打印结果sortBy.collect().forEach(System.out::println);}
}
运行结果:
mapToPair 操作
说明
mapToPair()是将一个普通的 RDD 转换为 JavaPairRDD 的一个方法。 JavaPairRDD 中的每个元素都是一个键值对。
mapToPair对RDD的元素进行映射成一个由键值对组成的 RDD(即映射成JavaPairRDD),即将每个元素转换成一个 Tuple2 对象,其中第一个元素是键(key),第二个元素是值(value)。
案例
将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void mapToPairDemo() {// 创建包含学生成绩的列表List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");// 将数据并行化为 RDDJavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {// 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组String[] parts = s.split(":");// 返回一个 (姓名, 成绩) 的元组return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));});// 打印结果studentsPairRDD.collect().forEach(System.out::println);}
}
运行结果:
mapValues操作
说明
mapValues() 是一个用于处理 JavaPairRDD 的方法。它可以对 RDD 中的每个键值对的 值(value) 进行转换,同时保留原来的 键(key) 不变。
案例
将集合 [“张三:85”, “李四:90”, “王五:85”, “赵六:95”, “孙七:90”] 中的学生和成绩按照冒号分隔,形成键值对,姓名为键,值为成绩。并将成绩在原分数上+5分。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void mapToPairDemo() {// 创建包含学生成绩的列表List<String> students = Arrays.asList("张三:85", "李四:90", "王五:85", "赵六:95", "孙七:90");// 将数据并行化为 RDDJavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair((PairFunction<String, String, Integer>) s -> {// 根据冒号分隔学生姓名和成绩,返回一个 (姓名, 成绩) 的元组String[] parts = s.split(":");// 返回一个 (姓名, 成绩) 的元组return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));});// 打印结果System.out.println("原始分数:");studentsPairRDD.collect().forEach(System.out::println);System.out.println("\n+5分后:");// item 表示每个键值对中的值,即分数JavaPairRDD<String, Integer> newValeRdd = studentsPairRDD.mapValues(item -> item + 5);// 打印结果newValeRdd.collect().forEach(System.out::println);}
}
运行结果:
groupByKey操作
说明
groupByKey() 是一个用于处理 JavaPairRDD 的方法。它根据键对数据进行分组,将所有具有相同键的元素聚集到一起,生成一个新的 RDD,其中每个键对应一个包含所有相同键值的集合。
每个键对应的值收集到一个 Iterable 容器中,然后返回一个新的 RDD,其中每个键对应一个包含该键的所有值的 Iterable。
案例
将集合 [“张三:85”, “李四:90”, “陈八:95”, “王五:85”, “黄六:92”] 中的学生和成绩按照冒号分隔,形成键值对,成绩为键,值为姓名。并将键值对中的键进行分类,将相同分数的学生归成一组。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void groupByKeyDemo(){// 创建包含学生成绩的列表List<String> students = Arrays.asList("张三:85", "李四:90", "陈八:95", "王五:85", "黄六:92");// 将数据并行化为 RDDJavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (成绩, 姓名) 的键值对JavaPairRDD<Integer, String> studentsPairRDD = studentsRDD.mapToPair(s -> {String[] parts = s.split(":");return new Tuple2<Integer, String>(Integer.parseInt(parts[1]), parts[0]);});// 根据键值对中的键进行分类JavaPairRDD<Integer, Iterable<String>> groupedRDD = studentsPairRDD.groupByKey();// 打印结果groupedRDD.collect().forEach(pair -> {System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());});}
}
执行结果:
reduceByKey操作
说明
reduceByKey()用于对 JavaPairRDD 数据进行聚合的一个方法。它根据键对值进行合并,并在分区内进行局部聚合,从而减少了跨节点的数据传输,通常比 groupByKey() 更高效。
案例
计算学生的总分。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void groupByKeyDemo(){// 创建包含学生成绩的列表List<String> students = Arrays.asList("张三:85", "李四:90", "张三:95", "王五:85", "李四:92");// 将数据并行化为 RDDJavaRDD<String> studentsRDD = javaSparkContext.parallelize(students);// 使用 mapToPair 将每个元素转换为 (姓名, 成绩) 的键值对JavaPairRDD<String, Integer> studentsPairRDD = studentsRDD.mapToPair(s -> {String[] parts = s.split(":");return new Tuple2<>(parts[0], Integer.parseInt(parts[1]));});// 使用 groupByKey 按学生姓名进行成绩求和JavaPairRDD<String, Integer> groupedRDD = studentsPairRDD.reduceByKey((a, b) -> a + b);// 打印结果groupedRDD.collect().forEach(pair -> {System.out.println("姓名: " + pair._1() + ", 成绩: " + pair._2());});}
}
运行结果:
sortByKey操作
说明
对 JavaPairRDD 数据按键进行排序的方法。默认为升序。
传入参数为Boolean值:true 升序 ,false降序
案例
将学生成绩按照成绩从高到低降序排序
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void sortByKeyDemo(){// 定义原始数据List<Tuple2<String, Integer>> students = Arrays.asList(new Tuple2<>("张三", 85),new Tuple2<>("李四", 90),new Tuple2<>("王五", 65),new Tuple2<>("赵六", 95),new Tuple2<>("孙七", 90));// 生成RDDJavaPairRDD<String, Integer> studentsPairRDD = javaSparkContext.parallelizePairs(students);// 将学生原集合(姓名,成绩)格式的数据转换为(成绩,姓名)格式的键值对JavaPairRDD<Integer, String> integerStringJavaPairRDD = studentsPairRDD.mapToPair(item -> new Tuple2<>(item._2(), item._1()));// 将(成绩,姓名)格式的键值对按照键降序排序,即按照成绩降序排序JavaPairRDD<Integer, String> stringIntegerJavaPairRDD = integerStringJavaPairRDD.sortByKey(false);// 打印结果stringIntegerJavaPairRDD .collect() .forEach(pair -> {System.out.println("成绩: " + pair._1() + ", 姓名: " + pair._2());});}
}
运行结果:
行动算子 操作
行动算子(Action)是指会触发 Spark 作业的执行,并且会产生一个结果或者副作用的操作。与 转换算子(Transformation)不同,转换算子只会定义数据转换的计算逻辑,而不会立即执行。只有在遇到行动算子时,Spark 才会真正开始计算,并将结果返回给用户或写入外部存储。
当你调用一个行动算子时,Spark 会从头开始执行所有必要的转换操作,并将结果返回给你或者存储到外部系统(如 HDFS、数据库等)。
行动算子通常会返回一个具体的结果,例如一个列表、一个数值,或者在某些情况下,可能会执行一些副作用操作(例如将数据写入磁盘)。
collect 操作
说明
将分布式 RDD 中的所有数据项拉取到本地驱动程序(Driver)中,通常作为一个数组、列表或其他集合类型。因为 collect() 会将整个 RDD 数据集拉到本地,所以如果数据量非常大,可能会导致内存溢出(OutOfMemoryError)。
注意事项:
数据量大时的风险:如果 RDD 中包含的数据量非常大,调用 collect() 会导致所有数据被加载到本地驱动程序的内存中,这可能会导致内存溢出错误(OutOfMemoryError)。因此,建议在数据集非常大的情况下谨慎使用 collect()。
建议:对于大规模数据集,通常会使用其他行动算子(如 take())获取一个数据的子集,避免一次性加载所有数据。
并行计算的代价:尽管 collect() 会将所有数据从分布式环境中拉回到单个节点,但它不会对数据进行额外的计算,只会执行之前定义的转换操作。因此,它本质上是将整个数据集的计算结果从集群中汇总回来。
案例
收集RDD数据并打印。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void collectDemo(){// 定义一个整数集合List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// collect()方法,用于将RDD中的数据收集到Driver端,并返回一个List。List<Integer> collect = parallelize.collect();// 打印结果collect.forEach(item -> System.out.println(item));}
}
执行结果:
count 操作
说明
统计RDD中的元素数量。
案例
统计RDD中的元素数量,并输出
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void countDemo(){// 定义一个整数集合List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// count()方法,统计数量。Long total = parallelize.count();// 打印结果System.out.println("元素数量 = " + total);}
}
运行结果:
first操作
说明
返回RDD中的第一个元素
案例
获取RDD中的第一个元素并打印
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void firstDemo(){// 创建一个集合List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// 获取第一个元素Integer first = parallelize.first();// 打印结果System.out.println("第一个元素 = " + first);}
}
运行结果:
take操作
说明
在RDD中从头获取指定数量的元素,返回获取的元素集合。
案例
获取前3个元素
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void takeDemo(){// 创建一个集合List<Integer> arrs = Arrays.asList(1, 2, 3, 4, 5);// 创建RDDJavaRDD<Integer> parallelize = javaSparkContext.parallelize(arrs);// 获取前三个元素List<Integer> takes = parallelize.take(3);// 打印结果System.out.println("前三个元素 = " + takes);}
}
运行结果:
countByKey操作
说明
统计RDD中每种键的数量。
案例
根据键值对,统计键值对中不同键的数量。并打印。
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import scala.Tuple2;
import javax.annotation.PostConstruct;
import java.util.*;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;@PostConstructpublic void countByKeyDemo(){// 创建一个集合List<Tuple2<String,Integer>> arrs = Arrays.asList(new Tuple2<String,Integer>("张三",15),new Tuple2<String,Integer>("张三",20),new Tuple2<String,Integer>("李四",20),new Tuple2<String,Integer>("李四",30),new Tuple2<String,Integer>("李四",50),new Tuple2<String,Integer>("王五",10));// 创建JavaPairRDD 对象,JavaPairRDD对象的元素为键值对。JavaPairRDD<String, Integer> parallelize = javaSparkContext.parallelizePairs(arrs);// 使用 countByKey() 方法,统计相同键的数量。Map<String, Long> countByKey = parallelize.countByKey();// 打印结果countByKey.forEach((key, value) -> System.out.println("键: " + key + ", 数量: " + value));}
}
运行结果:
saveAsTextFile 操作
说明
将 RDD 的数据以txt文件保存到外部存储系统。传入指定路径,文件会生成到该路径下。
注意
输出路径不能存在:如果输出路径已经存在,saveAsTextFile 会抛出异常
文件分区:Spark 会将每个分区的数据写入一个独立的文件。因此,如果 RDD 有多个分区,它会生成多个文件,每个文件对应一个分区的数据 。 文件名会根据分区编号进行自动命名,通常形式是:part-00***
文本格式:保存时,RDD 中的每个元素会被转换为文本行。默认情况下,Spark 会把 RDD 中的每个元素的 toString() 输出到文件中。
路径支持分布式存储:saveAsTextFile 支持将数据保存到本地文件系统、HDFS、S3 等分布式存储中。路径的格式取决于存储系统的类型。例如,如果要保存到 HDFS,路径应该以 hdfs:// 开头。
案例
将RDD存入外部文件。
以下案例中,将基于集合生成的RDD保存到saveRddDemo文件夹中。具体参考本文中的 RDD存入外部文件中 内容
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;/*** spark 案例*/
@Slf4j
@Component
public class DemoTimer {@AutowiredJavaSparkContext javaSparkContext;@AutowiredSparkSession sparkSession;/*** 创建 RDD 算子*/@PostConstructpublic void createRDD() {// 从集合创建 RDDList<String> lists = Arrays.asList("hello", "spark", "hi", "spark", "hadoop");// parallelize(): 创建RDD,RDD中创建默认分区数// parallelize( 元素, 分区数): 创建RDD,RDD中指定分区数JavaRDD<String> parallelize = javaSparkContext.parallelize(lists); // 创建RDDparallelize.collect().forEach(System.out::println); // 打印// 存储的目录文件夹路径。此处为项目中的路径且目录必须为不存在的路径。String fileSavePath="I:\\zhang\\SpringBootSparkDemo\\src\\main\\resources\\saveRddDemo";parallelize.saveAsTextFile(fileSavePath);// 开始将RDD保存到文件中 }
}
foreach操作
说明
循环遍历RDD中的元素
案例
以上案例中大部分用到此方法,是否用方式看以上案例。
相关文章:
【spark-spring boot】学习笔记
目录 说明RDD学习RDD介绍RDD案例基于集合创建RDDRDD存入外部文件中 转换算子 操作map 操作说明案例 flatMap操作说明案例 filter 操作说明案例 groupBy 操作说明案例 distinct 操作说明案例 sortBy 操作说明案例 mapToPair 操作说明案例 mapValues操作说明案例 groupByKey操作说…...
【Python】九大经典排序算法:从入门到精通的详解(冒泡排序、选择排序、插入排序、归并排序、快速排序、堆排序、计数排序、基数排序、桶排序)
文章目录 1. 冒泡排序(Bubble Sort)2. 选择排序(Selection Sort)3. 插入排序(Insertion Sort)4. 归并排序(Merge Sort)5. 快速排序(Quick Sort)6. 堆排序&…...
【346】Postgres内核 Startup Process 通过 signal 与 postmaster 交互实现 (5)
1. Startup Process 进程 postmaster 初始化过程中, 在进入 ServerLoop() 函数之前,会先通过调用 StartChildProcess() 函数来开启辅助进程,这些进程的目的主要用来完成数据库的 XLOG 相关处理。 如: 核实 pg_wal 和 pg_wal/archive_status 文件是否存在Postgres先前是否发…...
Jmeter中的测试片段和非测试原件
1)测试片段 1--测试片段 功能特点 重用性:将常用的测试元素组合成一个测试片段,便于在多个线程组中重用。模块化:提高测试计划的模块化程度,使测试计划更易于管理和维护。灵活性:可以通过模块控制器灵活地…...
利用 Jsoup 进行高效 Web 抓取与 HTML 处理
Jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 JQuery 的操作方法来取出和操作数据。 官网:https://jsoup.org/ 中文文档:Jsou…...
【Java】二叉树:数据海洋中灯塔式结构探秘(上)
个人主页 🌹:喜欢做梦 二叉树中有一个树,我们可以猜到他和树有关,那我们先了解一下什么是树,在来了解一下二叉树 一🍝、树型结构 1🍨.什么是树型结构? 树是一种非线性的数据结构&…...
微信小程序 WXS 的概念与基本用法教程
微信小程序 WXS 的概念与基本用法教程 引言 在微信小程序的开发中,WXS(WeiXin Script)是一种特殊的脚本语言,旨在解决小程序在逻辑处理和数据处理上的一些限制。WXS 允许开发者在小程序的 WXML 中嵌入 JavaScript 代码,以便实现更复杂的逻辑处理。本文将深入探讨 WXS 的…...
Vue.js 中 v-bind 和 v-model 的用法与异同
简介 在 Vue.js 中,v-bind 和 v-model 是两个非常常用且强大的指令,它们分别用于动态地绑定属性和实现双向数据绑定。理解这两个指令的用法和区别对于构建 Vue.js 应用至关重要。本文将详细介绍 v-bind 和 v-model 的用法,并探讨它们的异同。…...
K8s的水平自动扩容和缩容HPA
HPA全称是Horizontal Pod Autoscaler,翻译成中文是POD水平自动伸缩,HPA可以基于CPU利用率对replication controller、deployment和replicaset中的pod数量进行自动扩缩容(除了CPU利用率也可以基于其他应程序提供的度量指标custom metrics进行自…...
【AI日记】24.11.26 聚焦 kaggle 比赛
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 核心工作 1 内容:研究 kaggle 比赛时间:3 小时 核心工作 2 内容:学习 kaggle 比赛 Titanic - Machine Learning from Disaster时间:4 小时备注:这…...
大型语言模型LLM - Finetuning vs Prompting
资料来自台湾大学李宏毅教授机器学课程ML 2023 Spring,如有侵权请通知下架 台大机器学课程ML 2023 Springhttps://speech.ee.ntu.edu.tw/~hylee/ml/2023-spring.php2023/3/10 课程 機器如何生成文句 内容概要 主要探讨了大型语言模型的两种不同期待及其导致的两类…...
【IEEE独立出版 | 厦门大学主办】第四届人工智能、机器人和通信国际会议(ICAIRC 2024,12月27-29日)
第四届人工智能、机器人和通信国际会议(ICAIRC 2024) 2024 4th International Conference on Artificial Intelligence, Robotics, and Communication 重要信息 会议官网:www.icairc.net 三轮截稿时间:2024年11月30日23:59 录…...
【GPT】力量训练是什么,必要吗,有可以替代的方式吗
什么是力量训练? 力量训练是一种通过抵抗力(如重量、阻力带、自身体重等)来刺激肌肉收缩,从而提高肌肉力量、耐力和体积的运动形式。它包括以下常见形式: 自由重量训练:使用哑铃、杠铃、壶铃等。固定器械…...
【03】Selenium+Python 八种定位元素方法
操作元素,需要先查找定位到对应的元素。 查找单个元素:driver.find_element() 返回是一个web element 对象 查找多个元素:driver.find_elements() 返回是一个list对象 By 是 Selenium 中一个非常重要的类,用于定位网页元素。 使…...
笔记:jQuery追加js时会自动加“_时间戳“参数,导致百度统计失败
问题描述: $(document.createElement("script")).attr(id, baidutj).attr(src, https://hm.baidu.com/hm.js?xxx).appendTo(body); 会自动给src加_时间戳的参数? 问题解疑: 【未完待续…】 问题解决: 老老实实按它…...
【PyTorch】(基础二)---- 张量
张量 在 PyTorch 中,张量(Tensor)是核心数据结构,类似于 NumPy 中的数组,但具有更强的计算能力和对 GPU 的支持。 创建 从列表或数组创建 import torch# 从列表创建 tensor_from_list torch.tensor([1, 2, 3, 4])…...
充满智慧的埃塞俄比亚狼
非洲的青山 随着地球温度上升,贝尔山顶峰的冰川消失殆尽,许多野生动物移居到海拔3000米以上的高原上生活,其中就包括埃塞俄比亚狼。埃塞俄比亚狼是埃塞俄比亚特有的动物,总数不到500只,为“濒危”物种。 埃塞俄比亚狼…...
基于STM32设计的智能桌面暖风机(华为云IOT)
一、前言 1.1 项目开发背景 随着智能家居技术的迅猛发展,传统家用电器正逐步向智能化方向转型。暖风机作为冬季广泛使用的取暖设备,其智能化升级不仅能够提高用户的使用体验,还能通过物联网技术实现远程控制和数据监控,赋予其更…...
零基础学安全--云技术基础
目录 学习连接 前言 云技术历史 云服务 公有云服务商 云分类 基础设施即服务(IaaS) 平台即服务(PaaS) 软件即服务(SaaS) 云架构 虚拟化 容器 云架构设计 组件选择 基础设施即代码 集成部署…...
Spring Boot中配置Flink的资源管理
在 Spring Boot 中配置 Flink 的资源管理,需要遵循以下步骤: 添加 Flink 依赖项 在你的 pom.xml 文件中,添加 Flink 和 Flink-connector-kafka 的依赖项。这里以 Flink 1.14 版本为例: <!-- Flink dependencies --><de…...
51单片机从入门到精通:理论与实践指南入门篇(二)
续51单片机从入门到精通:理论与实践指南(一)https://blog.csdn.net/speaking_me/article/details/144067372 第一篇总体给大家在(全局)总体上讲解了一下51单片机,那么接下来几天结束详细讲解,从…...
Notepad++ 替换所有数字给数字加单引号
前言 今天遇到这样一个场景: 要去更新某张表里 code1,2,3,4,5,6 的数据,把它的 name 设置为 ‘张三’ 但是 code在数据库里面的字段类型是 vachar(64),它自身携带索引 原本可以这样写 SQL: update tableA set namezhangsan where code in …...
【CANOE】【Capl】【RS232】控制串口设备
系列文章目录 内置函数,来控制传统的串口设备,比如继电器等 文章目录 系列文章目录前言一、控制串口二、自定义相关的参数RS232Configure**函数语法****函数功能****参数说明****返回值****示例代码** 三、回调函数的使用RS232OnSend**函数语法****函数…...
查找相关题目
1.顺序查找法适合于存储结构为(B )的线性表。 A.散列存储 B.顺序存储或链式存储 C.压缩存储 D.索引存储 顺序查找法的特点 2.适用于折半查找的表的存储方式及元素排列要求为(D ) 。 A.链接方式存储,元素无序 B.链接方式存储࿰…...
《独立开发:Spring 框架的综合应用》
一、Spring 框架概述 Spring 是一个分层的 Java SE/EE full-stack 轻量级开源框架,以 IoC 和 AOP 为内核,具有方便解耦、方便集成优秀框架、降低 Java EE API 使用难度等优点。 Spring 框架因其强大的功能以及卓越的性能而受到众多开发人员的喜爱。它是…...
数据工程流程
** 数据工程流程图** #mermaid-svg-ArT55xCISSfZImy3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ArT55xCISSfZImy3 .error-icon{fill:#552222;}#mermaid-svg-ArT55xCISSfZImy3 .error-text{fill:#552222;stroke…...
Linux宝塔部署wordpress网站更换服务器IP后无法访问管理后台和打开网站页面显示错乱
一、背景: wordpress网站搬家,更换服务器IP后,如果没有域名时,使用服务器IP地址无法访问管理后台和打开网站页面显示错乱。 二、解决方法如下: 1.wordpress搬家后,在新服务器上,新建站点时&am…...
区块链知识体系
1. 区块链基础知识 Q: 什么是区块链? A: 区块链是一种去中心化的分布式账本技术,通过加密算法保证数据的不可篡改性和透明性。它由一系列按时间顺序链接的区块组成,每个区块包含一批交易记录。 Q: 区块链的主要特点是什么? 去…...
力扣第 66 题 “加一”
题目描述 给定一个由 非负整数组成的非空数组,表示一个整数。在该整数的基础上加一。 最高位数字在数组的首位,数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数不会以零开头。 示例 1: 输入: digits [1,2,3] 输出:…...
C语言数据结构与算法--简单实现队列的入队和出队
(一)队列的基本概念 和栈相反,队列(Queue)是一种先进先出(First In First Out)的线性表。只 允许在表的一端进行插入,而在另一端删除元素,如日常生活中的排队现象。队列中 允许插入的一端叫队尾…...
二维码生成器怎么使用/seo优化服务是什么
2014/7/26 今天是删了英雄联盟的第一天,和平时上课的时间一样起来了。很早去了实验室,里面只有一个师弟。 今天打算好学树分治的,可是觉得应该写个题比较好,然后写了CF的一题,看了,不会,看了人家…...
保定做网站/成功品牌策划案例
题目链接 戳我 \(Solution\) 观察发现如果一个数两边都比他大,删掉他可以保证最优,这个应该是显然的。这个东西用单调栈维护一下,最后剩下的就是个单调递减或单调递增的数列,从小到大排个序取前面\(n-2\)个,\(n\)为数列长度 \(Cod…...
重庆市建设工程交易中心/搜索引擎优化培训免费咨询
2019独角兽企业重金招聘Python工程师标准>>> Zookeeper Watch机制 博客分类: zookeeper Znode发生变化(Znode本身的增加,删除,修改,以及子Znode的变化)可以通过Watch机制通知到客户端。那么要实…...
做校园文化展览的网站/国外网站设计
文章目录切比雪夫近似值是什么常见函数的近似值切比雪夫近似值是什么 计算机计算正弦余弦等函数,都不太可能直接泰勒级数展开,因为太耗费计算资源了。除非特殊需要,一般都是使用切比雪夫近似值计算的。当然更不可能用割圆术,割圆术…...
平台类网站开发/福州短视频seo获客
云顶之弈10.12新版本中的赌赵信阵容要怎么进行搭配比较厉害呢?好多小伙伴们可能还不知道呢,来一起看看10.12赌赵信阵容搭配推荐分享吧!10.12赌赵信阵容怎么搭配前期阵容皇子洛赵信妮蔻前期基本什么强上什么,如果没有强的,可以直接空城&#…...
怎么注册个人的网站/百度指数官网入口
from datetime import datetime, datenow_date 2021-6-24 15:23:29.000227 nowdate now_date.strftime(now_date,"%m/%d/%Y") 想从字符串表示的时刻中仅获取“年/月/日”,但运行以上代码会报错str object has no attribute strftime。 改进方法如下&am…...