当前位置: 首页 > news >正文

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id = id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAction() {return action;}public void setAction(String action) {this.action = action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}

编写Flink任务类

package com.example.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;@Component
public class FlinkETLJob implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {@Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute("Flink ETL Job");}public static class MySQLSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_mysql_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class OracleSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_oracle_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";private static final String JDBC_USER = "target_user";private static final String JDBC_PASSWORD = "target_password";private transient Connection connection;private transient PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";statement = connection.prepareStatement(sql);}@Overridepublic void flatMap(UserAction value, Collector<Void> out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (statement != null) {statement.close();}if (connection != null) {connection.close();}}}
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

相关文章:

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

使用Apache Flink实现实时数据同步与清洗&#xff1a;MySQL和Oracle到目标MySQL的ETL流程 实现数据同步的ETL&#xff08;抽取、转换、加载&#xff09;过程通常涉及从源系统&#xff08;如数据库、消息队列或文件&#xff09;中抽取数据&#xff0c;进行必要的转换&#xff0…...

postman教程-22-Newman结合Jenkins执行自动化测试

上一小节我们学习了Postman Newman运行集合生成测试报告的方法&#xff0c;本小节我们讲解一下Postman Newman结合Jenkins执行自动化测试的方法。 在软件开发过程中&#xff0c;持续集成&#xff08;CI&#xff09;是一种实践&#xff0c;旨在通过自动化的测试和构建过程来频繁…...

uniapp实现tabBar功能常见的方法

在 UniApp 中实现 Tab 功能通常涉及到使用 <navigator> 组件结合 tabBar 配置&#xff0c;或者通过自定义的视图切换逻辑来实现。以下是两种常见的实现方式&#xff1a; 1. 使用 tabBar 配置 UniApp 支持在 pages.json 文件中配置 tabBar&#xff0c;以在应用的底部或顶…...

智慧在线医疗在线诊疗APP患者端+医生端音视频诊疗并开处方

智慧在线医疗&#xff1a;音视频诊疗新纪元 &#x1f310; 智慧医疗新篇章 随着科技的飞速发展&#xff0c;智慧医疗正逐步走进我们的生活。特别是在线医疗&#xff0c;凭借其便捷、高效的特点&#xff0c;已成为许多患者的首选。而其中的“智慧在线医疗患者端医生端音视频诊疗…...

攻防平台搭建与简易渗透工具箱编写

知识点&#xff1a;攻防平台搭建&#xff0c;虚拟机的网络模式详解&#xff0c;安全脚本编写 虚拟机的网络模式&#xff1a; 虚拟机&#xff08;VM&#xff09;的网络模式决定了虚拟机与宿主机以及外部网络之间的连接方式。不同的虚拟化平台&#xff08;如VMware, VirtualBox,…...

SQL EXISTS 关键字的使用与理解

SQL EXISTS 关键字的使用与理解 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理关系数据库管理系统&#xff08;RDBMS&#xff09;的标准编程语言。在SQL中&#xff0c;EXISTS关键字是一个逻辑运算符&#xff0c;用于检查子查询中是否存在至少一行数据。…...

开源低代码平台,JeecgBoot v3.7.0 里程碑版本发布

项目介绍 JeecgBoot是一款企业级的低代码平台&#xff01;前后端分离架构 SpringBoot2.x&#xff0c;SpringCloud&#xff0c;Ant Design&Vue3&#xff0c;Mybatis-plus&#xff0c;Shiro&#xff0c;JWT 支持微服务。强大的代码生成器让前后端代码一键生成! JeecgBoot引领…...

名侦探李先生第一话:谁是真正的凶手(只出现一次的数字相关题解(力扣)+位操作符回忆)

引子&#xff1a;我们在之前的案子中破解过基础的单身狗问题&#xff0c;那面对更有挑战的案子&#xff0c;且看李先生如何破局&#xff0c;那下凶手&#xff01; 复习&#xff1a; 1&#xff0c;位操作符&#xff1a; 正整数原&#xff0c;反&#xff0c;补码都相同 首位是…...

【PA交易】BackTrader(一): 如何使用实时tick数据和蜡烛图

