当前位置: 首页 > news >正文

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例:

用flink中的窗口函数(apply)读取kafka中数据,并对热词进行统计。

apply:全量聚合函数,指在窗口触发的时候才会对窗口内的所有数据进行一次计算(等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求)。

代码演示:

kafka发送消息端: 

package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer kafkaProducer = new KafkaProducer<>(properties);String[] arr = {"联通换猫","遥遥领先","恒大歌舞团","恒大足球队","郑州烂尾楼"};Random random = new Random();for (int i = 0; i < 500; i++) {ProducerRecord record = new ProducerRecord<>("topic1",arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();}
}

kafka接受消息端: 

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(tuple2 -> tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

当执行kafka接收消息端时,会报如下错误: 

 错误原因:在对kafka中数据进行KeyBy分组处理时,使用了lambda表达式

 

解决方法:

在使用KeyBy时,将函数的各种参数类型都写清楚,修改后的代码如下:

package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id", "g2");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// transformation-数据处理转换DataStream<Tuple2<String,Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String,Integer> map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 分组key    {"俄乌战争",[1,1,1,1,1]}TimeWindow window, // 窗口对象Iterable<Tuple2<String, Integer>> input, // 分组key在窗口的所有数据Collector<String> out  // 用于输出) throws Exception {long start = window.getStart();long end = window.getEnd();// lang3 包下的工具类String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");int sum = 0;for(Tuple2<String,Integer> tuple2: input){sum += tuple2.f1;}out.collect(key +"," + startStr +","+endStr +",sum="+sum);}}).print();//5. execute-执行env.execute();}
}

相关文章:

Flink——进行数据转换时,报:Recovery is suppressed by NoRestartBackoffTimeStrategy

热词统计案例&#xff1a; 用flink中的窗口函数&#xff08;apply&#xff09;读取kafka中数据&#xff0c;并对热词进行统计。 apply:全量聚合函数&#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算&#xff08;等窗口的数据到齐&#xff0c;才开始进行聚合…...

技能之发布自己的依赖到npm上

目录 开始 解决 步骤一&#xff1a; 步骤二&#xff1a; 步骤三&#xff1a; 运用 一直以为自己的项目在github上有了&#xff08;之传了github&#xff09;就可以进行npm install下载&#xff0c;有没有和我一样萌萌的同学。没事&#xff0c;萌萌乎乎的不犯罪。 偶然的机…...

COMSOL工作站:配置指南与性能优化

COMSOL Multiphysics 求解的问题类型相当广泛&#xff0c;提供了仿真单一物理场以及灵活耦合多个物理场的功能&#xff0c;供工程师和科研人员来精确分析各个工程领域的设备、工艺和流程。 软件内置的#模型开发器#包含完整的建模工作流程&#xff0c;可实现从几何建模、材料参数…...

Qt导出Excel图表

目的 就是利用Qt导出Excel图表,如果直接画Excel 图表&#xff0c;比较麻烦些&#xff0c;代码写得也复杂了&#xff1b;而直接利用Excel模块就简单了&#xff0c;图表在模块当中已经是现成的了&#xff0c;Qt程序只更改数据就可以了&#xff0c;这篇文章就是记录一下利用模块上…...

分布式协同 - 分布式系统的特性与互斥问题

文章目录 导图概述分布式系统的特性与挑战分布式互斥算法的目标分布式互斥算法集中互斥算法集中互斥算法示意图集中互斥算法流程 基于许可的互斥算法Lamport 算法示意图Lamport 流程 令牌环互斥算法令牌环互斥算法示意图 1. 集中互斥算法&#xff08;Centralized Mutual Exclus…...

windows安装itop

本文介绍 win10 安装 itop 安装WAMP集成环境前 先安装visual c 安装itop前需要安装WAMP集成环境(windowsApacheMysqlPHP) 所需文件百度云盘 通过网盘分享的文件&#xff1a;itop.zip 链接: https://pan.baidu.com/s/1D5HrKdbyEaYBZ8_IebDQxQ 提取码: m9fh 步骤一&#xff1…...

LAMP环境的部署

一、软件安装介绍 在Linux系统中安装软件有rpm安装、yum安装、源码安装等方法&#xff0c;在这里主要给大家介绍 yum 安装&#xff0c;这是一种最简单方便的一种安装方法。 YUM&#xff08;Yellow dog Upadate Modifie&#xff09;是改进版的 RPM 管理器&#xff0c;很好地解…...

Go语言压缩文件处理

目录 Go 语言压缩文件处理1. 压缩文件&#xff1a;Zip函数2. 解压文件&#xff1a;UnZip 函数3. 小结 Go 语言压缩文件处理 在现代的应用开发中&#xff0c;处理压缩文件&#xff08;如 .zip 格式&#xff09;是常见的需求。Go 语言提供了内置的 archive/zip 包来处理 .zip 文…...

