当前位置: 首页 > news >正文

flink学习(6)——自定义source和kafka

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {// ctrl + oprivate final Random random = new Random();private boolean flag = true;// 现在不用@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("实现一些资源的开启");}// 现在不用@Overridepublic void close() throws Exception {System.out.println("实现一些资源的关闭");}@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {while (flag){String stu_id = UUID.randomUUID().toString();String stu_name = "Student_"+stu_id;int stu_age = random.nextInt(8)+10;long stu_timestamp = System.currentTimeMillis();Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道@Overridepublic void cancel() {flag = false;System.out.println("停止运行");}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// add + new DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());int parallelism = studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}

cancel+open+close的调用时机

package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度(8个cpu)studentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute = env.execute();JobClient flink_job = env.executeAsync("Flink Job");Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
}

 kafkaSource

package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");// consumerFlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);// sourceDataStreamSource<String> dataStreamSource = env.addSource(consumer);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}}).print();env.execute();}
}

相关文章:

flink学习(6)——自定义source和kafka

概述 SourceFunction:非并行数据源(并行度只能1) --接口 RichSourceFunction:多功能非并行数据源(并行度只能1) --类 ParallelSourceFunction:并行数据源(并行度能够>1) --接口 RichParallelSourceFunction:多功能并行数据源(并行度能够>1) --类 【建议使用的】 ——…...

开发常见问题及解决

1.DBeaver 报Public Key Retrieval is not allowed 在使用DBeaver连接数据库时出现“Public Key Retrieval is not allowed”错误&#xff0c;主要是因为数据库连接配置的安全策略导致的。以下是详细的解释和解决方法&#xff1a; 错误原因 这个错误通常出现在连接MySQL数据…...

python excel接口自动化测试框架!

今天采用Excel继续写一个接口自动化测试框架。 设计流程图 这张图是我的excel接口测试框架的一些设计思路。 首先读取excel文件&#xff0c;得到测试信息&#xff0c;然后通过封装的requests方法&#xff0c;用unittest进行测试。 其中&#xff0c;接口关联的参数通过正则进…...

mybatis:You have an error in your SQL syntax;

完整报错You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near false, false, false, false, false, false, false, false, false, false, false, at line 1 SQL: INSERT INTO user …...

使用 Maven 开发 IntelliJ IDEA 插件

使用 Maven 开发 IntelliJ IDEA 插件的完整流程 1. 创建 Maven 项目 1.1 使用 IntelliJ 创建 Maven 项目 打开 IntelliJ IDEA&#xff0c;点击 File > New > Project。选择 Maven&#xff0c;填写项目名称和 GroupId&#xff0c;例如&#xff1a; GroupId: com.exampl…...

Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测

漏洞说明&#xff1a; 打开链接&#xff1a;https://docs.microsoft.com/zh-cn/troubleshoot/windows-server/windows-security/restrict-cryptographic-algorithms-protocols-schannel 可以看到&#xff1a; 找到&#xff1a;应通过配置密码套件顺序来控制 TLS/SSL 密码 我们…...

uniapp生命周期:应用生命周期和页面生命周期

文章目录 1.应用的生命周期2.页面的生命周期 1.应用的生命周期 生命周期的概念&#xff1a;一个对象从创建、运行、销毁的整个过程被称为生命周期 生命周期函数&#xff1a;在生命周期中每个阶段会伴随着每一个函数的出发&#xff0c;这些函数被称为生命周期函数 所有页面都…...

基于SSM的婴幼儿用品商城系统+LW示例参考

1.项目介绍 功能模块&#xff1a;管理员&#xff08;产品管理、产品分类、会员管理、订单管理、秒杀活动、文章管理、数据统计等&#xff09;、普通用户&#xff08;登录注册、个人中心、购物车、我的收藏、各类信息查看等&#xff09;技术选型&#xff1a;SSM&#xff0c;jsp…...

【工具变量】城市供应链创新试点数据(2007-2023年)

一、测算方式&#xff1a;参考C刊《经济管理》沈坤荣和乔刚老师&#xff08;2024&#xff09;的做法&#xff0c;使用“供应链创新与应用试点”的政策虚拟变量&#xff08;TreatPost&#xff09;表征。若样本城市为试点城市&#xff0c;则赋值为 1&#xff0c;否则为 0&#xf…...

【carla生成车辆时遇到的问题】carla显示的坐标和carlaworld中提取的坐标y值相反

项目需要重新运行了一下generate_car.py的脚本&#xff0c;发现死活生成不了&#xff0c;研究了半天&#xff0c;发现脚本里面生成车辆的坐标值y和carla_ros_bridge_with_example_ego_vehicle.launch脚本打开的驾驶操控界面里面的y值正好是相反数! y1-y2 因为&#xff0c;我运行…...

