【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档
Flink 读取 ClickHouse 数据两种驱动
- ClickHouse 官方提供Clickhouse JDBC.【建议使用】
- 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver
ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护
ClickHouse 官方提供Clickhouse JDBC的包名:
com.clickhouse.jdbc.*
有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.
因此加载包的时候一定要注意导入的包名
引入依赖
<!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>
使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包
<!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>
自定义Source
测试表映射实体类,该表仅有一个name字段
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}
Flink Clickhouse Source
public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {// Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}
自定义Sink
需额外引入依赖
<!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>
Java 对sql语句处理的两个对象
- PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过
占位符'?'
实现,可以防止sql注入】 - Statement对象:只能对静态的sql语句进行处理
核心代码
/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps: PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}
对sql语句进行预编译
@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}
ClickHouse读写工具类
public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}
测试一下
public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}
此时表中仅一行记录
读取没有问题!
写入没有问题!
相关文章:

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…...

linux 查看服务启动时间
文章目录 linux 查看服务启动时间参数解析 linux 查看服务启动时间 [root104 ~]# ps -o lstart -p ps -ef |grep -v grep |grep "zookeeper"|awk {print$2}STARTED Fri Dec 15 16:54:10 2023参数解析 linux 命令中 ps -ef 详解 ps -ef表示查看全格式的进程。 ps …...
[RK-Linux] 移植Linux-5.10到RK3399(六)| 检查GMAC(RTL8211F)配置使能千兆以太网
ROC-RK3399-PC Pro 使用 RTL8211F PHY 芯片作为以太网收发器。 RTL8211F是一种高性能的千兆以太网物理层收发器(PHY),广泛用于台式机、笔记本电脑、网络交换机等设备中。主要特点: 采用低功耗28nm CMOS技术,功耗低。支持千兆速率(10/100/1000Mbps)。支持全双工和半双工…...

博途WinCC专业版C/S架构入门指南
WinCC Professional V16 支持客户机/服务器架构,但目前只支持单个服务器或单对冗余服务器/多个客户机的模式,还不能支持像WinCC V7.5 SP1中的多个服务器/多个客户机的分布式架构。 博途工控人平时在哪里技术交流博途工控人社群 博途工控人平时在哪里技…...

大数据生态圈kafka在物联网中的应用测试
背景 由物联网项目中使用到了Tbox应用管理车辆,在上报数据的过程中,需要将终端产生的数据通过kafka的produce topic customer对数据进行处理后,放置到mysql中。完成数据二进制到json转换工作。 Kafka的使用 查看kafka的topic ./kafka-topi…...

ChatGPT使用:一个发包机器人的提示词
发包机器人: 设想:目前项目组有n条打包线会输出多个包,用户想获取最新的包是比较困难的,难点在于 1. 分支多:trunk,release,outer等,至少有3个分支; 2. 多平台&#x…...

Axure元件库的使用
1.基本元件库 1.1Axure的画布范围 Axure是一个绘制项目原型图的软件,它里面的基本原件有: 1.1元件的呈现范围 首先我们要了解基本元件的作用范围在哪里? 浏览效果: 可以看出当我们的基本元件放在画布区域内是可以完全呈现出来…...

Unity中Shader URP最简Shader框架(整理总结篇)
文章目录 前言一、精简 ShaderGraph 所有冗余代码后的最简 URP Shader二、我们来对比一下 URP Shader 与 BuildInRP Shader 的对应关系 与 区别1、"RenderPipeline""UniversalPipeline"2、面片剔除、深度测试、深度写入、颜色混合 和 BRP 下一致3、必须引入…...

AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference
AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference 1. 源由2. 规格3. 分析3.1 喜欢3.2 不便3.3 建议 4. 总结5. 参考资料 1. 源由 AT32 F435飞控在xFlight开源飞控之AT32F435计划一文中已经大体阐述了一些移植历史。 之前整体上看,就是航模飞控新MCU的移植…...
ntp时间同步配置中 server、pool和peer的区别
在 NTP(Network Time Protocol)的配置中,server、pool 和 peer 是用于指定时间同步关系的关键字,它们在角色和行为上有一些区别。 server: server 关键字用于指定一个或多个 NTP 服务器,这些服务器将提供时…...

JMeter安装RabbitMQ测试插件
整体流程如下:先下载AMQP插件源码,可以通过antivy在本地编译成jar包,再将jar包导入JMeter目录下,重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序,使开发人员更轻…...

基于ssm日用品网站设计论文
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本日用品网站就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息&…...
coco数据集格式的RandomCrop
transforms.py文件的改进 添加 RandomCrop 函数 class RandomCrop(object):"""随机裁剪图像以及bboxes"""def __init__(self, output_size):self.output_size output_sizedef __call__(self, image, target):height, width image.shape[-2:]…...

机器学习-KL散度的直观理解+代码
KL散度 直观理解:KL散度是一种衡量两个分布之间匹配程度的方法。通常在概率和统计中,我们会用更简单的近似分布来代替观察到的数据或复杂的分布,KL散度帮我们衡量在选择近似值时损失了多少信息。 在信息论或概率论中,KL散度&#…...

【教程】制作 iOS 推送证书
目录 证书类型 MAC Key Store 消息推送控制台 制作证书 创建苹果 App ID 使用appuploder制作 .p12文件 创建证书 如需向 iOS 设备推送数据,您首先需要在消息推送控制台上配置 iOS 推送证书。iOS 推送证书用于推送通知,本文将介绍消息推送服务支…...

ToolLLM model 以及LangChain AutoGPT Xagent在调用外部工具Tools的表现对比浅析
文章主要谈及主流ToolLLM 以及高口碑Agent 在调用Tools上的一些对比,框架先上,内容会不断丰富与更新。 第一部分,ToolLLM model 先来说主打Function Call 的大模型们 OpenAI GPT 宇宙第一LLM,它的functionCall都知道࿰…...

【MySQL学习之基础篇】约束
文章目录 1. 概述2. 基础约束3. 外键约束3.1. 介绍3.2. 外键的添加3.3. 外键删除和更新行为 1. 概述 概念: 约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的: 保证数据库中数据的正确、有效性和完整性。 分类&#x…...

【DataSophon】大数据管理平台DataSophon-1.2.1基本使用
🦄 个人主页——🎐开着拖拉机回家_Linux,大数据运维-CSDN博客 🎐✨🍁 🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁&am…...

基于redisson实现发布订阅(多服务间用避坑)
前言 今天要分享的是基于Redisson实现信息发布与订阅(以前分享过直接基于redis的实现),如果你是在多服务间基于redisson做信息传递,并且有服务压根就收不到信息,那你一定要看完。 今天其实重点是避坑࿰…...
Java 源码、反码、补码 位运算
文章目录 1. 源码、反码、补码1.1 原码1.2 反码1.3 补码1.4 byte的最大值1.5 byte的最小值 2. 位运算2.1 & 与2.2 | 或2.3 ~ 非2.4 ^ 异或2.5 << 左移 (没有无符号左移)2.6 >> 右移 (有符号右移)2.7 >>>…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...

微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?
在建筑行业,项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升,传统的管理模式已经难以满足现代工程的需求。过去,许多企业依赖手工记录、口头沟通和分散的信息管理,导致效率低下、成本失控、风险频发。例如&#…...

DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...