当前位置: 首页 > news >正文

亿万级海量数据去重软方法

文章目录

  • 原理
  • 案例一
        • 需求:
        • 方法
  • 案例二
        • 需求:
        • 方法:
  • 参考

原理

在大数据分布式计算框架生态下,提升计算效率的方法是尽可能的把计算分布式话、并行化,避免单节点计算过载,把计算分摊到各个节点。这样解释小白能够听懂:比如你有5个桶,怎样轻松地把A池子的水倒入B池子里?

  • 最大并行化,5个桶同时利用,避免count distinct只用一个桶的方法
  • 重复利用化,一次提不动那么多水,不要打肿脸充胖子,一不小心oom,为什么不分几次呢
  • 数据均衡化,5个桶的水不要一个多一个少的,第一个提水的次数变多,第二个某些桶扛不住,俗称数据倾斜

案例一

需求:

计算day_num维度下的uv,自己脑补出海量数据,这里为方便说明,只列举了day_num,一个维度用桶来描绘计算模型,假设数据都是按字典顺序分桶

> select * from event;
+----------------+------------+
| event.day_num  | event.uid  |
+----------------+------------+
| day1           | a          |
| day1           | a          |
| day1           | a          |
| day1           | a          |
| day1           | bb         |
| day1           | bb         |
| day1           | bbb        |
| day1           | ccc        |
| day1           | ccc        |
| day1           | dddd       |
| day1           | eeee       |
| day1           | eeeee      |
| day1           | eeeee      |
| day1           | eeeee      |
+----------------+------------+

方法

  • 原始方法:count(distinct)
select count(distinct(uid))as uv from event group by day_num;

在这里插入图片描述
可以看到所有数据装到一个桶里面,桶已经快装不下了,明显最差

  • 优化一
select size(collect_set(uid)) as uv 
from (select day_num,uid from event group by day_num,uid) tmp 
group by day_num;

在这里插入图片描述
充分利用了桶,最大的实现了并行化,执行虽然分为了两部,但是大大减轻了第一步的负担,面向海量数据的场景去重方面拥有绝对的优势,假如第二步的结果集还是太大了呢?一样会oom扛不住

  • 优化二(推荐👍)

简单说就是转化计算,在一个jvm里面,硬去重的方法都逃不开把所有字符或字符的映射放一个对象里面,通过一定的逻辑获取去重集合,对于分布式海量数据的场景下,这种硬去重的计算仍然会花大量的时间在上图的最后单点去重的步骤,我们可以把去重的逻辑按照一定的规则分桶计算完成,每个桶之间分的数据都不重复,所有桶计算完桶内数据去重的集合大小,最后一步再相加。

创建临时表,其中length(uid) as len_uid是映射字段,uid的长度create table event_tmp as select *,length(uid) as len_uid from event;
select sum(uv_tmp) as uv 
from(select day_num,size(collect_set(uid)) as uv_tmp from event_tmp group by len_uid,day_num) tmp group by day_num

在这里插入图片描述
这里使用uid长度映射字段,实际开发中,你也可以选择首字母、末字母或者其它能想到的属性作为映射字段,分桶分步预聚合的方法,巧妙的把一个集合去重问题最终转化为相加问题,避开了单个jvm去重承受的压力,在海量数据的场景下,这个方法最为使用,推荐用在生产上


案例二

需求:

商品 product 每日总销售记录量级亿 级别起,去重 product 量大概 万 级别。每个商品有一个 state 标识其状态,该状态共3个值,分别为 “0”, “1”,“2”。
统计:
(1) 三个 state 下 product 的总量 pv
(2) 对应 state 下 product 去重后的量 uv
第二个统计每个 state 下有亿级别的 value ,去重时有严重的数据倾斜且数据去重规模很大,亿级别去重至万亿级别

