当前位置: 首页 > news >正文

【flink】之如何消费kafka数据?

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。

 1.环境准备

确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181

2.Maven项目设置

创建一个新的Maven项目,并在pom.xml中添加以下依赖:

<dependencies>  <!-- Flink dependencies -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <!-- Kafka client dependency -->  <dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.0</version>  </dependency>  <!-- Logging -->  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.30</version>  </dependency>  
</dependencies>

注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。

3.编写Flink Kafka Consumer代码

import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  import java.util.Properties;  public class FlinkKafkaConsumerDemo {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // Kafka消费者属性  Properties props = new Properties();  props.put("bootstrap.servers", "localhost:9092");  props.put("group.id", "test-group");  props.put("enable.auto.commit", "true");  props.put("auto.commit.interval.ms", "1000");  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 创建Kafka消费者  FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(  "input-topic", // Kafka topic  new SimpleStringSchema(), // 反序列化器  props);  // 添加数据源  DataStream<String> stream = env.addSource(myConsumer);  // 数据处理  stream.map(new MapFunction<String, String>() {  @Override  public String map(String value) throws Exception {  return "Received: " + value;  }  }).print();  // 执行流程序  env.execute("Flink Kafka Consumer Example");  }  // 简单的字符串反序列化器  public static final class SimpleStringSchema implements DeserializationSchema<String> {  @Override  public String deserialize(byte[] message) throws IOException {  return new String(message, "UTF-8");  }  @Override  public boolean isEndOfStream(String nextElement) {  return false;  }  @Override  public TypeInformation<String> getProducedType() {  return BasicTypeInfo.STRING_TYPE_INFO;  }  }  
}

4.执行程序

  1. 确保Kafka正在运行,并且有一个名为input-topic的topic(如果没有,你需要先创建它)。
  2. 编译并运行你的Maven项目

相关文章:

【flink】之如何消费kafka数据?

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例&#xff0c;我们需要确保我们的环境已经安装了Flink和Kafka&#xff0c;并且它们都能正常运行。此外&#xff0c;我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南&#xff0c;包括依赖添加、代码编写…...

科研绘图系列:R语言山脊图(Ridgeline Chart)

介绍 山脊图(Ridge Chart)是一种用于展示数据分布和比较不同类别或组之间差异的数据可视化技术。它通常用于展示多个维度或变量之间的关系,以及它们在不同组中的分布情况。山脊图的特点: 多变量展示:山脊图可以同时展示多个变量的分布情况,允许用户比较不同变量之间的关…...

Boost搜索引擎:如何建立 用户搜索内容 与 网页文件内容 之间的关系

如果想使“用户搜索内容”和“网页文件内容”之间产生联系&#xff0c;就应该将“用户搜索内容”和“网页文件”分为很小的单元 &#xff08;这个单元就是关键词&#xff09;&#xff0c;寻找用户搜索单元是否出现在这个文档之中&#xff0c;如果出现就证明这个网页文件和用户搜…...

【QT】QT 窗口(菜单栏、工具栏、状态栏、浮动窗口、对话框)

Qt 窗口是通过 QMainWindow类来实现的。 QMainWindow 是一个为用户提供主窗口程序的类&#xff0c;继承自 QWidget 类&#xff0c;并且提供了⼀个预定义的布局。QMainWindow 包含一个菜单栏&#xff08;Menu Bar&#xff09;、多个工具栏&#xff08;Tool Bars&#xff09;、…...

Golang | Leetcode Golang题解之第283题移动零

题目&#xff1a; 题解&#xff1a; func moveZeroes(nums []int) {left, right, n : 0, 0, len(nums)for right < n {if nums[right] ! 0 {nums[left], nums[right] nums[right], nums[left]left}right} }...

ubuntu22.04 安装 NVIDIA 驱动以及CUDA

目录 1、事前问题解决 2、安装 nvidia 驱动 3、卸载 nvidia 驱动方法 4、安装 CUDA 5、安装 Anaconda 6、安装 PyTorch 1、事前问题解决 在安装完ubuntu之后&#xff0c;如果进入ubuntu出现黑屏情况&#xff0c;一般就是nvidia驱动与linux自带的不兼容&#xff0c;可以通…...

数据结构·AVL树

1. AVL树的概念 二叉搜索树虽可以缩短查找的效率&#xff0c;但如果存数据时接近有序&#xff0c;二叉搜索将退化为单支树&#xff0c;此时查找元素效率相当于在顺序表中查找&#xff0c;效率低下。因此两位俄罗斯数学家 G.M.Adelson-Velskii 和E.M.Landis 在1962年发明了一种解…...

记一次Mycat分库分表实践

接了个活,又搞分库分表。 一、分库分表 在系统的研发过程中,随着数据量的不断增长,单库单表已无法满足数据的存储需求,此时就需要对数据库进行分库分表操作。 分库分表是随着业务的不断发展,单库单表无法承载整体的数据存储时,采取的一种将整体数据分散存储到不同服务…...

数据分析:微生物数据的荟萃分析框架

介绍 Meta-analysis of fecal metagenomes reveals global microbial signatures that are specific for colorectal cancer提供了一种荟萃分析的框架&#xff0c;它主要基于常用的Wilcoxon rank-sum test和Blocked Wilcoxon rank-sum test 方法计算显著性&#xff0c;再使用分…...

