当前位置: 首页 > news >正文

【基础篇】七、Flink核心概念

文章目录

  • 1、并行度
  • 2、并行度的设置
  • 3、算子链
  • 4、禁用算子链
  • 5、任务槽
  • 6、任务槽和并行度的关系

1、并行度

要处理的数据量很多时,可以把一个算子的操作(比如前面demo里的flatMap、sum),"复制"多份到多个节点,数据来了以后可以到任意一个节点执行。即将一个算子任务拆分成多个并行的子任务,再分发到不同的节点上执行,实现真正的并行计算。(好绕口,就是把一个活儿让好几个Task节点共同去做)

在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行

在这里插入图片描述

某一个算子的子任务的个数被称之为其并行度(parallelism)。 一条流水线上,几个人在同时干着打螺丝,几个人在同时处理着焊电路板。同一个程序,不同的算子,可以有不同的并行度。一个流程序的并行度,可以认为就是其所有算子中最大的并行度。如上图,source、map、window、sink四个算子,sink为1,其余为2,则这个流处理程序的并行度为2。

2、并行度的设置

方式一:代码中设置

算子后跟着调用setParallelism()方法为某一个算子设置并行度

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);   //map算子并行度为2

执行环境对象后面调setParallelism()方法设置全局并行度,对所有算子生效

env.setParallelism(2);

一般不设全局,会导致无法动态扩容。

方式二:提交应用时指令中设置

-p参数来指定当前应用程序执行的并行度,类似上面的全局设置

bin/flink run –p 2 –c com.plat.SocketStreamWordCount  ./FlinkDemo-1.0-SNAPSHOT.jar

这种和Web控制台设置一个意思:

在这里插入图片描述

方式三:配置文件中设置

在集群的配置文件flink-conf.yaml中直接更改默认并行度:

parallelism.default: 2

这个设置对于整个集群上提交的所有作业有效,初始值为1。当代码中没设置、提交时没指定,就用这个配置文件的。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数

在这里插入图片描述

最后,本地调试想看控制台界面,可创建本地环境执行对象,用于本地调试:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createlocalEnvironmentWithwebuI(new Configuration());

访问localhosr:8081,socket是特殊的,只能是1,改不了,其余算子均为4。

在这里插入图片描述
最后,这几种方式的优先级为:代码中为某算子单独设定 > 代码中执行环境对象全局设置 > 提交时指定 > 配置文件

3、算子链

一个数据流,数据在各种算子之间传输的形式可能是一对一(one-to-one)的直通(forwarding),也可能是打乱的重分区(redistributing)。

在这里插入图片描述

一对一(One-to-one,forwarding)

如上图,source算子读完数据后,可以直接发给map算子接着处理。map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,即一对一,一个算子的task和一个算子的task数据一样。特点是:

  • 数据不需要重新分区
  • 数据不需要调整顺序

重分区(Redistributing)

和一对一的直流相反,此时数据的分区会发生改变,如图中,map完数据后,直接keyBy/window(注意keyBy自身不是算子),按key分组了。也就是每一个算子的子任务task,会根据某些规则,把数据发送到不同的下游task,从而引起了数据重分区。

合并算子链

在Flink中,并行度相同一对一(one to one)算子操作,可以直接连接在一起形成一个大的任务(task),每个task又会被一个线程执行,即算子链。合并的条件:

  • 两算子并行度相等(子任务数量一样)
  • 两算子为one to one的直流关系

在这里插入图片描述

上图中Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行合并算子链的机制,可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

4、禁用算子链

Flink默认会按照算子链的原则进行链接合并,但有的场景不适合合并,比如:

  • 两个算子串在一起,它们的子任务task搭配形成n组(n为并行度),每组共用一个线程,但如果两个算子本身计算任务都很重,那就不适合串一起,就像两个脾气都差的人合租,此时应该断开算子链
  • 当出现错误,需要定位问题是哪个算子时,就要禁用算子链

全局禁用算子链:

//env为执行环境对象
env.disableOperatorChaining();

disableChaining方法可只给某个算子设置禁用算子链,那它和它前后的算子就都不能再组成算子链(控制台上UI会显示Forward,表明本来是一对一的算子链关系)

.map(word -> Tuple2.of(word, 1L)).disableChaining();

在这里插入图片描述

startNewChain方法,从当前算子开始新链,即只和前面的算子断开,和后面的算子能串一起的话还是会串

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

