当前位置: 首页 > news >正文

Flink Kafka获取数据写入到MongoDB中 样例

简述

Apache Flink 是一个流处理和批处理的开源框架,它允许从各种数据源(如 Kafka)读取数据,处理数据,然后将数据写入到不同的目标系统(如 MongoDB)。以下是一个简化的流程,描述如何使用 Flink 从 Kafka 读取数据并保存到 MongoDB:

1、环境准备

  • 安装并配置 Apache Flink。
  • 安装并配置 Apache Kafka。
  • 安装并配置 MongoDB。
  • 创建一个 Kafka 主题,并发送一些测试数据。
  • 确保 Flink 可以连接到 Kafka 和 MongoDB。

部署参考:
1、flink:Flink 部署执行模式
2、kafka:Flink mongo & Kafka
3、mongoDb:mongo副本集本地部署

2. 添加依赖

在Flink 项目中,需要添加 Kafka 和 MongoDB 的连接器依赖。对于 Maven 项目,可以在 pom.xml 文件中添加相应的依赖。
对于 Kafka,需要添加 Flink Kafka Connector 的依赖。
对于 MongoDB,需要添加 Flink MongoDB Sink 的依赖。

3. 编写 Flink 作业

* 创建一个 Flink 作业,使用 Flink 的 `FlinkKafkaConsumer` 从 Kafka 主题中读取数据。  
* 对读取的数据进行必要的转换或处理。  
* 使用 MongoDB 的 Java 驱动程序或第三方库将处理后的数据写入 MongoDB。

4. 运行 Flink 作业

使用 Flink 的命令行工具或 IDE 运行 Flink 作业。确保 Kafka 和 MongoDB 正在运行,并且 Flink 可以访问它们。

参考:Flink 命令行提交、展示和取消作业

5. 监控和调试

使用 Flink 的 Web UI 或其他监控工具来监控作业。如果出现问题,检查日志并进行调试。

6. 优化和扩展

根据需求和数据量,优化 Flink 作业的性能和可扩展性。这可能包括调整并行度、增加资源、优化数据处理逻辑等。

代码

