【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档
Flink 读取 ClickHouse 数据两种驱动
- ClickHouse 官方提供Clickhouse JDBC.【建议使用】
- 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver
ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护
ClickHouse 官方提供Clickhouse JDBC的包名:
com.clickhouse.jdbc.*有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.
因此加载包的时候一定要注意导入的包名
引入依赖
<!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>
使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包
<!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>
自定义Source
测试表映射实体类,该表仅有一个name字段
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}
Flink Clickhouse Source
public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {// Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}
自定义Sink
需额外引入依赖
<!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>
Java 对sql语句处理的两个对象
- PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过
占位符'?'实现,可以防止sql注入】 - Statement对象:只能对静态的sql语句进行处理
核心代码
/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps: PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}
对sql语句进行预编译
@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}
ClickHouse读写工具类

public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}
测试一下
public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}
此时表中仅一行记录

读取没有问题!

写入没有问题!

相关文章:
【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作
ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…...
linux 查看服务启动时间
文章目录 linux 查看服务启动时间参数解析 linux 查看服务启动时间 [root104 ~]# ps -o lstart -p ps -ef |grep -v grep |grep "zookeeper"|awk {print$2}STARTED Fri Dec 15 16:54:10 2023参数解析 linux 命令中 ps -ef 详解 ps -ef表示查看全格式的进程。 ps …...
[RK-Linux] 移植Linux-5.10到RK3399(六)| 检查GMAC(RTL8211F)配置使能千兆以太网
ROC-RK3399-PC Pro 使用 RTL8211F PHY 芯片作为以太网收发器。 RTL8211F是一种高性能的千兆以太网物理层收发器(PHY),广泛用于台式机、笔记本电脑、网络交换机等设备中。主要特点: 采用低功耗28nm CMOS技术,功耗低。支持千兆速率(10/100/1000Mbps)。支持全双工和半双工…...
博途WinCC专业版C/S架构入门指南
WinCC Professional V16 支持客户机/服务器架构,但目前只支持单个服务器或单对冗余服务器/多个客户机的模式,还不能支持像WinCC V7.5 SP1中的多个服务器/多个客户机的分布式架构。 博途工控人平时在哪里技术交流博途工控人社群 博途工控人平时在哪里技…...
大数据生态圈kafka在物联网中的应用测试
背景 由物联网项目中使用到了Tbox应用管理车辆,在上报数据的过程中,需要将终端产生的数据通过kafka的produce topic customer对数据进行处理后,放置到mysql中。完成数据二进制到json转换工作。 Kafka的使用 查看kafka的topic ./kafka-topi…...
ChatGPT使用:一个发包机器人的提示词
发包机器人: 设想:目前项目组有n条打包线会输出多个包,用户想获取最新的包是比较困难的,难点在于 1. 分支多:trunk,release,outer等,至少有3个分支; 2. 多平台&#x…...
Axure元件库的使用
1.基本元件库 1.1Axure的画布范围 Axure是一个绘制项目原型图的软件,它里面的基本原件有: 1.1元件的呈现范围 首先我们要了解基本元件的作用范围在哪里? 浏览效果: 可以看出当我们的基本元件放在画布区域内是可以完全呈现出来…...
Unity中Shader URP最简Shader框架(整理总结篇)
文章目录 前言一、精简 ShaderGraph 所有冗余代码后的最简 URP Shader二、我们来对比一下 URP Shader 与 BuildInRP Shader 的对应关系 与 区别1、"RenderPipeline""UniversalPipeline"2、面片剔除、深度测试、深度写入、颜色混合 和 BRP 下一致3、必须引入…...
AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference
AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference 1. 源由2. 规格3. 分析3.1 喜欢3.2 不便3.3 建议 4. 总结5. 参考资料 1. 源由 AT32 F435飞控在xFlight开源飞控之AT32F435计划一文中已经大体阐述了一些移植历史。 之前整体上看,就是航模飞控新MCU的移植…...
ntp时间同步配置中 server、pool和peer的区别
在 NTP(Network Time Protocol)的配置中,server、pool 和 peer 是用于指定时间同步关系的关键字,它们在角色和行为上有一些区别。 server: server 关键字用于指定一个或多个 NTP 服务器,这些服务器将提供时…...
JMeter安装RabbitMQ测试插件
整体流程如下:先下载AMQP插件源码,可以通过antivy在本地编译成jar包,再将jar包导入JMeter目录下,重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序,使开发人员更轻…...
基于ssm日用品网站设计论文
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本日用品网站就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息&…...
coco数据集格式的RandomCrop
transforms.py文件的改进 添加 RandomCrop 函数 class RandomCrop(object):"""随机裁剪图像以及bboxes"""def __init__(self, output_size):self.output_size output_sizedef __call__(self, image, target):height, width image.shape[-2:]…...
机器学习-KL散度的直观理解+代码
KL散度 直观理解:KL散度是一种衡量两个分布之间匹配程度的方法。通常在概率和统计中,我们会用更简单的近似分布来代替观察到的数据或复杂的分布,KL散度帮我们衡量在选择近似值时损失了多少信息。 在信息论或概率论中,KL散度&#…...
【教程】制作 iOS 推送证书
目录 证书类型 MAC Key Store 消息推送控制台 制作证书 创建苹果 App ID 使用appuploder制作 .p12文件 创建证书 如需向 iOS 设备推送数据,您首先需要在消息推送控制台上配置 iOS 推送证书。iOS 推送证书用于推送通知,本文将介绍消息推送服务支…...
ToolLLM model 以及LangChain AutoGPT Xagent在调用外部工具Tools的表现对比浅析
文章主要谈及主流ToolLLM 以及高口碑Agent 在调用Tools上的一些对比,框架先上,内容会不断丰富与更新。 第一部分,ToolLLM model 先来说主打Function Call 的大模型们 OpenAI GPT 宇宙第一LLM,它的functionCall都知道࿰…...
【MySQL学习之基础篇】约束
文章目录 1. 概述2. 基础约束3. 外键约束3.1. 介绍3.2. 外键的添加3.3. 外键删除和更新行为 1. 概述 概念: 约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的: 保证数据库中数据的正确、有效性和完整性。 分类&#x…...
【DataSophon】大数据管理平台DataSophon-1.2.1基本使用
🦄 个人主页——🎐开着拖拉机回家_Linux,大数据运维-CSDN博客 🎐✨🍁 🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁&am…...
基于redisson实现发布订阅(多服务间用避坑)
前言 今天要分享的是基于Redisson实现信息发布与订阅(以前分享过直接基于redis的实现),如果你是在多服务间基于redisson做信息传递,并且有服务压根就收不到信息,那你一定要看完。 今天其实重点是避坑࿰…...
Java 源码、反码、补码 位运算
文章目录 1. 源码、反码、补码1.1 原码1.2 反码1.3 补码1.4 byte的最大值1.5 byte的最小值 2. 位运算2.1 & 与2.2 | 或2.3 ~ 非2.4 ^ 异或2.5 << 左移 (没有无符号左移)2.6 >> 右移 (有符号右移)2.7 >>>…...
突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
