当前位置: 首页 > news >正文

Flink+Kafka消费

引入jar

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.8.0</version>
</dependency>
<!-- flink整合kafka_2.11 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version>
</dependency>

二、处理逻辑

//2、定义环境 => Env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(9);
env.enableCheckpointing(1000);FlinkKafkaConsumer<String> consumer = this.getConsumer();//调用下面的方法获取数据源
consumer.setStartFromLatest();//消费最新数据//2、绑定数据源=> resource
DataStream<String> stream = env.addSource(consumer);//3、批量读取的方法=>
stream.timeWindowAll(Time.milliseconds(500)) //timeWindowAll:时间滚动窗口,滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠.apply(new ReadKafkaFlinkWindowFunction())//使用自己定义的apply来收集.addSink(new KafkaBatchSink());//批量的sink方法
env.execute();

2、定义消费者,并且将消费者consumer转成FlinkKafkaConsumer

public FlinkKafkaConsumer<String> getConsumer(){//定义消费者信息Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.131.147:9092");properties.put("group.id", "flink-consumer-kafka-group");properties.put("auto.offset.reset", "latest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), properties);return consumer;
}

3、收集数据ReadKafkaFlinkWindowFunction的实现类

4、KafkaBatchSink的实现逻辑

总结

分布式处理引擎Flink使用至少一个【job】调度和至少一个【task】实现分布式处理

有界:就是指flink【消费指定范围内】的数据。例如我定义某个作业间隔时间为0.5秒,则flink已0.5秒为界,进行数据处理。有界数据用在离线数据的处理场景较多

无界:就是指flink始终【监听数据源】里的数据,获取到就处理。无界数据往往用在【实时数据】处理下的场景较多。

我这里结合我们项目的场景来给各位说一下该选那种处理。我们的场景为:

1:尽量支持最多的数据落地
2:数据必须要准确。所以我们最终了有界处理,将flink的界限设置为0.5秒,0.5秒内收集的所有数据整体使用一个算
子消费。保证数据的准确和消费高效性。

1、一定要有抛出异常的机制

我们都知道抛出异常会终止消费,但是为什么要抛出异常呢?这注意是因为如果用户不抛出异常的话,flink会认为当前的数据时正常消费的,这就造成了我们的kafka数据误消费

2、关于并行度parallelism

并行度的配置都是setParallelism,对于env和stream来说,stream的优先级比env高

3、关于checkpoint

我们如果定义程序运行在SPring Boot时,一定要配置检查点这个是flink实现容错的核心配置!

4、关于并行度

我们在设置并行度的时候,将里边的数字设置为多少,最终就会有多少个线程来执行任务。
所以大家一定要清楚对于数据准确性高的数据来说,宁愿牺牲多线程带来的效率提升也要只设置一个线程来执行消费。
可能大家没有注意,如果你不设置flink的并行度为1时。它是以的是系统的线程数来作为并行度!这样顺序是会乱的。

5、saveBatch很好