rocylinux9.4安装prometheus监控

一.上传软件包 具体的软件包如下&#xff0c;其中kubernetes-mixin是下载的监控kubernetes的一些监控规则、dashbaordd等。 二.Prometheus配置 1.promethes软件安装 #解压上传后的软件包 [rootlocalhost ] cd /opt [rootlocalhost opt]# tar xf prometheus-2.35.3.linux-amd…...

屏幕分辨率|尺寸|颜色深度指纹

一、前端通过window.screen接口获取屏幕分辨率 尺寸 颜色深度&#xff0c;横屏竖屏信息。 二、window.screen c接口实现&#xff1a; 1、third_party\blink\renderer\core\frame\screen.idl // https://drafts.csswg.org/cssom-view/#the-screen-interface[ExposedWindow ] …...

docker-elasticsearch-kibana-logstash

一、安装 Elasticsearch 尝试直接拉取 Elasticsearch 镜像&#xff1a; 执行 docker pull docker.elastic.co/elasticsearch/elasticsearch&#xff0c;拉取失败&#xff0c;错误提示为 “Error response from daemon: manifest for docker.elastic.co/elasticsearch/elasticse…...

C#设计模式——抽象工厂模式(重点)

文章目录 项目地址一、抽象工厂模式1.1 特性1.2 使用反射获取特性标记的类1.3 完整代码 项目地址 教程作者&#xff1a;教程地址&#xff1a; 代码仓库地址&#xff1a; 所用到的框架和插件&#xff1a; dbt airflow一、抽象工厂模式 工厂方法模式依然存在一个问题就是&…...

全新AI模型家族登场:完全可复现的开源语言模型OLMo 2

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…...

用Matlab和SIMULINK实现DPCM仿真和双边带调幅系统仿真

1、使用SIMULINK或Matlab实现DPCM仿真 1.1 DPCM原理 差分脉冲编码调制&#xff0c;简称DPCM&#xff0c;主要用于将模拟信号转换为数字信号&#xff0c;同时减少数据的冗余度以实现数据压缩。在DPCM中&#xff0c;信号的每个抽样值不是独立编码的&#xff0c;而是通过预测前一…...

RabbitMQ的交换机总结

1.direct交换机 2.fanout交换机...

Android so库的编译

在没弄明白so库编译的关系前,直接看网上博主的博文,常常会觉得云里雾里的,为什么一会儿通过Android工程cmake编译,一会儿又通过NDK命令去编译。两者编译的so库有什么区别? android版第三方库编译总体思路: 对于新手小白来说搞明白上面的总体思路图很有必…...

2024年底-Arch linux或转为0BSD许可证!

原文&#xff1a;https://archlinux.org/news/providing-a-license-for-package-sources/ 解读&#xff1a;Arch Linux社区通过RFC 40达成共识&#xff0c;决定将所有软件包源代码更改为0BSD许可证。 0BSD许可证是什么&#xff1f;&#xff1a;这是一个非常自由的开源许可证&a…...

深入解析音视频流媒体SIP协议交互过程

一、引言 在音视频流媒体传输过程中&#xff0c;SIP&#xff08;Session Initiation Protocol&#xff09;协议发挥着举足轻重的作用。本文将详细全面地介绍音视频流媒体传输中的SIP协议&#xff0c;包括其基本概念、交互过程、关键信令以及应用场景 二、SIP协议基本概念 1.…...

linux安装mysql8.0.40

一、下载MySQL安装包 1.查看glibc版本 rpm -qa | grep glibc 2.到mysql官网下载安装包 ​ 二、解压安装 1.上传压缩包纸/usr/local 目录下&#xff0c;解压&#xff1a; tar -xvf mysql-8.0.40-linux-glibc2.17-x86_64.tar.xz 2.重命名&#xff1a; mv mysql-8.0.40-linux-…...

Java基础之控制语句:开启编程逻辑之门

一、Java控制语句概述 Java 中的控制语句主要分为选择结构、循环结构和跳转语句三大类&#xff0c;它们在程序中起着至关重要的作用&#xff0c;能够决定程序的执行流程。 选择结构用于根据不同的条件执行不同的代码路径&#xff0c;主要包括 if 语句和 switch 语句。if 语句有…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

从WWDC看苹果产品发展的规律

WWDC 是苹果公司一年一度面向全球开发者的盛会&#xff0c;其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具&#xff0c;对过去十年 WWDC 主题演讲内容进行了系统化分析&#xff0c;形成了这份…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...