【flink】之集成mybatis对mysql进行读写
背景:
在现代大数据应用中,数据的高效处理和存储是核心需求之一。Flink作为一款强大的流处理框架,能够处理大规模的实时数据流,提供丰富的数据处理功能,如窗口操作、连接操作、聚合操作等。而MyBatis则是一款优秀的持久层框架,能够简化数据库操作,提高开发效率。将这两者结合使用,可以实现高效的数据处理和存储。
介绍:
MyBatis简介
MyBatis是一款基于Java的持久层框架,它可以使用XML配置文件或注解来定义数据库操作。MyBatis提供了简单的API来执行SQL语句,以及更高级的API来处理复杂的数据库操作。其核心是SQL映射,可以将关系型数据库的表映射到Java对象中,从而实现对数据库的操作。此外,MyBatis还提供了一些高级功能,如动态SQL、缓存等,以提高开发效率和性能。
Flink简介
Flink是一款流处理框架,可以处理大规模的实时数据流。Flink支持各种数据源和数据接收器,如Kafka、HDFS、TCP等。Flink的核心是流计算模型,可以实现对数据流的有状态计算,从而实现对实时数据的处理。Flink提供了丰富的数据处理功能,如窗口操作、连接操作、聚合操作等,以满足不同的应用需求。
目的:
Flink集成MyBatis的目的
Flink集成MyBatis的主要目的是将MyBatis作为Flink的数据源,通过Flink处理实时数据流,实现高效的数据处理和存储。使用MyBatis定义数据库操作,以实现高效的数据存储和查询;使用Flink处理实时数据流,以实现高效的数据处理和分析。
准备:
添加依赖
<!--添加spring依赖--><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>5.2.2.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>5.2.2.RELEASE</version></dependency><!--添加mybatis相关依赖--><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId><version>3.5.4</version></dependency><dependency><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId><version>2.0.7</version></dependency><!--添加连接池和mysql驱动依赖--><dependency><groupId>com.zaxxer</groupId><artifactId>HikariCP</artifactId><version>3.4.5</version><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!-- 加上这个才能辨认到*.yml文件 如果配置文件不使用yaml,则不需要引用此依赖--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId><version>2.17.2</version></dependency>
代码示例:
配置文件设置
config.properties文件配置
local.url=jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNull
local.username=root
local.password=
local.maximumPoolSize=10
或者配置yml文件,(二选其一)如下:
local:url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=ROUND&allowMultiQueries=true&zeroDateTimeBehavior=convertToNullusername: rootpassword:maximumPoolSize: 5
配置文件加载
package com.iterge.flink.utils;import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;import java.io.IOException;
import java.util.Properties;
import java.util.Set;/*** @author iterge* @version 1.0* @date 2024/10/18 14:34* @description spring环境初始化*/public class SpringEnv {private static volatile boolean inited = false;//配置文件地址private static final String applicationLocation = "/application.yml";public static void init() {if (!inited) {System.out.println("...........................spring init start ...........................");//加载配置文件AnnotationConfigApplicationContext springContext = new AnnotationConfigApplicationContext();springContext.scan("com.iterge.flink");springContext.refresh();System.out.println("...........................spring init end ...........................");System.out.println("...........................config init start ...........................");//loadProperties();loadYamlProperties();System.out.println("...........................config init start ...........................");inited = true;}}/*** 加载配置文件*/private static void loadProperties() {try {Resource resource = new ClassPathResource(applicationLocation);Properties properties = PropertiesLoaderUtils.loadProperties(resource);Set<String> keys = properties.stringPropertyNames();for (String key : keys) {System.setProperty(key, properties.getProperty(key));}} catch (IOException e) {throw new RuntimeException(e.getMessage());}}/*** 加载yml文件*/private static void loadYamlProperties() {try {Resource resource = new ClassPathResource(applicationLocation);YamlPropertiesFactoryBean yamlPropertiesFactoryBean = new YamlPropertiesFactoryBean();yamlPropertiesFactoryBean.setResources(resource);Properties properties = yamlPropertiesFactoryBean.getObject();assert properties != null;Set<String> keys = properties.stringPropertyNames();for (String key : keys) {System.setProperty(key, properties.getProperty(key));}}catch (Exception e){throw new RuntimeException(e.getMessage());}}
}
数据源配置&加载
package com.iterge.flink.datasource;import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;/*** @author iterge* @version 1.0* @date 2024/10/12 15:33* @description 本地数据源加载配置*/@Configuration
@Lazy
@MapperScan(basePackages = "com.iterge.flink.mapper",sqlSessionFactoryRef = "localDataSourceSqlSessionFactory",lazyInitialization = "true")
public class LocalDatasourceConfig {@Value("${local.url}")private String url;@Value("${local.username}")private String user;@Value("${local.password}")private String password;@Value("${local.maximumPoolSize:10}")private Integer maxPoolSize;@Bean("localDataSource")public DataSource localDataSource() {return DataSourceHelper.createDataSource(url, user, password, "localDataSource", 5, maxPoolSize);}@Bean("localDataSourceSqlSessionFactory")public SqlSessionFactory localDataSourceSqlSessionFactory(@Qualifier("localDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean bean = new SqlSessionFactoryBean();bean.setDataSource(dataSource);// mapper的xml形式文件位置必须要配置,不然将报错:no statement (这种错误也可能是mapper的xml中,namespace与项目的路径不一致导致)bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));return bean.getObject();}
}
package com.iterge.flink.datasource;import com.zaxxer.hikari.HikariDataSource;/*** @author iterge* @version 1.0* @date 2024/10/12 15:44* @description 数据源创建工具*/
public class DataSourceHelper {public static HikariDataSource createDataSource(String jdbcUrl,String user,String password,String poolName,Integer minIdle,Integer maxPoolSize) {HikariDataSource dataSource = new HikariDataSource();dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");dataSource.setJdbcUrl(jdbcUrl);dataSource.setUsername(user);dataSource.setPassword(password);dataSource.setIdleTimeout(120000);dataSource.setMinimumIdle(minIdle);dataSource.setMaximumPoolSize(maxPoolSize);dataSource.setMaxLifetime(600000);dataSource.setRegisterMbeans(false);dataSource.setConnectionTimeout(2000);dataSource.setPoolName(poolName);return dataSource;}}
创建实体类
package com.iterge.flink.entity;import lombok.Data;/*** @author iterge* @date 2024/10/12 16:00:50*/@Data
public class User {private Integer id;private String name;
}
创建mapper
package com.iterge.flink.mapper;import com.iterge.flink.entity.User;
import org.apache.ibatis.annotations.Mapper;/*** @author iterge* @version 1.0* @date 2024/10/12 15:59* @description 用户对象dao*/@Mapper
public interface UserMapper {int insertOne(User user);}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.iterge.flink.mapper.UserMapper"><insert id="insertOne" keyProperty="id" useGeneratedKeys="true" parameterType="com.iterge.flink.entity.User">insert into t_user(name) values(#{name})</insert></mapper>
上下文获取工具
package com.iterge.flink.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @author iterge* @version 1.0* @date 2024/10/12 16:20* @description 上下文文获取工具*/@Slf4j
@Component
public class ContextUtil implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext context) throws BeansException {ContextUtil.applicationContext = context;}public static ApplicationContext getContext() {return applicationContext;}public static Object getBean(String name) {if (getContext() == null) {log.error("spring context can not be found");return null;}if (StringUtils.isBlank(name)) {log.error("bean name can not be null");return false;}return getContext().getBean(name);}
}
创建flink任务
package com.iterge.flink.job;import com.iterge.flink.entity.User;
import com.iterge.flink.mapper.UserMapper;
import com.iterge.flink.utils.ContextUtil;
import com.iterge.flink.utils.SpringEnv;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;/**** @author FlinkMybatisDemo* @date 2024/10/12 11:17* @version 1.0* @description 整合mybatis*
*/@Slf4j
public class FlinkMybatisDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");SingleOutputStreamOperator<String> process = stringDataStreamSource.process(new ProcessFunction<String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);SpringEnv.init();}@Overridepublic void processElement(String s, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {log.info("message={}",s);User u = new User();u.setName(s);UserMapper mapper = ContextUtil.getContext().getBean(UserMapper.class);mapper.insertOne(u);collector.collect(s);}});process.print();env.execute("mybatis-demo");}
}
代码地址:
GitCode - 全球开发者的开源社区,开源代码托管平台
相关文章:
【flink】之集成mybatis对mysql进行读写
背景: 在现代大数据应用中,数据的高效处理和存储是核心需求之一。Flink作为一款强大的流处理框架,能够处理大规模的实时数据流,提供丰富的数据处理功能,如窗口操作、连接操作、聚合操作等。而MyBatis则是一款优秀的持…...
Java设计模式—观察者模式详解
引言 模式角色 UML图 示例代码 应用场景 优点 缺点 结论 引言 观察者模式(Observer Pattern)是一种行为设计模式,它定义了对象之间的一对多依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都会得到通知…...
【Cri-Dockerd】安装cri-dockerd
cri-dockerd的作用: 在k8s1.24之前。k8s会通过dockershim来调用docker进行容器运行时containerd,并且会自动安装dockershim,但是从1.24版本之前k8s为了降低容器运行时的调用的复杂度和效率,直接调用containerd了,并且…...
GCC及GDB的使用
参考视频及博客 https://www.bilibili.com/video/BV1EK411g7Li/?spm_id_from333.999.0.0&vd_sourceb3723521e243814388688d813c9d475f https://www.bilibili.com/video/BV1ei4y1V758/?buvidXU932919AEC08339E30CE57D39A2BABF6A44F&from_spmidsearch.search-result.0…...
大数据新视界 -- 大数据大厂之大数据重塑影视娱乐产业的未来(4 - 3)
💖💖💖亲爱的朋友们,热烈欢迎你们来到 青云交的博客!能与你们在此邂逅,我满心欢喜,深感无比荣幸。在这个瞬息万变的时代,我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...
数据结构——基础知识补充
1.队列 1.普通队列 queue.Queue 是 Python 标准库 queue 模块中的一个类,适用于多线程环境。它实现了线程安全的 FIFO(先进先出)队列。 2.双端队列 双端队列(Deque,Double-Ended Queue)是一种具有队列和…...
只有.git文件夹时如何恢复项目
有时候误删文件但由于.git是隐藏文件夹而幸存,或者项目太大,单单甩给你一个.git文件夹让你自己恢复整个项目,该怎么办呢? 不用担心,只要进行以下步骤,即可把原项目重新搭建起来: 创建一个文件…...
anchor、anchor box、bounding box之间关系
最近学YOLO接触到这些概念,一下子有点蒙,简单总结一下。 anchor和anchor box Anchor:表示一组预定义的尺寸比例,用来代表常见物体的宽高比。可以把它看成是一个模板或规格,定义了物体框的“形状”和“比例”ÿ…...
代码随想录算法训练营第三十天 | 452.用最少数量的箭引爆气球 435.无重叠区间 763.划分字母区间
LeetCode 452.用最少数量的箭引爆气球: 文章链接 题目链接:452.用最少数量的箭引爆气球 思路: 气球的区间有重叠部分,只要弓箭从重叠部分射出来,那么就能减少所使用的弓箭数 **局部最优:**只要有重叠部分…...
海亮科技亮相第84届中国教装展 尽显生于校园 长于校园教育基因
10月25日,第84届中国教育装备展示会(以下简称“教装展”)在昆明滇池国际会展中心开幕。作为国内教育装备领域规模最大、影响最广的专业展会,本届教装展以“数字赋能教育,创新引领未来”为主题,为教育领域新…...
C语言数据结构学习:栈
C语言 数据结构学习 汇总入口: C语言数据结构学习:[汇总] 1. 栈 栈,实际上是一种特殊的线性表。这里使用的是链表栈,链表栈的博客:C语言数据结构学习:单链表 2. 栈的特点 只能在一端进行存取操作&#x…...
如何快速分析音频中的各种频率成分
从视频中提取音频 from moviepy.editor import VideoFileClip# Load the video file and extract audio video_path "/mnt/data/WeChat_20241026235630.mp4" video_clip VideoFileClip(video_path)# Extract audio and save as a temporary file for further anal…...
MongoDB 6.0 主从复制配置
以下是 MongoDB 6.0 版本配置主从的详细安装步骤: 1. 安装 MongoDB:可以从官网下载 MongoDB 6.0 的安装包并进行安装,或者使用相应的包管理工具进行安装。 2. 配置主节点:在主节点的 MongoDB 配置文件(默认路径为 …...
NPU 神经网络处理单元
Ⅰ 什么是 NPU? 当前正处于神经网络和机器学习处理需求爆发的初期。传统的 CPU(中央处理器)/GPU(图形处理器)可以执行类似任务,但专门为神经网络优化的 NPU(神经处理单元)比 CPU/GP…...
安宝特分享 | AR技术引领:跨国工业远程协作创新模式
在当今高度互联的工业环境中,跨国合作与沟通变得日益重要。然而,语言障碍常常成为高效协作的绊脚石。安宝特AR眼镜凭借其强大的多语言自动翻译和播报功能,正在改变这一局面,让远程协作变得更加顺畅。 01 多语言翻译优势 安宝特A…...
Vulkan 开发(五):Vulkan 逻辑设备
图片来自《Vulkan 应用开发指南》 Vulkan 开发系列文章: 1. 开篇,Vulkan 概述 2. Vulkan 实例 3. Vulkan 物理设备 4. Vulkan 设备队列 在 Vulkan 中,逻辑设备(Logical Device)是与物理设备(Physical D…...
Kafka 解决消息丢失、乱序与重复消费
一、引言 在分布式系统中,Apache Kafka 作为一种高吞吐量的分布式发布订阅消息系统,被广泛应用于日志收集、流式处理、消息队列等场景。然而,在实际使用过程中,可能会遇到消息丢失、乱序、重复消费等问题,这些问题可能…...
计算机专业毕业生面试工具推荐:白瓜面试
随着毕业季的临近,计算机专业的毕业生们即将步入职场,面试成为了他们必须面对的挑战。在这个过程中,选择合适的面试工具可以大大提高求职成功率。今天,我要向大家推荐一款专为计算机专业毕业生设计的面试工具——白瓜面试。 为什…...
数字IC开发:布局布线
数字IC开发:布局布线 前端经过DFT,综合后输出网表文件给后端,由后端通过布局布线,将网表转换为GDSII文件;网表文件只包含单元器件及其连接等信息,GDS文件则包含其物理位置,具体的走线࿱…...
高空作业未系安全带监测系统 安全带穿戴识别预警系统
在各类高空作业场景中,安全带是保障作业人员生命安全的关键防线。然而,由于人为疏忽或其他原因,作业人员未正确系挂安全带的情况时有发生,这给高空作业带来了巨大的安全隐患。为有效解决这一问题,高空作业未系安全带监…...
k8s的配置和存储(ConfigMap、Secret、Hostpath、EmptyDir以及NFS的服务使用)
ConfigMap 简介 在 Kubernetes 中,ConfigMap 是一种用于存储非敏感信息的 Kubernetes 对象。它用于存储配置数据,如键值对、整个配置文件或 JSON 数据等。ConfigMap 通常用于容器镜像中的配置文件、命令行参数和环境变量等。 ConfigMap 可以通过三种方…...
JS轮播图实现自动轮播、悬浮停止轮播、点击切换,下方指示器与图片联动效果
代码: <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><s…...
使用 Kafka 和 MinIO 实现人工智能数据工作流
MinIO Enterprise Object Store 是用于创建和执行复杂数据工作流的基础组件。此事件驱动功能的核心是使用 Kafka 的 MinIO 存储桶通知。MinIO Enterprise Object Store 为所有 HTTP 请求(如 PUT、POST、COPY、DELETE、GET、HEAD 和 CompleteMultipartUpload…...
力扣题86~90
题86(中等): python代码 # Definition for singly-linked list. # class ListNode: # def __init__(self, val0, nextNone): # self.val val # self.next next class Solution:def partition(self, head: Optional[Li…...
【JavaEE】【多线程】定时器
目录 一、定时器简介1.1 Timer类1.2 使用案例 二、实现简易定时器2.1 MyTimerTask类2.2 实现schedule方法2.3 构造方法2.4 总代码2.5 测试 一、定时器简介 定时器:就相当于一个闹钟,当我们定的时间到了,那么就执行一些逻辑。 1.1 Timer类 …...
CI/CD 的原理
一、CI/CD 的概念 CI/CD是一种软件开发流程,旨在通过自动化和持续的集成、测试和交付实现高质量的软件产品。 CI(Continuous Integration)持续集成 目前主流的开发方式是协同开发,即多位开发人员同事处理同意应用不同模块或功能。 如果企业在同一时间将…...
进一步认识ICMP协议
在日常工作中,我们经常需要判断网络是否连通,相信大家使用较多的命令就是 ping啦。ping命令是基于 ICMP 协议来实现的,那么什么是 ICMP 协议呢?ping命令又是如何基于 ICMP 实现的呢? 今天这篇文章,我们就来…...
NUUO网络视频录像机upload.php任意文件上传漏洞复现
文章目录 免责声明漏洞描述搜索语法漏洞复现nuclei修复建议 免责声明 本文章仅供学习与交流,请勿用于非法用途,均由使用者本人负责,文章作者不为此承担任何责任 漏洞描述 NUUO网络视频录像机(Network Video Recorder࿰…...
WebGL 3D基础
1. 归一化函数 对一个向量进行归一化处理,即调整向量的模长(长度)为1,同时保持其方向不变。 // 归一化函数 function normalized(arr) {let sum 0;for (let i 0; i < arr.length; i) {sum arr[i] * arr[i];}const middle …...
Docker 部署MongoDb
1. 编写docker-compose.conf 文件 version: 3 services:mongo:image: mongo:latest # 指定 MongoDB 版本,确保 > 3.6container_name: mongo-replicarestart: alwayscommand: ["mongod", "--replSet", "rs0", "--oplogSize&…...
敦煌做网站 条件/已备案域名30元
React Hooks+Laravel 前端博客实战 阐述用create-next-app快速创建项目建立博客首页按需加载 Ant Design配置文件 blog\package.json阐述 我们先完成博客的前端界面的制作,主要完成的功能就是用户的访问,文章列表和文章详情页面。 因为Blog的前台需要SEO操作,所以我们一定…...
汽车技术支持 武汉网站建设/域名关键词排名查询
前言这是作者修改后的Linux下的小米随身WiFi驱动,支持Ap模式,开源,同时支持360随身WiFi二代和小度WiFiQQ群(随身WiFi_Linux_Ap)389615079 注意:1.这个是Lin前言这是作者修改后的Linux下的小米随身WiFi驱动,支持Ap模式&…...
wordpress登陆美化/seo查询
我是卢松松,点点上面的头像,欢迎关注我哦! 2月9日,我们刚上班。很多公司也有开工发红包的习惯,虽然我们只是个小公司,但我们也发。昨天给到岗的同事每人发了800元红包,未到岗的500元。 我们定…...
网站默认首页/网络广告策划书范文
volatile关键字区分C程序员和嵌入式系统程序员的最基本的问题:嵌入式系统程序员经常同硬件、中断、RTOS等等打交道,所有这些都要求使用volatile变量。不懂得volatile内容将会带来灾难。 volatile的作用是作为指令关键字,确保本条指令不会因编译器的优化而…...
做的不错的网站/百度网站首页网址
你关注ArcGIS Online吗?下面的看点你觉得怎么样? 看点2:ArcGIS Online功能增强 大会列举了Online最受喜爱的十大功能点,以及其它新的功能增强。如支持更多类型文件下载,支持更多服务类型的底图,更加灵活的…...
中国建设信用卡积分兑换网站/站长查询域名
最近做图像分类,数据集的整理真的好费时间,找到一个代码,非常好使,推荐给大家。实现的功能是,将一个文件夹下的每个子文件夹的一半数据分割出来并保存,生成相应的train.txt,test.txt。自己弄数据集…...