5、任务槽

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行子任务。但每个TaskManager总的计算资源有限,并行任务越多,每个线程能分到的可用资源就越少,为了限制TaskManager能并行处理的最大任务数,提出任务槽(task slots)的概念,对TaskManager上对每个任务运行所占用的资源做出明确的划分。一锅饭,能盛6碗,谁来都夹一筷子,谁都吃不饱,因此,锅前放6个碗,也就是分为6碗饭,来一个人,就端走一碗,端没了别人就去其他锅,分到饭的六个人,也不用和别人抢,且能吃饱。这个碗就是任务槽。

在这里插入图片描述

比如一个TaskManager上有三个slot,那就把这个TaskManager的内存资源分为三份,一个插槽一份。如此,在插槽上执行一个子任务时,就相当于划定了一块内存给这个子任务专款专用,不需要和其他子任务去竞争内存资源。前面提到的合并成算子链后的5个子任务,两个TaskManager就可实现,如上图。

任务槽数量的设置

在flink安装目录的conf/flink-conf.yaml配置文件中,可以设置每个TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。(这就像合租,内存就像卧室,厕所就像CPU,三个人,三间房,但一个厕所也够用,类比CPU时间片和线程切换)

子任务task对任务槽的共享

上面讲到,一人一碗饭,一个子任务一个插槽。而插槽的共享,就是放宽了政策,不同类型的算子,它们的并行子任务允许放到同一个插槽上并行执行(注意,依旧并行)。如下图,两个TaskManager,6个插槽,每个插槽上的子任务对应的算子种类都不一样。

在这里插入图片描述

如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。slot共享的好处在于:

  • 活儿大致平均分配到了所有的TaskManager
  • slot有好几种算子的子任务,组合起来就是一个完整的作业管道或者流。此时,即使某个TaskManager宕机,其他节点也不受影响,作业继续执行

如果不希望默认的slot共享,比如需要让某个算子的task独享一个slot,就可以设置slot共享组

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("taskTest");

只有属于同一个slot共享组的子任务,才会开启slot共享,这个组默认是default,不同slot共享组之间的任务是完全隔离的,必须分配到不同的slot上。

6、任务槽和并行度的关系

  • 任务槽slot是一个静态概念,表示最大的并发上限。假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示集群最多能并行执行同一算子的9个子任务。
  • 并行度是一个动态概念,表示实际运行占用了几个。比如并行度为4,即这个算子有4个子任务task,需要放在4个插槽上。此时,并行度为4,slot为9。

Job运行时,必须插槽slot的数量必须大于等于并行度,否则任务运行失败:NoResourceAvailableException:could not acquire the minimun required resources 。注意Yarn等模式部署时,会动态申请TaskManager,申请的TM的数量 = job并行度 /每个TM的slot的数量,向上取整。

比如,某算子并行度为10,即它有10个task要放在不同的插槽上,此时插槽有9个,那就不能运行,而不是9个跑完再让第十个执行。再比如,一个Flink程序中定义了4个算子:

source→ flatmap→ reduce→ sink

前提: flink-conf.yaml中taskmanager.numberOfTaskSlots数量为3(建议为CPU核心数),假设TaskManager数量也为3,即插槽有3*3=9个

Case1:并行度parallelism.default=1

在这里插入图片描述

分析:4种算子,并行度为1 ⇒ 其中两个形成算子链算一个 ⇒ 三个子任务 ⇒ 同一作业的不同种类的算子的任务,共享任务槽 ⇒ 总共占用一个插槽,剩8个可用

Case2:全局并行度为2

在这里插入图片描述

分析:三种算子,并行度为2 ⇒ 其中两个形成算子链算一个 ⇒六个子任务 ⇒ 插槽共享 ⇒ 总共占用2个插槽,剩7个可用 ⇒ 计算机资源利用不充分,设置合适的并行度才能提高效率

Case3:全局并行度为9

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 插槽共享 ⇒ 占九个

在这里插入图片描述

Case4:全局set为9,但sink算子set为1

在这里插入图片描述

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 29 + 11 = 19个子任务 ⇒ 插槽共享

最后,可以看到,整个流处理程序的并行度,就是所有算子并行度的最大值,因为这代表了程序运行所需要的插槽slot的数量。

相关文章:

【基础篇】七、Flink核心概念

文章目录 1、并行度2、并行度的设置3、算子链4、禁用算子链5、任务槽6、任务槽和并行度的关系 1、并行度 要处理的数据量很多时,可以把一个算子的操作(比如前面demo里的flatMap、sum),"复制"多份到多个节点&#xff0c…...

06-Scala面向对象

