4.2、Flink任务怎样读取文件中的数据
目录
1、前言
2、readTextFile(已过时,不推荐使用)
3、readFile(已过时,不推荐使用)
4、fromSource(FileSource) 推荐使用
1、前言
思考: 读取文件时可以设置哪些规则呢?
1. 文件的格式(txt、csv、二进制...)
2. 文件的分隔符(按\n 分割)
3. 是否需要监控文件变化(一次读取、持续读取)
基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法
2、readTextFile(已过时,不推荐使用)
语法说明:
定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath)); // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定Flink中的数据类型 TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName); // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
代码示例:
public static void readTextFile() throws Exception {/** TODO 功能说明* readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}
3、readFile(已过时,不推荐使用)
语法说明:
定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath : 指定 文件路径watchType : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
代码示例:
public static void readFile() throws Exception {/** TODO 功能说明* readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。* readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)* 按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:* @inputFormat : 指定文件输入格式* @filePath : 指定文件路径* @watchType : 指定监控类型,提供了两种读取策略* PROCESS_ONCE : 只读取一次* PROCESS_CONTINUOUSLY :持续读取,监控新增数据* @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:* 1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该* 文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}
4、fromSource(FileSource) 推荐使用
public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}
相关文章:
4.2、Flink任务怎样读取文件中的数据
目录 1、前言 2、readTextFile(已过时,不推荐使用) 3、readFile(已过时,不推荐使用) 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢? 1. 文件的格式(tx…...
Effective Java笔记(28)列表优于数组
数组与泛型相比,有两个重要的不同点 。 首先,数组是协变的( covariant ) 。 这个词听起来有点吓人,其实只是表示如果 Sub 为 Super 的子类型,那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反,泛…...
做BI领域的ChatGPT,思迈特升级一站式ABI平台
8月8日,以「指标驱动 智能决策」为主题,2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 后疫情时代,BI发展趋势的观察与应对 在发布会上,思迈特CEO吴华夫在开场致辞中表示,当前大环境背景下…...
ELFK——ELK结合filebeat日志分析系统(2)
目录 一、filebeat 二、ELFK 1.原理简介 2.在ELK基础上部署filebeat 一、filebeat Filebeat,轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat,并指定目录与日志格式,Filebeat 就能快速收集数据,并…...
webSocket 协议是什么
webSocket 协议是什么,能简述一下吗? websocket 协议 HTML5 带来的新协议,相对于 http,它是一个持久连接的协议,它利用 http 协议完成握手,然后通过 TCP 连接通道发送消息,使用 websocket 协议可…...
CentOS 7迁移Anolis OS 8
背景:生产环境客户要求操作系统国产化 操作系统:Centos7.9 内核:5.4.108 服务器可以联网,进行在线迁移: # 下载迁移工具软件源 wget https://mirrors.openanolis.cn/anolis/migration/anolis-migration.repo -O /etc/y…...
Transformer 立体视觉 Depth Estimation
1. Intro 立体深度估计具有重要的意义,因为它能够重建三维信息。为此,在左右相机图像之间匹配相应的像素;对应像素位置的差异,即视差,可以用来推断深度并重建3D场景。最近基于深度学习的立体深度估计方法已经显示出有希望的结果,但仍然存在一些挑战。 其中一个挑战涉及使…...
vue去掉所有输入框两边空格,封装指令去空格,支持Vue2和Vue3,ElementUI Input去空格
需求背景 就是页面很多表单输入框,期望在提交的时候,都要把用户两边的空格去掉 ❌使用 vue 的指令 .trim 去掉空格 中间会输入不了空格, 比如我想输入 你好啊 中国, 这中间的空格输入不了,只能变成 你好啊中国 ❌在提交的时候使用…...
认识FFMPEG框架
FFMPEG全称: Fast Forward Moving Picture Experts Group (MPEG:动态图像专家组) ffmpeg相关网站: git://source.ffmpeg.org/ffmpeg.git http://git.videolan.org/?pffmpeg.git https://github.com/FFmpeg/FFmpeg FFMPEG框架基本组件: AVFormat , AVCodec, AVDevice, AVFil…...
Vue3 大屏数字滚动效果
父组件: <template> <div class"homePage"> <NumRoll v-for"(v, i) in numberList" :key"i" :number"v"></NumRoll> </div> </template> <script setup> import { onMounted, r…...
【深度学习注意力机制系列】—— SENet注意力机制(附pytorch实现)
深度学习中的注意力机制(Attention Mechanism)是一种模仿人类视觉和认知系统的方法,它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制,神经网络能够自动地学习并选择性地关注输入中的重要信息ÿ…...
go 函数
go 语言函数 go 函数函数定义Go函数的特点如下函数作为参数可变参数相同类型可变参数不同类型可变参数 return语句作用概述空的return语句空白标识符多个返回值命名返回值 defer 语句作用引申出来的面试题for defer下面是一个使用defer的示例代码输出结果 匿名函数定义匿名函数…...
python之正则表达式
目录 正则表达式 python正则表达式方法 match search findall finditer compile 元字符匹配 元字符 量词 贪婪匹配和惰性匹配 正则表达式的group 语法 案例 正则表达式 正则表达式又称规则表达式,是使用单个字符串来描述、匹配某个句法规则的字符串…...
【LeetCode每日一题】——219.存在重复元素II
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…...
篇六:适配器模式:让不兼容变兼容
篇六:“适配器模式:让不兼容变兼容” 开始本篇文章之前先推荐一个好用的学习工具,AIRIght,借助于AI助手工具,学习事半功倍。欢迎访问:http://airight.fun/ 另外有2本不错的关于设计模式的资料,…...
【云原生】Docker-compose中所有模块学习
compose模块 模板文件是使用 Compose 的核心,涉及到的指令关键字也比较多。但大家不用担心,这里面大部分指令跟 docker run 相关参数的含义都是类似的。 默认的模板文件名称为 docker-compose.yml,格式为 YAML 格式。 version: "3&quo…...
广义积分练习
前置知识 无穷限积分瑕积分 练习 计算 ∫ 0 ∞ 1 x ( 1 x ) d x \int_0^{\infty}\dfrac{1}{\sqrt x(1x)}dx ∫0∞x (1x)1dx 解: x 0 \qquad x0 x0为瑕点 \qquad 原式 lim a → 0 lim b → ∞ ∫ a b 1 x ( 1 x ) d x lim a → 0 lim …...
element-ui树形表格,左边勾选,右边显示选中的数据-功能(如动图)
功能如图 功能需求 表格树形表格勾选数据,右边显示对应勾选的数据内容,选中客户,自动勾选所有的店铺(子级),选中其中一个店铺,自动勾选上客户(父级),同时会存在只有客户(下面没有子级的情况&am…...
Android数字价格变化的动画效果的简单实现
原理:使用ValueAnimator属性动画类实现,它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码: public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…...
Win10无法投影关闭3D模式
Win10不小心开启了3D模式,插上投影仪就一闪一闪的,无法正投影 解决办法: 1. 打开注册表工具regedit,删除以下注册表,重启电脑 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\GraphicsDrivers\Configurat…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
Unity UGUI Button事件流程
场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...
MyBatis中关于缓存的理解
MyBatis缓存 MyBatis系统当中默认定义两级缓存:一级缓存、二级缓存 默认情况下,只有一级缓存开启(sqlSession级别的缓存)二级缓存需要手动开启配置,需要局域namespace级别的缓存 一级缓存(本地缓存&#…...
