当前位置: 首页 > news >正文

Flink Source 详解

Flink Source 详解

原文

flip-27
FLIP-27 介绍了新版本Source 接口定义及架构

相比于SourceFunction,新版本的Source更具灵活性,原因是将“splits数据获取”与真“正数据获取”逻辑进行了分离
在这里插入图片描述

重要部件

Source 作为工厂类,会创建以下两个重要部件

  1. SplitEnumerator

    • 通过createEnumerator创建

    • SplitEnumerator 响应request split请求

      • handleSplitRequest
    • 工作在SourceCoordinator (官方描述如下),可以理解为在JobMaster上运行一个单线程的逻辑,所以需要跟在worker上的reader通过rpc通信

      Where to run the enumerator
      There was a long discussion about where to run the enumerator which we documented in the appendix. The final approach we took was very similar to option 3 with a few differences. The approach is following.Each SplitEnumerator will be encapsulated in one SourceCoordinator. If there are multiple sources, multiple SourceCoordinator will there be. The SourceCoordinators will run in the JobMaster, but not as part of the ExecutionGraph. In this FLIP, we propose to failover the entire execution graph when the SplitEnumerator fails. A finer grained enumerator failover will be proposed in a later FLIP.
      
  2. SourceReader

    • 通过createReader创建

    • 工作在worker

    • 由于单独实现SourceReader过于复杂,官方抽象了3种比较通用的模型供开发者使用,MySqlSourceReader就是继承了SingleThreadMultiplexSourceReaderBase

      1. Sequential Single Split (File, database query, most bounded splits)
      2. Multi-split multiplexed (Kafka, Pulsar, Pravega, …)
      3. Multi-split multi-threaded (Kinesis, …)
        在这里插入图片描述

      在这里插入图片描述

      在这里插入图片描述

    • 使用了抽象后的类,开发者的关注点集中在实现一个SplitReader

      public interface SplitReader<E, SplitT extends SourceSplit> {RecordsWithSplitIds<E> fetch() throws InterruptedException;void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);void wakeUp();
      }
      
      1. fetch 获取数据,这里是包含了split信息的record
      2. 响应split改变
      3. 唤醒
  3. RecordEmitter

    1. The RecordsWithSplitIds returned by the SplitReader will be passed to an RecordEmitter one by one.
    2. The RecordEmitter is responsible for the following:
      • Convert the raw record type into the eventual record type
      • Provide an event time timestamp for the record that it processes.
    3. 在 emitRecord 方法中实现

由于通信使用mail风格的rpc(单线程串行),所以响应函数需要保证非阻塞,所以后面可以看到无论enumerator还是reader的最终响应都是在异步线程池中

Non-blocking progress methods, to it supports running in an actor/mailbox/dispatcher style operator

MysqlSource 举例

