Flink 流式读写文件、文件夹
文章目录
- 一、flink 流式读取文件夹、文件
- 二、flink 写入文件系统——StreamFileSink
- 三、查看完整代码
一、flink 流式读取文件夹、文件
Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);
StreamExecutionEnvironment.readFile()接收如下参数:
- FileInputFormat参数,负责读取文件中的内容。
- 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
- PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
- 30000L表示多久扫描一次监听的文件。
FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。
DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。
在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。
二、flink 写入文件系统——StreamFileSink
该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink中输出的文件,其生命周期会经历3中状态:
- in-progress Files 当前文件正在写入中
- Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
- Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !
数据文件格式是行式存储格式
BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();
其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。
所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
将数据以列式存储的格式输出到文件中
三、查看完整代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}
相关文章:
Flink 流式读写文件、文件夹
文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示: StreamExe…...
【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总
【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总 一、QNX侧1.1 surfacedump 功能1.2 screenshot 功能二、Android GVM 侧2.1 screencap -p 导出 PNG 图片2.2 screencap 不加 -p 参数,导出 RGB32 图片2.3 dumpsys SurfaceFlinger --display-id 方法系列文…...
字符串旋转(1)
目录 编辑 题目要求😍: 题目内容❤: 题目分析📚: 主函数部分📕:编辑 方法一🐒: 方法二🐒🐒: 方法三🐒…...
【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置
【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置 一、QUP v3 介绍二、QUP v3 UART 功能配置2.1 TrustZone 域 Uart 资源权限配置:以 QUPV3_0_SE2 为例2.2 QNX Host 域关闭 Uart 资源:以 QUPV3_0_SE2 为例2.3 Android Kernel 域使能 U…...
STM32 F103C8T6学习笔记10:OLED显示屏GIF动图取模—简易时钟—动图手表的制作~
今日尝试做一款有动图的OLED实时时钟,本文需要现学一个OLED的GIF动图取模 其余需要的知识点有不会的可以去我 STM32 F103C8T6学习笔记 系列专栏自己查阅把,闲话不多,直接开肝~~~ 文章提供源码,测试工程下载,测试效…...
大数据课程K3——Spark的常用案例
文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的常用案例——WordCount; ⚪ 掌握Spark的常用案例——求平均值; ⚪ 掌握Spark的常用案例——求最大值和最小值; ⚪ 掌握Spark的常用案例——TopK; ⚪ 掌握Spark的常用案例…...
85-最大矩阵
题目 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵,找出只包含 1 的最大矩形,并返回其面积。 示例 1: 输入:matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”,“1”,…...
8.3 【C语言】通过指针引用数组
8.3.1 数组元素的指针 所谓数组元素的指针就是数组元素的地址。 可以用一个指针变量指向一个数组元素。例如: int a[10]{1,3,5,7,9,11,13,15,17,19}; int *p; p&a[0]; 引用数组元素可以用下标法,也可以用指针法…...
基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】
文章目录 一、PostgreSQL作为数据来源(source),由flink读取1.postgre安装与配置2.flink安装与配置3.flink cdc postgre配置3.1 postgre配置(for flink cdc)3.2 flink cdc postgres的jar包下载 4.flink cdc postgre测试…...
Vue-5.编译器Idea
Vue专栏(帮助你搭建一个优秀的Vue架子) Vue-1.零基础学习Vue Vue-2.Nodejs的介绍和安装 Vue-3.Vue简介 Vue-4.编译器VsCode Vue-5.编译器Idea Vue-6.编译器webstorm Vue-7.命令创建Vue项目 Vue-8.Vue项目配置详解 Vue-9.集成(.editorconfig、…...
qiuzhiji3
本篇想介绍一下慧与,这里的工作氛围和企业文化令人难忘,希望更多人了解它 也想探讨一下不同的文化铸就的不同企业,究竟有哪些差别。 本篇将从我个人角度出发描述慧与。 2022/3/16至2023/7/31 本篇初次写于2023年8月20日 说起来在毕业之前那段…...
JVM——垃圾回收(垃圾回收算法+分代垃圾回收+垃圾回收器)
1.如何判断对象可以回收 1.1引用计数法 只要一个对象被其他对象所引用,就要让该对象的技术加1,某个对象不再引用其,则让它计数减1。当计数变为0时就可以作为垃圾被回收。 有一个弊端叫做循环引用,两个的引用计数都是1ÿ…...
QT TLS initialization failed问题(已解决) QT基础入门【网络编程】openssl
问题: qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed 这个问题的出现主要是使用了https请求:HTTPS ≈ HTTP + SSL,即有了加密层的HTTP 所以Qt 组件库需要OpenSSL dll 文件支持HTTPS 解决: 1.加入以下两行代码获取QT是否支持opensll以…...
SpringMVC之获取请求参数
文章目录 前言一、通过ServletAPI获取二、通过控制器方法的形参获取请求参数三、注解1.RequestParam2.RequestHeader3.CookieValue前面的代码总和:4.通过POJO获取请求参数 三、解决获取请求参数的乱码问题总结 前言 下面用到了thymeleaf,不知道的可以看…...
【无标题】QT应用编程: QtCreator配置Git版本控制(码云)
QT应用编程: QtCreator配置Git版本控制(码云) 感谢:DS小龙哥的文章,这篇主要参考小龙哥的内容。 https://cloud.tencent.com/developer/article/1930531?areaSource102001.15&traceIdW2mKALltGu5f8-HOI8fsN Qt Creater 自带了git支持。但是一直没…...
JVM面试题-2
1、有哪几种垃圾回收器,各自的优缺点是什么? 垃圾回收器主要分为以下几种:Serial、ParNew、Parallel Scavenge、Serial Old、Parallel Old、CMS、G1; Serial:单线程的收集器,收集垃圾时,必须stop the worl…...
kafka安装说明以及在项目中使用
一、window 安装 1.1、下载安装包 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12 1.2、安装配置 1、解压刚刚下载的配置文件,解压后如下&#x…...
二叉树搜索
✅<1>主页:我的代码爱吃辣📃<2>知识讲解:数据结构——二叉搜索树☂️<3>开发环境 :Visual Studio 2022💬<4>前言:在之前的我们已经学过了普通二叉树,了解了基本的二叉树…...
【先进PID控制算法(ADRC,TD,ESO)加入永磁同步电机发电控制仿真模型研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
k8s集群生产环境的问题处理
2 k8s上的服务均无法访问 执行命令kubectl get pods -ALL,k8s集群中的服务均是running状态 1 kuboard 网页无法访问 kuboard无法通过浏览器访问,但是查看端口是被占用的...
serve : 无法将“serve”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。
1、在学习webpack打包的时候,需要 serve用来启动开发服务器来部署代码查看效果的。安装完之后运行出现以下错误: 2、使用命令查看安装目录: npm list -g我们已经安装过了 3、解决: 我们看到上图路径在:C:\Users\qiy…...
【LVS】2、部署LVS-DR群集
LVS-DR数据包的流向分析 1.客户端发送请求到负载均衡器,请求的数据报文到达内核空间; 2.负载均衡服务器和正式服务器在同一个网络中,数据通过二层数据链路层来传输; 3.内核空间判断数据包的目标IP是本机VIP,此时IP虚…...
设计模式 -- 单例模式(传统面向对象与JavaScript 的对比实现)
单例模式 – 传统面向对象与JavaScript 的对比实现 文章目录 单例模式 -- 传统面向对象与JavaScript 的对比实现传统的面向对象的实现定义实现思路初级实现缺点 透明的单例模式实现目的(实现效果)实现缺点 用代理实现单例模式优点 JavaScript 中的单例模…...
YOLOX算法调试记录
YOLOX是在YOLOv3基础上改进而来,具有与YOLOv5相媲美的性能,其模型结构如下: 由于博主只是要用YOLOX做对比试验,因此并不需要对模型的结构太过了解。 先前博主调试过YOLOv5,YOLOv7,YOLOv8,相比而言,YOLOX的环…...
基于小程序的汽车俱乐部系统的设计与实现(论文+源码)_kaic
目录 前 言 1 系统概述 1.1 系统主要功能 1.2 开发及运行环境 2 系统分析和总体设计 2.1 需求分析 2.2 可行性分析 2.3 设计目标 2.4 项目规划 2.5 系统开发语言简介 2.6 系统功能模块图 3 系统数据库设计 3.1 数据库开发工具简介 3.2 数据库需求分析 3.3 数据库…...
ProgrammingArduino物联网
programming_arduino_ed2 IO 延时闪灯 void setup() {pinMode(13, OUTPUT); }void loop() {digitalWrite(13, HIGH);delay(500);digitalWrite(13, LOW);delay(500); }// sketch 03-02 加入变量 int ledPin 13; int delayPeriod 500;void setup() {pinMode(ledPin, OUTPUT)…...
SSM框架的学习与应用(Spring + Spring MVC + MyBatis)-Java EE企业级应用开发学习记录(第一天)Mybatis的学习
SSM框架的学习与应用(Spring Spring MVC MyBatis)-Java EE企业级应用开发学习记录(第一天)Mybatis的学习 一、当前的主流框架介绍(这就是后期我会发出来的框架学习) Spring框架 Spring是一个开源框架,是为了解决企业应用程序开发复杂…...
Programming abstractions in C阅读笔记: p118-p122
《Programming Abstractions In C》学习第49天,p118-p122,总结如下: 一、技术总结 1.随机数 (1)seed p119,“The initial value–the value that is used to get the entire process start–is call a seed for the random ge…...
2023国赛数学建模思路 - 案例:ID3-决策树分类算法
文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法,就是频繁模…...
selenium 选定ul-li下拉选项中某个指定选项
场景:selenium的下拉选项是ul-li模式,选定某个指定的选项。 from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # 显示等待def select_li(self, text, *ul_locator):"…...
知名网络公司有哪些/济宁seo推广
先打出nand表0 nand 011 nand 100 nand 111 nand 01容易发现(!a)a nand a然后(a&b)!(a nand b)然后(a|b)!((!a)&(!b))然后(a^b)(a|b)&(a nand b)所以通过nand我们可以实现任意一种位运算所以每一位我们想得到0/1都是可以的按道理[L,R]中符合位数要求的数都能得到然…...
wap网站制作工具/如何创建一个网址
Scala集合的mutable和immutable解释概述集合API概述概述 Scala 集合类系统地区分了可变的和不可变的集合。可变集合可以在适当的地方被更新或扩展。这意味着你可以修改,添加,移除一个集合的元素。而不可变集合类,相比之下,永远不会…...
去哪里找做网站的/百姓网
在网页开发中经常会碰到获取同种类型的 标签 的值得问题,比如下面的两种情况. 当需要批量获取同种标签的指定值时,新人就会碰上一点小麻烦. 比如 idproblem1的demo var list1$("#problem1").children();//获取到problem1指定的对象数组 console.log(list1);//打印到…...
seo网站推广电话/自己开平台怎么弄啊
西南交《计算机绘图(机械)》在线作业二一、单选题(共 11 道试题,共 77 分。)1. 世界坐标系原点的位置,将( )。. 始终在绘图区域的左下角. 由Usion命令设定. 由Limits命令对绘图界限的设置来确定. 由ZOOM命令设定正确答案:2. 在使用单行文本命令书写“45”…...
网站建设运营费用包括哪些/网络营销好找工作吗
阅读本文大概需要 3.5 分钟。 本篇是设计模式系列的开篇,虽然之前也写过相应的文章,但是因为种种原因后来断掉了,而且发现之前写的内容也很渣,不够系统。 所以现在打算重写,加上距离现在也有一段时间了,也算…...
wordpress浏览统计/中国足球世界排名
查看linux的重启记录: last | grep reboot 与之类似可以查看Linux的关机记录: last | grep shutdown...