Jira使用笔记二 ScriptRunner 验证问题创建角色

背景 最近在对公司Jira工作流改造&#xff0c;收到这么一个要求&#xff1a;某些问题类型只有某些角色可以创建。本来是想通过Jira内建的权限控制来处理的。结果点到权限页面&#xff0c;心都凉透了。 好吧&#xff0c;那只能上脚本了。最终使用ScriptRunner的Simple scripte…...

Java线程的使用

Java中的线程是用来实现多任务并发执行的机制。在Java中&#xff0c;主要有两种方式来创建和使用线程&#xff1a;实现Runnable接口和继承Thread类。 实现Runnable接口&#xff1a; 创建一个类&#xff0c;实现Runnable接口&#xff0c;并重写run()方法。在run()方法中定义线程…...

自动化测试工具Ranorex Studio(四十三)-RANOREXPATH编辑器5

代码示例 下面的代码示例将讲解如何使用Ranorex API来编写代码模块&#xff0c;或者是使用用户代码来扩展录制的模块。 在代码中使用对象库 使用对象库等待UI元素 建立Adapter来访问更多的属性和方法 为对象库元素建立一组Adapter 使用Validate类 强制一个测试用例失败 设置aut…...

超高流量多级缓存架构设计!

文章内容已经收录在《面试进阶之路》&#xff0c;从原理出发&#xff0c;直击面试难点&#xff0c;实现更高维度的降维打击&#xff01; 文章目录 电商-多级缓存架构设计多级缓存架构介绍多级缓存请求流程负载均衡算法的选择轮询负载均衡一致性哈希负载均衡算法选择 应用层 Ngi…...

数据结构(Java)—— ArrayList

1.线性表 线性表&#xff08; linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列... 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条直线。但是在…...

实习冲刺第三十三天

102.二叉树的层序遍历 给你二叉树的根节点 root &#xff0c;返回其节点值的 层序遍历 。 &#xff08;即逐层地&#xff0c;从左到右访问所有节点&#xff09;。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[[3],[9,20],[15,7]]示例…...

Uniapp开发下拉刷新功能onPullDownRefresh/onReachBottom

文章目录 1.onPullDownRefresh2.onReachBottom 1.onPullDownRefresh 在 js 中定义 onPullDownRefresh 处理函数&#xff08;和onLoad等生命周期函数同级&#xff09;&#xff0c;监听该页面用户下拉刷新事件。 需要在 pages.json 里&#xff0c;找到的当前页面的pages节点&am…...

什么是 C++ 中的函数对象?函数对象与普通函数有什么区别?如何定义和使用函数对象?

1) 什么是 C 中的函数对象&#xff1f;它有什么特点&#xff1f; 在 C 中&#xff0c;函数对象&#xff08;也称为仿函数或 functor&#xff09;是一种重载了 operator() 的对象。这意味着这些对象可以像函数一样被调用。函数对象通常用于需要传递行为&#xff08;即代码&…...

PointNet++论文复现

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…...

【VUE】el-table表格内输入框或者其他控件规则校验实现

1、封装组件 1、规则校验一般基于form表单实现&#xff0c;因此需要给具体控件套一层form表单 新建组件input-required.vue&#xff0c;内容如下 <template><div><el-form ref"formRef" :model"form" :rules"formRules" label-…...

django开发中html继承模板样式

存在问题&#xff1a; django开发中&#xff0c;不同页面样式相同&#xff0c;如何共用一套母版&#xff0c;避免每个页面都重复写样式&#xff1b; 解决方案&#xff1a; 添加一个母版&#xff0c;如“layout.html”&#xff0c;在需要继承的位置添加{% block content %}{% e…...

MT6769/MTK6769核心板规格参数_联发科安卓主板开发板方案

MT6769安卓核心板具有集成的蓝牙、FM、WLAN和GPS模块&#xff0c;是一个高度集成的基带平台&#xff0c;结合了调制解调器和应用处理子系统&#xff0c;以支持LTE/LTE-A和C2K智能手机应用。 该芯片集成了两个工作频率高达2.0GHz的ARMCortex-A75内核、六个工作频率高达1.70GHz的…...

鸿蒙进阶篇-状态管理之@Provide与@Consume

大家好&#xff0c;这里是鸿蒙开天组&#xff0c;今天我们来学习一下状态管理中的Provide与Consume。 一、概述 嘿&#xff01;大家还记得这张图吗&#xff1f;不记得也要记得哦&#xff0c;因为这张图里的东西&#xff0c;既是高频必考面试题&#xff0c;也是实际开发中&…...

java集合及源码

