当前位置: 首页 > news >正文

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档

Flink 读取 ClickHouse 数据两种驱动

  1. ClickHouse 官方提供Clickhouse JDBC.【建议使用
  2. 第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver

ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护

ClickHouse 官方提供Clickhouse JDBC的包名:com.clickhouse.jdbc.*

有些版本com.clickhouse.jdbc.* 包含了 ru.yandex.clickhouse.ClickHouseDriver.

因此加载包的时候一定要注意导入的包名

引入依赖

        <!-- clickhouse jdbc driver --><dependency><groupId>com.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId></dependency>

使用的是 0.3 这个版本,该版本就包含上述3方CH jdbc包

     <!-- CH JDBC版本推荐使用 0.3, 0.4的版本是要 JDK 17 --><clickhouse-jdbc.version>0.3.2-patch11</clickhouse-jdbc.version>

自定义Source

测试表映射实体类,该表仅有一个name字段

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CHTestPO {private String name;}

Flink Clickhouse Source

public class ClickHouseSource implements SourceFunction<CHTestPO> {private final String URL;private final String SQL;public ClickHouseSource(String URL, String SQL) {this.URL = URL;this.SQL = SQL;}@Overridepublic void run(SourceContext<CHTestPO> output) throws Exception {//  Properties是持久化的属性集 Properties的key和value都是字符串Properties properties = new Properties();ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource(URL, properties);// 使用 try-with-resource 方式关闭JDBC连接 无需手动关闭try (ClickHouseConnection conn = clickHouseDataSource.getConnection()) {// clickhouse 通过游标的方式读取数据Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(SQL);while (rs.next()) {String name = rs.getString(1);output.collect(new CHTestPO(name));}}}@Overridepublic void cancel() {}
}

自定义Sink

需额外引入依赖

        <!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency>

Java 对sql语句处理的两个对象