但是我建议你先封装一下或者改为批量的保存。可能大家都知道或者说都用过mybatis plus的saveBatch,它能将一个列表的inseert封装为一条sql(insert into a values(a1),(a2),(a3),但是我们一条sql的长度过长的话会存在性能问题。建议在批量处理的时候每隔1000条记录saveBatch一次

为什么flink消费kafka比官方的listener都要快

1、并行度和分区处理: Flink 具有高度的并行度支持

可以为每个 Kafka 分区创建独立的消费者实例,以便并行地处理多个分区。这使得 Flink能够更有效地利用资源,并提高整体的消费速度。相比之下,一些官方 Kafka Consumer 实现可能没有明确的并行度配置或并行处理策略。

2、事件时间处理

Flink 强调【事件时间】处理,支持按照事件的实际发生时间进行【有序处理】。这对于一些需要处理时间相关业务逻辑的应用来说很重要。Flink 可以轻松处理乱序事件,并确保事件按照正确的顺序进行处理。官方 Kafka Consumer 也提供了类似的功能,但 Flink在这方面的设计更加深入和专注。

状态管理

Flink 提供了强大的状态管理机制,对于需要在处理过程中保持状态的任务,这一点非常重要。在消费 Kafka消息时,可能需要追踪某些状态,例如记录已处理的偏移量。Flink 的状态管理可以更好地支持这种场景,而官方 Kafka Consumer可能没有提供类似的状态管理机制。

异步处理模型:

Flink 使用异步处理模型,这使得在处理一条消息时,可以同时进行其他处理而无需等待。这有助于提高整体的处理效率。

相关文章:

Flink+Kafka消费

引入jar <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.0</version> </dependency> <dependency><groupId>org.apache.flink</groupId><artifactI…...

Seconds_Behind_Master越来越大,主从同步延迟

问题现象 发现从库mysql_slave的参数Seconds_Behind_Master越来越大。已排除主从服务器时间不一致&#xff1b;那么主要就判断两点&#xff1a;是io thread慢还是 sql thread慢&#xff1f;先观察show slave status\G 。 判断3个参数&#xff08;参数后面的值是默认空闲时候的…...

除法求值[中等]

一、题目 给你一个变量对数组equations和一个实数值数组values作为已知条件&#xff0c;其中equations[i] [Ai, Bi]和values[i]共同表示等式Ai / Bi values[i]。每个Ai或Bi是一个表示单个变量的字符串。另有一些以数组queries表示的问题&#xff0c;其中queries[j] [Cj, Dj…...

新时代商业市场:AR技术的挑战与机遇并存

随着科技的不断发展&#xff0c;增强现实&#xff08;AR&#xff09;技术逐渐成为当今社会的一个重要组成部分。AR技术能够将虚拟世界与现实世界相结合&#xff0c;为人们提供更加丰富、多样化的体验。在新时代的社会商业市场中&#xff0c;AR技术也正逐渐被应用于各种商业活动…...

RHEL8中ansible的使用

编写ansible.cfg和清单文件ansible的基本用法 本章实验三台RHEL8系统&#xff08;rhel801&#xff0c;rhel802&#xff0c;rhel803&#xff09;&#xff0c;其中rhel801是ansible主机 这里要确保ansible主机能够解析所有被管理的机器&#xff0c;这里通过配置/etc/hosts来实现…...

【1.6计算机组成与体系结构】存储系统

目录 1.层次化存储结构2.Cache2.1 Cache的介绍2.2 局部性原理2.3 Cache应用 1.层次化存储结构 由 ⬆ CPU&#xff1a;寄存器。 快 ⬆ Cache&#xff1a;按内容存取(相联存储器)。 到 ⬆内存&#xff08;主存&#xff09;&#xff1a;DRAM。 慢 ⬆ 外存&#xff08;辅存&#…...

TCP/UDP 协议

目录 一.TCP协议 1.介绍 2.报文格式 ​编辑 确认号 控制位 窗口大小 3.TCP特性 二.TCP协议的三次握手 1.tcp 三次握手的过程 三.四次挥手 2.有限状态机 四.tcp协议和udp协议的区别 五.udp协议 UDP特性 六.telnet协议 一.TCP协议 1.介绍 TCP&#xff08;Transm…...

如何正确理解和使用 Golang 中 nil ?

目录 指针中的 nil 切片中的 nil map 中的 nil 通道中的 nil 函数中的 nil 接口中的 nil 避免 nil 相关问题的最佳实践 小结 在 Golang 中&#xff0c;nil 是一个预定义的标识符&#xff0c;在不同的上下文环境中有不同的含义&#xff0c;但通常表示“无”、“空”或“…...

IDEA新建jdk8 spring boot项目

今天新建spring boot项目发现JDK版本最低可选17。 但是目前用的最多的还是JDK8啊。 解决办法 Server URL中设置&#xff1a; https://start.aliyun.com/设置完成后&#xff0c;又可以愉快的用jdk8创建项目了。 参考 https://blog.csdn.net/imbzz/article/details/13469117…...

Qt/C++音视频开发59-使用mdk-sdk组件/原qtav作者力作/性能凶残/超级跨平台

一、前言 最近一个月一直在研究mdk-sdk音视频组件&#xff0c;这个组件是原qtav作者的最新力作&#xff0c;提供了各种各样的示例demo&#xff0c;不仅限于支持C&#xff0c;其他各种比如java/flutter/web/android等全部支持&#xff0c;性能上也是杠杠的&#xff0c;目前大概…...

智安网络|企业网络安全工具对比:云桌面与堡垒机,哪个更适合您的需求

随着云计算技术的快速发展&#xff0c;越来越多的企业开始采用云计算解决方案来提高效率和灵活性。在云计算环境下&#xff0c;云桌面和堡垒机被广泛应用于企业网络安全和办公环境中。尽管它们都有助于提升企业的安全和效率&#xff0c;但云桌面和堡垒机在功能和应用方面存在着…...

Git忽略已经提交的文件

原理类似于 Android修改submodule的lib包名...

MVVM和MVC以及MVP的原理以及它们的区别

MVVM、MVC 和 MVP 都是前端架构模式&#xff0c;它们各自有不同的原理和特点。 MVC&#xff08;Model-View-Controller&#xff09; 原理&#xff1a;MVC 将应用程序分为三个部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&a…...

WeChatMsg: 导出微信聊天记录 | 开源日报 No.108

Mozilla-Ocho/llamafile Stars: 3.5k License: NOASSERTION llamafile 是一个开源项目&#xff0c;旨在通过将 lama.cpp 与 Cosmopolitan Libc 结合成一个框架&#xff0c;将 LLM (Large Language Models) 的复杂性折叠到单个文件可执行程序中&#xff0c;并使其能够在大多数…...

Python学习之复习MySQL-Day3(DQL)

目录 文章声明⭐⭐⭐让我们开始今天的学习吧&#xff01;DQL简介基本查询查询多个/全部字段设置别名去除重复记录 条件查询条件查询介绍实例演示 聚合函数什么是聚合函数&#xff1f;常见的聚合函数实例演示 分组查询分组查询语法where 和 having 的区别实例演示 排序查询语法实…...

AI超级个体:ChatGPT与AIGC实战指南

目录 前言 一、ChatGPT在日常工作中的应用场景 1. 客户服务与支持 2. 内部沟通与协作 3. 创新与问题解决 二、巧用ChatGPT提升工作效率 1. 自动化工作流程 2. 信息整合与共享 3. 提高决策效率 三、巧用ChatGPT创造价值 1. 优化产品和服务 2. 提高员工满意度和留任率…...

SpringBoot集成websocket(5)|(使用OkHttpClient实现websocket以及详细介绍)

SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09; 文章目录 SpringBoot集成websocket&#xff08;5&#xff09;|&#xff08;使用OkHttpClient实现websocket以及详细介绍&#xff09;[TOC] 前言一、初始…...

Kafka-Kafka基本原理与集群快速搭建(实践)

Kafka单机搭建 下载Kafka Apache Download Mirrors 解压 tar -zxvf kafka_2.12-3.4.0.tgz -C /usr/local/src/software/kafkakafka内部bin目录下有个内置的zookeeper(用于单机) 启动zookeeper&#xff08;在后台启动&#xff09; nohup bin/zookeeper-server-start.sh conf…...

Elasticsearch 进阶(索引、类型、字段、分片、副本、集群等详细说明)-06

笔记来源&#xff1a;Elasticsearch Elasticsearch进阶 进阶-核心概念 索引Index 一个索引就是一个拥有几分相似特征的文档的集合。比如说&#xff0c;你可以有一个客户数据的索引&#xff0c;另一个产品目录的索引&#xff0c;还有一个订单数据的索引。一个索引由一个名字…...

hive的分区表和分桶表详解

分区表 Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录&#xff0c;每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区&#xff0c;这样的查询效率会提高很多。 静态分区表基本语法 创建分区表 create table dept_p…...

verilog语法进阶-分布式ram

概述: FPGA的LUT查找表是用RAM设计的&#xff0c;所以LUT可以当成ram来使用&#xff0c;也并不是所有的LUT都可以当成ram来使用&#xff0c;sliceM的ram可以当成分布式ram来使用&#xff0c;而sliceL的ram只能当成rom来使用&#xff0c;也就是只能读&#xff0c;不能写&#x…...

HarmonyOS使用HTTP访问网络

HTTP数据请求 1 概述 日常生活中我们使用应用程序看新闻、发送消息等&#xff0c;都需要连接到互联网&#xff0c;从服务端获取数据。例如&#xff0c;新闻应用可以从新闻服务器中获取最新的热点新闻&#xff0c;从而给用户打造更加丰富、更加实用的体验。 那么要实现这样一种…...

GZ015 机器人系统集成应用技术样题1-学生赛

2023年全国职业院校技能大赛 高职组“机器人系统集成应用技术”赛项 竞赛任务书&#xff08;学生赛&#xff09; 样题1 选手须知&#xff1a; 本任务书共 25页&#xff0c;如出现任务书缺页、字迹不清等问题&#xff0c;请及时向裁判示意&#xff0c;并进行任务书的更换。参赛队…...

计算机毕业设计 基于SpringBoot的日常办公用品直售推荐系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

uniapp:使用fixed定位,iOS平台的安全区域问题解决

manifest.json > 添加节点 "safearea": { //iOS平台的安全区域"background": "#1C1E22","backgroundDark": "#1C1E22", // HX 3.1.19支持"bottom": {"offset": "auto"} },已解决&#xff…...

三层交换机原理与配置

文章目录 三层交换机原理与配置一、三层交换技术概述二、传统的 MLS三、基于CEF 的MLS1、转发信息库&#xff08;FIB&#xff09;2、邻接关系表3、工作原理&#xff1a; 四、三层交换机的配置1、三层交换机配置命令2、三层交换机配置步骤 三层交换机原理与配置 一、三层交换技…...

Linux-----5、文件系统

# 文件系统 # 终端的基本操作 ㈠ 打开多个终端 ㈡ 快速清屏 新建标签&#xff1a;command T 新建窗口&#xff1a;command N 关闭标签&#xff1a;command Q 关闭窗口&#xff1a;command W 放大&#xff1a;command 缩小&#xff1a;command - 清屏&#xff…...

电脑自动关机怎么设置?

电脑自动关机怎么设置&#xff1f;如果你是一名上班族&#xff0c;工作忙起来很多事情都会忘记做&#xff0c;有时候忙到很晚后紧急下班&#xff0c;就会忘记给电脑关机&#xff0c;电脑如果经常不关机&#xff0c;那么电脑就会超负荷的运转&#xff0c;大家都知道电脑的寿命是…...

MS5602视频 8 位数模转换器,可替代TLC5602

MS5602 是低功率、超高速视频数模转换器。 MS5602 以 DC 至 20MHz 的采样速率&#xff0c;将数字信号转换成模拟信号。由于高速工作 的特性&#xff0c; MS5602 适合于数字电视、电脑视频处理及雷达信号处 理等数字视频应用。 MS5602 工作在 -40C 至 85C 的温度范围内 …...

Logistic Regression——逻辑回归

1. 为什么需要逻辑回归 在前面学习的线性回归中&#xff0c;我们的预测值都是任意的连续值&#xff0c;例如预测房价。除此之外&#xff0c;还有一个常见的问题就是分类问题&#xff0c;而逻辑回归是一个解决分类问题的模型&#xff0c;其预测值是离散的。 分类问题又包括…...

国贸行业 网站建设/百度seo排名优化系统

/********************************************************************* Redmine 数据库连接错误* 说明&#xff1a;* OpenMediaVault上的Redmine出现连接错误&#xff0c;目前不知道是我自己不小心* 把mysql的密码修改了&#xff0c;还是因为被攻击…...

做网站经费/线上引流的八种推广方式

苹果开发的M1处理器在性能方面已超越了Intel的i7&#xff0c;这代表着ARM阵营在PC处理器市场的重大突破&#xff0c;然而M1处理器能有如此成就却要感谢早已在1990年代末消失的DEC公司。DEC公司是上世纪最伟大的计算机公司之一&#xff0c;它是一家历史最悠久、规模庞大的计算机…...

南宁网络推广培训机构/seo内容优化是什么

综合性实验报告课程名称&#xff1a; 《C语言程序设计》实验题目&#xff1a; 学生成绩管理系统的设计与实现 姓 名&#xff1a; 学 号&#xff1a; 系 别&#xff1a; 专业班级&#xff1a; 指导教师&#xff1a;实验日期&#xff1a; 2012年6月1日—6月20日一、实验目的和要求…...

做问卷兼职有哪些网站/优化标题关键词技巧

学习目标&#xff1a; 1. 了解什么是IDE&#xff1f;为什么使用IDE&#xff1f;常用的PythonIDE有哪些&#xff1f; 2. 了解并安装Pycharm集成开发环境,运行上节课的Python程序。 学习重难点&#xff1a; 可以使用Pycharm运行Python程序。 学习内容&#xff1a; 1.什么是IDE&am…...

常州网站公司网站/关键词搜索排名软件

介绍 Base64编码是一种“防君子不防小人”的编码方式。广泛应用于MIME协议&#xff0c;作为电子邮件的传输编码&#xff0c;生成的编码可逆&#xff0c;后一两位可能有“”&#xff0c;生成的编码都是ascii字符。 优点&#xff1a;速度快&#xff0c;ascii字符&#xff0c;肉眼…...

对于网站建设的调查问卷/考研培训班哪个机构比较好

有时在A页面点击按钮弹出一个form表单&#xff0c;在填完表单后提交成功后&#xff0c;需要关闭表单页并将表单中的某些值反应在A页面上&#xff0c;这时就需要异步提交表单。其实也挺简单&#xff0c;只是需要把表单数据序列化。$("#form1").submit(function (){ va…...