当前位置: 首页 > news >正文

Flink 中 JDBC Connector 使用详解

1. 背景

在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。

本文将介绍 Flink JDBC Connector 的基础用法、配置方法以及注意事项,帮助开发者更好地集成数据库操作。


2. JDBC Connector 的基础概念

JDBC Connector 是 Flink 官方提供的一个用于连接关系型数据库的工具包,支持:

  • Source:从数据库读取数据。
  • Sink:将数据写入数据库。

使用 JDBC Connector 可以实现对数据库的实时写入,也可以用作批量操作的工具。


3. Maven 依赖

在项目中添加 Flink JDBC 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.17.0</version> <!-- 根据实际使用的 Flink 版本调整 -->
</dependency>

如果使用 MySQL 数据库,还需添加 MySQL 驱动:

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version> <!-- MySQL 驱动版本 -->
</dependency>

4. JDBC Connector 的使用

4.1 写入数据库(Sink)

以下是一个将流式数据写入 MySQL 的示例:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcSink;public class JdbcSinkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟输入数据env.fromElements(Tuple2.of(1, "Alice"),Tuple2.of(2, "Bob"),Tuple2.of(3, "Charlie")).addSink(JdbcSink.sink("INSERT INTO users (id, name) VALUES (?, ?)", // SQL 语句(ps, t) -> {ps.setInt(1, t.f0);  // 设置第一个参数为 IDps.setString(2, t.f1);  // 设置第二个参数为 Name},JdbcSink.DefaultJdbcExecutionOptions.builder().withBatchSize(100) // 批量写入大小.build(),() -> JdbcSink.defaultJdbcConnectionProvider("jdbc:mysql://localhost:3306/testdb", // 数据库 URL"root",  // 用户名"password" // 密码)));env.execute("Flink JDBC Sink Example");}
}
关键点解析
  1. SQL 语句:支持动态参数 ? 占位符,适合批量插入。
  2. 参数绑定:通过 Lambda 表达式绑定输入数据与 SQL 参数。
  3. 批量写入:通过 JdbcExecutionOptions 配置批量写入策略。

4.2 从数据库读取数据(Source)

以下是一个从 MySQL 读取数据并打印的示例:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;public class JdbcSourceExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<Integer, String>> sourceStream = env.createInput(JdbcInputFormat.buildJdbcInputFormat().setDrivername("com.mysql.cj.jdbc.Driver") // JDBC 驱动.setDBUrl("jdbc:mysql://localhost:3306/testdb") // 数据库 URL.setUsername("root") // 用户名.setPassword("password") // 密码.setQuery("SELECT id, name FROM users") // SQL 查询.setRowTypeInfo(Types.TUPLE(Types.INT, Types.STRING)) // 结果类型.finish());sourceStream.print();env.execute("Flink JDBC Source Example");}
}
关键点解析
  1. SQL 查询:需要提供完整的查询语句。
  2. 结果类型:通过 RowTypeInfo 显式定义数据库返回的数据结构。

5. JDBC Connector 的配置选项

5.1 批量写入配置

通过 JdbcExecutionOptions 可调整写入策略:

  • withBatchSize(int):设置批量写入大小(默认为 500)。
  • withBatchIntervalMs(long):设置批量写入的时间间隔。
  • withMaxRetries(int):设置写入失败后的最大重试次数。

5.2 数据库连接池

Flink JDBC Connector 默认使用单个连接执行操作。对于高并发需求,可以结合 HikariCP 等连接池框架优化性能。


6. 注意事项

  1. 事务支持

    • 默认情况下,JDBC Sink 使用批量提交,未显式开启事务。如果需要事务一致性,可以通过 JDBC 驱动自行管理事务。
  2. 数据库性能瓶颈

    • 数据库可能成为瓶颈,建议使用批量写入和合适的索引优化性能。
    • 高写入场景可考虑切换到 Kafka、HBase 等专为实时写入设计的存储系统。
  3. 错误处理

    • 可通过 withMaxRetries 设置重试次数。
    • 对于未能成功写入的数据,可考虑使用侧输出流保存以供后续处理。
  4. 分布式读取

    • 默认情况下,Flink JDBC Source 在单线程上运行,性能可能有限。可以使用分片或其他工具提升读取性能。

7. 总结

Flink JDBC Connector 是一个简单而高效的工具,适用于实时计算场景下与关系型数据库的交互。无论是数据写入还是读取,都可以通过简单配置快速实现。但对于高并发和大规模数据场景,需要根据业务需求调整策略。

相关文章:

Flink 中 JDBC Connector 使用详解

1. 背景 在实时计算或离线任务中&#xff0c;往往需要与关系型数据库交互&#xff0c;例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector&#xff0c;可以方便地将流式数据写入或读取数据库。 本文将介绍 Flink JDBC Connector 的基础用法、配置方法以及注意事…...

【Linux打怪升级记 | 报错02】-bash: 警告:setlocale: LC_TIME: 无法改变区域选项 (zh_CN.UTF-8)

&#x1f5fa;️博客地图 &#x1f4cd;1、报错发现 &#x1f4cd;2、原因分析 &#x1f4cd;3、解决办法 &#x1f4cd;4、测试结果 1、报错发现 装好了CentOS操作系统&#xff0c;使用ssh远程登陆CentOS&#xff0c;出现如下告警信息&#xff1a; bash: 警告:setlocale…...

未来已来?AI技术革新改变我们的生活

在21世纪的今天&#xff0c;人工智能&#xff08;AI&#xff09;不再是一个遥远的概念&#xff0c;而是逐渐渗透到我们生活的方方面面。从智能家居到自动驾驶汽车&#xff0c;从个性化推荐系统到医疗诊断辅助&#xff0c;AI技术正在以惊人的速度发展&#xff0c;并深刻地影响着…...

【Linux】进程的生命之旅——诞生、消逝与守候(fork/exit/wait)

&#x1f3ac; 个人主页&#xff1a;谁在夜里看海. &#x1f4d6; 个人专栏&#xff1a;《C系列》《Linux系列》《算法系列》 ⛰️ 一念既出&#xff0c;万山无阻 目录 &#x1f4d6;一、进程创建 1.fork函数 &#x1f4da;高层封装特性 &#x1f4da;fork返回值 2.写时拷…...

使用vcpkg自动链接tinyxml2时莫名链接其他库(例如boost)

使用vcpkg自动链接tinyxml2时莫名链接其他库&#xff08;例如boost&#xff09; vcpkg的自动链接功能非常方便&#xff0c;但在某些情况下会出现过度链接的问题。 链接错误症状 以tinyxml2为例&#xff0c;程序中调用tinyxml2的函数后&#xff0c;若vcpkg中同时存在opencv和…...

【去毛刺】OpenCV图像处理基础:腐蚀与膨胀操作入门

在数字图像处理中&#xff0c;形态学操作是一种常用的技术&#xff0c;用于提取图像中的特定形状或特征。其中&#xff0c;腐蚀&#xff08;Erosion&#xff09;和膨胀&#xff08;Dilation&#xff09;是两种基本的形态学运算。本文将通过一个简单的例子来演示如何使用Python中…...

道可云人工智能元宇宙每日资讯|第三届京西地区发展论坛成功召开

道可云元宇宙每日简报&#xff08;2024年11月27日&#xff09;讯&#xff0c;今日元宇宙新鲜事有&#xff1a; 工信部等十二部门印发《5G规模化应用“扬帆”行动升级方案》 11月25日&#xff0c;工业和信息化部等十二部门印发《5G规模化应用“扬帆”行动升级方案》。《方案》…...

若依框架部署在网站一个子目录下(/admin)问题(

部署在子目录下首先修改vue.config.js文件&#xff1a; 问题一&#xff1a;登陆之后跳转到了404页面问题&#xff0c;解决办法如下&#xff1a; src/router/index.js 把404页面直接变成了首页&#xff08;大佬有啥优雅的解决办法求告知&#xff09; 问题二&#xff1a;退出登录…...

【ue5】UE5运行时下载视频/UE5 runtime download video(MP4)

插件还是老朋友。 节点的content type要打对。 &#xff08;参照表&#xff1a;MIME 类型&#xff08;MIME Type&#xff09;完整对照表 - 免费在线工具&#xff09; 结果展示&#xff1a;...

对比C++,Rust在内存安全上做的努力

简介 近年来&#xff0c;越来越多的组织表示&#xff0c;如果新项目在技术选型时需要使用系统级开发语言&#xff0c;那么不要选择使用C/C这种内存不安全的系统语言&#xff0c;推荐使用内存安全的Rust作为替代。 谷歌也声称&#xff0c;Android 的安全漏洞&#xff0c;从 20…...

如何利用 Qt 的模块化架构组织大型项目

目录 1. 大型项目的架构设计 1.1 分层架构 1.2 事件驱动与异步架构 2. 模块划分与职责分离 2.1 功能模块划分 2.2 模块之间的依赖管理 3. 跨平台开发与模块复用 在大型软件项目中&#xff0c;随着代码量的增加和功能的扩展&#xff0c;项目的复杂度会显著提升。没有良好…...

探索Python词云库WordCloud的奥秘

文章目录 探索Python词云库WordCloud的奥秘1. 背景介绍&#xff1a;为何选择WordCloud&#xff1f;2. WordCloud库简介3. 安装WordCloud库4. 简单函数使用方法5. 应用场景示例6. 常见Bug及解决方案7. 总结 探索Python词云库WordCloud的奥秘 1. 背景介绍&#xff1a;为何选择Wo…...

MySQL根据idb文件恢复数据

首先得有对应表的idb文件以及建表语句 1.首先在新数据库建表 CREATE TABLE sys_menu (id bigint(20) NOT NULL,parent_id bigint(20) NULL DEFAULT NULL,name varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,type int(11) NULL DEFAULT …...

hadoop-mapreduce词频统计

一、Map Reduce主要阶段 二、词频统计示例 0.MapReduce 词频统计(Word Count)示例图 1. Input 阶段&#xff08;输入阶段&#xff09; 输入数据是一段文本&#xff0c;如下&#xff1a; Hadoop is a big data framework. Hadoop can store vast data. Hadoop processes big …...

精心修炼Java并发编程(JUC)-volatile与synchronized关键字

volatile volatile 是 JVM 提供的 最轻量级的同步机制&#xff0c;中文意思是不稳定的&#xff0c;易变的&#xff0c;用 volatile 修饰变量是为了保证变量在多线程中的可见性&#xff0c;它表达的含义是&#xff1a;告诉编译器&#xff0c;对这个变量的读写&#xff0c;需要基…...

【ROS2】ROS2 与 ROS1 编码方式对比(Python实现)

目录 一、初始化和关闭节点二、发布者三、订阅者四、服务端五、客户端六、参数管理七、日志记录八、生命周期管理 ROS2 在 Python 编程中引入了一些新的概念和 API&#xff0c;这些变化使得代码更加模块化和易于维护。特别是 rclpy 库提供了更丰富的功能和更好的错误处理机制&a…...

ElasticSearch的下载和基本使用(通过apifox)

1.概述 一个开源的高扩展的分布式全文检索引擎&#xff0c;近乎实时的存储&#xff0c;检索数据 2.安装路径 Elasticsearch 7.8.0 | Elastic 安装后启动elasticsearch-7.8.0\bin里的elasticsearch.bat文件&#xff0c; 启动后就可以访问本地的es库http://localhost:9200/ …...

城市轨道交通运营控制指挥中心设计方案

为某城市轨道交通运营控制指挥中心(OCC)的设计提供方案时,我们需要考虑到多个方面的需求,包括系统架构、设备选择、功能实现、数据流与监控、通信管理等。以下是一个综合性的设计方案,涉及系统硬件和软件的选择、布局规划、安全性等方面,以确保指挥中心的高效运作、实时监…...

多目标优化算法:多目标河马优化算法(MOHOA)求解ZDT1、ZDT2、ZDT3、ZDT4、ZDT6,提供完整MATLAB代码

一、河马优化算法 河马优化算法&#xff08;Hippopotamus optimization algorithm&#xff0c;HO&#xff09;由Amiri等人于2024年提出的一种模拟自然界中河马觅食行为的新型群体智能优化算法。该算法由Mohammad Hussein Amiri等人于2024年2月发表在Nature旗下子刊《Scientifi…...

线程与进程的个人理解

进程&#xff08;Process&#xff09;&#xff1a; 一个程序在执行时&#xff0c;操作系统为其分配的资源&#xff08;如内存、CPU 时间等&#xff09;构成了一个进程。每个进程都有自己的独立的地址空间、堆栈和局部变量&#xff0c;它们之间不共享内存&#xff08;除非通过特…...

vscode的项目给gitlab上传

目录 一.创建gitlab帐号 二.在gitlab创建项目仓库 三.Windows电脑安装Git 四.vscode项目git上传 一.创建gitlab帐号 二.在gitlab创建项目仓库 图来自:Git-Gitlab中如何创建项目、创建Repository、以及如何删除项目_gitlab新建项目-CSDN博客&#xff09; 三.Windows电脑安…...

企业微信定位打卡

废话少说&#xff1a;定位修改软件链接奉上 一、定位打卡原理 GPS定位&#xff1a;企业微信可以利用手机的GPS功能进行定位&#xff0c;这是一种基于卫星的定位技术&#xff0c;能够提供相对精确的位置信息&#xff0c;通常精确度在20米以内。这种方式耗电较大&#xff0c;且在…...

libaom 源码分析:码率控制介绍

码率控制 命令行码率控制选项:可以看到码率控制包括丢帧、resize、超分、码控模式、目标码率、目标上限下限(类似 x264、x265 中的 VBV)、码控偏置、GOP 码率等。Rate Control Options:--drop-frame=<arg> Temporal resampling threshold (buf %)--resize-mo…...

RK3568平台开发系列讲解(DMA篇)DMA engine使用

🚀返回专栏总目录 文章目录 一、申请DMA channel二、配置DMA channel的参数三、获取传输描述(tx descriptor)四、启动传输沉淀、分享、成长,让自己和他人都能有所收获!😄 📢DMA子系统下有一个帮助测试的测试驱动(drivers/dma/dmatest.c), 从这个测试驱动入手我们了解…...

C++中的函数对象

C 中函数对象的定义和特点 定义&#xff1a;函数对象&#xff08;Function Object&#xff09;也叫仿函数&#xff08;Functor&#xff09;&#xff0c;是一个类&#xff0c;这个类重载了函数调用运算符()。当创建这个类的对象后&#xff0c;可以像使用函数一样使用这个对象&am…...

Linux指标之平均负载(The Average load of Linux Metrics)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…...

盛最多水的容器

本节将数组与坐标轴共同组成一个容器,通过改变容器的两个端点使容器装的水最多,容器两个端点不断移动可以通过左右指针算法解决. 问题描述: 给定两个非负整数k1,k2...km每个数代表坐标中的一个点(i,ki).在坐标内绘制m条垂线,垂直线i的两个端点分别为(i,k1)和(i,0)找出其中的两…...

光伏功率预测!Transformer-LSTM、Transformer、CNN-LSTM、LSTM、CNN五模型时序预测

目录 预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Transformer-LSTM、Transformer、CNN-LSTM、LSTM、CNN五模型多变量时序光伏功率预测 (Matlab2023b 多输入单输出) 1.程序已经调试好&#xff0c;替换数据集后&#xff0c;仅运行一个main即可运行&#xff0c;数据格式…...

java全栈day10--后端Web基础(基础知识)

引言&#xff1a;只要能通过浏览器访问的网站全是B/S架构&#xff0c;其中最常用的服务器就是Tomcat 在浏览器与服务器交互的时候采用的协议是HTTP协议 一、Tomcat服务器 1.1介绍 官网地址&#xff1a;Apache Tomcat - Welcome! 1.2基本使用(网上有安装教程&#xff0c;建议…...

使用爬虫时,如何确保数据的准确性?

在数字化时代&#xff0c;数据的准确性对于决策和分析至关重要。本文将探讨如何在使用Python爬虫时确保数据的准确性&#xff0c;并提供代码示例。 1. 数据清洗 数据清洗是确保数据准确性的首要步骤。在爬取数据后&#xff0c;需要对数据进行清洗&#xff0c;去除重复、无效和…...

黄岛网站建设哪家好/百度网页排名怎么提升

public class HomeController : Controller{// GET: Homepublic ActionResult Index() //控制器名Home下默认的一个方法{return View();//返回视图}[HttpPost]//表单提交是post请求&#xff0c;定义该方法调用的类型位只能post请求public ActionResult Update(){ViewBag.test …...

wordpress pdf杂志/网络视频营销平台

系统&#xff1a;Centos7.4 安装pyenv是为了更好的管理python的版本。 在进行安装操作之前&#xff0c;首先使用普通用户test,进行操作&#xff0c;如下&#xff1a; #安装之前先安装依赖的库 [testlocalhost ~]$ sudo yum -y install git gcc* make patch gdbm-devel openssl-…...

可以做直播卖产品的网站/360指数官网

题目大意&#xff1a;给出nnn个不同的动物&#xff0c;它们围成一个圈&#xff0c;t[i]t[i]t[i]表示第iii个动物的种类&#xff0c;你要给这些动物染色&#xff0c;使得没有相邻的不同种类的动物被染成了相同的颜色&#xff0c;同一种类的动物可以被染成不同颜色。请输出最少需…...

简单网站系统/百度推广账户登录

PHP 字符串变量一个字符串(string)就是由一系列的字符组成&#xff0c;其中每个字符等同于一个字节。字符串变量用于存储并处理文本。PHP 中的字符串变量字符串变量用于包含有字符的值。在创建字符串之后&#xff0c;我们就可以对它进行操作了。您可以直接在函数中使用字符串&a…...

南海网站建设/苏州seo网站公司

NginxLuaOpenResty安装1&#xff0c;安装依赖yum install libreadline-dev libncurses5-dev libpcre3-dev libssl-dev perl2,下载ngx_openresty-xxx.tar.gz并解压(ngx_openresty-xxx/bundle目录里存放着nginx核心和很多第三方模块&#xff0c;比如有我们需要的Lua和LuaJIT。)wg…...

南京网站公安备案/天津seo推广

从JDk1.2版本开始&#xff0c;把对象的引用分为四种级别&#xff0c;从而使程序能更加灵活的控制对象的生命周期。这四种级别由高到低依次为&#xff1a;强引用、软引用、弱引用、虚引用。1、强引用(StrongReference)如果一个对象具有强引用&#xff0c;那就类似于必不可少的生…...