以flink cdc中的MysqlSource来举例分析

  1. MysqlSource
    • 通过 createEnumerator 创建 MySqlSourceEnumerator

      • 初始化调用start
        • 调用splitAssigner.open()
          • splitAssigner 是获取/分配split动作的真正实现
            • 创建异步线程,填充remainingSplits
      • handleSplitRequest 响应空闲worker的请求
        • assignSplits
          • splitAssigner.getNext()
            • 从 remainingSplits 拿一个可用的split
      • 调用 context.assignSplit 发送 AddSplitEvent
      • MySqlSourceEnumerator 中 splitAssigner 的实现说明
        • splitAssigner 默认实现是 MySqlHybridSplitAssigner
          • hybrid的含义,启动分为两个步骤 1. 读取全量数据 2. 全量数据读取完毕后读取增量数据。将两种模式混合在一起被称为hybird。所以MySqlSnapshotSplitAssigner可以创建两种split
            1. 通过MySqlSnapshotSplitAssigner创建存量数据的split
              • 在读取存量数据时通过chunkSplitter切分为多个split,之后分发给多个reader并行读取
                • chunkSplitter 通过 chunkKey 的范围将存量数据切分
                • 用户可以手动设置chunkKey,否则使用主key作为chunkKey,切分split
            2. 通过 createBinlogSplit 创建增量数据的split
              • 只assign一次binlog的split
              • 只能分发给一个reader,所以在进入增量模式后flink实际所有并行度上只有一个source有数据
                在这里插入图片描述
    • 通过 createReader 创建 MySqlSourceReader

      • 创建 SingleThreadFetcherManager 传入 elementQueue splitReaderSupplier
        • elementQueue: io线程和主线程公用队列,io线程写,主线程读
        • splitReaderSupplier: split reader的工厂
        • SingleThreadFetcherManager 启动后创建线程池
      • sourceOperator 收到 AddSplitEvent 调用 sourceReader.addSplits 这里 sourceReader 是 MySqlSourceReader
        • readerBase 中会调用 splitFetcherManager.addSplits(splits);
          • 由于使用的是 SingleThreadFetcherManager,所以addSplits中永远看到只同时存在一个fetcher
            • fetcher 初始化时加入默认任务 FetchTask 构造的时候传入 elementQueue 传入构造好的 splitReader
            • fetcher addSplits时加入任务 AddSplitsTask
            • fetcher 启动时调用 startFetcher
              • 调用 executors.submit(fetcher); 提交到线程池
              • 线程池中运行 runOnce
                • FetchTask 调用 splitReader.fetch() 获取records 写入 elementQueue
      • 主线程 SourceReaderBase 中的 pollNext 会被框架调用
        • 调用 getNextFetch
          • elementsQueue.poll() 取得 records
            在这里插入图片描述

其他

在Flink CDC 3.0 中

Flink Composer 中使用 WatermarkStrategy.noWatermarks()

 return env.fromSource(sourceProvider.getSource(),WatermarkStrategy.noWatermarks(),sourceDef.getName().orElse(generateDefaultSourceName(sourceDef)),new EventTypeInfo()).setParallelism(sourceParallelism);

很合理,因为pipeline的定义中不会出现聚合函数 window函数

相关文章:

Flink Source 详解

Flink Source 详解 原文 flip-27 FLIP-27 介绍了新版本Source 接口定义及架构 相比于SourceFunction&#xff0c;新版本的Source更具灵活性&#xff0c;原因是将“splits数据获取”与真“正数据获取”逻辑进行了分离 重要部件 Source 作为工厂类&#xff0c;会创建以下两…...

2024年了,TCP分析工具有哪些?

TCP分析工具广泛应用于网络调试、性能分析和协议学习。以下是一些常用的TCP分析工具&#xff0c;它们各有特点&#xff0c;适用于不同的场景&#xff1a; Wireshark - 这是一个非常强大的网络协议分析器&#xff0c;支持图形界面&#xff0c;可以捕获和分析TCP流量&#xff0c;…...

SRP 实现 Cook-Torrance BRDF

写的很乱&#xff01; BRDF&#xff08;Bidirectional Reflectance Distribution Function&#xff09;全称双向反射分布函数。辐射量单位非常多&#xff0c;这里为方便直观理解&#xff0c;会用非常不严谨的光照强度来解释说明。 BRDF光照模型&#xff0c;上反射率公式&#…...

MySQL慢日志

慢查询日志顾名思义就是查询慢的sql语句可以记录到一个日志文件里&#xff0c;至于有多慢才会被记录&#xff0c;默认是10秒&#xff0c;但也可以通过系统配置来更改&#xff0c;慢日志在做系统优化时是一个非常好用的工具 #是否开启慢日志 show variables like slow_query_log…...

Flutter网络通信-封装Dio

前言 dio 是一个强大的 Dart HTTP 请求库&#xff0c;支持全局配置、Restful API、FormData、拦截器、 请求取消、Cookie 管理、文件上传/下载、超时以及自定义适配器等。 Dio的pub地址为&#xff1a;dio | Dart package 封装要求 能够使用get、post、put、patch、delete、…...

