当前位置: 首页 > news >正文

Spark 3.1.1 shuffle fetch 导致shuffle错位的问题

背景

最近从数据仓库小组那边反馈了一个问题,一个SQL任务出来的结果不正确,重新运行一次之后就没问题了,具体的SQL如下:

select col1,count(1) as cnt
from table1
where dt = '20230202' 
group by col1
having count(1) > 1

这个问题是偶发的,在其运行的日志中会发现如下三类日志:

FetchFailed 
TaskKilled (another attempt succeeded)
ERROR (org.apache.spark.network.shuffle.RetryingBlockFetcher:231) - Failed to fetch block shuffle_4865_2481
283_286, and will not retry (3 retries)

最终在各种同事的努力下,找到了一个Jira:SPARK-34534

分析

直接切入主题,找到对应的类OneForOneBlockFetcher,该类会被NettyBlockTransferService(没开启ESS)和ExternalBlockStoreClient(开启ESS)调用,其中start方法:

public void start() {client.sendRpc(message.toByteBuffer(), new RpcResponseCallback() {@Overridepublic void onSuccess(ByteBuffer response) {try {streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);// Immediately request all chunks -- we expect that the total size of the request is// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].for (int i = 0; i < streamHandle.numChunks; i++) {if (downloadFileManager != null) {client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),new DownloadCallback(i));} else {client.fetchChunk(streamHandle.streamId, i, chunkCallback);}}} catch (Exception e) {logger.error("Failed while starting block fetches after success", e);failRemainingBlocks(blockIds, e);}}@Overridepublic void onFailure(Throwable e) {logger.error("Failed while starting block fetches", e);failRemainingBlocks(blockIds, e);}});}

其中的message的初始化在构造方法中:

 if (!transportConf.useOldFetchProtocol() && isShuffleBlocks(blockIds)) {this.message = createFetchShuffleBlocksMsg(appId, execId, blockIds);} else {this.message = new OpenBlocks(appId, execId, blockIds);}

其中transportConf.useOldFetchProtocol 也就是 spark.shuffle.useOldFetchProtocol配置(默认是false),如果是shuffle block的话,就会运行到:createFetchShuffleBlocksMsg方法,对于为什么存在这么一个判断,具体参考SPARK-27665
关键的就是 createFetchShuffleBlocksMsg 方法:
这个方法的作用就是: 构建一个FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr, batchFetchEnabled) 对象,其中里面的值
如图:
在这里插入图片描述
其中这里有一点需要注意:

 long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));

这里面对MapIdReduceId 进行了重组(在获得streamHandle的时候内部会根据reduceIdArr构建blocks索引,下文中会说到)会导致和成员变量blockIds的顺序不一致,为什么两者不一致会导致问题呢?
原因在于任务的fetch失败会导致重新进行fetch,如下:

  client.fetchChunk(streamHandle.streamId, i, chunkCallback);

chunkCallback的代码如下:

private class ChunkCallback implements ChunkReceivedCallback {@Overridepublic void onSuccess(int chunkIndex, ManagedBuffer buffer) {// On receipt of a chunk, pass it upwards as a block.listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);}@Overridepublic void onFailure(int chunkIndex, Throwable e) {// On receipt of a failure, fail every block from chunkIndex onwards.String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);failRemainingBlocks(remainingBlockIds, e);}}

String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length),此处的chunckIndex就是shuffle blocks的索引下标,也就是下文中numBlockIds组成的数组下标,
但是这个和createFetchShuffleBlocksMsg输出的顺序是不一致的,所以如果发生问题重新fetch的时候,数据有错位,具体可以看:
ShuffleBlockFetcherIterator中的

    if (req.size > maxReqSizeShuffleToMem) {shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,blockFetchingListener, this)} else {shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,blockFetchingListener, null)}

其中blockFetchingListener回调方法onBlockFetchSuccess会把fetch的block数据和shuffleBlockId一一对应上

ESS端构建blocks的信息

在start方法中,client.sendRpc向对应的ESS发送对应的请求shuffle数据信息,ESS会重新构建blocks的信息,组成StreamHandle(streamId, numBlockIds)返回给请求端:
具体为ExternalBlockHandler的handleMessage方法:

