Spark-Streaming+Kafka+mysql实战示例
文章目录
- 前言
- 一、简介
- 1. Spark-Streaming简介
- 2. Kafka简介
- 二、实战演练
- 1. MySQL数据库部分
- 2. 导入依赖
- 3. 编写实体类代码
- 4. 编写kafka主题管理代码
- 5. 编写kafka生产者代码
- 6. 编写Spark-Streaming代码
- 7. 查看数据库
- 8. 代码下载
- 总结
前言
本文将介绍一个使用Spark Streaming和Kafka进行实时数据处理的示例。通过该示例,您将了解到如何使用Spark Streaming和Kafka处理实时数据流,以及如何将处理后的数据保存到MySQL数据库中。示例涵盖了从环境搭建到代码实现的全过程,帮助您快速上手实时数据处理的开发。
zookeeper安装教程:zookeeper安装与配置:使用shell脚本在centos上进行zookeeper自动化下载安装配置(集群搭建版)
kafka安装教程:Kafka安装与配置-shell脚本一键安装配置(集群版)
一、简介
1. Spark-Streaming简介
Spark Streaming是Apache Spark的一个组件,用于实时流数据处理。它提供了高级别的API,可以使用类似于批处理的方式处理实时数据流。Spark Streaming可以与各种消息队列系统集成,包括Kafka、RabbitMQ等。
2. Kafka简介
Kafka是一个分布式流处理平台,具有高吞吐量、可扩展性和可靠性。它提供了一种可持久化、分布式、分区的日志服务,用于处理实时数据流。Kafka使用发布-订阅模型,消息被发布到一个或多个主题,然后由订阅该主题的消费者进行消费。
二、实战演练
开始之前先启动zookeeper集群和kafka集群。
1. MySQL数据库部分
这部分代码用于创建MySQL数据库和数据表,以及将从Kafka获取的数据保存到数据库中。
create database kafkademo;
创建数据表:
CREATE TABLE kafka_tb
(`txid` varchar(255) PRIMARY KEY,`version` varchar(255),`connector` varchar(255),`name` varchar(255),`ts_ms` varchar(255),`snapshot` varchar(255),`db` varchar(255),`sequence` varchar(255),`schema` varchar(255),`table` varchar(255),`lsn` varchar(255),`xmin` varchar(255)
);
2. 导入依赖
这部分代码是Maven的依赖配置,用于引入所需的Spark、Kafka和MySQL相关的库。
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version><scope>compile</scope>
</dependency>
3. 编写实体类代码
这部分代码定义了一个Java类EntityMessage,用于将从Kafka获取的JSON数据转换为Java对象。
package com.zcs;import lombok.Data;import java.io.Serializable;/*** @author zcs2312* @date 2023/12/12 20:49:47* @product_name IntelliJ IDEA* @project_name spark-kafka*/
@Data
public class EntityMessage implements Serializable {private String op;private String ts_ms;private String transaction;private DataItem dataItem;@Datapublic static class DataItem {private String version;private String connector;private String name;private String ts_ms;private String snapshot;private String db;private String[] sequence;private String schema;private String table;private String txId;private String lsn;private String xmin;}
}
4. 编写kafka主题管理代码
这部分代码用于创建、删除和修改Kafka主题的一些操作。
package com.zcs;import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;import java.util.*;
import java.util.concurrent.ExecutionException;/*** @author zcs2312* @date 2023/12/12 20:51:34* @product_name IntelliJ IDEA* @project_name spark-kafka*/
public class KafkaTopicManager {相关文章:
Spark-Streaming+Kafka+mysql实战示例
文章目录 前言一、简介1. Spark-Streaming简介2. Kafka简介二、实战演练1. MySQL数据库部分2. 导入依赖3. 编写实体类代码4. 编写kafka主题管理代码5. 编写kafka生产者代码6. 编写Spark-Streaming代码7. 查看数据库8. 代码下载总结前言 本文将介绍一个使用Spark Streaming和Ka…...
C++改写为C
stm使用中,经常能见到CPP的示例,这些是给arduino,esp32用的,stm32 也支持cpp但是你就想用c怎么办呢,比如我在新手的时候:: 这个双冒号就难住了英雄好汉 比如这是个cpp的 如果类不多的情况下 改写…...
抖去推--短视频剪辑、矩阵无人直播saas营销工具一站式开发
抖去推是一款短视频剪辑和矩阵无人直播SAAS营销工具一站式开发平台。它提供了以下功能和特点: 1. 短视频剪辑:抖去推提供了一系列的剪辑工具,包括自动剪辑、特效制作、配音配乐等,可以帮助用户轻松制作出高质量的短视频。 2. 矩阵…...
HBase 详细图文介绍
目录 一、HBase 定义 二、HBase 数据模型 2.1 HBase 逻辑结构 2.2 HBase 物理存储结构 2.3 数据模型 2.3.1 Name Space 2.3.2 Table 2.3.3 Row 2.3.4 Column 2.3.5 Time Stamp 2.3.6 Cell 三、HBase 基本架构 架构角色 3.1 Master 3.2 Region Server 3.3 Zo…...
Hanlp自然语言处理如何再Spring Boot中使用
一、HanLP HanLP (Hankcs NLP) 是一个自然语言处理工具包,具有功能强大、性能高效、易于使用的特点。HanLP 主要支持中文文本处理,包括分词、词性标注、命名实体识别、依存句法分析、关键词提取、文本分类、情感分析等多种功能。 HanLP 可以在 Java、Py…...
MySQL 是什么?
MySQL官方网站(http://www.mysql.com/)提供关于MySQL软件的最新信息。 MySQL是一个数据库管理系统。 数据库是一种结构化的数据集合。它可以是从简单的购物清单到图片库,再到企业网络中的大量信息等任何形式。要添加、访问和处理存储在计算…...
yarn link使用(npm link)
使用场景 前端开发中,两个项目相互依赖时,使用yarn link(npm link)链接 例如:A项目依赖于本司自己的UI库B,当我们修改了UI库B中的某些代码时,需本地验证后再发布到私服,此时A项目与UI项目B通过yarn link连…...
Docker容器讲解
Docker是一个开源的容器化平台,可以用来在轻量级容器中打包、部署和运行应用程序。Docker的基本概念包括容器、镜像、仓库和服务。 容器是一个独立运行的应用程序包,包括应用程序及其依赖项、运行时环境和配置等。容器相互隔离,可以在不同的…...
three.js模拟太阳系
地球的旋转轨迹目前设置为了圆形,效果: <template><div><el-container><el-main><div class"box-card-left"><div id"threejs" style"border: 1px solid red"></div><div c…...
WPF仿网易云搭建笔记(1):项目搭建
文章目录 前言项目地址动态样式组合样式批量样式覆盖Prism新建UserControler修改Material Design 笔刷收放列表可以滚动的StackPanel列表点击展开或折叠 实现效果 前言 今天接着继续细化代码,把整体框架写出来 项目地址 WPF仿网易云 Gitee仓库 动态样式 【WPF】C#…...
DDOS 攻击是什么?有哪些常见的DDOS攻击?
DDOS简介 DDOS又称为分布式拒绝服务,全称是Distributed Denial of Service。DDOS本是利用合理的请求造成资源过载,导致服务不可用,从而造成服务器拒绝正常流量服务。就如酒店里的房间是有固定的数量的,比如一个酒店有50个房间&am…...
未来应用从何而来:认知力延伸、边界突破、回归云与产业
文 | 智能相对论 作者 | 沈浪 或许,谁也没想到未来应用来的如此之快,现如今传统应用从开发到体验,已经进入了一个前所未有的颠覆性改革阶段。 不久前,美国人工智能公司OpenAI举办开发者大会。在现场,公司创始人Sam …...
vue零基础
vue 与其他框架的对比 框架设计模式数据绑定灵活度文件模式复杂性学习曲线生态VueMVVM双向灵活单文件小缓完善ReactMVC单向较灵活all in js大陡丰富AngularMVC双向固定多文件较大较陡(Typescript)独立 更多对比细节:vue 官网:ht…...
html中一个div中平均一行分配四个盒子,可展开与收起所有的盒子
html中一个div中平均一行分配四个盒子,可展开与收起所有的盒子 1.截图显示部分 2.代码展示部分 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"wid…...
Python虚拟环境指南:告别依赖地狱
一、背景 在SAAS(软件即服务)平台中,用户使用自行定制的Python脚本已经成为司空见惯的做法,然而,由于不同用户对Python三方库的需求各不相同,而底层服务器一般只安装了一个Python版本。举例来说࿰…...
【Jeecg Boot 3 - 第二天】第2节 前后端docker部署云服务器
更新完成,点击下面章节进入 一、后端部署 1.1、后端 docker-compose 部署 JEECGBOOT 1.2、jar 包和 lib 依赖分离,部署包缩小100倍 二、前端部署 2.1、nginx 部署 JEECGBOOT VUE3 2.2、开启Nginx压缩,解决前端访问慢问题...
2020年第九届数学建模国际赛小美赛A题自由泳解题全过程文档及程序
2020年第九届数学建模国际赛小美赛 A题 自由泳 原题再现: 在所有常见的游泳泳姿中,哪一种最快?哪个冲程推力最大?在自由泳项目中,游泳者可以选择他们的泳姿,他们通常选择前面的爬行。然而,游泳…...
双端队列和优先级队列
文章目录 前言dequedeque底层设计迭代器设计 priority仿函数数组中的第k个最大元素优先级队列模拟实现pushpop调整仿函数存储自定义类型 前言 今天要介绍比较特殊的结构,双端队列。 还有一个适配器,优先级队列。 deque 栈的默认容器用了一个deque的东西…...
c#读取CSV文件跟Excel导入成DataTble
1.读取CSV文件 /// <summary>/// 读取CSV文件/// </summary>/// <param name"fileName">文件路径</param>public static DataTable ReadCSV(string fileName){DataTable dt new DataTable();FileStream fs new FileStream(fileName, FileM…...
Python编程技巧 – 单字符函数
Python编程技巧 – 单字符函数 Python Programming Skills – Single Character Function By JacksonML 0. 前言 Python有其内建(built-in)的一系列函数,其中,有两个函数为长度为一的字符设计。这样的函数是单字符函数,尽管它们操作的对象…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...
代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
Linux部署私有文件管理系统MinIO
最近需要用到一个文件管理服务,但是又不想花钱,所以就想着自己搭建一个,刚好我们用的一个开源框架已经集成了MinIO,所以就选了这个 我这边对文件服务性能要求不是太高,单机版就可以 安装非常简单,几个命令就…...
pycharm 设置环境出错
pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...