Django—admin后台管理

Django官网 https://www.djangoproject.com/ 如果已经有了Django跳过这步 安装Django&#xff1a; 如果你还没有安装Django&#xff0c;可以通过Python的包管理器pip来安装&#xff1a; pip install django 创建项目&#xff1a; 使用Django创建一个新的项目&#xff1a; …...

数字图像处理中的常用特殊矩阵及MATLAB应用

一、前言 Matlab的名称来源于“矩阵实验室&#xff08;Matrix Laboratory&#xff09;”&#xff0c;其对矩阵的操作具有先天性的优势&#xff08;特别是相对于C语言的数组来说&#xff09;。在数字图像处理中&#xff0c;为了提高编程效率&#xff0c;我们可以使用多种方式来创…...

vue侦听器(Watch)精彩案例剖析一

目录 watch介绍 监视普通数据类型 监视对象类型 watch介绍 在 Vue 中,watch主要用于监视数据的变化,并执行相应操作。一旦被监视的属性发生变化,回调函数将自动被触发。当在 Vue 中使用watch来响应数据变化时,首先要清楚,watch本质上是一个对象,且必须以对象的…...

HTTP 协议浅析

HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是应用层最重要的协议之一。它定义了客户端和服务器之间的数据传输方式&#xff0c;并成为万维网&#xff08;World Wide Web&#xff09;的基石。本文将深入解析 HTTP 协议的基础知识、工作…...

VsCode | 让空文件夹始终展开不折叠

文章目录 1 问题引入2 解决办法3 效果展示 1 问题引入 可能很多小伙伴更新VsCode或者下载新版本时候 &#xff0c;创建的文件 会出现xxx文件夹/xxx文件夹&#xff0c;看着很不舒服&#xff0c;所以该如何展开所有空文件夹呢&#xff1f; 2 解决办法 找到VsCode的设置 &…...

Centos7_Minimal安装Cannot find a valid baseurl for repo: base/7/x86_6

问题 运行yum报此问题 就是没网 解决方法 修改网络信息配置文件&#xff0c;打开配置文件&#xff0c;输入命令&#xff1a; vi /etc/sysconfig/network-scripts/ifcfg-网卡名字把ONBOOTno&#xff0c;改为ONBOOTyes 重启网卡 /etc/init.d/network restart 网路通了...

Spark_Oracle_II_Spark高效处理Oracle时间数据:通过JDBC桥接大数据与数据库的分析之旅

接前文背景&#xff0c; 当需要从关系型数据库&#xff08;如Oracle&#xff09;中读取数据时&#xff0c;Spark提供了JDBC连接功能&#xff0c;允许我们轻松地将数据从Oracle等数据库导入到Spark DataFrame中。然而&#xff0c;在处理时间字段时&#xff0c;可能会遇到一些挑战…...

力扣 459重复的子字符串

思路&#xff1a; KMP算法的核心是求next数组 next数组代表的是当前字符串最大前后缀的长度 而求重复的子字符串就是求字符串的最大前缀与最大后缀之间的子字符串 如果这个子字符串是字符串长度的约数&#xff0c;则true /** lc appleetcode.cn id459 langcpp** [459] 重复…...

MyBatis XML配置文件

目录 一、引入依赖 二、配置数据库的连接信息 三、实现持久层代码 3.1 添加mapper接口 3.2 添加UserInfoXMLMapper.xml 3.3 增删改查操作 3.3.1 增(insert) 3.3.2 删(delete) 3.3.3 改(update) 3.3.4 查(select) 本篇内容仍然衔接上篇内容&#xff0c;使用的代码及案…...

读写RDS或RData等不同格式的文件,包括CSV和TXT、Excel的常见文件格式,和SPSS、SAS、Stata、Minitab等统计软件的数据文件

R语言是数据分析和科学计算的强大工具,其丰富的函数和包使得处理各种数据格式变得相对简单。在本文中,我们将详细介绍如何使用R语言的函数命令读取和写入不同格式的文件,包括RDS或RData格式文件、常见的文本文件(如CSV和TXT)、Excel文件,和和SPSS、SAS、Stata、Minitab等…...

Android 支持的媒体格式,(二)视频支持格式

视频支持格式&#xff1a; 格式编码器解码器具体说明文件类型 容器格式H.263是是对 H.263 的支持在 Android 7.0 及更高版本中并非必需• 3GPP (.3gp) • MPEG-4 (.mp4) • Matroska (.mkv)H.264 AVC Baseline Profile (BP)Android 3.0 及以上版本是 • 3GPP (.3gp) • MPEG-4…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...

通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器

拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件&#xff1a; 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...

【HarmonyOS 5】鸿蒙中Stage模型与FA模型详解

一、前言 在HarmonyOS 5的应用开发模型中&#xff0c;featureAbility是旧版FA模型&#xff08;Feature Ability&#xff09;的用法&#xff0c;Stage模型已采用全新的应用架构&#xff0c;推荐使用组件化的上下文获取方式&#xff0c;而非依赖featureAbility。 FA大概是API7之…...