方法:

  • GroupBy + RandomIndex + ToSet
    val re = sc.textFile(input).map(line => {val info = line.split("\t")val state = info(0)val productId = info(1)// 全局计数countMap(state).add(1L)// 构建 state + randomIndex + product 的 PairRDD(state + "_" + random.nextInt(100) , productId)}).groupBy(_._1).map(info => {val state = info._1.split("_")(0)// 分治val productSet = info._2.map(kv => {val productId = kv._2productId}).toArray.toSet(state, productSet)}).groupBy(_._1).map(info => {val state = info._1val tmpSet = mutable.HashSet[String]()// 合并info._2.foreach(kv => {tmpSet ++= kv._2})state + ":" + tmpSet.size}).collect()

因为 state 只有 0,1,2 三种可能,所以最后全部压力分摊在 3 个节点上,构造 PairRDD 时可以给 state 加上随机索引,从而将任务分散,获得多个小的 Set 再合并成大 Set 。相当于分治,该方法会将原始数据分为 3 x 100 份,缩减了每个 key 要处理的 productId 的量,最后再去除随机索引再 groupBy 一次,汇总得到结果,执行时间 5 min,优化效果显著。

  • Distinct + GroupBy (推荐👍 )
    上一步方案通过 randomIndex 将数据量分治,减少的百分比和 random 的数值成正比,但是在数据量很大的情况下,分治的每个 key 对应的 value 量还是很大,所以简单的去重执行 5min +,这次将 groupBy 改为 distinct先去重得到 万 级别数据量,再 GroupBy,此时的数据量本机也可轻松完成
def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)val sc: SparkContext = SparkContext.getOrCreate(conf)val rdd1: RDD[String] = sc.parallelize(List("1,spark","0,flink","1,kafka","1,spark","0,hadoop",), 4)val myAccumulator = new MyAccumulatorsc.register(myAccumulator, "myAcc")val rdd2 = rdd1.map(str => {val info: Array[String] = str.split(",")val state: String = info(0)val productId: String = info(1)//累加器 求pvmyAccumulator.add(state)state + "_" + productId}).distinct().map(info => {val str: Array[String] = info.split("_")val state: String = str(0)val productId: String = str(1)(state, productId)}).groupBy(_._1) //(1,CompactBuffer((1,kafka), (1,spark))).map(f => {val state: String = f._1val num: Int = f._2.map(_._2).toSet.size(state, num)})rdd2.foreach(println(_))//输出累加器值(注意在action后)val sentMap: mutable.HashMap[String, Long] = myAccumulator.valueprintln(sentMap.toString())}
}//自定义累加器
class MyAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Long]] {private val hashMap = new mutable.HashMap[String, Long]()override def isZero: Boolean = hashMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.HashMap[String, Long]] = new MyAccumulatoroverride def reset(): Unit = hashMap.clear()override def add(v: String): Unit = {val l: Long = hashMap.getOrElse(v, 0L)hashMap.update(v, l + 1)}override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Long]]): Unit = {val hashMap1: mutable.HashMap[String, Long] = this.hashMapval hashMap2: mutable.HashMap[String, Long] = other.valuehashMap2.foreach {case (k, v) => {val l: Long = hashMap1.getOrElse(k, 0L)hashMap1.update(k, l + v)}}}override def value: mutable.HashMap[String, Long] = this.hashMap
}
输出:(1,2)(0,2)Map(1 -> 3, 0 -> 2)
  • distinct源码
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {def removeDuplicatesInPartition(partition: Iterator[T]) .......partitioner match {case Some(_) if numPartitions == partitions.length =>mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)}}

partitioner源码是这样声明的:val partitioner: Option[Partitioner] = None
case Some(_) //这句是匹配partitioner不为None
所以最终执行的代码是:
case _ => map(x => (x, null)).reduceByKey((x, ) => x, numPartitions).map(._1)

case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
主要是用到了 reduceByKey ,这个算子会在MapSide进行预聚合的操作。将聚合后的结果传递到reduce端。

参考

https://www.jianshu.com/p/1cdc943bb649

https://blog.csdn.net/BIT_666/article/details/121672715

reduceByKey详见

累加器详见

相关文章:

亿万级海量数据去重软方法

文章目录原理案例一需求:方法案例二需求:方法:参考原理 在大数据分布式计算框架生态下,提升计算效率的方法是尽可能的把计算分布式话、并行化,避免单节点计算过载,把计算分摊到各个节点。这样解释小白能够…...

记录--手摸手带你撸一个拖拽效果

这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助 前言 最近看见一个拖拽效果的视频(抖音:艾恩小灰灰),看好多人评论说跟着敲也没效果,还有就是作者也不回复大家提出的一些疑问,本着知其然必要知其所以然…...

python高德地图+58租房网站平台源码

wx供重浩:创享日记 对话框发送:python地图 免费获取完整源码源文件说明文档配置教程等 在PyCharm中运行《高德地图58租房》即可进入如图1所示的高德地图网页。 具体的操作步骤如下: (1)打开地图网页后,在编…...

ubuntu 将jupyter-lab保存为桌面快捷方式和favourites

ubuntu: 将jupyter-lab保存为桌面快捷方式和favourites desktop shortcut 建立一个新的desktop文件 cd ~/Desktop touch Jupyter-lab.desktop将文件修改成如下: [Desktop Entry] Version1.0 NameJupyterlab CommentBack up your data with one click Exec/home/cjb/…...

Java 类和对象简介

类是对象的抽象,是一组具有相同特性(属性,事物的状态信息)和行为(事物能做什么)的事物的集合,可以看做一类事物的模板。 对象是类的实例化,是具体的事物。 比如:人类和…...

时间复杂度的计算

个人主页:平行线也会相交 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 平行线也会相交 原创 收录于专栏【数据结构初阶(C实现)】 文章目录123456789时间复杂度(就是一个函数)的计算,…...

站内信箱系统的设计与实现

技术:Java、JSP等摘要:在经济全球化和信息技术成为发展迅速的今时今日,人们通过电子邮件收发进行信息传递已经成为主流。随着互联网和网络办公的发展,电子邮件正在被广泛应用在人们的日常生活中。跟据调查研究统计,在全…...