if (msgObj instanceof FetchShuffleBlocks) {FetchShuffleBlocks msg = (FetchShuffleBlocks) msgObj;checkAuth(client, msg.appId);numBlockIds = 0;if (msg.batchFetchEnabled) {numBlockIds = msg.mapIds.length;} else {for (int[] ids: msg.reduceIds) {numBlockIds += ids.length;}}streamId = streamManager.registerStream(client.getClientId(),new ShuffleManagedBufferIterator(msg), client.getChannel());
。。。
callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());

这里的numBlockIds就是OneForOneBlockFetcher中的streamHandle.numChunks
如图:在这里插入图片描述

没有开启ESS端的构建blocks的信息

这里和上面的一样,只不过对应的方法为NettyBlockRpcServerreceive:

      case fetchShuffleBlocks: FetchShuffleBlocks =>val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>if (!fetchShuffleBlocks.batchFetchEnabled) {fetchShuffleBlocks.reduceIds(index).map { reduceId =>blockManager.getLocalBlockData(ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))}} else {val startAndEndId = fetchShuffleBlocks.reduceIds(index)if (startAndEndId.length != 2) {throw new IllegalStateException(s"Invalid shuffle fetch request when batch mode " +s"is enabled: $fetchShuffleBlocks")}Array(blockManager.getLocalBlockData(ShuffleBlockBatchId(fetchShuffleBlocks.shuffleId, mapId, startAndEndId(0), startAndEndId(1))))}}val numBlockIds = if (fetchShuffleBlocks.batchFetchEnabled) {fetchShuffleBlocks.mapIds.length} else {fetchShuffleBlocks.reduceIds.map(_.length).sum}val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,client.getChannel)logTrace(s"Registered streamId $streamId with $numBlockIds buffers")responseContext.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer)

这里的numBlockIds就是OneForOneBlockFetcher中的streamHandle.numChunks
如图:
在这里插入图片描述
所以在以上两种情况下,只要有重新fetch数据的操作,就会存在数据的错位,导致数据的不准确

解决

直接git cherry-pick对应的commit就行:

git cherry-pick 4e438196114eff2e1fc4dd726fdc1bda1af267da

相关文章:

Spark 3.1.1 shuffle fetch 导致shuffle错位的问题

背景 最近从数据仓库小组那边反馈了一个问题,一个SQL任务出来的结果不正确&#xff0c;重新运行一次之后就没问题了&#xff0c;具体的SQL如下&#xff1a; select col1,count(1) as cnt from table1 where dt 20230202 group by col1 having count(1) > 1这个问题是偶发…...

2月第2周榜单丨飞瓜数据B站UP主排行榜(哔哩哔哩平台)发布!

飞瓜轻数发布2023年2月6日-2月12日飞瓜数据UP主排行榜&#xff08;B站平台&#xff09;&#xff0c;通过充电数、涨粉数、成长指数三个维度来体现UP主账号成长的情况&#xff0c;为用户提供B站号综合价值的数据参考&#xff0c;根据UP主成长情况用户能够快速找到运营能力强的B站…...

Jdk19 动态编译 Java源码为 Class 文件

动态编译 Java 源码为 Class一.背景1.Jdk 版本2.需求二.Java 源码动态编译实现1.Maven 依赖2.源码包装类3.Java 文件对象封装类4.文件管理器封装类5.类加载器6.类编译器三.动态编译测试1.普通测试类2.接口实现类3.测试四.用动态编译 Class 替换 SpringBoot 的 Bean&#xff08;…...

安装 GPU 版本的 tensorflow 完整版本

前言&#xff1a; 之前安装的 CPU 版本的 tensorflow 一直出问题&#xff0c;索性就直接安装 GPU 版本的 tensorflow 了&#xff08;有了GPU 就不能浪费&#xff09;。 安装过程&#xff1a; 1&#xff09;看自己有无 GPU&#xff0c;找到对应 GPU 的版本&#xff1a;任务管理…...

BOM编程-设置地址栏上的URL