背景和需求 整合Tick数据是PA交易的回测与实盘基本需求。多数交易回测框架往往缺乏对大规模Tick数据直接而全面的支持。Tick数据因其体量庞大&#xff08;例如&#xff0c;某棕榈油主力合约四年间的数据达8GB&#xff09;为结合价格趋势与PA分析带来挑战&#xff0c;凸显了实时…...

HTML(16)——边距问题

清楚默认样式 很多标签都有默认的样式&#xff0c;往往我们不需要这些样式&#xff0c;就需要清楚默认样式 写法&#xff1a; 用通配符选择器&#xff0c;选择所有标签&#xff0c;清除所有内外边距选中所有的选择器清楚 *{ margin:0; padding:0; } 盒子模型——元素溢出 作…...

【Godot4自学手册】第四十二节实现拖拽进行物品交换和数量叠加

这一节我们主要学习背包系统中的物品拖拽后&#xff0c;物品放到新的位置&#xff0c;或交换物品位置&#xff0c;如果两个物品属于同一物品则数量相加。具体效果如下&#xff1a; 一、修改item.tscn场景 给item.tscn场景的根节点Item添加Label子节点&#xff0c;命名为Numv…...

存储系统概述

目录 层次结构 存储器的分类 存储器的编址和端模式 存储器端模式 存储器的技术指标 1. 存储容量 示例&#xff1a; 2. 访问速度 访问速度的表现形式&#xff1a; 示例&#xff1a; 3. 功耗 示例&#xff1a; 4. 可靠性 可靠性指标&#xff1a; 示例&#xff1a;…...

Trilium windows上修改笔记目录,创建多个笔记空间方法

一开始使用trilium会非常的不舒服&#xff0c;不像是obsidian可以创建多个笔记空间&#xff0c;指定多个笔记目录。这里摸索到了解决方案 修改目录的方法一 ——修改系统环境变量 打开控制面板-系统-高级系统设置 新增如上条目 修改目录的方法二——直接写bat脚本运行 新建位…...

<Rust><iced>在iced中显示gif动态图片的一种方法

前言 本文是在rust的GUI库iced中在窗口显示动态图片GIF格式图片的一种方法。 环境配置 系统&#xff1a;window 平台&#xff1a;visual studio code 语言&#xff1a;rust 库&#xff1a;iced、image 概述 在iced中&#xff0c;提供了image部件&#xff0c;从理论上说&…...

【Unity设计模式】状态编程模式

前言 最近在学习Unity游戏设计模式&#xff0c;看到两本比较适合入门的书&#xff0c;一本是unity官方的 《Level up your programming with game programming patterns》 ,另一本是 《游戏编程模式》 这两本书介绍了大部分会使用到的设计模式&#xff0c;因此很值得学习 本…...

圆的面积并三角形面积并

三角形面积并 #include<iostream> #include<cstring> #include<algorithm> #include<cmath> #include<vector> using namespace std; const int maxn 110; #define x first #define y second typedef pair<double, double> PDD; const d…...

Spring Data JPA介绍与CRUD实战演练

文章目录 一、Spring Data JPA 简介二、Spring Data JPA 与 MyBatis Plus 比较设计哲学和抽象层次SQL 控制学习曲线和技术要求性能与优化综合考虑 三、SpringDataJpa实战演练1. 创建user表2. 搭建Spring Boot开发环境3. pom.xml文件内容4. application.yml文件内容5. Applicati…...

Python网络爬虫实战6—下一页,模拟用户点击,切换窗口

【前期提要】感兴趣的可以看看往期文章哈~ Python网络爬虫5-实战网页爬取 Python网络爬虫4-实战爬取pdf Pyhon网络爬虫3-模拟用户点击 Python网络爬虫实战2-下载url下的pdf Python网络爬虫基础1 1.需求背景 针对长虹美菱电器说明书网页形式&#xff0c;编写爬虫代码&#xff…...

Notepad++插件 Hex-Edit

Nptepad有个Hex文件查看器&#xff0c;苦于每次打开文件需要手动开插件显示Hex&#xff0c;配置一下插件便可实现打开即调用 关联多个二进制文件&#xff0c;一打开就使用插件的方法&#xff0c;原来是使用空格分割&#xff01;&#xff01;&#xff01;...