matlab 读取csv

需要跳过第一行表头等信息 1、读取整个文件 csvread(FILENAME)%文件路径 文件名2、指定起始位置 csvread(FILENAME, R, C)%从文件的第R行和第C列开始读取数据 逗号分开3、指定数据范围 csvread(FILENAME, R, C, [R1 C1 R2 C2])%读取从(R1, C1)到(R2, C2)范围内的数据注意&am…...

网络层9——虚拟专用网VPN和网络地址转换NAT

目录 一、为什么有虚拟专用网&#xff1f; 二、如何理解“虚拟专用网”&#xff1f; 三、IP隧道技术实现虚拟专用网 四、网络地址变换 一、为什么有虚拟专用网&#xff1f; 第一&#xff0c;IPv4只有32位&#xff0c;最多有40亿个全球唯一的IP地址数量不够&#xff0c;无法…...

开源科学工程技术软件介绍 – EDA工具KLayout

link 今天向各位知友介绍的 KLayout是一款由德国团队开发的开源EDA工具。 KLayout是使用C开发的&#xff0c;用户界面基于Qt。它支持Windows、MacOS和Linux操作系统。安装程序可以从下面的网址下载&#xff1a; https://www.klayout.de/build.html KLayout图形用户界面&…...

【网络安全】Cookie SameSite属性

未经许可,不得转载。 文章目录 背景CSRF 攻击SameSite 属性StrictLaxNone背景 为了有效防止 CSRF 攻击并保护用户隐私,Chrome 从 51 版本开始引入了 SameSite 属性,专门用于限制第三方 Cookie 的使用,进而减少安全风险。 CSRF 攻击 跨站请求伪造(CSRF)攻击是指恶意网站…...

Linux 命令 | 每日一学,文本处理三剑客之awk命令实践

