当前位置: 首页 > news >正文

4.2、Flink任务怎样读取文件中的数据

目录

1、前言

2、readTextFile(已过时,不推荐使用)

3、readFile(已过时,不推荐使用)

4、fromSource(FileSource) 推荐使用


1、前言

思考: 读取文件时可以设置哪些规则呢?

         1. 文件的格式(txt、csv、二进制...)        

         2. 文件的分隔符(按\n 分割)

         3. 是否需要监控文件变化(一次读取、持续读取)

基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法


2、readTextFile(已过时,不推荐使用)

语法说明:

定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath));  // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter());  // 指定Flink中的数据类型    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName);     // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }

代码示例:

    public static void readTextFile() throws Exception {/** TODO 功能说明*   readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}

3、readFile(已过时,不推荐使用)

语法说明:

定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath    : 指定 文件路径watchType   : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval    : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
FileInputFormat 的实现类
FileInputFormat 的实现类

代码示例:

    public static void readFile() throws Exception {/** TODO 功能说明*    readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。*    readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)*       按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:*      @inputFormat : 指定文件输入格式*      @filePath    : 指定文件路径*      @watchType   : 指定监控类型,提供了两种读取策略*            PROCESS_ONCE : 只读取一次*            PROCESS_CONTINUOUSLY :持续读取,监控新增数据*      @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:*      1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该*           文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}

4、fromSource(FileSource) 推荐使用

    public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}

相关文章:

4.2、Flink任务怎样读取文件中的数据

目录 1、前言 2、readTextFile&#xff08;已过时&#xff0c;不推荐使用&#xff09; 3、readFile&#xff08;已过时&#xff0c;不推荐使用&#xff09; 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢&#xff1f; 1. 文件的格式(tx…...

Effective Java笔记(28)列表优于数组

数组与泛型相比&#xff0c;有两个重要的不同点 。 首先&#xff0c;数组是协变的&#xff08; covariant &#xff09; 。 这个词听起来有点吓人&#xff0c;其实只是表示如果 Sub 为 Super 的子类型&#xff0c;那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反&#xff0c;泛…...

做BI领域的ChatGPT,思迈特升级一站式ABI平台

8月8日&#xff0c;以「指标驱动 智能决策」为主题&#xff0c;2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 ​ 后疫情时代&#xff0c;BI发展趋势的观察与应对 在发布会上&#xff0c;思迈特CEO吴华夫在开场致辞中表示&#xff0c;当前大环境背景下&#xf…...

ELFK——ELK结合filebeat日志分析系统(2)

目录 一、filebeat 二、ELFK 1.原理简介 2.在ELK基础上部署filebeat 一、filebeat Filebeat&#xff0c;轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat&#xff0c;并指定目录与日志格式&#xff0c;Filebeat 就能快速收集数据&#xff0c;并…...

webSocket 协议是什么

webSocket 协议是什么&#xff0c;能简述一下吗&#xff1f; websocket 协议 HTML5 带来的新协议&#xff0c;相对于 http&#xff0c;它是一个持久连接的协议&#xff0c;它利用 http 协议完成握手&#xff0c;然后通过 TCP 连接通道发送消息&#xff0c;使用 websocket 协议可…...

CentOS 7迁移Anolis OS 8

背景&#xff1a;生产环境客户要求操作系统国产化 操作系统&#xff1a;Centos7.9 内核&#xff1a;5.4.108 服务器可以联网&#xff0c;进行在线迁移&#xff1a; # 下载迁移工具软件源 wget https://mirrors.openanolis.cn/anolis/migration/anolis-migration.repo -O /etc/y…...

Transformer 立体视觉 Depth Estimation

1. Intro 立体深度估计具有重要的意义,因为它能够重建三维信息。为此,在左右相机图像之间匹配相应的像素;对应像素位置的差异,即视差,可以用来推断深度并重建3D场景。最近基于深度学习的立体深度估计方法已经显示出有希望的结果,但仍然存在一些挑战。 其中一个挑战涉及使…...

vue去掉所有输入框两边空格,封装指令去空格,支持Vue2和Vue3,ElementUI Input去空格

需求背景 就是页面很多表单输入框&#xff0c;期望在提交的时候&#xff0c;都要把用户两边的空格去掉 ❌使用 vue 的指令 .trim 去掉空格 中间会输入不了空格&#xff0c; 比如我想输入 你好啊 中国, 这中间的空格输入不了&#xff0c;只能变成 你好啊中国 ❌在提交的时候使用…...

认识FFMPEG框架

FFMPEG全称: Fast Forward Moving Picture Experts Group (MPEG:动态图像专家组) ffmpeg相关网站: git://source.ffmpeg.org/ffmpeg.git http://git.videolan.org/?pffmpeg.git https://github.com/FFmpeg/FFmpeg FFMPEG框架基本组件: AVFormat , AVCodec, AVDevice, AVFil…...

Vue3 大屏数字滚动效果

父组件&#xff1a; <template> <div class"homePage"> <NumRoll v-for"(v, i) in numberList" :key"i" :number"v"></NumRoll> </div> </template> <script setup> import { onMounted, r…...

【深度学习注意力机制系列】—— SENet注意力机制(附pytorch实现)

深度学习中的注意力机制&#xff08;Attention Mechanism&#xff09;是一种模仿人类视觉和认知系统的方法&#xff0c;它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制&#xff0c;神经网络能够自动地学习并选择性地关注输入中的重要信息&#xff…...

go 函数

go 语言函数 go 函数函数定义Go函数的特点如下函数作为参数可变参数相同类型可变参数不同类型可变参数 return语句作用概述空的return语句空白标识符多个返回值命名返回值 defer 语句作用引申出来的面试题for defer下面是一个使用defer的示例代码输出结果 匿名函数定义匿名函数…...

python之正则表达式

目录 正则表达式 python正则表达式方法 match search findall finditer compile 元字符匹配 元字符 量词 贪婪匹配和惰性匹配 正则表达式的group 语法 案例 正则表达式 正则表达式又称规则表达式&#xff0c;是使用单个字符串来描述、匹配某个句法规则的字符串…...

【LeetCode每日一题】——219.存在重复元素II

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…...

篇六:适配器模式:让不兼容变兼容

篇六&#xff1a;“适配器模式&#xff1a;让不兼容变兼容” 开始本篇文章之前先推荐一个好用的学习工具&#xff0c;AIRIght&#xff0c;借助于AI助手工具&#xff0c;学习事半功倍。欢迎访问&#xff1a;http://airight.fun/ 另外有2本不错的关于设计模式的资料&#xff0c…...

【云原生】Docker-compose中所有模块学习

compose模块 模板文件是使用 Compose 的核心&#xff0c;涉及到的指令关键字也比较多。但大家不用担心&#xff0c;这里面大部分指令跟 docker run 相关参数的含义都是类似的。 默认的模板文件名称为 docker-compose.yml&#xff0c;格式为 YAML 格式。 version: "3&quo…...

广义积分练习

前置知识 无穷限积分瑕积分 练习 计算 ∫ 0 ∞ 1 x ( 1 x ) d x \int_0^{\infty}\dfrac{1}{\sqrt x(1x)}dx ∫0∞​x ​(1x)1​dx 解&#xff1a; x 0 \qquad x0 x0为瑕点 \qquad 原式 lim ⁡ a → 0 lim ⁡ b → ∞ ∫ a b 1 x ( 1 x ) d x lim ⁡ a → 0 lim ⁡…...

element-ui树形表格,左边勾选,右边显示选中的数据-功能(如动图)

功能如图 功能需求 表格树形表格勾选数据&#xff0c;右边显示对应勾选的数据内容&#xff0c;选中客户&#xff0c;自动勾选所有的店铺(子级)&#xff0c;选中其中一个店铺&#xff0c;自动勾选上客户(父级)&#xff0c;同时会存在只有客户&#xff08;下面没有子级的情况&am…...

Android数字价格变化的动画效果的简单实现

原理&#xff1a;使用ValueAnimator属性动画类实现&#xff0c;它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码&#xff1a; public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…...

Win10无法投影关闭3D模式

Win10不小心开启了3D模式&#xff0c;插上投影仪就一闪一闪的&#xff0c;无法正投影 解决办法&#xff1a; 1. 打开注册表工具regedit&#xff0c;删除以下注册表&#xff0c;重启电脑 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\GraphicsDrivers\Configurat…...

FFmpeg 编码详细流程

介绍 FFmpeg的 libavcodec 模块完成音视频多媒体的编解码模块。FFmpeg 本身不具有音视频编码的功能和底层能力&#xff0c;只是对各类第三方的编码器API 进行封装调用。老版本的 FFmpeg 将avcodec_encode_video2()作为视频的解码函数 API&#xff0c;将avcodec_encode_audio2(…...

05如何做微服务架构设计

一句话导读 微服务架构设计方法有&#xff1a;领域驱动设计DDD&#xff08;Domain-Driven-Design&#xff09;、12因素应用&#xff08;12-Factor App&#xff09;、事件驱动架构EDA&#xff08;Event-Driven Architecture&#xff09;等等&#xff0c;但是他们都必须遵守微服务…...

安卓开发问题记录:需要常量表达式

问题原因 写代码过程中爆出这个错误&#xff1a;需要常量表达式&#xff0c;定位到switch。 解决方法&#xff1a;把switch case&#xff0c;改成if else 错误源代码&#xff1a; public void onClick(View view) {switch (view.getId()) {case R.id.iv_code:RxCaptcha.build(…...

回归预测 | MATLAB实现基于SVM-RFE-BP支持向量机递归特征消除特征选择算法结合BP神经网络的多输入单输出回归预测

回归预测 | MATLAB实现基于SVM-RFE-BP支持向量机递归特征消除特征选择算法结合BP神经网络的多输入单输出回归预测 目录 回归预测 | MATLAB实现基于SVM-RFE-BP支持向量机递归特征消除特征选择算法结合BP神经网络的多输入单输出回归预测预测效果基本介绍研究内容程序设计参考资料…...

配置root账户ssh免密登录并使用docker-machine构建docker服务

简介 Docker Machine是一种可以在多种平台上快速安装和维护docker运行环境&#xff0c;并支持多种平台&#xff0c;让用户可以在很短时间内在本地或云环境中搭建一套docker主机集群的工具。 使用docker-machine命令&#xff0c;可以启动、审查、停止、重启托管的docker 也可以…...

【力扣周赛】第357场周赛

【力扣周赛】第357场周赛 2810. 故障键盘题目描述解题思路 2811. 判断是否能拆分数组题目描述解题思路 2810. 故障键盘 题目描述 描述&#xff1a;你的笔记本键盘存在故障&#xff0c;每当你在上面输入字符 ‘i’ 时&#xff0c;它会反转你所写的字符串。而输入其他字符则可以…...

多线程案例(4)-线程池

文章目录 多线程案例四四、线程池 大家好&#xff0c;我是晓星航。今天为大家带来的是 多线程案例-线程池 相关的讲解&#xff01;&#x1f600; 多线程案例四 四、线程池 线程池是什么 虽然创建线程 / 销毁线程 的开销 想象这么一个场景&#xff1a; 在学校附近新开了一家…...

【数据结构OJ题】轮转数组

原题链接&#xff1a;https://leetcode.cn/problems/rotate-array/ 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 1. 方法一&#xff1a;暴力求解&#xff0c;将数组的第一个元素用临时变量tmp存起来&#xff0c;再将数组其他元素往右挪动一步&…...

现代C++中的从头开始深度学习:【4/8】梯度下降

一、说明 在本系列中&#xff0c;我们将学习如何仅使用普通和现代C编写必须知道的深度学习算法&#xff0c;例如卷积、反向传播、激活函数、优化器、深度神经网络等。 在这个故事中&#xff0c;我们将通过引入梯度下降算法来介绍数据中 2D 卷积核的拟合。我们将使用卷积和上一个…...

Yolov5缺陷检测/目标检测 Jetson nx部署Triton server

使用AI目标检测进行缺陷检测时&#xff0c;部署到Jetson上即小巧算力还高&#xff0c;将训练好的模型转为tensorRT再部署到Jetson 上供http或GRPC调用。1 Jetson nx 刷机 找个ubuntu 系统NVIDIA官网下载安装Jetson 的sdkmanager一步步刷机即可。 本文刷的是JetPack 5.1, 其中包…...

传媒公司vi/seo如何去做优化

1.#列表可重复&#xff0c;类型不同&#xff0c;用&#xff3b;&#xff3d;表示 listA [a, b, c, 1, 2] # 遍历list for item in listA:print(item)#元组是只读的&#xff0c;不能修改。元组用“()”表示 tuple1 (1,2,a,4,5,6) for item in tuple1:print(item)#字典定义了键…...

长春建站的费用/百度竞价网站

什么是MybatisMyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation 迁移到了google code&#xff0c;并且改名为MyBatis 。iBATIS一词来源于“internet”和“abatis”的组合&#xff0c;是一个基于Java的持久层框架。iBATIS提供的持久层框架包…...

厦门房地产网站建设/小红书关键词热度查询

K8s已经成为一线大厂分布式平台的标配技术。你是不是还在惆怅怎么掌握它&#xff1f;来这里&#xff0c;大型互联网公司一线工程师亲授&#xff0c;不来虚的&#xff0c;直接上手实战&#xff0c;3天时间带你搭建K8s平台&#xff0c;快速学会K8s&#xff0c;点击下方图片可了解…...

在线免费网站建设平台/网上如何推广自己的产品

一家是守护网络安全&#xff0c;建设网络强国的主力军&#xff0c;一家是系出高校名门&#xff0c;国内量子信息第一股&#xff1b;两家的结合将会碰撞出什么样的火花&#xff0c;非常值得业界期待。在上周&#xff0c;中国电信和科大国盾量子的合资公司正式成立。新公司名为“…...

网站内做二级目录/免费建站哪个最好

很多家长都会遇到上面这样的问题&#xff0c;而且即使你坐在他旁边盯着&#xff0c;一笔一划指导他写&#xff0c;孩子也未必能改变写作业磨蹭的毛病。这到底是怎么回事&#xff1f;孩子写作业慢&#xff0c;一般有以下 7 种原因。条理性差-学习无方法洋洋今年上小学二年级。爸…...

网站首页标题怎么写/网站营销网

题目描述给定一个二维数组&#xff0c;其每一行从左到右递增排序&#xff0c;从上到下也是递增排序。给定一个数&#xff0c;判断这个数是否在该二维数组中。Consider the following matrix:[ [1, 4, 7], [2, 5, 8], [3, 6, 9]]Given target 5, return true.Given…...