<!DOCTYPE html> <html> <head> <meta charset"utf-8"> <title>设置地址栏上的URL</title> </head> <body> <script> function go(){ // 获…...

设计模式之原型模式与建造者模式详解和应用

目录1 原型模式1.1 原型模式定义1.2 原型模式的应用场景1.3 原型模式的通用写法&#xff08;浅拷贝&#xff09;1.4 使用序列化实现深度克隆1.5 克隆破坏单例模式1.6 原型模式在源码中的应用1.7 原型模式的优缺点1.8 总结2 建造者模式2.1 建造者模式定义2.2 建造者模式的应用场…...

C语言(函数和递归)

函数是完成特定任务的独立程序代码单元。 目录 一.函数 1.创建一个简单的函数 2.定义带形式参数的函数 3.使用return从函数中返回值 二.递归 一.函数 1.创建一个简单的函数 #include <stdio.h> void print(void); //函数原型 int main(){ print(); //函…...

快乐的shell命令行

快乐的shell命令行 PART1——基础 1.权限 #超级用户权限$普通用户 2.复制粘贴 复制&#xff1a;鼠标左键沿着文本拖动高亮的文本被复制到X管理的缓冲区&#xff08;或者双击一个单词&#xff09;粘贴&#xff1a;鼠标中键 3.简单命令 时间和日期date当前月份的日历cal磁…...

大数据面试题flume篇

1.Flume 的Source&#xff0c;Sink&#xff0c;Channel 的作用&#xff1f;你们Source 是什么类型&#xff1f; 1. 作用 &#xff08;1&#xff09;Source组件是专门用来收集数据的&#xff0c;可以处理各种类型、各种格式的日志数据&#xff0c;包括 avro、thrift、exec、jm…...

零信任-深信服零信任aTrust介绍(5)

​深信服零信任aTrust介绍 深信服是国内领先的互联网信任服务提供商&#xff0c;也是国内首家通过认证的全球信任服务商。深信服零信任是其中一项核心的信任技术&#xff0c;主要针对身份认证、数字签名、数字证书等方面的信任问题。 深信服零信任提供了一种新的安全保护模式…...

UVa 1343 The Rotation Game 旋转游戏 IDA* BFS 路径还原

题目链接&#xff1a;The Rotation Game 题目描述&#xff1a; 给定二十四个整数&#xff0c;这二十四个整数由八个一&#xff0c;八个二&#xff0c;八个三组成&#xff0c;从左到右&#xff0c;从上到下依次描述下图方格中的数字&#xff1a; 例如上图左边对应的输入就是[1,…...

硬件学习 软件Cadence day02 画原理图的基本操作 (键盘快捷键 , 原理图设计流程 , 从开始到导出网表流程)

1. ORCAD Capture cls 界面的快捷键 键盘 按键对应的操作I放大 &#xff08;可以滚轮操作&#xff09;O缩小 &#xff08;可以滚轮操作&#xff09;W画线Esc退出现在的状态 &#xff08;画图界面 右键 End xxx&#xff09;N放置网络标号J放置节点 (控制…...

【python】基于Socket的聊天室Python开发

基于Socket的聊天室Python开发一、Socket简述二、创建服务端Server2.1 创建服务端初始化2.2 监听客户端连接2.3 处理客户端消息三、创建客户端Client3.1 创建服务端初始化3.2 发送消息3.3 接收消息3.3 线程工作3.4 线程工作是不是挺好玩的呢&#xff1f;也可以作为课程设计哦&a…...

2023想转行软件测试的看过来,你想要了解的薪资、前景、岗位方向、学习路线都讲明白了

在过去的一年中&#xff0c;软件测试行业发展迅速&#xff0c;随着数字化技术应用的广泛普及&#xff0c;业界对于软件测试的要求也在持续迭代与增加。 同样的&#xff0c;有市场就有需求&#xff0c;软件测试逐渐成为企业中不可或缺的岗位&#xff0c;作为一个高薪又需求广的…...

TortoiseSVN的使用

基本概念 版本库 SVN保持数据的地方&#xff0c;所有的文件都保存在这个库中&#xff0c;Tortoise访问的就是远程服务器上的Subversion版本库。 工作拷贝 就是工作副本&#xff0c;可将版本库的文件拷贝到本地中&#xff0c;可以任意修改&#xff0c; 不会影响版本库。在你…...

操作系统(day09) -- 连续分配管理方式

连续分配管理方式 单元连续分配 动态分区分配 1.系统要用什么样的数据结构记录内存的使用情况&#xff1f; 两种常用的数据结构 空闲分区表 每个空闲分区对应一个表项。表项中包含分区号、分区大小、分区起始地址等信息空闲分区链 每个分区的起始部分和末尾部分分别设置前向…...

APISpace 带你一起走进西湖美景

俗话说&#xff1a;“上有天堂&#xff0c;下有苏杭”。 “欲把西湖比西子&#xff0c;浓妆艳抹总相宜” 今天我就带大家走进杭州的西湖美景。自古以来&#xff0c;文人歌者面对西湖美景留下千古绝句&#xff0c;还以西湖为背景书写了一段段动人的爱情传说。 天生自带浪漫色…...

傻白探索Chiplet,Design Space Exploration for Chiplet-Assembly-Based Processors(十三)

阅读了Design Space Exploration for Chiplet-Assembly-Based Processors这篇论文&#xff0c;是关于chiplet设计空间探索的&#xff0c;个人感觉核心贡献有两个&#xff1a;1.提出使用整数线性规划算法进行Chiplet的选择&#xff1b;2.基于RE和NRE提出了一个cost模型&#xff…...

系统分析师真题2020试卷相关概念一

对象系统测试的基本概念: 面向对象系统的单元测试包括方法层次的测试、类层次的测试和类树层次的测试。方法层次的测试类似于传统软件测试中对单个函数的测试; 测试技术: 方法层次的测试,单个函数的测试;常用的技术:等价类划分测试、组合功能测试、递归函数的测试和多态…...

20230215_数据库过程_渠道业务计算过程

—20221209 渠道产能 —自有人员工号表 shzc.xc_qdcn_pgtx_opertype —select * from shzc.xc_qdcn_pgtx_opertype for update ; —渠道基础目录 shzc.xc_qdcn_pgtx_qdtype —select * from shzc.xc_qdcn_pgtx_qdtype for update ; SQL_STRING:‘update shzc.xc_qdcn_pgtx_q…...

【C++】Expression的学习笔记

关于不同类别表达式的举例&#xff0c;请参考博文《C 中的值类别》 1. 左值和右值的简单理解 左值对应了具有内存地址的对象&#xff0c;而右值仅仅是临时使用的值对象。&#xff08;引用自博文《C 中的值类别》&#xff09;左值有名称&#xff08;变量或常量名称&#xff09…...

[数据库迁移]-MySQL常见问题

[数据库迁移]-MySQL常见问题 森格 | 2023年2月 介绍&#xff1a;记录在MySQL数据库迁移过程中遇到的问题&#xff0c;以及解决方案。 文章目录[数据库迁移]-MySQL常见问题一、背景二、常见问题2.1 ERROR 20032.2 ERROR 12732.3 ERROR 10712.4 视图权限2.5 ERROR 1062三、总结一…...

C语言编译过程

C语言编译过程1、C语言编译过程2、单c文件编译实践3、多c文件编译实践4、define4.1、不带参宏4.2、带参宏4.3、带参宏和带参函数的区别5、选择性编译ifdef、ifndef、if5.1、#ifdef5.2、#ifndef5.3、#if6、静态库和动态链接库6.1、静态库实践6.1.1、将mylib.c制作成静态库6.1.2、…...

前端学习 ---常用标签

常用标签 1,文本标签 文本标签是双标签&#xff0c;自带加粗效果&#xff0c;有自己对应的文本大小&#xff0c;并且独占一行&#xff0c;有默认间距 一级标签&#xff1a;< h1 > < /h1 > 二级标签&#xff1a;< h2 > < /h2> 三级标签&#xff1a;&l…...

2023年PMP考试难不难?

整个考试的考察方向转向还是比较大的&#xff0c;基本上以“价值传递”和“以人为本”这两个出发点来考察项目经理所需要的能力。 1}新版提纲题目数量的变化 总题量从200道减少到180道&#xff0c;所以答题时间上相对变的宽裕一些。考试时间230分钟&#xff0c;中间有十分钟休…...

Netty 入门

文章目录一、概述1.1 Netty 是什么&#xff1f;1.2 Netty 的地位1.3 Netty 的优势二、Hello World2.1 目标2.2 服务器端2.3 客户端2.4 流程梳理三、组件3.1 EventLoop3.2 演示 NioEventLoop 处理 io 事件3.3 演示 NioEventLoop 处理普通任务3.4 演示 NioEventLoop 处理定时任务…...

收藏|一文掌握数据分析在企业的实际流程

一、数据分析概念 1.1 数据分析 是指用适当的统计分析方法对收集来的大量数据进行分析&#xff0c;将他们加以汇总和理解并消化&#xff0c;以求最大化地开发数据的功能&#xff0c;发挥数据的作用。 1.2 数据分析包括 描述性数据分析&#xff08;初级数据分析&#xff09;…...

100ask_imx6ull 输出PWM

查看PWM对应扩展板的引脚 100ask_imx6ul通过扩展板插槽来验证pwm波&#xff0c;所以这里通过扩展板的原理图及芯片手册可知&#xff0c;gpio4_io20&#xff0c;gpio4_io19分别对应着PWM8和PWM7。 设置设备树 打开官方NXP的工具i.MX pins v6工具&#xff0c;PWM7/PWM8的配置如…...

yolov5编译安卓APP:解决图像上全是检测框

yolov5编译安卓APP&#xff1a;解决图像上全是检测框前言一、第一个YOLOv5 APP1.参考链接2.详细说明3.APP检测时图像上全是框的解决方法二、第二个YOLOv5 APP1.参考链接2.详细说明3.APP检测时图像上全是框的解决方法三、其他1.APK打包2.修改APP图标与名字前言 YOLOv5编译安卓A…...

为什么我们需要地图?

想一想&#xff0c;武侠小说里面。一张藏宝图&#xff0c;引来江湖腥风血雨&#xff0c;要么是武功秘籍&#xff0c;要么是绝世宝剑&#xff0c;要么是富可敌国的财富&#xff0c;只要有了藏宝图&#xff0c;便可曲径通幽&#xff0c;到达彼岸。 由此可见&#xff0c;地图的重…...

wordpress复制数据库结构/互联网营销培训

Ubuntu环境下的“批处理”长时间生活在CLI中&#xff0c;“批处理”绝对能简化不少工作。在Windows环境中&#xff0c;建立一个后缀名为.bat的文件&#xff0c;输入需要的指令&#xff0c;保存之后执行即可&#xff0c;十分方便。其实在Ubuntu中也有类似的功能&#xff0c;而且…...

当前疫情最新情况/北京seo服务行者

1. 什么是锁消除&#xff1f;什么是锁膨胀 锁消除&#xff1a; 对数据进行逃逸分析。对象实例都是存在于线程共享的堆中的&#xff0c;即便是局部变量的对象&#xff0c;也是存在于堆中&#xff0c;但是局部变量对象的引用是存在于方法栈中的&#xff0c;方法栈是线程私有&am…...

资源站建站技术/搜狗收录批量查询

看错误是类没有找到&#xff0c;但是代码中确实有这个类&#xff0c;编译没错&#xff0c;执行的时候报这个异常。 我同事的机器没事&#xff0c;我的有问题。 想了一下差别&#xff0c;就是.classPath文件不一致。 后来进 project - properties-java build path - Order and E…...

全国做膏药的网站有多少家呢/如何开发一款app软件

在今天的Build大会上&#xff0c;微软宣布发布一款同时支持Windows、Mac OS X和Linux平台的原生Visual Studio应用——Visual Studio Code&#xff0c;旨在为所有开发者提供一款专注于代码本身的免费的编辑器。它虽然是Visual Studio家族的一员&#xff0c;但它与传统VS IDE的功…...

哪些b2b网站做游戏机比较好/seo高级优化方法

1、首先他们底层数据结构不一样&#xff0c;ArrayList底层结构是数组&#xff0c;LinkedList底层结构是链表&#xff1b; 2、数据结构决定了&#xff0c;ArrayList在查询上的效率较高&#xff0c;而LinkedList在删除和添加上的效率更高&#xff1b;&#xff08;需要注意的一点是…...

服务器不是自己的做违法网站/企业网站模板免费

在php中不支持多重继承&#xff0c;如果我们向使用多个类的方法而实现代码重用有什么办法么&#xff1f;那就是组合。在一个类中去将另外一个类设置成属性。下面的例子&#xff0c;模拟了多重继承。view sourceprint?0102 class user {03 private $name "tom";04 p…...