Flink 离线计算
文章目录
- 一、样例一:读 csv 文件生成 csv 文件
- 二、样例二:读 starrocks 写 starrocks
- 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
- 四、遇到的坑
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>
一、样例一:读 csv 文件生成 csv 文件
参考:(3)Flink学习- Table API & SQL编程
import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
// DataSet<WC> input = env.fromElements(
// WC.of("hello", 1),
// WC.of("hqs", 1),
// WC.of("world", 1),
// WC.of("hello", 1)
// );// 注册数据集
// tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}
准备测试文件 data.csv:
name,age
zhangsan,23
lisi,43
wangwu,12
运行程序后会生成 D:\\tmp\\result.csv 文件。
二、样例二:读 starrocks 写 starrocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
// DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
// dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.jdbc.Driver")
// .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
// .setUsername("root").setPassword("")
// .setQuery("insert into student values(?, ?)")
// .finish()
// );env.execute("SQL-Batch");}
}
数据准备:
CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);
参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式
注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}
解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}
<build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
三、样例三:DataSet、Table Sql 处理后写入 StarRocks
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}
四、遇到的坑
坑1:Bang equal '!=' is not allowed under the current SQL conformance level
解决:将 sql 中的 != 修改为 <>
坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
解决方法:去掉最后一行代码 env.execute(); 就可以了。
相关文章:
Flink 离线计算
文章目录 一、样例一:读 csv 文件生成 csv 文件二、样例二:读 starrocks 写 starrocks三、样例三:DataSet、Table Sql 处理后写入 StarRocks四、遇到的坑 <dependency><groupId>org.apache.flink</groupId><artifactId&…...
Git | 理解团队合作中Git分支的合并操作
合并操作 团队合作中Git分支的合并操作分支合并过程1.创建feature/A分支的过程2. 创建分支feature/A-COPY3.合并分支查看代码是否改变 团队合作中Git分支的合并操作 需求 假设团队项目中的主分支是main,团队成员A基于主分支main创建了feature/A,而我又在团队成员A创…...
C++多态的实现原理
【欢迎关注编码小哥,学习更多实用的编程方法和技巧】 1、类的继承 子类对象在创建时会首先调用父类的构造函数 父类构造函数执行结束后,执行子类的构造函数 当父类的构造函数有参数时,需要在子类的初始化列表中显式调用 Child(int i) : …...
[极客大挑战 2019]PHP--详细解析
信息搜集 想查看页面源代码,但是右键没有这个选项。 我们可以ctrlu或者在url前面加view-source:查看: 没什么有用信息。根据页面的hint,我们考虑扫一下目录看看能不能扫出一些文件. 扫到了备份文件www.zip,解压一下查看网站源代码…...
map用于leetcode
//第一种map方法 function groupAnagrams(strs) {let map new Map()for (let str of strs) {let key str ? : str.split().sort().join()if (!map.has(key)) {map.set(key, [])}map.get(key).push(str)} //此时map为Map(3) {aet > [ eat, tea, ate ],ant > [ tan,…...
CommonJS 和 ES Modules 的 区别
CommonJS 和 ES Modules 的 区别 1. CommonJS 和 ES Modules 区别?1.1 语法差异CommonJS:ES Modules: 1.2. 加载机制CommonJS:ES Modules: 1.3. 运行时行为CommonJS:ES Modules: 1.4. 兼容性和使用场景Com…...
科技为翼 助残向新 高德地图无障碍导航规划突破1.5亿次
今年12月03日是第33个国际残疾人日。在当下科技发展日新月异的时代,如何让残障人士共享科技红利、平等地参与社会生活,成为当前社会关注的热点。 中国有超过8500万残障人士,其中超过2400万为肢残人群,视力障碍残疾人数超过1700万…...
Flink四大基石之Time (时间语义) 的使用详解
目录 一、引言 二、Time 的分类及 EventTime 的重要性 Time 分类详述 EventTime 重要性凸显 三、Watermark 机制详解 核心原理 Watermark能解决什么问题,如何解决的? Watermark图解原理 举例 总结 多并行度的水印触发 Watermark代码演示 需求 代码演示ÿ…...
Spring WebFlux与Spring MVC
Spring WebFlux 是对 Spring Boot 项目中传统 Spring MVC 部分的一种替代选择,主要是为了解决现代 Web 应用在高并发和低延迟场景下的性能瓶颈。 1.WebFlux 是对 Spring MVC 的替代 架构替代: Spring MVC 使用的是基于 Servlet 规范的阻塞式模型…...
【深度学习基础】一篇入门模型评估指标(分类篇)
🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀深度学习_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. 模…...
D80【 python 接口自动化学习】- python基础之HTTP
day80 requests请求加入headers 学习日期:20241126 学习目标:http定义及实战 -- requests请求加入headers 学习笔记: requests请求加入headers import requestsurlhttps://movie.douban.com/j/search_subjects params{"type":…...
⽂件操作详解
⽬录 一 文件操作的引入 1 为什么使⽤⽂件? 2 什么是⽂件? 3 文件分类(1 从⽂件功能的⻆度来分类:程序⽂件/数据⽂件 2根据数据的组织形式:为⽂本⽂件/⼆进制⽂件) 二 ⽂件的打开和关闭 1 …...
双高(高比例新能源发电和高比例电力电子设备)系统宽频振荡研究现状
1 为什么会形成双高电力系统 (1)新能源发电比例增加 双碳计划,新能源革命,可再生能源逐步代替传统化石能源,未来新能源发电将成为最终能源需求的主要来源。 (2)电力电子设备数量增加 为了实…...
TorchMoji使用教程/环境配置(2024)
TorchMoji使用教程/环境配置(2024) TorchMoji简介 这是一个基于pytorch库,用于将文本分类成不同的多种emoji表情的库,适用于文本的情感分析 配置流程 从Anaconda官网根据提示安装conda git拉取TorchMoji git clone https://gi…...
使用 Python 中的 TripoSR 根据图像创建 3D 对象
使用 Python 中的 TripoSR 根据图像创建 3D 对象 1. 效果图2. 步骤图像到 3D 对象设置环境导入必要的库设置设备创建计时器实用程序上传并准备图像处理输入图像生成 3D 模型并渲染下载.stl 文件展示结果3. 源码4. 遇到的问题及解决参考这篇博客将引导如何使用Python 及 TripoSR…...
Spring 框架中AOP(面向切面编程)和 IoC(控制反转)
在 Spring 框架中,AOP(面向切面编程)和 IoC(控制反转)是两个核心概念,它们分别负责不同的功能。下面我将通过通俗易懂的解释来帮助你理解这两个概念。 IoC(控制反转) IoC 是 Inver…...
电机瞬态分析基础(7):坐标变换(3)αβ0变换,dq0变换
1. 三相静止坐标系与两相静止坐标系的坐标变换―αβ0坐标变换 若上述x、y坐标系在空间静止不动,且x轴与A轴重合,即,如图1所示,则为两相静止坐标系,常称为坐标系,考虑到零轴分量,也称为αβ0坐标…...
Open3D (C++) 生成任意3D椭圆点云
目录 一、算法原理1、几何参数2、数学公式二、代码实现三、结果展示一、算法原理 1、几何参数 在三维空间中,椭圆由以下参数定义: 椭圆中心点 c = ( x 0 , y 0 , z...
5.利用Pandas以及Numpy进行数据清洗
1、缺失值处理 import pandas as pd import numpy as np#创建一张7行5列带有缺失值的表,表中的数据0-100随机生成,索引是python1. df pd.DataFrame(datanp.random.randint(0,100,size(7,5)), index [i for i in pythonl])df.iloc[2,3] Nonedf.iloc[4…...
@Bean注解详细介绍以及应用
目录 一、概念二、应用(一)代码示例1、首先创建一个简单的 Java 类User2、然后创建一个配置类AppConfig3、在其他组件中使用Bean创建的 bean4、通过 Spring 的ApplicationContext来获取UserService并调用其方法 (二)bean的方法名详…...
IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
Web中间件--tomcat学习
Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...
