48、Flink 的 Data Source API 详解
a)概述
本节将描述 FLIP-27 中引入的新 Source API 的主要接口。
b)Source
Source API 是一个工厂模式的接口,用于创建以下组件。
- Split Enumerator
- Source Reader
- Split Serializer
- Enumerator Checkpoint Serializer
此外,Source 还提供了 Boundedness【有界】的特性,使 Flink 可以选择合适的模式来运行 Flink 任务。
Source 实现应该是可序列化的,因为 Source 实例会在运行时被序列化并上传到 Flink 集群。
c)SplitEnumerator
SplitEnumerator 典型实现如下:
- SourceReader 的注册处理;
- SourceReader 的失败处理;
- SourceReader 失败时会调用 addSplitsBack() 方法;SplitEnumerator 会收回已经被分配,但尚未被该
SourceReader
确认(acknowledged)的分片。
- SourceReader 失败时会调用 addSplitsBack() 方法;SplitEnumerator 会收回已经被分配,但尚未被该
- SourceEvent 的处理
- SourceEvents 是 SplitEnumerator 和 SourceReader 之间来回传递的自定义事件,可以利用此机制来执行复杂的协调任务。
- 分片的发现以及分配
- SplitEnumerator 可以将分片分配到 SourceReader 从而响应各种事件,包括发现新的分片、新 SourceReader 的注册、SourceReader 的失败处理等。
SplitEnumerator 可以在 SplitEnumeratorContext 的帮助下完成上述工作,SplitEnumeratorContext 会在 SplitEnumerator 创建或者恢复的时候提供给 Source。
SplitEnumeratorContext 允许 SplitEnumerator 检索到 reader 的必要信息并执行协调操作,而在 Source 的实现中会将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。
SplitEnumerator 的实现可以仅采用被动工作方式,仅在其方法被调用时采取协调操作;但是一些 SplitEnumerator 的实现会采取主动的工作方式;例如 SplitEnumerator 定期寻找分片并分配给 SourceReader,这类问题使用 SplitEnumeratorContext 类中的 callAsync() 方法比较方便。
示例:如何在 SplitEnumerator 不需要自己维护线程的条件下实现这一点。
class MySplitEnumerator implements SplitEnumerator<MySplit, MyCheckpoint> {private final long DISCOVER_INTERVAL = 60_000L;/*** 一种发现分片的方法*/private List<MySplit> discoverSplits() {...}@Overridepublic void start() {...enumContext.callAsync(this::discoverSplits, splits -> {Map<Integer, List<MySplit>> assignments = new HashMap<>();int parallelism = enumContext.currentParallelism();for (MySplit split : splits) {int owner = split.splitId().hashCode() % parallelism;assignments.computeIfAbsent(owner, new ArrayList<>()).add(split);}enumContext.assignSplits(new SplitsAssignment<>(assignments));}, 0L, DISCOVER_INTERVAL);...}...
}
d)SourceReader
SourceReader 是一个运行在 Task Manager 上的组件,用于处理来自分片的记录。
SourceReader
提供了一个拉取式的(pull-based)处理接口,Flink 任务会在循环中不断调用 pollNext(ReaderOutput)
轮询来自 SourceReader
的记录,pollNext(ReaderOutput)
方法的返回值指示 SourceReader 的状态。
MORE_AVAILABLE
- SourceReader 有可用的记录。NOTHING_AVAILABLE
- SourceReader 现在没有可用的记录,但是将来可能会有记录可用。END_OF_INPUT
- SourceReader 已经处理完所有记录,到达数据的尾部。即 SourceReader 可以终止任务了。
pollNext(ReaderOutput)
会使用 ReaderOutput
作为参数,为了提高性能且在必要情况下,SourceReader
可以在一次 pollNext() 调用中返回多条记录;例如外部系统的工作粒度为块,而一个块可以包含多个记录,但是 source 只能在块的边界处设置 Checkpoint,此时SourceReader
可以一次将一个块中的所有记录通过 ReaderOutput
发送至下游。
**注意:SourceReader
的实现应该避免在一次 pollNext(ReaderOutput)
的调用中发送多个记录;**因为对 SourceReader
轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。
在创建 SourceReader
时,相应的 SourceReaderContext
会提供给 Source
,而 Source
会将相应的上下文传递给 SourceReader
实例;SourceReader
可以通过 SourceReaderContext
将 SourceEvent
传递给相应的 SplitEnumerator
;Source
的一个典型设计模式是让 SourceReader
发送它们的本地信息给 SplitEnumerator
,后者则会全局性地做出决定。
SourceReader
API 是一个底层(low-level) API,允许用户自行处理分片,并使用自己的线程模型来获取和移交记录;为了帮助实现 SourceReader
,Flink 提供了 SourceReaderBase 类,可以显著减少编写 SourceReader
所需要的工作量。
强烈建议连接器开发人员充分利用 SourceReaderBase
而不是从头开始编写 SourceReader
。
e)Source 使用方法
为了通过 Source
创建 DataStream
,需要将 Source
传递给 StreamExecutionEnvironment
。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Source mySource = new MySource(...);DataStream<Integer> stream = env.fromSource(mySource,WatermarkStrategy.noWatermarks(),"MySourceName");
相关文章:
48、Flink 的 Data Source API 详解
a)概述 本节将描述 FLIP-27 中引入的新 Source API 的主要接口。 b)Source Source API 是一个工厂模式的接口,用于创建以下组件。 Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer 此外,Sou…...
深入解析Java扩展机制:SPI与Spring.factories
目录 Java SPI概述 1.1 什么是SPI?1.2 SPI的工作原理1.3 SPI的优缺点 SPI的应用 2.1 Java标准库中的SPI应用2.2 自定义SPI示例 Spring.factories概述 3.1 什么是spring.factories?3.2 spring.factories的工作原理3.3 spring.factories的优缺点 spring.f…...
Vue2之模板语法
文章目录 1.模板语法1.1 插值语法{{}}可以写什么1.2 指令语法1.2.1 指令概述1.2.2 v-bind指令1.2.3 v-model指令 1.模板语法 1.1 插值语法{{}}可以写什么 (1)在data中声明的 (2)常量 (3)合法的JavaScript…...
java基础练习题
1、一个".java"源文件中是否可以包括多个类?有什么限制? 可以包含多个类。但是只有一个类可以声明为public,且要求声明为public的类的类名与源文件名相同。 2、java的优势? a、跨平台性 b、安全性高 c、简单性 d、…...
unity中通过实现底层接口实现非按钮(图片)的事件监听
编写监听脚本 PEListenter 继承自MonoBehaviour类,并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口,按照需求定义需要接收事件(鼠标按下、抬起、拖拽)的回调函数 //监听类(需要挂载在物体上面&am…...
重庆耶非凡科技有限公司的选品师项目加盟靠谱吗?
在当今电子商务的浪潮中,选品师的角色愈发重要。而重庆耶非凡科技有限公司以其独特的选品师项目,在行业内引起了广泛关注。对于想要加盟该项目的人来说,项目的靠谱性无疑是首要考虑的问题。 首先,我们来看看耶非凡科技有限公司的背…...
《青少年编程与数学》课程方案:4、课程策略
《青少年编程与数学》课程方案:4、课程策略 一、工程师思维二、使命感驱动三、价值观引领四、学习现代化五、工作生活化六、与时代共进 《青少年编程与数学》课程策略强调采用工程师思维,避免重复造轮子,培养使命感,通过探索兴趣、…...
用爬虫实现---模拟填志愿
先来说实现逻辑,首先我要获取到这个网站上所有的信息,那么我们就可以开始对元素进行检查 我们发现他的每一个学校信息都有一个对应的属性,并且是相同的,那么我们就可以遍历这个网页中的所有属性一样的开始爬取 在来分析࿰…...
vscode Run Code输出出现中文乱码情况问题解决方案
主要解决方案是通过修改计算机默认的编码格式,来完成的。 chcp 是 Windows 操作系统中的一个命令,用于显示或设置控制台的代码页(code page)。代码页决定了控制台如何解释和显示字符,特别是非 ASCII 字符(例如 Unicode 字符)。 使用方法 显示当前代码页: 输入 chcp 而…...
代码随想录训练营Day30
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、重新安排行程 前言 提示:这里可以添加本文要记录的大概内容: 今天是跟着代码随想录刷题的第30天,主要是复习了回溯算法…...
Swift 序列(Sequence)排序面面俱到 - 从过去到现在(二)
概览 在上篇 Swift 序列(Sequence)排序面面俱到 - 从过去到现在(一)博文中,我们讨论了 Swift 语言中序列和集合元素排序的一些基本知识,我们还给出了以自定义类型中任意属性排序的“康庄大道”。 不过在实际的撸码场景中,我们往往需要的是“多属性”同时参与到排序的考…...
STM32F103C8T6基于HAL库移植uC/OS-III
文章目录 一、建立STM32CubeMX工程二、移植1、 uC/OS-III源码2、移植过程 三、配置相关代码1、bsp.c和bsp.h2、main.c3、修改启动代码4、修改app_cfg.h文件5、修改includes.h文件6、修改lib_cfg.h文件 四、编译与烧录总结参考资料 学习嵌入式实时操作系统(RTOS&…...
微服务学习Day9-分布式事务Seata
文章目录 分布式事务seata引入理论基础CAP定理BASE理论 初识Seata动手实践XA模式AT模式TCC模式SAGA模式 高可用 分布式事务seata 引入 理论基础 CAP定理 BASE理论 初识Seata 动手实践 XA模式 AT模式 TCC模式 Service Slf4j public class AccountTCCServiceImpl implements A…...
vue用vite配置代理解决跨域问题(target、rewrite和changeOrigin的使用场景)
Vite的target、rewrite和changeOrigin的使用场景 1. target 使用场景:target 属性在 Vite 的 vite.config.ts 或 vite.config.js 文件的 server.proxy 配置中指定,用于设置代理服务器应该将请求转发到的目标地址。这通常是一个后端服务的API接口地址。…...
为什么PPT录制没有声音 电脑ppt录屏没有声音怎么办
一、为什么PPT录制没有声音 1.软件问题 我们下载软件的时候可能遇到软件损坏的问题,导致录制没有声音,但其他功能还是可以使用的。我建议使用PPT的隐藏功能,下载插件,在PPT界面的加载项选项卡中就能使用。我推荐一款可以解决录屏…...
JDBC学习笔记(三)高级篇
一、JDBC 优化及工具类封装 1.1 现有问题 1.2 JDBC 工具类封装 V1.0 resources/db.properties配置文件: driverClassNamecom.mysql.cj.jdbc.Driver urljdbc:mysql:///atguigu usernameroot password123456 initialSize10 maxActive20 工具类代码: p…...
c++编译器在什么情况下会提供类的默认构造函数等,与析构函数
我们都知道,在 c 里,编写的简单类,若没有自己编写构造析构函数与 copy 构造函数 与 赋值运算符函数,那么编译器会提供这些函数,并实现简单的语义,比如成员赋值。看 源码时,出现了下图类似的情形…...
SpringBoot3整合Mybatis-Plus3.5.5出现的问题
主要是由于 mybatis-plus 中 mybatis 的整合包版本不够导致的 排除 mybatis-plus 中自带的 mybatis 整合包,单独引入即可 java.lang.IllegalArgumentException: Invalid value type for attribute factoryBeanObjectType: java.lang.Stringat org.springframework.…...
服务器数据恢复—强制上线raid5阵列离线硬盘导致raid不可用的数据恢复案例
服务器数据恢复环境: 某品牌2850服务器中有一组由6块SCSI硬盘组建的raid5磁盘阵列,linux操作系统ext3文件系统。 服务器故障: 服务器运行过程中突然瘫痪。服务器管理员检查阵列后发现raid5阵列中有两块硬盘离线,将其中一块硬盘进行…...
初入阿里云,上手走一波
初入阿里云,上手走一波 一阶:ECSMysqlDMS安装Mysql初始化MysqlMysql操作DMS管理Mysql 二阶:ECSOSS远程连接ECSOSS控制台其他图片服务 三阶:更多搭配操作 可以说个人在日常使用过程中,操作最多的阿里云产品就是阿里云服…...
[C++] 小游戏 斗破苍穹 2.2.1至2.11.5所有版本(中) zty出品
目录 2.8.2 2.9.1 2.10.1 2.10.2 2.10.3 2.10.4 2.10.5 2.8.2 #include<stdio.h> #include<iostream> #include<ctime> #include<bits/stdc.h> #include<time.h> //suiji #include<windows.h> //SLEEP函数 using namespace std; st…...
Javaweb---HTTPS
题记 为了保护数据的隐私性我们引入了HTTPS 加密的方式都有那些呢? 1.对称加密: 加密和解密使用的密钥是同一个密钥 2.非对称加密:有两个密钥(一对),分为公钥和私钥(公钥是公开的,私钥是要藏好的) HTTPS的工作过程(旨在对body和header进行加密) 1.对称加密 上述引出的…...
[已解决]ESP32-C3上传程序成功但没有反应的问题
ESP32-C3上传程序成功但没有反应的问题 ESP32-C3是一款功能强大的微控制器,常用于物联网(IoT)应用的开发和原型设计。然而,有时候在上传程序成功后,设备却没有任何反应,十分让人费解。通过各种尝试已解决这…...
使用 OCLint进行静态代码分析:一个完整的配置示例
文章目录 0. 概述1. 安装 oclint2. oclint配置文件3. 脚本详解3.1 禁用的规则列表3.2 需要启用的规则代码风格代码复杂性命名规范性能安全性其他 4. 检测执行1. 使用 CMake 生成 compile_commands.json2. 运行 Oclint 0. 概述 OCLint是一个静态代码分析工具,通过词…...
【Linux】线程的互斥
一、进程线程间的互斥相关的背景概念 临界资源:多线程执行流共享的资源就叫做临界资源临界区:每一个线程内部,访问临界资源的代码,就叫做临界区互斥:任何时刻,互斥保证有且只有一个执行流进入临界区&#…...
electron如何让你窗口总是显示在最前面【mac解决全屏窗口alwaysOnTop参数不起作用】
你创建了一个使用Electron框架的应用程序,并希望它在以下情况下始终保持可见: 在切换工作区(桌面)时可见在其他应用程序之上显示当其他应用程序全屏显示时,它也显示在顶部当Keynote处于演示模式时,它也能显示在顶部 特别是当Keynote处于演示模式时,要实现这一点比较困难…...
XR和Steam VR项目合并问题
最近有一个项目是用Steam VR开发的,里面部分场景是用VRTK框架做的,还有一部分是用SteamVR SDK自带的Player预制直接开发的。 这样本身没有问题,因为最终都是通过SteamVR SDK处理的,VRTK也管理好了SteamVR的逻辑,并且支…...
uni-app:利用Vue的原型对象Vue.prototype设置全局方法及其引用
一、在main.js中设置方法checkPermission绑定到Vue.prototype 核心代码 Vue.prototype.$checkPermission function(username) {console.log(Checking permission for:, username); }; 完整代码 import App from ./App// 添加 checkPermission 方法到 Vue.prototype 上,检查…...
django接入djangorestframework-simplejwt步骤
版本:django 4.2 python: 3.8 安装 pip install djangorestframework-simplejwtuser子应用models.py文件 from django.db import models from django.contrib.auth.models import AbstractUserclass User(AbstractUser):mobile models.CharField(max_length11, u…...
前端工程化工具系列(十)—— Browserslist:浏览器兼容性配置工具
Browserslist 是一个能够在不同的前端工具间共享目标浏览器的配置,各工具根据该配置进行代码转译等操作。 具体的这些前端工具为:Autoprefixer、Babel、postcss-preset-env、eslint-plugin-compat、stylelint-no-unsupported-browser-features、postcss-…...
百度网盟有哪些网站/google浏览器官网
jquery 对 Json 的各种遍历 概述: JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,采用完全独立于语言的文本格式,是理想的数据交换格式。同时,JSON是 JavaScript 原生格式,这意味着在 JavaScript 中…...
wordpress判断页面类型/西安今日头条新闻
查询ID是奇数的记录 “JIOUSHU”表:...
ppt做的比较好的网站/网页制作的软件
我们常见的执行js代码都是放入到HTML引入后然后通过HTML文件来执行胡查看代码,显然这是比较麻烦的事情, 如果你的电脑里面安装了node.js,你可以使用node来直接使用node来运行你想要运行的js文件, 具体的操作如图所示:…...
安徽建设委员会网站/百度学术搜索
文章目录一、第一阶段:前三年二、第二阶段:第五年三、第三阶段:第十年总结如果你还没有自己清晰的职业规划,他的建议可以帮助你思考一下自己的将来。 程序员的职业未来分为三个阶段,每个阶段都会遇到一个区分门槛。 程…...
设计制作个人网站/国内网络销售平台有哪些
学习内容简单查询汇总分析复杂查询多表查询如何提高SQL查询效率简单查询创建学校数据库的表查找学生查询姓‘猴’的学生名单查询姓名中最后一个字是‘猴’的学生名单查询姓名中带‘猴’的学生名单select * from student where 姓名 like 猴%;select * from student where 姓名 …...
做算命网站赚钱吗/商品热搜词排行榜
Loadrunner在场景中添加多个负载机报错:Action.c(38): Error -26488: Could not obtain information about submitted解决方法参考文章: (1)Loadrunner在场景中添加多个负载机报错:Action.c(38): Error -26488: Could…...