面向对象编程 ​ Scala是一门完全面向对象的语言,摒弃了Java中很多不是面向对象的语法。 ​ 虽然如此,但其面向对象思想和 Java的面向对象思想还是一致的 Scala包 1)基本语法 Scala中基本的package包语法和 Java 完全一致 例如&#xf…...

【设计模式】单例模式、“多例模式”的实现以及对单例的一些思考

文章目录 1.概述2.单例模式实现代码2.1.饿汉式单例2.2.懒汉式单例2.3.双检锁单例2.4.静态内部类单例2.5.枚举单例 3.对单例的一些思考3.1.是否需要严格的禁止单例被破坏?3.2.懒汉式真的比饿汉式更佳吗?3.3.单例存在的问题 4.其他作用范围的单例模式4.1.线…...

idea 2022 一个工作空间下导入git项目 后 无法导入第二个git项目

idea 2022 一个工作空间下导入git项目 后 无法导入第二个git项目 如图所示 我导入了一个git项目后,菜单栏出现了一个git按钮 找不到 导入git项目的按钮了 方式1、 通过idea设置 打开全局设置 如下图 把git先改为none,保存 保存后就可以看到 VCS按钮 导入…...

泛在电力物联网的关键技术与未来发展策略-安科瑞黄安南

摘要: 文章分析了泛在电力物联网的内涵及其主要特征,针对泛在电力物联网的建设目标、基本构架以及关键技术与未来发展策略进行综合探讨,期待得到专业人士的指点。 关键词: 泛在电力物联网, 网络规划, 网络发展 随着能源革命的不…...

iWall:支持自定义的Mac动态壁纸软件

iWall Mac是一款动态壁纸软件,它可以使用任何格式的漂亮视频(无须转换)、图片、动画、Flash、gif、swf、程序、网页、网站做为您的动态壁纸、动态桌面,并且可以进行交互。 这款软件功能多、使用简单、体积小巧、不占用资源、运行…...

【Docker 内核详解】namespace 资源隔离(四):Mount namespace Network namespace

【Docker 内核详解 - namespace 资源隔离】系列包含: namespace 资源隔离(一):进行 namespace API 操作的 4 种方式namespace 资源隔离(二):UTS namespace & IPC namespacenamespace 资源隔…...

STM32简介