package com.wfg.flink.connector.kafka;import com.mongodb.client.model.InsertOneModel;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;/*** @author wfg*/
public class KafkaToWriteMongo {public static void main(String[] args) throws Exception {// 1. 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(KAFKA_BROKERS).setTopics(TEST_TOPIC_PV).setGroupId("my-test-topic-pv").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 创建RollingFileSinkMongoSink<String> sink = MongoSink.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection("TestMongoPv").setMaxRetries(3)
//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setSerializationSchema((input, context) -> {System.out.println(input);return new InsertOneModel<>(BsonDocument.parse(input));}).build();rs.sinkTo(sink);// 6. 执行 Flink 作业env.execute("Kafka Flink Job");}
}

相关文章:

Flink Kafka获取数据写入到MongoDB中 样例

简述 Apache Flink 是一个流处理和批处理的开源框架&#xff0c;它允许从各种数据源&#xff08;如 Kafka&#xff09;读取数据&#xff0c;处理数据&#xff0c;然后将数据写入到不同的目标系统&#xff08;如 MongoDB&#xff09;。以下是一个简化的流程&#xff0c;描述如何…...

Android Jetpack Compose入门教程(二)

一、列表和动画 列表和动画在应用内随处可见。在本课中&#xff0c;您将学习如何利用 Compose 轻松创建列表并添加有趣的动画效果。 1、创建消息列表 只包含一条消息的聊天略显孤单&#xff0c;因此我们将更改对话&#xff0c;使其包含多条消息。您需要创建一个可显示多条消…...

如何避免接口重复请求(axios推荐使用AbortController)

前言&#xff1a; 我们日常开发中&#xff0c;经常会遇到点击一个按钮或者进行搜索时&#xff0c;请求接口的需求。 如果我们不做优化&#xff0c;连续点击按钮或者进行搜索&#xff0c;接口会重复请求。 以axios为例&#xff0c;我们一般以以下几种方法为主&#xff1a; 1…...

算法设计与分析:网络流求解棒球赛淘汰问题C++

目录 一、实验目的 二、问题描述 三、实验要求 四、算法思想 1、明显的:win[i]+remain[i][j]<> 2、不明显的:最大流 3、操作 3.1 先读入相关信息(邻接矩阵**k),进行一遍“明显的”判断。 3.2 对剩下的“不明显的”的每个球队构建流网络(邻接表vector< ve…...

Linux Ubuntu 24.04 C语言gcc编译过程详解

下面是Hello World程序源代码文件hello.c的内容&#xff0c;我们将以它为例来说明源文件到可执行文件的形成过程&#xff0c;主要分4步&#xff1a;预处理、汇编、机器码、链接。 #include <stdio.h> int main () {printf ( "hello, world \n " );return 0; }…...

Python自动化办公篇—pandas操作Excel:读取+查看+选择+清洗+排序+筛选+函数+写入

目录 专栏导读库的介绍库的安装1、读取数据2、查看数据3、选择数据4、数据清洗5、数据排序6、数据筛选7、数据操作8、数据写入总结 专栏导读 文章名称链接Python自动化办公—pyautogui图像定位\点击功能,实现自动截取当前屏幕并检索点击(可制作为游戏点击脚本)点我进行跳转Pyt…...

数据库大作业——音乐平台数据库管理系统

W...Y的主页&#x1f60a; 代码仓库分享&#x1f495; 《数据库系统》课程设计 &#xff1a;流行音乐管理平台数据库系统&#xff08;本数据库大作业使用软件sql server、dreamweaver、power designer&#xff09; 目录 系统需求设计 数据库概念结构设计 实体分析 属性分…...

【DBA早下班系列】—— 并行SQL/慢SQL 问题该如何高效收集诊断信息

1. 前言 OceanBase论坛问答区或者提交工单支持的时候大部分时间都浪费在了诊断信息的获取交互上&#xff0c;今天我就其中大家比较头疼的SQL问题&#xff0c;给大家讲解一下如何一键收集并行SQL/慢SQL所需要的诊断信息&#xff0c;减少沟通成本&#xff0c;让大家早下班。 2. …...

用python实现多文件多文本替换功能

用python实现多文件多文本替换功能 今天修改单位项目代码时由于改变了一个数据结构名称&#xff0c;结果有几十个文件都要修改&#xff0c;一个个改实在太麻烦&#xff0c;又没有搜到比较靠谱的工具软件&#xff0c;于是干脆用python手撸了一个小工具&#xff0c;发现python在…...

【DevOps】深入探索Ubuntu操作系统:全面了解

引言 在开源软件的世界里&#xff0c;Ubuntu是一个闪耀的明星。它不仅是一个操作系统&#xff0c;更是一种社区精神、一种共享和协作的文化。Ubuntu操作系统基于强大的Linux内核&#xff0c;由世界各地的开发者共同维护和改进。在这篇博文中&#xff0c;我们将深入探索Ubuntu操…...

【Linux】—MySQL安装

文章目录 前言一、下载官方MySQL包二、下载完成后&#xff0c;通过xftp6上传到Linux服务器上三、解压MySQL安装包四、在安装目录下执行rpm安装&#xff0c;请按顺序依次执行。五、配置MySQL六、启动MySQL数据库七、退出&#xff0c;重新登录数据库 前言 本文主要介绍在Linux环境…...

【vue】form表单提交validate验证不进valid原因

目录 1. 原因 1. 原因 1.<el-form>是否写了ref“form”。2.是否有其它标签写了ref“form”。3.<el-form>中要写成:model&#xff0c;不能使用v-model。4.自定义的validate要各个路径均能返回callback()。 const validatePass (rule, value, callback) > {if (…...

如何用 Google Chrome 浏览器浏览经过 XSLT 渲染的 XML 文件

对于经过XSLT渲染的XML文件&#xff0c;本来&#xff0c;可以直接用 IE (Internet Explorer) 打开&#xff0c;就能看到渲染之后的样子&#xff0c;很方便。但是后来&#xff0c;微软把 IE 换成了 Microsoft Edge&#xff0c;按理说这是比 IE 更先进的浏览器&#xff0c;可是偏…...

Python学习笔记12:进阶篇(二),类的继承与组合

类的继承 我们在编写一系列的类的时候&#xff0c;会发现这些类很相似&#xff0c;但是又有各自的特点和行为。在编写这些类的时候&#xff0c;我们可以把相同的部分抽象成一个基类&#xff0c;然后根据其他不同的特点和行为&#xff0c;抽象出子类&#xff0c;继承这个基类。…...

npm install cnpm -g 报错4048

npm install cnpm -g 报错4048 设置淘宝镜像&#xff1a; 报错如下&#xff1a; 其他博主提供的方法都尝试了&#xff0c;比如管理员权限打开终端&#xff0c;删除.npmrc文件&#xff0c;清除缓存npm cache clean -f等都试了无效&#xff0c;最后怀疑是npm和cnpm版本不对应&…...

本地快速部署 SuperSonic

本地快速部署 SuperSonic 0. 引言1. 本地快速部署 supersonic2. 访问 supersonic3. 支持的数据库4. github 地址 0. 引言 SuperSonic融合Chat BI&#xff08;powered by LLM&#xff09;和Headless BI&#xff08;powered by 语义层&#xff09;打造新一代的BI平台。这种融合确…...

如何给vue开发的网站做seo?

最近公司有个需求&#xff0c;需要给公司的官网sqlynx做seo&#xff0c;但因为各种历史原因吧&#xff0c;原来的网站是用vue开发的。没办法&#xff0c;只能尝试尽量做一些seo&#xff0c;让网站能更好一些。 目录 1. 服务器端渲染&#xff08;SSR&#xff09; 2. 预渲染&am…...

算法训练营第六十天(延长12天添加图论) | LeetCode 647 回文子串、LeetCode 516 最长回文子序列

LeetCode 67 回文子串 思路很简单&#xff0c;每一个dp[i]等于dp[i-1]加上当前字符向前直到0各个长度字符串回文串个数即可 代码如下&#xff1a; class Solution {public boolean isValid(String s) {int l 0, r s.length() - 1;while (l < r) {if (s.charAt(l) ! s.ch…...

TikTok账号养号的流程分享

对于很多刚开始运营TikTok的新手小白来说&#xff0c;都会有一个同样的疑问&#xff0c;那就是&#xff1a;TikTok到底需不需要养号&#xff1f;这里明确告诉大家是需要养号的&#xff0c;今天就把我自己实操过的养号经验和策略总结出来&#xff0c;分享给大家。 一、什么是Ti…...

C++初学者指南第一步---6.枚举和枚举类

C初学者指南第一步—6.枚举和枚举类 文章目录 C初学者指南第一步---6.枚举和枚举类1.作用域的枚举(enum class类型&#xff09;&#xff08;C11&#xff09;2.无作用域的枚举(enum类型)3.枚举类的基础类型4.自定义枚举类映射5.和基础类型的互相转换 1.作用域的枚举(enum class类…...

【js判断机型】

var isIOS /(iPhone|iPad|iPod)/i.test(navigator.userAgent) var isiPad navigator.userAgent.match(/(iPad)/) || (navigator.platform ‘MacIntel’ && navigator.maxTouchPoints > 1) 上面这个不行的话&#xff0c;再试下这个 var isiPad (navigator.userAg…...

google chrome浏览器安装crx插件Jam

先上一张图&#xff1a; Jam是bug报告生成插件 1、在地址栏中输入chrome://extensions/&#xff0c;然后回车。 2、将下载好的crx插件&#xff0c;直接拖到里面就可以完成安装工作了。 3、测试了一下jam插件&#xff0c;发现直接没有响应。 4、点击【移除】直接可以删除插件…...

【Java面试】二十、JVM篇(上):JVM结构

文章目录 1、JVM2、程序计数器3、堆4、栈4.1 垃圾回收是否涉及栈内存4.2 栈内存分配越大越好吗4.3 方法内的局部变量是否线程安全吗4.4 栈内存溢出的情况4.5 堆和栈的区别是什么 5、方法区5.1 常量池5.2 运行时常量池 6、直接内存 1、JVM Java源码编译成class字节码后&#xf…...

【Python教程】压缩PDF文件大小

压缩 PDF 文件能有效减小文件大小并提高文件传输的效率&#xff0c;同时还能节省计算机存储空间。除了使用一些专业工具对PDF文件进行压缩&#xff0c;我们还可以通过 Python 来执行该操作&#xff0c;实现自动化、批量处理PDF文件。 本文将分享一个简单有效的使用 Python 压缩…...

UE4中性能优化和检测工具

UE4中性能优化和检测工具合集 简述CPUUnreal InsightUnreal ProfilerSimpleperfAndroid StudioPerfettoXCode TimeprofilerBest Practice GPUAdreno GPUMali GPUAndroid GPU Inspector (AGI) 内存堆内存分析Android StudioLoliProfilerUE5 Memory InsightsUnity Mono 内存Memre…...

大型ERP设计-业务与功能指引:外币折算与辅助账套

外币折算与辅助账套 前言&#xff1a;在对ORACLE和SAP的核心模块功能全面解读的基础上&#xff0c;给出大型ERP设计的建议-业务与功能指引&#xff0c;企业选型、开发大型ERP软件的公司和ERP顾问可以参考。模块包括财务、计划与制造、供应链、项目及设备(MRO)&#xff0c;初步预…...

重学java 73.设计模式

本想送你一本沉思录&#xff0c;可该迷途知返的人是我 —— 24.6.18 设计模式 设计模式(Design pattern)&#xff0c;是一套被反复使用、经过分类编目的、代码设计经验的总结&#xff0c;使用设计模式是为了可重用代码、保证代码可靠性、程序的重用性,稳定性。 1995 年&#x…...

线代的学习(矩阵)

1.矩阵的乘法 矩阵实现满足&#xff1a;内标相等 矩阵相乘之后的结果&#xff1a;前行后列 需要注意&#xff1a;1.矩阵的乘法不具有交换律&#xff1a;AB!BA 2.矩阵的乘法满足分配律&#xff1a;A(BC) AB AC 抽象逆矩阵求逆矩阵 方法1.凑定义法、 方法2.长除法 数字型矩阵…...

【Java基础5】JDK、JRE和JVM的区别与联系

JDK、JRE和JVM的区别与联系 Java是一种广泛使用的编程语言&#xff0c;它的跨平台特性得益于Java虚拟机&#xff08;JVM&#xff09;。然而&#xff0c;在Java的世界里&#xff0c;JDK、JRE和JVM这三个术语常常让人感到困惑。本文将阐述它们各自的功能&#xff0c;以及它们是如…...

2024年先进机械电子、电气工程与自动化国际学术会议(ICAMEEA 2024)

2024年先进机械电子、电气工程与自动化国际学术会议(ICAMEEA 2024) 2024 International Conference on Advanced Mechatronic, Electrical Engineering and Automation 会议地点&#xff1a;杭州&#xff0c;中国 网址&#xff1a;www.icameea.com 邮箱: icameeasub-conf.c…...

网络销售话术和技巧/外链seo招聘

JDialog 2011-08-29 15:16:33| 分类&#xff1a; JFC|字号 订阅 JDialog 用途 JDialog是创建对话框窗口的主要类&#xff0c;可以使用此类创建自定义的对话框。该类继承了AWT的Dialoglei&#xff0c;支持Swing体系结构的高级GUI属性。与JFrame类似&#xff0c;只不过JDialog…...

源码论坛网站/环球军事网最新军事新闻最新消息

1. 查看当前使用的内核版本 uname -a 2.在终端下察看已经安装的旧的内核&#xff1a; ctrlaltt——>进入终端——>输入命令&#xff1a; dpkg --get-selections|grep linux 可以看到都是一些内核启动文件&#xff0c;很明显有些是我们不需要的&#xff08;建议先卸载比较…...

泰安做网站公司哪家好/厦门网络推广外包多少钱

事情是这样滴&#xff01;一个小伙伴在这两天提出一个问题如下&#xff1a;考虑到数字推理是浙江省考每年的必考题&#xff0c;图形题在去年的浙江省考中考查了四题。而图形题相较于分数数列、递推数列、多级数列等常见纯数字数列来说&#xff0c;在没有掌握一些常见技巧的前提…...

怎么样免费给网站做优化/线上广告投放方式

额&#xff0c;得说声抱歉&#xff0c;时隔半年回顾的时候&#xff0c;发现之前的确没有做很多的考量&#xff0c;这篇博客内容欠佳。计算器还有有一些功能没有完成&#xff0c;也有许多bug&#xff0c;不过我做了改进&#xff0c;完善了许多bug&#xff0c;详细的&#xff0c;…...

安阳做网站多少钱/最新新闻事件今天疫情

本文只是针对此思维导图的流程介绍及内容介绍&#xff0c;不再在文章中用文字一一列举&#xff0c;仅用图片展示&#xff0c;如需本文的思维导图原版Xmind文件请私聊博主或在本账号下的资源中心自行下载。 思维导图分组总览 SQL基础 1.1 SQL基础-数据库 1.2 SQL基础-SQL约束 …...

门户网站 建设 通知/社群营销策略有哪些

笔记&#xff1a; 原理就是&#xff1a;宽高为零&#xff0c;单独设置border的宽度&#xff0c;然后上右下左&#xff0c;根据需要选择三角形的方向&#xff0c;比如选向上箭头&#xff0c;其他右、下、左设置为透明&#xff1b; 然后放在下边&#xff0c;用margin-top&#xf…...