目录 一.集合框架概述 1.1集合和数组 数组 集合 1.2Java集合框架体系 常用 二. Collection中的常用方法 添加 判断 删除 其它 集合与数组的相互转换 三Iterator(迭代器)接口 3.0源码 3.1作用及格式 3.2原理 3.3注意 3.4获取迭代器(Iterator)对象 3.5. 实现…...

GraphRAG访问模式和知识图谱建模

GraphRAG访问模式和知识图谱建模 GraphRAG访问模式和知识图谱建模什么是GraphRAG了解文本分块检索模式图谱建模相关概念图结构 GraphRAG访问模式和知识图谱建模 graphrag.com是一个开源项目&#xff0c;收集了围绕GraphRAG的相关资源&#xff0c;目前正在快速收集大家的投稿。深…...

TCP/IP协议攻击与防范

一、TCP/IP协议攻击介绍 1.1 Internet的结构​ LAN&#xff1a;局域网 WAN&#xff1a;广域网 WLAN&#xff1a;无线局域网 私有IP地址与公有IP地址&#xff1f; 私有地址&#xff1a;A类&#xff1a;10.0.0.0~10.255.255.255 B类&#xff1a;172.16.0.0~172.31.255.255…...

Java基于 SpringBoot+Vue的口腔管理平台(附源码+lw+部署)

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

11.26深度学习_神经网络-数据处理

一、深度学习概述 1. 什么是深度学习 ​ 人工智能、机器学习和深度学习之间的关系&#xff1a; ​ 机器学习是实现人工智能的一种途径&#xff0c;深度学习是机器学习的子集&#xff0c;区别如下&#xff1a; ​ 传统机器学习算法依赖人工设计特征、提取特征&#xff0c;而深…...

【人工智能】Python常用库-TensorFlow常用方法教程

TensorFlow 是一个广泛应用的开源深度学习框架&#xff0c;支持多种机器学习任务&#xff0c;如深度学习、神经网络、强化学习等。以下是 TensorFlow 的详细教程&#xff0c;涵盖基础使用方法和示例代码。 1. 安装与导入 安装 TensorFlow&#xff1a; pip install tensorflow…...

微信小程序按字母顺序渲染城市 功能实现详细讲解

在微信小程序功能搭建中&#xff0c;按字母渲染城市会用到多个ES6的方法&#xff0c;如reduce&#xff0c;map&#xff0c;Object.entries()&#xff0c;Object.keys() &#xff0c;需要组合熟练掌握&#xff0c;才能优雅的处理数据完成渲染。 目录 一、数据分析 二、数据处理 …...

苏州建网站/如何引流推广产品

为发挥区块链在产业变革中的重要作用&#xff0c;促进区块链和经济社会深度融合&#xff0c;加快推动区块链技术应用和产业发展&#xff0c;工信部、中央网信办发布《关于加快推动区块链技术应用和产业发展的指导意见》&#xff08;以下简称《指导意见》&#xff09;&#xff0…...

建了网站怎么装饰/营销比较好的知名公司有哪些

因为是从官方版本库做的镜像&#xff0c;所以有些权限直接从官方同步到了本地。 今天&#xff0c;有同事执行git push操作&#xff0c;报错&#xff1a; 根据网上搜索的内容&#xff0c;在gerrit.config中[auth]中添加如下内容&#xff1a; [auth]type HTTPcontributorAgreeme…...

做网站深圳/防疫优化措施

iPhone系统中的Objective-C的内存管理机制是比较灵活的&#xff0c;即可以拿来像C/C一样用&#xff0c;也可以加个AutoreleasePool让它升级为半自动化的内存管理语言。当然&#xff0c;也不能拿JAVA虚拟机中的全自动化GC来比? 引用计数是实例对象的内存回收唯一参考 引用计数(…...

公司网站开发/怎么制作个人网站

1.避免在索引列上使用NOT和&#xff01;&#xff0c;索引只能告诉我们什么存在与表中&#xff0c;不能告诉我们什么不存在表中 2.索引列上用>替代> 3.oracle采用自下而上的顺序解析where子句&#xff0c;因此表之间的连接必须放在其他where条件之前&#xff0c;那些可以过…...

要执行请求的操作_wordpress需要访问您网页服务器的权限/100%上热门文案

【数据分析】—数据预处理数据预处理数据变换数据规范化最小-最大规范化z-score规范化小数定标规范化小结数据预处理 数据变换 数据变换的目的是将数据转换成适合分析建模的形式 前提条件&#xff1a;尽量不改变原始数据的规律数据规范化 最小-最大规范化z-score规范化小数定…...

十款app软件下载入口/快速优化官网

1. translate translate要比replace要高效&#xff0c;translate支持替换多 使用translate之前必须要创建一个转换表。要创建转换表&#xff0c;可对字符串类型str调用方法maketrans。 table str.maketrans(cs, kz) # 然后执行转换 this is an incredible test.translate(tabl…...