Matlab要这样批量读取txt数据!科研效率UpUp第10期

假如我们有多组txt格式的数据&#xff1a; 其数据格式是这样的&#xff1a; 想要批量读取这些数据&#xff0c;并把他们画在一张图上&#xff0c;该怎么操作呢&#xff1f; ​之前有分享load函数的版本&#xff0c;本期进一步分享适用性更强的readtable函数的实现方法​。 首…...

buuctf----firmware

- -一定不能再ubutu22进行,我是在18(血泪教训) binwalk安装 buuctf firmware(binwalk和firmware-mod-kit的使用)_buu firmware-CSDN博客 参考博客 指令 sudo apt-get update sudo apt-get install python3-dev python3-setuptools python3-pip zlib1g-dev libmagic-dev pi…...

ssl证书90天过期?保姆级教程——使用acme.sh实现证书的自动续期

腾讯云相关文档相关参考-有的点不准确 前言 最近https到期了&#xff0c;想着手动更新一下https证书&#xff0c;结果发现证书现在的有效期只有90天&#xff0c;于是想找到一个自动更新证书的工具&#xff0c;发现了acme.sh&#xff0c;但是网上的文章质量参差不齐&#xff0…...

由于bug造成truncate table卡住问题

客户反应truncate table卡主&#xff0c;检查awr发现多个truncate在awr报告期内一直没执行完&#xff0c;如下&#xff1a; 检查ash&#xff0c;truncate table表的等待事件都是“enq: RO - fast object reuse”和“local write wait” 查找“enq: RO - fast object reuse”&am…...

Charles抓包工具系列文章(二)-- Repeat 回放http请求

一、什么是http请求回放 当我们对客户端进行抓包&#xff0c;经常会想要重试http请求&#xff0c;或者改写原有部分进行重新请求&#xff0c;都需要用到回放http请求。 还有一种场景是压力测试&#xff0c;对一个请求进行重复请求多少次&#xff0c;并加上适当的并发度。 这里…...

jemeter基本使用

后端关验签&#xff0c;设置请求头编码和token 配置编码和token...

【Golang】Steam 创意工坊 Mod 文件夹批量重命名

本文将介绍一个使用Go语言编写的脚本&#xff0c;其主要功能是解析XML文件并基于解析结果重命名文件夹。这个脚本适用于需要对文件夹进行批量重命名&#xff0c;并且重命名规则依赖于XML文件内容的情况。 脚本功能概述 Steam创意工坊下载的Mod文件夹批量重命名为id名称 运行前…...

求职刷题力扣DAY33--贪心算法part04

DAY 33 贪心算法part04 1. 452. 用最少数量的箭引爆气球 有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面上的气球记录在整数数组 points &#xff0c;其中points[i] [xstart, xend] 表示水平直径在 xstart 和 xend之间的气球。你不知道气球的确切 y 坐标。 一支弓箭可…...

aws的eks(k8s)ingress+elb部署实践

eks&#xff08;k8s&#xff09;版本1.29 ingress 版本1.10.0 负载均衡elb 1. 创建Ingress-Nginx服务 部署项目地址【点我跳转】推荐自定义部署 可绑定acm证书什么的自己属性 这里就是aws上面Certificate Manager产品上面创建证书 导入 创建都行 对应集群版本推荐阵列GitH…...

大数据面试题之YARN

目录 1、介绍下YARN 2、YARN有几个模块 3、YARN工作机制 4、YARN有什么优势&#xff0c;能解决什么问题? 5、YARN容错机制 6、YARN高可用 7、YARN调度器 8、YARN中Container是如何启动的? 9、YARN的改进之处&#xff0c;Hadoop3.x相对于Hadoop 2.x? 10、YARN监控 1…...

最小生成树模板(prim,heap-prim,kruskal)

prim 出圈法&#xff0c;时间复杂度 O ( n 2 ) O(n^2) O(n2) #include<iostream> #include<vector> using namespace std; #define MAX_N 5000 #define inf 100000000 struct edge{int v,w; }; vector<edge>e[MAX_N5]; int d[MAX_N5],vis[MAX_N5]; int n,m…...