当前位置: 首页 > news >正文

Flink 定时加载数据源

一、简介

flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单

如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现

二、pom.xml 文件

注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
在这里插入图片描述

下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.ye</groupId><artifactId>flink-study</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.0</flink.version><!--<flink.version>1.12.7</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
<!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ye.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

三、自定义数据源

使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串

public class TimerSinkRich extends RichSourceFunction<String> {private  ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();private boolean flag = true;private Timer timer;private TimerTask timerTask;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask = new TimerTask() {@Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (flag){if(queue.size()>0){ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if(null!=timer) timer.cancel();if(null!=timerTask) timerTask.cancel();// 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败if(queue.size()<=0) flag = false;}
}

四、flink 加载数据源并启动

public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");}
}

本地测试成功

在这里插入图片描述

五、上传 flink 集群

1、flink 1.16.0

启动成功
在这里插入图片描述
撤销任务成功
在这里插入图片描述
solt 也成功释放
在这里插入图片描述

2、flink 1.12.7

启动成功
在这里插入图片描述
撤销任务当然也没问题,同样能正常释放 slot
在这里插入图片描述

当然你也可以不要 open() 方法

public class DiySinkRich extends RichSourceFunction<String> {private TimerTask timerTask;private Timer timer;private boolean flag = true;private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();@Overridepublic void run(SourceFunction.SourceContext<String> ctx) throws Exception {timerTask = new TimerTask() {@Overridepublic void run() {Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() > 0) {ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if (timer != null) timer.cancel();if (timerTask != null) timerTask.cancel();if (queue.size() == 0) flag = false;}
}

以上就是 flink 定时加载数据源的简单实例

相关文章:

Flink 定时加载数据源

一、简介 flink 自定义实时数据源使用流处理比较简单&#xff0c;比如 Kafka、MQ 等&#xff0c;如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理&#xff0c;比如定时从 mysql 或者 redis 获取一批数据&#xff0c;传入 flink 做处…...

ChatGPT、人工智能、人类和一些酒桌闲聊

© 2023 Conmajia Initiated 10th March, 2023 昨天跟某化学家喝酒&#xff0c;期间提到了 ChatGPT。他的评价是&#xff1a;这鬼东西大量输出毫无意义、错漏百出甚至是虚假的信息&#xff0c;“in a confident accent”。例如某次 GPT 针对“描述某某记者”这一问题&#…...

WebRTC开源库内部调用abort函数引发程序发生闪退问题的排查

目录 1、初始问题描述 2、使用Process Explorer工具查看到处理音视频业务的rtcmpdll.dll模块没有加载起来 3、使用Dependency Walker工具查看到rtcmpdll.dll依赖的库有问题 4、更新库之后Debug程序启动时就发生异常&#xff0c;程序闪退 5、VS调试时看不到有效的函数调用堆…...

Golang并发编程

Golang并发编程 文章目录Golang并发编程1. 协程2. channel2.1 channel的创建2.2 使用waitGroup实现同步3. 并发编程3.1 并发编程之runtime包3.2 mutex互斥锁3.3 channel遍历3.3.1 for if遍历3.3.2 for range3.4 select switch3.5 Timer3.5.1 time.NewTimer()3.5.2 Stop、reset…...

windows+Anaconda环境下安装BERT成功安装方法及问题汇总

前言 在WindowsAnaconda环境下安装BERT&#xff0c;遇到各种问题&#xff0c;几经磨难&#xff0c;最终成功。接下来&#xff0c;先介绍成功的安装方法&#xff0c;再附上遇到的问题汇总 成功的安装方法 1、创建虚拟环境 注意&#xff1a;必须加上python3.7.12以创建环境&a…...

git - 简易指南

git - 简易指南 创建新仓库 创建新文件夹&#xff0c;打开&#xff0c;然后执行 git init 以创建新的 git 仓库。 检出仓库 执行如下命令以创建一个本地仓库的克隆版本&#xff1a; git clone /path/to/repository 如果是远端服务器上的仓库&#xff0c;你的命令会是这个样…...

[论文笔记]Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context

引言 我们知道Transformer很好用&#xff0c;但它设定的最长长度是512。像一篇文章超过512个token是很容易的&#xff0c;那么我们在处理这种长文本的情况下也想利用Transformer的强大表达能力需要怎么做呢&#xff1f; 本文就带来一种处理长文本的Transformer变种——Transf…...

华为OD机试题 - 找目标字符串(JavaScript)| 机考必刷

更多题库,搜索引擎搜 梦想橡皮擦华为OD 👑👑👑 更多华为OD题库,搜 梦想橡皮擦 华为OD 👑👑👑 更多华为机考题库,搜 梦想橡皮擦华为OD 👑👑👑 华为OD机试题 最近更新的博客使用说明本篇题解:找目标字符串题目输入输出示例一输入输出说明Code解题思路版权说…...

C++面向对象编程之六:重载操作符(<<,>>,+,+=,==,!=,=)

重载操作符C允许我们重新定义操作符&#xff08;例如&#xff1a;&#xff0c;-&#xff0c;*&#xff0c;/&#xff09;等&#xff0c;使其对于我们自定义的类类型对象&#xff0c;也能像内置数据类型&#xff08;例如&#xff1a;int&#xff0c;float&#xff0c;double&…...

JS_wangEditor富文本编辑器

官网&#xff1a;https://www.wangeditor.com/ 引入 CSS 定义样式 <link href"https://unpkg.com/wangeditor/editorlatest/dist/css/style.css" rel"stylesheet"> <style>#editor—wrapper {border: 1px solid #ccc;z-index: 100; /* 按需定…...

Django实践-06导出excel/pdf/echarts

文章目录Django实践-06导出excel/pdf/echartsDjango实践-06导出excel/pdf/echarts导出excel安装依赖库修改views.py添加excel导出函数修改urls.py添加excel/运行测试导出pdf安装依赖库修改views.py添加pdf导出函数修改urls.py添加pdf/生成前端统计图表修改views.py添加get_teac…...

java并发入门(一)共享模型—Synchronized、Wait/Notify、pack/unpack

一、共享模型—管程 1、共享存在的问题 1.1 共享变量案例 package com.yyds.juc.monitor;import lombok.extern.slf4j.Slf4j;Slf4j(topic "c.MTest1") public class MTest1 {static int counter 0;public static void main(String[] args) throws InterruptedEx…...

Ast2500增加用户自定义功能

备注&#xff1a;这里使用的AMI的开发环境MegaRAC进行AST2500软件开发&#xff0c;并非openlinux版本。1、添加上电后自动执行的任务在PDKAccess.c中列出了系统启动过程中的所有任务&#xff0c;若需要添加功能&#xff0c;在相应的任务中添加自定义线程。一般在两个任务里面添…...

用Python暴力求解德·梅齐里亚克的砝码问题

文章目录固定个数的砝码可称量重量砝码的组合方法40镑砝码的组合问 一个商人有一个40磅的砝码&#xff0c;由于跌落在地而碎成4块。后来&#xff0c;称得每块碎片的重量都是整磅数&#xff0c;而且可以用这4 块来称从1 至40 磅之间的任意整数磅的重物。问这4 块砝码片各重多少&…...

离散Hopfield神经网络的分类——高校科研能力评价

离散Hopfield网络离散Hopfield网络是一种经典的神经网络模型&#xff0c;它的基本原理是利用离散化的神经元和离散化的权值矩阵来实现模式识别和模式恢复的功能。它最初由美国物理学家John Hopfield在1982年提出&#xff0c;是一种单层的全连接神经网络&#xff0c;被广泛应用于…...

Retrofit核心源码分析(三)- Call逻辑分析和扩展机制

在前面的两篇文章中&#xff0c;我们已经对 Retrofit 的注解解析、动态代理、网络请求和响应处理机制有了一定的了解。在这篇文章中&#xff0c;我们将深入分析 Retrofit 的 Call 逻辑&#xff0c;并介绍 Retrofit 的扩展机制。 一、Call 逻辑分析 Call 是 Retrofit 中最基本…...

源码分析spring如和对@Component注解进行BeanDefinition注册的

Spring ioc主要职责为依赖进行处理&#xff08;依赖注入、依赖查找&#xff09;、容器以及托管的(java bean、资源配置、事件)资源声明周期管理&#xff1b;在ioc容器启动对元信息进行读取&#xff08;比如xml bean注解等&#xff09;、事件管理、国际化等处理&#xff1b;首先…...

C语言--字符串函数1

目录前言strlenstrlen的模拟实现strcpystrcatstrcat的模拟实现strcmpstrcmp的模拟实现strncpystrncatstrncmpstrstrstrchr和strrchrstrstr的模拟实现前言 本章我们将重点介绍处理字符和字符串的库函数的使用和注意事项。 strlen 我们先来看一个我们最熟悉的求字符串长度的库…...

Webstorm使用、nginx启动、FinalShell使用

文章目录 主题设置FinalShellFinalShell nginx 启动历史命令Nginx页面发布配置Webstorm的一些常用快捷键代码生成字体大小修改Webstorm - gitCode 代码拉取webstorm 汉化webstorm导致CPU占用率高方法一 【忽略node_modules】方法二 【设置 - 代码编辑 - 快速预览文档 - 关闭】主…...

源码分析Spring @Configuration注解如何巧夺天空,偷梁换柱。

前言 回想起五年前的一次面试&#xff0c;面试官问Configuration注解和Component注解有什么区别&#xff1f;记得当时的回答是&#xff1a; 相同点&#xff1a;Configuration注解继承于Component注解&#xff0c;都可以用来通过ClassPathBeanDefinitionScanner装载Spring bean…...

vector的使用及模拟实现

目录 一.vector的介绍及使用 1.vector的介绍 2.vector的使用 1.vector的定义 2.vector iterator的使用 3. vector 空间增长问题 4.vector 增删查改 3.vector 迭代器失效问题&#xff08;重点&#xff09; 1. 会引起其底层空间改变的操作 2.指定位置元素的删除操作--erase 3. Li…...

“华为杯”研究生数学建模竞赛2007年-【华为杯】A题:基于自助法和核密度估计的膳食暴露评估模型(附获奖论文)

赛题描述 我国是一个拥有13亿人口的发展中国家,每天都在消费大量的各种食品,这批食品是由成千上万的食品加工厂、不可计数的小作坊、几亿农民生产出来的,并且经过较多的中间环节和长途运输后才为广大群众所消费,加之近年来我国经济发展迅速而环境治理没有能够完全跟上,以…...

刷题(第三周)

目录 [CISCN2021 Quals]upload [羊城杯 2020]EasySer [网鼎杯 2020 青龙组]notes [SWPU2019]Web4 [Black Watch 入群题]Web [HFCTF2020]BabyUpload [CISCN2021 Quals]upload 打开界面以后&#xff0c;发现直接给出了源码 <?php if (!isset($_GET["ctf"]))…...

新C++(14):移动语义与右值引用

当你在学习语言的时候&#xff0c;是否经常听到过一种说法,""左边的叫做左值&#xff0c;""右边的叫做右值。这句话对吗&#xff1f;从某种意义上来说&#xff0c;这句话只是说对了一部分。---前言一、什么是左右值?通常认为:左值是一个表示数据的表达式(…...

TCP相关概念

目录 一.滑动窗口 1.1概念 1.2滑动窗口存在的意义 1.3 滑动窗口的大小变化 1.4丢包问题 二.拥塞控制 三.延迟应答 四.捎带应答 五.面向字节流 六.粘包问题 七.TIME_WAIT状态 八.listen第2个参数 九.TCP总结 一.滑动窗口 1.1概念 概念&#xff1a;双方在进行通信时&a…...

MySQL锁篇

MySQL锁篇 一、一条update语句 我们的故事继续发展&#xff0c;我们还是使用t这个表&#xff1a; CREATE TABLE t (id INT PRIMARY KEY,c VARCHAR(100) ) EngineInnoDB CHARSETutf8;现在表里的数据就是这样的&#xff1a; mysql> SELECT * FROM t; —------- | id | c | —…...

SWF (Simple Workflow Service)简介

Amazon Simple Workflow Service (Amazon SWF) 提供了给应用程序异步、分布式处理的流程工具。 SWF可以用在媒体处理、网站应用程序后端、商业流程、数据分析和一系列定义好的任务上。 举个例子&#xff0c;下图表明了一个电商网站的工作流程&#xff0c;其中涉及了程序执行的…...

java(Class 常用方法 获取Class对象六种方式 动态和静态加载 类加载流程)

ClassClass常用方法获取Class对象六种方式哪些类型有Class对象动态和静态加载类加载流程加载阶段连接阶段连接阶段-验证连接阶段-准备连接阶段-解析初始化阶段获取类结构信息Class常用方法 第一步&#xff1a;创建一个实体类 public class Car {public String brand "宝…...

【数据结构】线性表和顺序表

Yan-英杰的主页 悟已往之不谏 知来者之可追 目录 1.线性表 2.顺序表 2.1 静态顺序表 2.2 动态顺序表 2.3移除元素 1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线…...

Ubuntu数据库安装(mysql)

##1.下载mysql-apt-config_0.8.22-1_all.deb并且安装 wget https://dev.mysql.com/get/mysql-apt-config_0.8.22-1_all.deb sudo dpkg -i mysql-apt-config_0.8.22-1_all.deb##2.更新apt-updata sudo apt update##3.如果出现如下图情况执行以下命令 [外链图片转存失败,源站可…...

wordpress get_most_viewed/合肥网站建设公司

目录 一、死锁 二、死锁举例 三、避免死锁 一、死锁 死锁&#xff1a; 线程等待某个资源&#xff0c;导致线程被无限期地阻塞 二、死锁举例 例如&#xff1a;不适当地运用“synchronized”关键词。如果线程A锁住strA &#xff0c;线程B锁住strB&#xff0c;线程A就没办法…...

监管网站建设情况/做营销型网站哪家好

本文列举了用于人脸识别的免费经典数据集&#xff0c;并给出各数据来源及特点。 1.FERET人脸数据库 -http://www.nist.gov/itl/iad/ig/colorferet.cfm 由FERET项目创建,包含1万多张多姿态和光照的人脸图像,是人脸识别领域应用最广泛的人脸数据库之一.其中的多数人是西方人,每个…...

重庆邮电大学官网网站/seo知识分享

安装文档&#xff1a;http://docs.jumpserver.org/zh/docs/step_by_step.html 1、Jumpserver 是一款由Python编写开源的跳板机(堡垒机)系统&#xff0c;实现了跳板机应有的功能。基于ssh协议来管理&#xff0c;客户端无需安装agent。 特点&#xff1a; 完全开源&#xff0c;GPL…...

手机端网站开发页/什么软件可以排名次

请用程序实现 输入一段仅由英文字母、空格组成的文本&#xff0c;并通过split()方法统计这段文本中的单词数量&#xff0c;并将统计结果输出。 def word_len(text):return len([i for i in text.split( ) if i]) def main():text str(input("请输入字符串&#xff1a;&q…...

自己做的手机网站怎么测试/在线代理浏览网站免费

达梦7的试用 与SQLSERVER的简单技术对比 达梦数据库公司推出了他们的数据库服务管理平台&#xff0c;可以在该平台使用达梦数据库而无须安装达梦7数据库 地址&#xff1a;http://online.dameng.com/ 说实话&#xff0c;第一眼看到还是感到很高大上的&#xff0c;毕竟ORACLE、MY…...

个人soho要怎么做企业网站/网站推广优化公司

运行--cmd--regsvr32 dll的绝对路径名&#xff08;路径实例:c:\dll\xxx.dll) 转载于:https://www.cnblogs.com/systemnet123/archive/2013/06/07/3123542.html...