  1. PreparedStatement对象:能够对预编译之后的sql语句进行处理【SQL 语句预编译:通过占位符'?'实现,可以防止sql注入】
  2. Statement对象:只能对静态的sql语句进行处理

核心代码

/*** 使用 Flink-jdbc-connector + 批量写入 + sql语句的预编译 写入 Clickhouse*/
public class ClickHouseJdbcSink<T> {private final SinkFunction<T> sink;private final static String NA = "null";public ClickHouseJdbcSink(String sql, int batchSize, String url) {sink = JdbcSink.sink(sql,// 对sql语句进行预编译new ClickHouseJdbcStatementBuilder<T>(),// 设置批量插入数据new JdbcExecutionOptions.Builder().withBatchSize(batchSize).build(),// 设置ClickHouse连接配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(url).build());}public SinkFunction<T> getSink() {return this.sink;}/*** 对预编译之后的sql语句进行占位符替换** @param ps:     PreparedStatement对象 下标从 1 开始* @param fields: clickhouse表PO对象的属性字段* @param object: clickhouse表PO对象的属性字段所对应的数据类型*/public static void setPreparedStatement(PreparedStatement ps,Field[] fields,Object object) throws IllegalAccessException, SQLException {// 遍历 Field[]for (int i = 1; i <= fields.length; i++) {// 取出每个Field实例Field field = fields[i - 1];// 指示反射的对象在使用时应该取消 Java 语言访问检查field.setAccessible(true);// 通过Field实例的get方法返回指定的对象Object o = field.get(object);if (o == null) {ps.setNull(i, 0);continue;}// 这里统一设为字符型String fieldValue = o.toString();// 变量和常量的比较,通常将常量放前,可以避免空指针if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {// 替换对应位置的占位符ps.setObject(i, fieldValue);} else {ps.setNull(i, 0);}}}}

对sql语句进行预编译

@Slf4j
public class ClickHouseJdbcStatementBuilder<T> implements JdbcStatementBuilder<T> {@Overridepublic void accept(PreparedStatement preparedStatement, T t) throws SQLException {/* *********************** Java通过反射获取类的字段:** 1. getDeclaredFields():获取所有的字段,不会获取父类的字段* 2. getFields(): 只能会public字段,获取包含父类的字段** *********************/Field[] fields = t.getClass().getDeclaredFields();// 将获取到的字段替换sql预编译之后的占位符。try {ClickHouseJdbcSink.setPreparedStatement(preparedStatement, fields, t);} catch (IllegalAccessException e) {log.error("sql 预编译失败", e);e.printStackTrace();}}
}

ClickHouse读写工具类

image-20231209233006017

public class ClickHouseUtil {private static final String URL;static {ParameterTool parameterTool = ParameterUtil.getParameters();URL = parameterTool.get("clickhouse.url");}/*** 读取clickhouse*/public static DataStream<CHTestPO> read(StreamExecutionEnvironment env, String sql) {return env.addSource(new ClickHouseSource(URL, sql));}/*** 批量写入ClickHouse*/public static <T> DataStreamSink<T> batchWrite(DataStream<T> dataStream,String sql,int batchSize) {//生成 SinkFunctionClickHouseJdbcSink<T> clickHouseJdbcSink =new ClickHouseJdbcSink<T>(sql, batchSize, URL);return dataStream.addSink(clickHouseJdbcSink.getSink());}}

测试一下

public class ClickHouseUtilTest {@DisplayName("测试Flink+jdbc+游标读取Clickhouse")@Testvoid testRead() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 从default数据库的user表中读取数据String sql = "select * from default.user";DataStream<CHTestPO> ds = ClickHouseUtil.read(env, sql);// 打印数据流中的元素ds.print("clickhouse");// 执行程序env.execute();}@DisplayName("测试Flink-Connector-jdbc+预编译批量写入Clickhouse")@Testvoid testBatchWrite() throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);CHTestPO po = new CHTestPO();po.setName("Lucy");CHTestPO po1 = new CHTestPO();po1.setName("Jack");DataStream<CHTestPO> ds = env.fromCollection(Arrays.asList(po, po1));// 定义将数据写入ClickHouse数据库的SQL语句String sql = "insert into default.user(name) values(?)";// 调用ClickHouseUtil的batchWrite方法将数据流ds中的数据批量写入ClickHouse数据库ClickHouseUtil.batchWrite(ds, sql, 2);// 执行程序env.execute();}
}

此时表中仅一行记录

image-20231209232619959

读取没有问题!

image-20231209232741522

写入没有问题!

image-20231209232902469

相关文章:

【自定义Source、Sink】Flink自定义Source、Sink对ClickHouse进行读和批量写操作

ClickHouse官网文档 Flink 读取 ClickHouse 数据两种驱动 ClickHouse 官方提供Clickhouse JDBC.【建议使用】第3方提供的Clickhouse JDBC. ru.yandex.clickhouse.ClickHouseDriver ru.yandex.clickhouse.ClickHouseDriver.现在是没有维护 ClickHouse 官方提供Clickhouse JDBC…...

linux 查看服务启动时间

文章目录 linux 查看服务启动时间参数解析 linux 查看服务启动时间 [root104 ~]# ps -o lstart -p ps -ef |grep -v grep |grep "zookeeper"|awk {print$2}STARTED Fri Dec 15 16:54:10 2023参数解析 linux 命令中 ps -ef 详解 ps -ef表示查看全格式的进程。 ps …...

[RK-Linux] 移植Linux-5.10到RK3399(六)| 检查GMAC(RTL8211F)配置使能千兆以太网

ROC-RK3399-PC Pro 使用 RTL8211F PHY 芯片作为以太网收发器。 RTL8211F是一种高性能的千兆以太网物理层收发器(PHY),广泛用于台式机、笔记本电脑、网络交换机等设备中。主要特点: 采用低功耗28nm CMOS技术,功耗低。支持千兆速率(10/100/1000Mbps)。支持全双工和半双工…...

博途WinCC专业版C/S架构入门指南

WinCC Professional V16 支持客户机/服务器架构&#xff0c;但目前只支持单个服务器或单对冗余服务器/多个客户机的模式&#xff0c;还不能支持像WinCC V7.5 SP1中的多个服务器/多个客户机的分布式架构。 博途工控人平时在哪里技术交流博途工控人社群 博途工控人平时在哪里技…...

大数据生态圈kafka在物联网中的应用测试

背景 由物联网项目中使用到了Tbox应用管理车辆&#xff0c;在上报数据的过程中&#xff0c;需要将终端产生的数据通过kafka的produce topic customer对数据进行处理后&#xff0c;放置到mysql中。完成数据二进制到json转换工作。 Kafka的使用 查看kafka的topic ./kafka-topi…...

ChatGPT使用:一个发包机器人的提示词

发包机器人&#xff1a; 设想&#xff1a;目前项目组有n条打包线会输出多个包&#xff0c;用户想获取最新的包是比较困难的&#xff0c;难点在于 1. 分支多&#xff1a;trunk&#xff0c;release&#xff0c;outer等&#xff0c;至少有3个分支&#xff1b; 2. 多平台&#x…...

Axure元件库的使用

1.基本元件库 1.1Axure的画布范围 Axure是一个绘制项目原型图的软件&#xff0c;它里面的基本原件有&#xff1a; 1.1元件的呈现范围 首先我们要了解基本元件的作用范围在哪里&#xff1f; 浏览效果&#xff1a; 可以看出当我们的基本元件放在画布区域内是可以完全呈现出来…...

Unity中Shader URP最简Shader框架(整理总结篇)

文章目录 前言一、精简 ShaderGraph 所有冗余代码后的最简 URP Shader二、我们来对比一下 URP Shader 与 BuildInRP Shader 的对应关系 与 区别1、"RenderPipeline""UniversalPipeline"2、面片剔除、深度测试、深度写入、颜色混合 和 BRP 下一致3、必须引入…...

AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference

AT32F435飞控之DIATONE MAMBA MK5 F435 Anti-Interference 1. 源由2. 规格3. 分析3.1 喜欢3.2 不便3.3 建议 4. 总结5. 参考资料 1. 源由 AT32 F435飞控在xFlight开源飞控之AT32F435计划一文中已经大体阐述了一些移植历史。 之前整体上看&#xff0c;就是航模飞控新MCU的移植…...

ntp时间同步配置中 server、pool和peer的区别

在 NTP&#xff08;Network Time Protocol&#xff09;的配置中&#xff0c;server、pool 和 peer 是用于指定时间同步关系的关键字&#xff0c;它们在角色和行为上有一些区别。 server&#xff1a; server 关键字用于指定一个或多个 NTP 服务器&#xff0c;这些服务器将提供时…...

JMeter安装RabbitMQ测试插件

整体流程如下&#xff1a;先下载AMQP插件源码&#xff0c;可以通过antivy在本地编译成jar包&#xff0c;再将jar包导入JMeter目录下&#xff0c;重启JMeter生效。 Apache Ant 是一个基于 Java 的构建工具。Ant 可用于自动化构建和部署 Java 应用程序&#xff0c;使开发人员更轻…...

基于ssm日用品网站设计论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本日用品网站就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&…...

coco数据集格式的RandomCrop

transforms.py文件的改进 添加 RandomCrop 函数 class RandomCrop(object):"""随机裁剪图像以及bboxes"""def __init__(self, output_size):self.output_size output_sizedef __call__(self, image, target):height, width image.shape[-2:]…...

机器学习-KL散度的直观理解+代码

KL散度 直观理解&#xff1a;KL散度是一种衡量两个分布之间匹配程度的方法。通常在概率和统计中&#xff0c;我们会用更简单的近似分布来代替观察到的数据或复杂的分布&#xff0c;KL散度帮我们衡量在选择近似值时损失了多少信息。 在信息论或概率论中&#xff0c;KL散度&#…...

【教程】制作 iOS 推送证书

​ 目录 证书类型 MAC Key Store 消息推送控制台 制作证书 创建苹果 App ID 使用appuploder制作 .p12文件 创建证书 如需向 iOS 设备推送数据&#xff0c;您首先需要在消息推送控制台上配置 iOS 推送证书。iOS 推送证书用于推送通知&#xff0c;本文将介绍消息推送服务支…...

ToolLLM model 以及LangChain AutoGPT Xagent在调用外部工具Tools的表现对比浅析

文章主要谈及主流ToolLLM 以及高口碑Agent 在调用Tools上的一些对比&#xff0c;框架先上&#xff0c;内容会不断丰富与更新。 第一部分&#xff0c;ToolLLM model 先来说主打Function Call 的大模型们 OpenAI GPT 宇宙第一LLM&#xff0c;它的functionCall都知道&#xff0…...

【MySQL学习之基础篇】约束

文章目录 1. 概述2. 基础约束3. 外键约束3.1. 介绍3.2. 外键的添加3.3. 外键删除和更新行为 1. 概述 概念&#xff1a; 约束是作用于表中字段上的规则&#xff0c;用于限制存储在表中的数据。     目的&#xff1a; 保证数据库中数据的正确、有效性和完整性。 分类&#x…...

【DataSophon】大数据管理平台DataSophon-1.2.1基本使用

&#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&am…...

基于redisson实现发布订阅(多服务间用避坑)

前言 今天要分享的是基于Redisson实现信息发布与订阅&#xff08;以前分享过直接基于redis的实现&#xff09;&#xff0c;如果你是在多服务间基于redisson做信息传递&#xff0c;并且有服务压根就收不到信息&#xff0c;那你一定要看完。 今天其实重点是避坑&#xff0…...

Java 源码、反码、补码 位运算

文章目录 1. 源码、反码、补码1.1 原码1.2 反码1.3 补码1.4 byte的最大值1.5 byte的最小值 2. 位运算2.1 & 与2.2 | 或2.3 ~ 非2.4 ^ 异或2.5 << 左移 &#xff08;没有无符号左移&#xff09;2.6 >> 右移 &#xff08;有符号右移&#xff09;2.7 >>>…...

时序分解 | Matlab实现NGO-ICEEMDAN基于北方苍鹰算法优化ICEEMDAN时间序列信号分解

时序分解 | Matlab实现NGO-ICEEMDAN基于北方苍鹰算法优化ICEEMDAN时间序列信号分解 目录 时序分解 | Matlab实现NGO-ICEEMDAN基于北方苍鹰算法优化ICEEMDAN时间序列信号分解效果一览基本介绍程序设计参考资料 效果一览 基本介绍 Matlab实现NGO-ICEEMDAN基于北方苍鹰算法优化ICE…...

Linux Conda 安装 Jupyter

在Linux服务器Conda环境上安装Jupyter过程中遇到了无数的报错&#xff0c;特此记录。 目录 步骤一&#xff1a;安装Anaconda3 步骤二&#xff1a;配置Conda源 步骤三&#xff1a;安装Jupyter 安装报错&#xff1a;simplejson.errors.JSONDecodeError 安装报错&#xff1a;…...

金融众筹系统源码:适合创业孵化机构 附带完整的搭建教程

互联网技术的发展&#xff0c;金融众筹作为一种新型的融资方式&#xff0c;逐渐成为创业孵化机构的重要手段。为了满足这一需求&#xff0c;金融众筹系统源码就由此而生&#xff0c;并附带了完整的搭建教程。 以下是部分代码示例&#xff1a; 系统特色功能一览&#xff1a; 1.…...

OpenCV imencode 函数详解与应用示例

OpenCV imencode 函数详解与应用示例 介绍imencode 函数的基本信息示例代码应用场景 介绍 OpenCV是一个强大的计算机视觉库&#xff0c;提供了许多图像处理和分析的工具。imencode函数是其中之一&#xff0c;用于将图像编码为指定格式的字节流。这个函数对于图像的存储、传输和…...

持续集成交付CICD:Jenkins使用CD流水线下载Nexus制品

目录 一、实验 1.Jenkins使用CD流水线下载Nexus制品 一、实验 1.Jenkins使用CD流水线下载Nexus制品 &#xff08;1&#xff09;Jenkins新建CD流水线 &#xff08;2&#xff09;新建视图 &#xff08;3&#xff09;查看视图 &#xff08;4&#xff09;添加字符参数 &#xf…...

【C++】输入输出流 ⑩ ( 文件流 | 文件流打开方式参数 | 文件指针 | 组合打开方式 | 文件打开失败 )

文章目录 一、文件流打开方式参数1、文件流打开方式参数2、文件指针3、组合打开方式4、文件打开失败 一、文件流打开方式参数 1、文件流打开方式参数 文件流打开方式参数 : ios::in : 以只读方式打开文件 ;ios::out : 以只写方式打开文件 , 默认打开方式 , 如果文件已存在则清…...

React中的setState执行机制

我这里今天下雨了&#xff0c;温度一下从昨天的22度降到今天的6度&#xff0c;家里和学校已经下了几天雪了&#xff0c;还是想去玩一下的&#xff0c;哈哈&#xff0c;只能在图片里看到了。 一. setState是什么 它是React组件中用于更新状态的方法。它是类组件中的方法&#x…...

LabVIEW实时建模检测癌细胞的异常

LabVIEW实时建模检测癌细胞的异常 癌症是全球健康的主要挑战之一&#xff0c;每年导致许多人死亡。世界卫生组织指出&#xff0c;不健康的生活方式和日益严重的环境污染是癌症发生的主要原因之一。癌症的发生通常与基因突变有关&#xff0c;这些突变导致细胞失去正常的增长和分…...

Python卡尔曼滤波器OpenCV跟踪和预测物体的轨迹

模拟简单物体二维运动和预测位置 预测数学式 想象一下你正坐在一辆汽车里&#xff0c;在雾中行驶。 你几乎看不到路&#xff0c;但你有一个 GPS 系统可以告诉你你的速度和位置。 问题是&#xff0c;这个 GPS 并不完美&#xff1b; 它有时会产生噪音或不准确的读数。 您如何知…...

LeetCode Hot100 25.K个一组翻转链表

题目&#xff1a; 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是 k 的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。 你不能只是单纯…...

wordpress系统和插件下载地址/北京网站优化合作

MySQL事物隔离级别事物基本要素概念说明事物隔离级别读未提交读提交可重复读串行化事物基本要素 原子性&#xff1a;事务开始后所有操作&#xff0c;要么全部做完&#xff0c;要么全部不做&#xff0c;不可能停滞在中间环节。事务执行过程中出错&#xff0c;会回滚到事务开始前…...

网站建设php教程视频/seo优化软件

实现的效果如下&#xff0c;根据“体重分类”列来控制所在行的颜色。 步骤如下&#xff1a; 1、选中表格内容&#xff1b; 2、条件格式-新建规则-使用公式确定要设置格式的单元格 3、公式如下&#xff0c;实现效果&#xff0c;如果为“超重”&#xff0c;则所在行显示为红色 …...

机械厂做网站到底有没有效果/磁力搜索器 磁力猫在线

在设置中搜索python.linting.pylintPath如下 如果使用的是anaconda&#xff0c;在anaconda文件夹中寻找pylint应用程序的安装位置&#xff0c;例如&#xff1a;E:\anaconda\new\pkgs\pylint-2.2.2-py37_0\Scripts 如果没有安装anaconda&#xff0c;先检查是否安装了pylint&am…...

武汉值得去的互联网公司/南宁seo排名收费

https://sourceforge.net/projects/cloc/files/cloc/v1.64/...

关于美丽乡村建设的活动和网站/电商代运营收费标准

在项目中搜索文件随着项目的深入&#xff0c;工程中的各种组、类文件、资源文件的数量越来越庞大。这时在项目中快速定位目标文件变得越来越困难。Xcode充分认识到这一点&#xff0c;并提供给开发者四个方便、快捷的功能&#xff0c;以辅助开发者快速查找、定位目标文件或代码段…...

视频网站是如何做的/会员制营销方案

2019独角兽企业重金招聘Python工程师标准>>> 1.ListView滑动方向判断 代码很简单&#xff0c;给mListView监听onScrollListener事件&#xff0c;然后在onScroll进行判断 //listView中第一项的索引private int mListViewFirstItem 0;//listView中第一项的在屏幕中的…...