大数据:快速入门Scala+Flink
一、什么是Scala
Scala 是一种多范式编程语言,它结合了面向对象编程和函数式编程的特性。Scala 这个名字是“可扩展语言”(Scalable Language)的缩写,意味着它被设计为能够适应不同规模的项目,从小型脚本到大型分布式系统。
以下是 Scala 的一些主要特点:
-
兼容 Java:Scala 代码可以编译成 Java 字节码,并且可以在任何支持 Java 的平台上运行。这意味着 Scala 可以直接使用大量的 Java 库和框架。
-
简洁性:Scala 提供了一种更加简洁的方式来表达复杂的逻辑。通过模式匹配、类型推断等特性,程序员可以用较少的代码完成更多的工作。
-
函数式编程:Scala 支持函数作为一等公民,允许高阶函数、不可变数据结构和懒惰求值等函数式编程概念。
-
面向对象:Scala 同样支持面向对象编程的所有核心概念,包括类、对象、继承、封装等。
-
类型安全:Scala 有一个强大的静态类型系统,这有助于在编译时捕获错误并提供更好的代码质量。
-
并发模型:Scala 提供了 Actor 模型来处理并发问题,这是通过 Akka 框架实现的,非常适合构建高并发的应用程序。
-
泛型:Scala 对泛型的支持非常强大,提供了更灵活和安全的泛型机制。
-
交互性:Scala 有一个 REPL(读取-求值-打印循环)环境,允许开发者快速测试代码片段。
Scala 被广泛用于开发大规模的数据处理应用、Web 应用以及企业级软件。由于其与 Java 的良好集成,很多公司采用 Scala 来增强他们的 Java 生态系统中的应用程序。例如,Apache Spark 就是用 Scala 编写的,它是一个流行的大数据处理框架。
二、什么是Flink
Apache Flink 是一个开源的流处理框架,它为分布式、高性能、随时可用以及准确的流处理应用程序提供支持。Flink 的核心是一个流数据流引擎,它提供了数据分布、通信和状态管理等功能。Flink 可以处理有界数据(如固定大小的数据集)和无界数据(如持续不断的事件流),这使得它既可以作为批处理也可以作为流处理框架来使用。
以下是 Apache Flink 的一些关键特性:
- 实时处理:Flink 能够实现低延迟的实时数据处理。
- 高吞吐量:设计用于处理大规模数据流,并能够维持高吞吐量。
- 容错性:提供强大的容错机制,确保即使在节点故障的情况下也能保证计算结果的正确性和一致性。
- 精确一次(Exactly-Once)语义:保证每个事件只被处理一次,这对于需要精确结果的应用非常重要。
- 窗口操作:支持基于时间、计数或会话的窗口操作,方便对流数据进行复杂的分析。
- 状态管理:允许用户定义和维护应用状态,这对于实现复杂的业务逻辑是必需的。
- 可扩展性:可以轻松地部署到各种集群环境中,包括 YARN, Kubernetes, Mesos 等。
- API 支持:提供了多种语言的 API,包括 Java 和 Scala,也支持 Python 以及其他语言通过 Table API 或 SQL 接口。
Flink 被广泛应用于实时分析、事件驱动应用、ETL 操作、报警系统等领域。随着大数据和实时数据处理需求的增长,Flink 在业界得到了越来越多的关注和应用。
三、流处理和批处理 的区别
流处理和批处理是数据处理的两种主要方式,它们各自适用于不同的场景,并且有着不同的特点。下面是这两种处理方式的主要区别:
批处理(Batch Processing)
- 定义:批处理是指对固定大小的数据集进行处理的过程,这些数据通常是一次性加载到系统中的。
- 数据特性:处理的是静态的、历史的数据集合,数据在处理之前就已经完全可用。
- 延迟:由于需要收集完整的数据集后才能开始处理,因此批处理通常具有较高的延迟。
- 应用场景:适合于不需要实时响应的场景,如日志分析、报告生成等。
- 容错性:可以实现精确一次(Exactly-Once)语义,保证每个数据项被准确处理一次。
- 资源使用:批处理任务可以在非高峰时段运行,以优化资源使用。
流处理(Stream Processing)
- 定义:流处理是对连续不断的数据流进行即时处理的过程,数据项一旦到达就立即被处理。
- 数据特性:处理的是动态的、实时的数据流,数据是持续产生的。
- 延迟:能够提供非常低的延迟,甚至接近实时,因为数据一到达就可以被处理。
- 应用场景:适用于需要快速反应的场景,如实时监控、在线广告投放、欺诈检测等。
- 容错性:现代流处理框架如 Apache Flink 和 Kafka Streams 也支持精确一次(Exactly-Once)语义,但实现起来比批处理更复杂。
- 资源使用:流处理通常要求更高的计算资源和更复杂的基础设施来保证低延迟和高吞吐量。
混合模式
近年来,随着技术的发展,出现了一些混合处理模式,比如微批处理(Micro-batching),它将数据流分成小批次进行处理,试图结合流处理和批处理的优点。这种模式既保持了较低的延迟,又简化了处理逻辑和状态管理。
选择哪种处理方式取决于具体的应用需求、数据特性和业务目标。例如,如果应用需要基于最新数据做出决策,那么流处理可能更适合;而对于需要定期生成报表或分析大量历史数据的情况,则批处理可能是更好的选择。
四、安装Scala
1、 首先确保jdk1.8安装成功
首先在安装之前,确保本地已经安装了JDK1.5以上的版本,在此博主安装的是1.8版本。并且已经设置了JAVA_HOME 环境变量及JDK的bin目录。
2、下载对应的Scala安装文件scala-2.11.8.zip
接着我们从Scala官网地址 https://www.scala-lang.org/download/all.html 上下载Scala二进制的包。
3. 解压scala-2.11.8.zip
4. 配置Scala的环境变量
- 打开环境变量
右击我的电脑,单击"属性",进入如图所示页面。下面开始配置环境变量,右击【我的电脑】–【属性】–【高级系统设置】–【环境变量】,如图:
- 设置 SCALA_HOME 变量
单击新建,在变量名栏输入:SCALA_HOME: 变量值一栏输入:D:\scala 也就是 Scala 的安装目录,根据个人情况有所不同,如果安装在 C 盘,将 D 改成 C 即可。
- 设置 Path 变量
找到系统变量下的"Path"如图,单击编辑。在"变量值"一栏的最前面添加如下的路径: %SCALA_HOME%\bin;
4. 设置 Classpath 变量
找到找到系统变量下的"Classpath"如图,单击编辑,如没有,则单击"新建":
变量名: ClassPath
变量值: .D:\scala.;
5. 检查
检查环境变量是否设置好了:调出"cmd"检查。单击 【开始】,在输入框中输入cmd,然后"回车",输入 scala,然后回车,如环境变量设置ok,你应该能看到这些信息。
6. 测试
Plugins库有很多插件可联网安装,但可以选择离线安装方式,单击红框,然后选择Scala插件所在的路径确认即可。
注:查看scala插件是否安装成功,这也是第二种查看scala是否安装的方法。
如图所示可在Plugins库列表中搜索到即已完成安装
安装完scala插件后重启IDEA工具使其生效,单击【Restart】
五、大数据案例代码
1、批处理
Maven依赖
首先,确保你的pom.xml
中包含以下依赖(适用于Maven构建):
<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.14.0</version> <!-- 根据需要替换为您使用的Flink版本 --></dependency><!-- Flink Streaming Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><!-- Oracle JDBC Driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version> <!-- 确保版本与您的Oracle数据库兼容 --></dependency>
</dependencies>
配置Kafka和Oracle
请确保你的Kafka主题已经创建并且你能够通过Kafka消费消息。同时,确保你具有Oracle数据库的访问权限,并且已创建适当的表格以插入数据。
Scala + Flink 程序
下面是一段示例代码,展示了如何从Kafka读取数据并插入到Oracle数据库中。
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.environment.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.jdbc.JdbcSink
import java.sql.{Connection, PreparedStatement}
import java.util.Propertiesobject KafkaToOracle {def main(args: Array[String]): Unit = {// 创建StreamExecutionEnvironmentval env = StreamExecutionEnvironment.getExecutionEnvironment// 配置Kafka消费者val kafkaProps = new Properties()kafkaProps.setProperty("bootstrap.servers", "localhost:9092") // Kafka Broker 地址kafkaProps.setProperty("group.id", "test") // 消费者组kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)// 从Kafka读取数据val stream = env.addSource(kafkaConsumer)// 处理和插入数据到Oraclestream.map(record => {// 假设Kafka传来的数据是以逗号分隔的字符串val fields = record.split(",")(fields(0), fields(1)) // 返回元组(字段1,字段2)}).addSink(new JdbcSink[(String, String)]("jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", // Oracle JDBC URL(statement: PreparedStatement, t: (String, String)) => {statement.setString(1, t._1) // 设置字段1statement.setString(2, t._2) // 设置字段2},new JdbcStatementBuilder[(String, String)] {override def accept(t: (String, String), preparedStatement: PreparedStatement): Unit = {preparedStatement.setString(1, t._1)preparedStatement.setString(2, t._2)}}))// 执行任务env.execute("Kafka to Oracle Example")}
}
表结构
假设你在Oracle中有一个名为your_table
的表,结构为:
CREATE TABLE your_table (field1 VARCHAR2(255),field2 VARCHAR2(255)
);
确保表结构与上面代码中的插入逻辑相匹配。
补充说明
- Kafka的Topic: 修改
your_topic
为您实际使用的Topic名称。 - JDBC URL: 确保jdbc连接字符串和凭据是正确的。
- 性能优化: 在生产环境中,可能需要对Flink配置进行调整,例如并行度、检查点设置等。
确保所有依赖项正确并且可以访问Kafka和Oracle数据库后,编译并运行这个程序。它将从Kafka主题读取数据,进行处理后再插入到Oracle表中。
2、流处理
Maven依赖
确保你的pom.xml
中有以下依赖:
<dependencies><!-- Flink Streaming --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.14.0</version> <!-- 根据需要替换为您使用的Flink版本 --></dependency><!-- Flink Streaming Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><!-- Oracle JDBC Driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency>
</dependencies>
Scala + Flink 程序
以下是从Kafka读取数据并实时插入Oracle数据库的流处理示例代码:
import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._object KafkaToOracleStreaming {def main(args: Array[String]): Unit = {// 创建 StreamExecutionEnvironmentval env = StreamExecutionEnvironment.getExecutionEnvironment// Kafka配置val kafkaProps = new Properties()kafkaProps.setProperty("bootstrap.servers", "localhost:9092") // Kafka Broker 地址kafkaProps.setProperty("group.id", "test") // 消费者组// 创建Kafka消费者val kafkaConsumer = new FlinkKafkaConsumer[String]("your_topic", new SimpleStringSchema(), kafkaProps)// 从Kafka读取数据流val stream = env.addSource(kafkaConsumer)// 处理数据并插入Oraclestream.map(record => {// 假设Kafka传来的数据是以逗号分隔的字符串val fields = record.split(",")(fields(0), fields(1)) // 返回元组 (字段1, 字段2)}).addSink(new OracleSink)// 执行任务env.execute("Kafka to Oracle Streaming Example")}// 自定义Sink向Oracle插入数据class OracleSink extends RichSinkFunction[(String, String)] {var connection: Connection = _var insertStmt: PreparedStatement = _override def open(parameters: org.apache.flink.configuration.Configuration): Unit = {// 初始化JDBC连接connection = java.sql.DriverManager.getConnection("jdbc:oracle:thin:@your_oracle_host:1521:your_service_name", "username", "password")// 创建插入语句insertStmt = connection.prepareStatement("INSERT INTO your_table (field1, field2) VALUES (?, ?)")}override def invoke(value: (String, String), context: Context): Unit = {// 设置参数值insertStmt.setString(1, value._1)insertStmt.setString(2, value._2)// 执行插入insertStmt.executeUpdate()}override def close(): Unit = {// 关闭连接和语句if (insertStmt != null) insertStmt.close()if (connection != null) connection.close()}}
}
- Kafka消费者: 使用
FlinkKafkaConsumer
从Kafka主题获取数据。 - 数据处理: 每条从Kafka获取的记录在此处被转换为一个元组(字段1, 字段2),假设它们是通过逗号分隔的。
- 自定义Sink:
OracleSink
类继承自RichSinkFunction
,负责与Oracle数据库的连接和数据插入。- 在
open
方法中,建立与Oracle的连接。 - 在
invoke
方法中,执行插入操作。 - 在
close
方法中,确保正确关闭连接和语句。
- 在
- 执行环境: 最后,通过
env.execute("Kafka to Oracle Streaming Example")
来启动Flink流处理任务。
、
六、项目部署
Scala+Flink 打包以后依旧是jar 通过Java程序的方式部署即可
相关文章:

大数据:快速入门Scala+Flink
一、什么是Scala Scala 是一种多范式编程语言,它结合了面向对象编程和函数式编程的特性。Scala 这个名字是“可扩展语言”(Scalable Language)的缩写,意味着它被设计为能够适应不同规模的项目,从小型脚本到大型分布式…...

侧边菜单的展开和折叠
环境准备:Vue3Element-UI Plus <script setup> import {ref} from "vue";// 是否折叠菜单,默认折叠 const isCollapse ref(true)</script><template><el-container><el-aside><el-menu:collapse"isCo…...

自动化办公-Python中的for循环
for 循环是 Python 中用于迭代(遍历)序列(如列表、元组、字典、集合、字符串)或其他可迭代对象的控制结构。它允许您逐一访问序列中的每个元素,并对其执行操作。以下是对 for 循环的详细介绍,包括语法、使用…...

Python_itertools
itertools itertools.count(start, step) 返回一个无限迭代器,从指定的start开始,每次增加step。 import itertools # 从1开始,每次增加1,输出前5个数 for i in itertools.count(1, 1):if i > 5:breakprint(i)运行结果&#…...

Apache Iceberg 数据类型参考表
Apache Iceberg 概述-链接 Apache Iceberg 数据类型参考表 数据类型描述实例方法注意事项BOOLEAN布尔类型,表示真或假true, false用于条件判断,例如 WHERE is_active true。确保逻辑条件的正确性。INTEGER32位有符号整数42, -7可用于计算、聚合…...

职责链模式
职责链模式 责任链(Chain of Responsibility)模式:为了避免请求发送者与多个请求处理者耦合在一起,于是将所有请求的处理者通过前一对象记住其下一个对象的引用而连成一条链;当有请求发生时,可将请求沿着这…...

新品 | Teledyne FLIR IIS 推出Forge 1GigE SWIR 短波红外工业相机系列
近日,51camera的合作伙伴Teledyne FLIR IIS推出了新品Forge 1GigE SWIR 130万像素的红外相机。 Forge 1GigE SWIR系列的首款相机配备宽频带、高灵敏度的Sony SenSWIR™️ 130万像素IMX990 InGaAs传感器。这款先进的传感器采用5um像素捕捉可见光和SWIR光谱ÿ…...

深入MySQL:掌握索引、事务、视图、存储过程与性能优化
在掌握了MySQL的基本操作之后,你可能会遇到更复杂的数据管理和优化需求。本文将介绍一些MySQL的进阶特性,包括索引、事务、视图、存储过程和函数、以及性能优化等内容。通过学习这些高级功能,你可以更高效地管理和优化你的数据库。 索引 索…...

【WSL——Windows 上使用 Linux 环境】
引入 以前在windows上使用linux工具链,一般都要安装虚拟机(VMware/virtualBox)。虚拟机的缺点是,因为是完整的虚拟环境,消耗系统资源比较多。 windows自己开发了WSL功能,实现了虚拟机的功能,但是比虚拟机性…...

Redis:事务
什么是Redis事务 Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。 总结说&…...

策略模式的介绍和具体实现
❤ 作者主页:李奕赫揍小邰的博客 ❀ 个人介绍:大家好,我是李奕赫!( ̄▽ ̄)~* 🍊 记得点赞、收藏、评论⭐️⭐️⭐️ 📣 认真学习!!!🎉🎉 文章目录 策略接口三种…...

MySQL InnoDB MVCC数据结构分析
1、概述 MVCC(Multiversion Concurrency Control)多版本并发控制,通过维护不同的版本号,提供一种很好的并发控制技术,这种技术能够使读写操作不冲突,提升并发性能。 MySQL InnoDB存储引擎,在更…...

MySQL 8 查看 SQL 语句的执行进度
目录 1. 查询各阶段执行进度 (1)开启收集与统计汇总执行阶段信息的功能 (2)确定执行的SQL所属的thread_id (3)查询各阶段的执行进度 2. 查询SQL语句的整体执行进度 1. 查询各阶段执行进度 ࿰…...

OpenStack 部署实践与原理解析 - Ubuntu 22.04 部署 (DevStack)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言OpenStack 原理详解1. OpenStack 的架构2. OpenStack 的工作原理3. OpenStack 的 API4. 扩展性和模块化 OpenStack 安装方式比较1. DevStack2. Kolla3. OpenSta…...

【软件工程】可行性研究
一、目的 二、任务 三、步骤 四、结果:可行性研究报告 例题 选择题...

乌克兰因安全风险首次禁用Telegram
据BleepingComputer消息,乌克兰国家网络安全协调中心 (NCCC) 以国家安全为由,已下令限制在政府机构、军事单位和关键基础设施内使用 Telegram 消息应用程序。 这一消息通过NCCC的官方 Facebook 账号对外发布,在公告中乌…...

[SDX35]SDX35如何查看GPIO的Base值
SDX35 SDX35介绍 SDX35设备是一种多模调制解调器芯片,支持 4G/5G sub-6 技术。它是一个4nm芯片专为实现卓越的性能和能效而设计。它包括一个 1.9 GHz Cortex-A7 应用处理器。 SDX35主要特性 ■ 3GPP Rel. 17 with 5G Reduced Capability (RedCap) support. Backward compati…...

【Linux学习】【Ubuntu入门】2-1-1 vim编辑器设置
设置TAB键为4字节及显示行号 VIM编辑器默认TAB键为8空格,改为4空格 输入命令sudo vi /etc/vim/vimrc回车后输入密码按键盘下键到最后,按下“a”进入编辑模式,输入set ts4设置为4空格下一行输入set nu显示行号...

全栈开发(一):springBoot3+mysql初始化
1.开发环境准备 1.开发工具 2.jdk下载 官网下载java17 3.java环境变量配置 用户变量: ①.JAVA_HOME ②.path 4.mysql下载 b站随便搜 5.新建项目 6.maven配置 可以下载zip放到目录里 这里是配置好的 repository文件夹:为maven提供下载的文件存放…...

有关若依登录过程前端的对应处理学习
导言 在用C#搞完个后端后想用若依的前端做对接,不过很久没搞过若依了,想趁这个二次开发的过程记录熟悉一下登录的过程 过程 验证,在permission.js的路由守卫,这里在用户发起api请求时会验证用户的请求是否有token,对…...

django使用笔记6--docker部署
django使用笔记--docker部署 多环境配置创建环境变量配置文件静态资源配置dockerfile配置 由于服务器中python版本和依赖与本地开发环境不同,且centOS7中python及依赖安装更新较为麻烦,所以采用docker容器部署 多环境配置 多环境配置类似Spring中的多环…...

高性能、高可靠,MK SD卡让数据存储无忧!
文章目录 SD卡(Secure Digital Memory Card),作为当代数字生活中不可或缺的存储媒介,凭借其卓越的数据传输效率、灵活的热插拔功能以及惊人的存储容量,在多个领域大放异彩。从日常使用的智能手机、平板电脑到追求极致体…...

NetAssist测试TCP和UDP
由于在Windows下经常使用NetAssist.exe这款网络调试工具进行TCP、UDP的服务端、客户端的监听,对于需要编写各种通信协议的TCP服务端、客户端以及UDP通信程序来说是很方便的。下载地址:http://free.cmsoft.cn/download/cmsoft/assistant/netassist5.0.14.…...

mcuboot使用介绍
准备工作 硬件平台选择 确保你的微控制器单元(MCU)是 MCUboot 所支持的类型。查看 MCUboot 的文档或官方支持列表,了解其兼容的 MCU 系列和硬件平台。根据硬件平台的设计,将微控制器与相关的外设(如闪存、通信接口等&a…...

如何在 Linux 终端使用 GET 和 POST 请求
文章目录 1、GET请求基本请求带有请求头带有参数将响应保存成文件 2、POST请求基本请求发送JSON格式的POST请求体使用文件作为POST请求体使用时注意 1、GET请求 基本请求 在Linux中,发送GET请求通常使用 curl 命令,curl 的默认行为就是发送GET请求&…...

主从数据库同步配置详解(MySQL/MariaDB)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、环境准备与安装配置本地部署MySQLUbuntu 系统:CentOS 系统: MariaDBUbuntu 系统:CentOS 系统: 容器部署MySQ…...

台式机通过笔记本上网
概述: ①将wifi共享给网口 ②网口配置成自协商IP和DNS即可 一、背景 由于台式机只有网口,没得wifi网卡,因此想通过笔记本连wifi,再通过网线将笔记本和台式机连接起来,从而实现台式机通过笔记本的wifi上网,即让笔记本当台式机的…...

golang雪花算法实现64位的ID
推荐学习文档 golang应用级os框架,欢迎stargolang应用级os框架使用案例,欢迎star案例:基于golang开发的一款超有个性的旅游计划app经历golang实战大纲golang优秀开发常用开源库汇总想学习更多golang知识,这里有免费的golang学习笔…...

LeetCode 137. 只出现一次的数字 II
LeetCode 137. 只出现一次的数字 II 给你一个整数数组 nums ,除某个元素仅出现 一次 外,其余每个元素都恰出现 三次 。请你找出并返回那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法且使用常数级空间来解决此问题。 示例 1:…...

新书推荐——《深度学习精粹与PyTorch实践》
深度学习绝非不可窥探的黑箱!深入理解其模型和算法的实际运作机制,是驾驭并优化结果的关键。你无需成为数学专家或资深数据科学家,同样能够掌握深度学习系统内部的工作原理。 本书旨在通过深入浅出的方式,为你揭示这些原理,让你在理解和解释…...