STM32是ST公司基于ARM Cortex-M内核开发的32位微控制器,常应用在嵌入式领域如: 智能车(用stm32做寻迹小车,读取光电传感器或者摄像头数据,然后驱动电机前进和转弯); 无人机(用stm3…...

Yum安装JDK11

一、安装命令 : yum install java-11-openjdk二、执行以下命令来查看 JDK 11 的安装信息: yum list installed | grep java-11-openjdk三、找到 JDK 11 的软件包名称(使用以下命令来查询软件包的安装位置): rpm -ql…...

[HNCTF 2022 WEEK2]ez_ssrf题目解析

这题主要是引入ssrf这个漏洞攻击,本质上没有更深入的考察 本题是需要我们去伪造一个ssrf的请求头去绕过 题目开始给了我们信息让我们去访问index.php fsockopen函数触发ssrf fsockopen() 函数建立与指定主机和端口的 socket 连接。然后,它将传入的 bas…...

OpenFOAM: twoPhaseEulerFoam解读

twoPhaseEulerFoam全解读之一(转载) 本系列将对OpenFOAM-2.1.1 中的 twoPhaseEulerFoam 求解器进行完全解读,共分三部分:方程推导,代码解读,补充说明。本篇进行方程推导,详细介绍如果从双流体模型出发得到 twoPhaseEu…...

ffmpeg跨平台arm编译-ubuntu

目录 1. 安装必要的编译器2. 安装必要的依赖项3. 配置编译选项4. 编译安装 1. 安装必要的编译器 32位系统: sudo apt-get update sudo apt-get install gcc-arm-linux-gnueabihf sudo apt-get install g-arm-linux-gnueabihf64位系统: sudo apt-get u…...

Vue 网络处理 - axios 异步请求的使用,请求响应拦截器

目录 一、axiox 1.1、axios 简介 1.2、axios 基本使用 1.2.1、下载核心 js 文件. 1.2.2、发送 GET 异步请求 1.2.3、发送 POST 异步请求 1.2.4、发送 GET、POST 请求最佳实践 1.3、请求响应拦截器 1.3.1、拦截器解释 1.3.2、请求拦截器的使用 1.3.3、响应拦截器的使用…...

单目3D目标检测——MonoDLE 模型训练 | 模型推理

本文分享 MonoDLE 的模型训练、模型推理、可视化3D检测结果。 模型原理,参考我这篇博客:【论文解读】单目3D目标检测 MonoDLE(CVPR2021)_一颗小树x的博客-CSDN博客 源码地址:https://github.com/xinzhuma/monodle 目…...

CSS悬停卡片翻转明信片效果源码附注释

运行效果演示: HTML页面代码: <!DOCTYPE html> <html lang="en" > <head>...

使用kaliber与imu_utils进行IMU、相机+IMU联合标定

目录 1 标定工具编译 1.1 IMU标定工具 imu_utils 1.2 相机标定工具 kaliber 2 标定数据录制 3 开始标定 3.1 IMU标定 3.2 相机标定 3.3 相机IMU联合标定 4 将参数填入ORBSLAM的文件中 1 标定工具编译 1.1 IMU标定工具 imu_utils 标定IMU我们使用imu_utils软件进行标定…...

统一观测丨使用 Prometheus 监控 SQL Server 最佳实践

作者&#xff1a;啃唯 SQL Server 简介 SQL Server 是什么&#xff1f; Microsoft SQL Server 是 Microsoft 推出的关系型数据库解决方案&#xff0c;支持企业 IT 环境中的各种事务处理、商业智能和分析应用程序。Microsoft SQL Server 是市场领先的数据库技术之一。 SQL S…...

最短无序连续子数组

题目链接 最短无序连续子数组 题目描述 注意点 找出符合题意的 最短 子数组&#xff0c;并输出它的长度-100000 < nums[i] < 100000 解答思路 本题的数组可以分为三段&#xff0c;左段中段和右段&#xff0c;如下图所示 观察规律可知&#xff0c;左段元素始终比中段…...

更新 | 持续开源迅为RK3568驱动指南第十二篇-GPIO子系统

《iTOP-RK3568开发板驱动开发指南》更新&#xff0c;本次更新内容对应的是驱动&#xff08;第十二期_GPIO子系统-全新升级&#xff09;视频&#xff0c;后续资料会不断更新&#xff0c;不断完善&#xff0c;帮助用户快速入门&#xff0c;大大提升研发速度。 文档教程更新至第十…...

centos7安装erlang23.3.4.11及rabbitmq3.9.16版本

rpm包有系统版本要求&#xff0c;el是Red Hat Enterprise Linux(EL)的缩写。 EL7是Red Hat 7.x&#xff0c;Centos 7.x EL8是Red Hat 8.x, Centos 8.x 所以我们在安装erlang及rabbitmq时需要选择与自己的服务器相对应的rpm包 # rabbitmq的rpm安装包 https://github.com/rabbi…...

VMware和Debian下载

文章目录 ⭐️写在前面的话⭐️一、VMware二、Debain三、建立虚拟机&#x1f680; 先看后赞&#xff0c;养成习惯&#xff01;&#x1f680;&#x1f680; 先看后赞&#xff0c;养成习惯&#xff01;&#x1f680; ⭐️写在前面的话⭐️ CSDN主页&#xff1a;程序员好冰 目前在…...

mysql面试题48:MySQL中 Innodb的事务与日志的实现方式

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官: Innodb的事务与日志的实现方式 以下是InnoDB事务和日志的实现方式的详细说明: 事务日志(Transaction Log): InnoDB使用事务日志来保证事务的…...

数据结构 优先级队列(堆)

数据结构 优先级队列(堆) 文章目录 数据结构 优先级队列(堆)1. 优先级队列1.1 概念 2. 优先级队列的模拟实现2.1 堆的概念2.2 堆的存储方式2.3 堆的创建2.3.1 堆向下调整2.3.2 堆的创建2.3.3 建堆的时间复杂度 2.4 堆的插入与删除2.4.1 堆的插入2.4.2 堆的删除 2.5 用堆模拟实现…...

如何在edge浏览器中给PDF添加文字批注

我用的edge浏览器是目前最新版的&#xff08;一般自动更新到最新版&#xff09; 最近&#xff0c;我喜欢用edge浏览器查看PDF&#xff0c;节省电脑资源&#xff0c;快捷且方便。 但edge对PDF的标注种类较少&#xff0c;主要是划线和涂色&#xff0c;文字批注功能尚未出现在工具…...

集成学习的小九九

集成学习&#xff08;Ensemble Learning&#xff09;是一种机器学习的方法&#xff0c;通过结合多个基本模型的预测结果来进行决策或预测。集成学习的目标是通过组合多个模型的优势&#xff0c;并弥补单个模型的不足&#xff0c;从而提高整体性能。 集成学习的主要策略 在集成…...

深入理解Scrapy

Scrapy是什么 An open source and collaborative framework for extracting the data you need from websites. In a fast, simple, yet extensible way. Scrapy是适用于Python的一个快速、简单、功能强大的web爬虫框架&#xff0c;通常用于抓取web站点并从页面中提取结构化的数…...

想做WMS仓库管理系统,找了好久才找到云表

公司内部仓库管理原方式均基于人工电子表格管理方式来实现收发存管理&#xff0c;没有流程化管理&#xff0c;无法保证数据的准确性和及时性&#xff0c;同时现场操作和数据核对会出现不同步的情况&#xff0c;无法提高仓库的运作效率&#xff0c;因此&#xff0c;我们基于云表…...

公司销售个人号如何管理?

微信管理系统可以帮助企业解决哪些问题呢&#xff1f; 一、解决聊天记录监管问题 1.聊天记录的保存&#xff0c;让公司的管理者可以随时查看公司任意销售与客户的聊天记录&#xff0c;不用一个一个员工逐一去看&#xff0c;方便管理&#xff1b; 2.敏感词监控&#xff0c;管理者…...

COLE HERSEE 48408 工业4.0、制造业X和元宇宙

COLE HERSEE 48408 工业4.0、制造业X和元宇宙 需要数据来释放工业4.0的全部潜力——价值链中的所有公司都可以访问大量数据。一个新的互联数据生态系统旨在提供解决方案:制造业x。 在德国联邦经济事务和气候行动部以及BDI、VDMA和ZVEI贸易协会的密切合作下&#xff0c;实施制…...

【Vue基础-数字大屏】加载动漫效果

一、需求描述 当网页正在加载而处于空白页面状态时&#xff0c;可以在该页面上显示加载动画提示。 二、步骤代码 1、全局下载npm install -g json-server npm install -g json-server 2、在src目录下新建文件夹mock&#xff0c;新建文件data.json存放模拟数据 {"one&…...

如何优化移动端网站/网站关键词优化费用

终止正在运行的matlab文件&#xff0c;需要命令窗口按快捷键&#xff0c;有三种快捷键可以选择&#xff1a;  一&#xff1a;    ctrl c  二&#xff1a;    ctrlbreak  三&#xff1a;    ctrlaltbreak如果是在服务器上跑的代码的话,按完快捷键之后有时候需…...

wordpress移动新闻/百度广告投放平台

一说到反射&#xff0c;很多人都想到了性能&#xff0c;更有甚者直接说“慎用反射&#xff0c;遗患无穷”&#xff0c;“用反射&#xff0c;感觉怎么像是退步啊&#xff5e;”&#xff0c;看到这种言论&#xff0c;直接把反射妖魔化了&#xff0c;如果这种言论长此以往&#xf…...

湖南品牌网站建站可定制/本网站三天换一次域名

2019独角兽企业重金招聘Python工程师标准>>> hdfs优点&#xff1a; -高容错性&#xff1a;多副本&#xff1b;副本丢失后可以自动恢复-适合批处理&#xff1a;移动计算而非数据&#xff1b;数据位置暴露给计算框架-适合大数据库处理&#xff1a;TB,PB量级数据处理&a…...

python web网站开发/媒体发稿平台

C程序设计实验报告 实验项目&#xff1a; 1、利用复化梯形公式计算定积分2、计算Ackerman函数3、编写计算x的y次幂的递归函数getpower(int x,int y)&#xff0c;并在主程序中实现输入输出4、编写计算学生年龄的递归函数5、编写递归函数实现Ackman函数 姓名&#xff1a;张时锋 …...

专业网站制作公司/交换友情链接的注意事项

昨天&#xff0c;中国教育学会会长顾明远来杭州&#xff0c;参加杭州师范大学主办的第九届亚洲比较教育学会年会。会上&#xff0c;他对本报记者确认&#xff0c;全国执行高考新方案&#xff0c;不会在2016年&#xff0c;而是2017年。语文、数学在新高考里&#xff0c;份量加重…...

建设社团网站的可行性分析/必应站长平台

配置嘉里项目本地rabbitmq服务的流程&#xff1a; 1. 登录本地rabbit服务 输入 http://localhost:15672/ &#xff0c;输入用户名、密码&#xff0c;登录本地rabbit服务 2.创建 crm-user 和 kip-user 两个用户 创建crm-user 用户&#xff1a; 同上创建kip-user 用户&…...