systemV共享内存

systemV共享内存 共享内存区是最快的IPC形式。共享内存的大小一般是4KB的整数倍,因为系统分配共享内存是以4KB为单位的(Page)!4KB也是划分内存块的基本单位。 之前学的管道,是通过文件系统来实现让不同的进程看到同一…...

Python基础之if逻辑判断

在学习if语句之前,我们先学习一种数据类型,布尔类型(bool),在if语句中,我们需要通过判断条件是否为真或者假,才进入下面的语句块执行。 一、布尔类型(bool) 布尔类型&a…...

实现pdf文件预览

前言 工作上接到的一个任务,实现pdf的在线预览,其实uniapp中已经有对应的api:uni.openDocument(OBJECT)(新开页面打开文档,支持格式:doc, xls, ppt, pdf, docx, xlsx, pptx。)**实现了相关功能…...

【java】alibaba Fastjson --全解史上最快的JSON解析库

文章目录前序Fastjson 简介Fastjson 的优点速度快使用广泛测试完备使用简单功能完备下载和使用将 Java 对象转换为 JSON 格式JSONField创建 JSON 对象JSON 字符串转换为 Java 对象使用 ContextValueFilter 配置 JSON 转换使用 NameFilter 和 SerializeConfigFastjson 处理日期F…...

绝对零基础的C语言科班作业(期末模拟考试)(十道编程题)

编程题(共10题; 共100.0分)(给猛男妙妙屋更一篇模拟考试)模拟1(输出m到n的素数)从键盘输入两个整数[m,n], 输出m和n之间的所有素数。 输入样例:3,20输出样例:…...

按位与为零的三元组[掩码+异或的作用]

掩码异或的作用前言一、按位与为零的三元组二、统计分组1、map统计分组2、异或掩码总结参考资料前言 当a b 0时,我们能够很清楚的知道b是个什么值,b 0 - a -a,如果当a & b 0时,我们能够很清楚的知道b是什么值吗&#xf…...

C++基础篇(一)-- 简单入门

C 语言是在优化 C 语言的基础上为支持面向对象的程序设计而研制的一个通用目的的程序设计语言。在后来的持续研究中,C 增加了许多新概念,例如虚函数、重载、继承、标准模板库、异常处理、命名空间等。 C 语言的特点主要表现在两个方面:全面兼…...

前端整理 —— javascript 2

1. generator(生成器) 详细介绍 generator 介绍 generator 是 ES6 提供的一种异步编程解决方案,在语法上,可以把它理解为一个状态机,内部封装了多种状态。执行generator,会生成返回一个遍历器对象。返回的…...

Spring-注解注入

一、回顾XML注解 bean 配置 创建 bean public class Student { } 配置 xml bean <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:xsi"http://www.w3.org/2001/XMLSche…...

华为校招机试 - 攻城战(Java JS Python)

目录 题目描述 输入描述 输出描述 用例 题目解析 JavaScript算法源码 Java算法源码...

Docker入门

Docker一、何为DockerDocker是一个开源的应用容器引擎&#xff0c;基于GO语言并遵循从Apache2.0协议开源。Docker可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后在发布到任何流行的Linux机器上&#xff0c;也可以实现虚拟化。容器是完全使…...

时间序列分析 | CNN-LSTM卷积长短期记忆神经网络时间序列预测(Matlab完整程序)

时间序列分析 | CNN-LSTM卷积长短期记忆神经网络时间序列预测(Matlab完整程序) 目录 时间序列分析 | CNN-LSTM卷积长短期记忆神经网络时间序列预测(Matlab完整程序)预测结果模型输出基本介绍完整程序参考资料预测结果 模型输出 layers = 具有以下层的 151 Layer 数组:...

【蒸滴C】C语言结构体入门?看这一篇就够了

目录 一、结构体的定义 二、结构的声明 例子 三、 结构成员的类型 结构体变量的定义和初始化 1.声明类型的同时定义变量p1 2.直接定义结构体变量p2 3.初始化&#xff1a;定义变量的同时赋初值。 4.结构体变量的定义放在结构体的声明之后 5.结构体嵌套初始化 6.结构体…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

Opencv中的addweighted函数

一.addweighted函数作用 addweighted&#xff08;&#xff09;是OpenCV库中用于图像处理的函数&#xff0c;主要功能是将两个输入图像&#xff08;尺寸和类型相同&#xff09;按照指定的权重进行加权叠加&#xff08;图像融合&#xff09;&#xff0c;并添加一个标量值&#x…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

面试高频问题

文章目录 &#x1f680; 消息队列核心技术揭秘&#xff1a;从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"&#xff1f;性能背后的秘密1.1 顺序写入与零拷贝&#xff1a;性能的双引擎1.2 分区并行&#xff1a;数据的"八车道高速公路"1.3 页缓存与批量处理…...