[ 知识是人生的灯塔&#xff0c;只有不断学习&#xff0c;才能照亮前行的道路 ] 0x00 前言简述 描述&#xff1a;前面作者已经介绍了文本处理三剑客中的 grep 与 sed 文本处理工具&#xff0c;今天将介绍其最后一个且非常强大的 awk 文本处理输出工具&#xff0c;它可以非常方便…...

RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客 本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.&#xff08;只演⽰部分常⽤的⼯作模式&#xff09; 目录 引⼊依赖 一.工作队列模式 二.Publish/Subscribe(发布订阅模式) …...

【web前端笔记】vue3 + vite的前端项目中,使用import.meta.glob()方法实现全局注册组件的通用代码

目录 1.1、如何读取所有文件 1.2、通用代码 1.3、在main.js引入 这篇文章介绍一下,在vue3和vite搭建的项目中,如何将【src/components】目录下所有的【*.vue】文件,当做一个组件全局注册到Vue对象里面。 1.1、如何读取所有文件 在vue3和vite搭建的项目里面,它给我们提…...

保险行业建立知识管理系统:提高效率和安全性的策略

在保险行业&#xff0c;知识管理系统&#xff08;KMS&#xff09;的建立对于提高工作效率和保障数据安全性至关重要。保险公司需要在复杂的生态系统中航行&#xff0c;这个生态系统由不断发展的法规、错综复杂的保单和投保人不断变化的需求所定义。以下是一些关键策略&#xff…...

小程序如何完成订阅

小程序如何完成订阅 参考相关文档实践问题处理授权弹窗不再触发引导用户重新授权 参考相关文档 微信小程序实现订阅消息推送的实现步骤 发送订阅消息 小程序订阅消息&#xff08;用户通过弹窗订阅&#xff09;开发指南 实践 我们需要先选这一个模板&#xff0c;具体流程参考…...

JS学习日记(jQuery库)

前言 今天先更新jQuery库的介绍&#xff0c;它是一个用来帮助快速开发的工具 介绍 jQuery是一个快速&#xff0c;小型且功能丰富的JavaScript库&#xff0c;jQuery设计宗旨是“write less&#xff0c;do more”&#xff0c;即倡导写更少的代码&#xff0c;做更多的事&#xf…...

Uni-APP+Vue3+鸿蒙 开发菜鸟流程

参考文档 文档中心 运行和发行 | uni-app官网 AppGallery Connect DCloud开发者中心 环境要求 Vue3jdk 17 Java Downloads | Oracle 中国 【鸿蒙开发工具内置jdk17&#xff0c;本地不使用17会报jdk版本不一致问题】 开发工具 HBuilderDevEco Studio【目前只下载这一个就…...

Linux的基本用法

Linux的基本用法涵盖多个方面&#xff0c;包括用户登录、系统操作、文件和目录管理、系统工具使用等。以下是对Linux基本用法的详细介绍&#xff1a; 一、用户登录与系统操作 用户登录 普通用户登录&#xff1a;选择用户名并输入密码。超级用户&#xff08;root&#xff09;登…...

如何找出爬取网站的来源IP呢?

1.背景 最近网站数据库性能很不稳定&#xff0c;查询性能在某段时间很慢&#xff0c;服务器CPU也很高&#xff0c;平常时间很低&#xff0c;感觉被爬虫恶意搞了&#xff0c;因此我分析了一下最近的nginx访问日志 2.方法 找出访问量最大20个ip [root100 nginx]# cat liuhaih…...

Java爬虫(Jsoup)详解

文章目录 Java爬虫&#xff08;Jsoup&#xff09;详解一、引言二、Jsoup 快速入门1、Jsoup 简介1.1、添加依赖 2、解析 HTML 文档2.1、解析 HTML 字符串2.2、从 URL 加载 Document2.3、解析 body 片断 三、数据抽取1、使用 DOM 方法遍历文档3.1、获取元素 2、使用选择器语法查找…...

力扣周赛:第424场周赛

&#x1f468;‍&#x1f393;作者简介&#xff1a;爱好技术和算法的研究生 &#x1f30c;上期文章&#xff1a;力扣周赛&#xff1a;第422场周赛 &#x1f4da;订阅专栏&#xff1a;力扣周赛 希望文章对你们有所帮助 第一道题模拟题&#xff0c;第二道题经典拆分数组/线段树都…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Docker 本地安装 mysql 数据库

Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker &#xff1b;并安装。 基础操作不再赘述。 打开 macOS 终端&#xff0c;开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖

在Vuzix M400 AR智能眼镜的助力下&#xff0c;卢森堡罗伯特舒曼医院&#xff08;the Robert Schuman Hospitals, HRS&#xff09;凭借在无菌制剂生产流程中引入增强现实技术&#xff08;AR&#xff09;创新项目&#xff0c;荣获了2024年6月7日由卢森堡医院药剂师协会&#xff0…...

JavaScript 数据类型详解

JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型&#xff08;Primitive&#xff09; 和 对象类型&#xff08;Object&#xff09; 两大类&#xff0c;共 8 种&#xff08;ES11&#xff09;&#xff1a; 一、原始类型&#xff08;7种&#xff09; 1. undefined 定…...

C++ 设计模式 《小明的奶茶加料风波》

&#x1f468;‍&#x1f393; 模式名称&#xff1a;装饰器模式&#xff08;Decorator Pattern&#xff09; &#x1f466; 小明最近上线了校园奶茶配送功能&#xff0c;业务火爆&#xff0c;大家都在加料&#xff1a; 有的同学要加波霸 &#x1f7e4;&#xff0c;有的要加椰果…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...

SQL Server 触发器调用存储过程实现发送 HTTP 请求

文章目录 需求分析解决第 1 步:前置条件,启用 OLE 自动化方式 1:使用 SQL 实现启用 OLE 自动化方式 2:Sql Server 2005启动OLE自动化方式 3:Sql Server 2008启动OLE自动化第 2 步:创建存储过程第 3 步:创建触发器扩展 - 如何调试?第 1 步:登录 SQL Server 2008第 2 步…...