当前位置: 首页 > news >正文

【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式

集成方式

Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。

方式一:Kafka 0.8.x版本

  • 老的Old Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
  • 老的Old消费者API,有两种方式:
    • 第一种:高级消费API(Consumer High Level API),Receiver接收器接收数据
    • 第二种:简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据

方式二:Kafka 0.10.x版本

  • 新的 New Kafka Consumer API
  • 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html
  • 核心API:KafkaConsumer、ConsumerRecorder
    在这里插入图片描述

两种方式区别

使用Kafka Old Consumer API集成两种方式,虽然实际生产环境使用Direct方式获取数据,但是在面试的时候常常问到两者区别。
文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html

  • Receiver-based Approach:
    • 基于接收器方式,消费Kafka Topic数据,但是企业中基本上不再使用;
    • Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数据(union),再进行处理,很麻烦;
    • Receiver那台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低;
    • Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在zookeeper,由Receiver维护;
    • Spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据不一致;
  • Direct Approach (No Receivers):
    • 直接方式,Streaming中每批次的每个job直接调用Simple Consumer API获取对应Topic数据,此种方式使用最多,面试时被问的最多;
    • Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高并行能力
    • Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况;
    • 当然也可以自己手动维护,把offset存在MySQL、Redis和Zookeeper中;

上述两种方式区别,如下图所示:
在这里插入图片描述

4.2 Direct 方式集成

使用Kafka Old Consumer API方式集成Streaming,采用Direct方式,调用Old Simple Consumer API,
文档:
http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers

编码实现
Direct方式会定期地从Kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围,在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。
在这里插入图片描述
Direct方式没有receiver层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,再根据设定的maxRatePerPartition来处理每个batch。较于Receiver方式的优势:

  • 其一、简化的并行:Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据;
  • 其二、高效:no need for Write Ahead Logs;
  • 其三、精确一次:直接使用simple Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录。

添加相关MAVEN依赖:

<!-- Spark Streaming 集成Kafka 0.8.2.1 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>

提供工具类KafkaUtils专门从Kafka消费数据,其中函数【createDirectStream】消费数据:
在这里插入图片描述
创建Topic及Console控制台发送数据,命令如下:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic wc-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.itcast.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic wc-topic --broker-list node1.itcast.cn:9092
# Consumer
kafka-console-consumer.sh --topic wc-topic \
--bootstrap-server node1.itcast.cn:9092 --from-beginning

具体演示代码如下:

import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 集成Kafka,采用Direct式读取数据,对每批次(时间为1秒)数据进行词频统计,将统计结果输出到控制台
*/
object StreamingKafkaDirect {
def main(args: Array[String]): Unit = {
// 1、构建流式上下文实例对象StreamingContext,用于读取流式的数据和调度Batch Job执行
val ssc: StreamingContext = {
// a. 创建SparkConf实例对象,设置Application相关信息
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[3]")
//TODO: 设置每秒钟读取Kafka中Topic最大数据量
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
// b. 创建StreamingContext实例,传递Batch Interval(时间间隔:划分流式数据)
val context: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
// c. 返回上下文对象
context
}
// TODO: 2、从流式数据源读取数据
/*
def createDirectStream[
K: ClassTag, // Topic中Key数据类型
V: ClassTag,
KD <: Decoder[K]: ClassTag, // 表示Topic中Key数据解码(从文件中读取,反序列化)
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)]
*/
// 表示从Kafka Topic读取数据时相关参数设置
val kafkaParams: Map[String, String] = Map(
"bootstrap.servers" -> "node1.itcast.cn:9092",
// 表示从Topic的各个分区的哪个偏移量开始消费数据,设置为最大的偏移量开始消费数据
"auto.offset.reset" -> "largest"
)
// 从哪些Topic中读取数据
val topics: Set[String] = Set("wc-topic")
// 采用Direct方式从Kafka 的Topic中读取数据
val kafkaDStream: DStream[(String, String)] = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, //
kafkaParams, //
topics
)
// 3、对接收每批次流式数据,进行词频统计WordCount
val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(rdd => {
rdd
// 获取从Kafka读取数据的Message
.map(tuple => tuple._2)
// 过滤“脏数据”
.filter(line => null != line && line.trim.length > 0)
// 分割为单词
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组
.mapPartitions{iter => iter.map(word => (word, 1))}
// 聚合统计
.reduceByKey((a, b) => a + b)
})
// 4、将分析每批次结果数据输出,此处答应控制台
resultDStream.foreachRDD{ (rdd, time) =>
// 使用lang3包下FastDateFormat日期格式类,属于线程安全的
val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
.format(new Date(time.milliseconds))
println("-------------------------------------------")
println(s"Time: $batchTime")
println("-------------------------------------------")
// TODO: 先判断RDD是否有数据,有数据在输出哦
if(!rdd.isEmpty()){
rdd
// 对于结果RDD输出,需要考虑降低分区数目
.coalesce(1)
// 对分区数据操作
.foreachPartition{iter =>iter.foreach(item => println(item))}
}
}
// 5、对于流式应用来说,需要启动应用,正常情况下启动以后一直运行,直到程序异常终止或者人为干涉
ssc.start() // 启动接收器Receivers,作为Long Running Task(线程) 运行在Executor
ssc.awaitTermination()
// 结束Streaming应用执行
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}

从WEB UI监控页面可以看出如下信息:
在这里插入图片描述

相关文章:

【Spark分布式内存计算框架——Spark Streaming】7. Kafka集成方式

集成方式 Spark Streaming与Kafka集成&#xff0c;有两套API&#xff0c;原因在于Kafka Consumer API有两套&#xff0c; 文档&#xff1a;http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html。 方式一&#xff1a;Kafka 0.8.x版本 老的Old Kafka Consum…...

如何引入elementUI

elementUI的引入完整引入按需引入完整引入 在 main.js 中写入以下内容&#xff1a; import Vue from ‘vue’; import ElementUI from ‘element-ui’; import ‘element-ui/lib/theme-chalk/index.css’; import App from ‘./App.vue’; Vue.use(ElementUI); new Vue({ el: ‘…...

vue3+rust个人博客建站日记4-Vditor搞定MarkDown

即然是个人博客&#xff0c;那么绝对不能丢给自己一个大大的输入框敷衍了事。如果真是这样&#xff0c;现在就可以宣布项目到此结束了。如今没人享受用输入框写博客。作为一个有追求的程序员&#xff0c;作品就要紧跟潮流。 后来&#xff0c;Markdown 的崛起逐步改变了大家的排…...

KDZD-JC软化击穿试验仪

一、概 述 KDZD-JC智能软化击穿试验仪是根据GB/T4074.6-2008和idtIEC60851-6:2004标准而设计的一种新型漆包圆线检测仪器。主要适用于固体绝缘材料&#xff08;如&#xff1a;塑料、橡胶、层压材料、薄膜、树脂、云母、陶瓷、玻璃、绝缘漆等绝缘材料及绝缘件&#xff09;在工…...

【数据结构】单链表的C语言实现--万字详解介绍

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录1.链表1.1 链表的概念…...

电子科技大学软件工程期末复习笔记(七):测试策略

目录 前言 重点一览 V模型 回归测试 单元测试 集成测试 重要概念 自顶向下的集成方法 自底向上的集成方法 SMOKE方法 系统测试 验收测试 α测试 β测试 本章小结 前言 本复习笔记基于王玉林老师的课堂PPT与复习大纲&#xff0c;供自己期末复习与学弟学妹参考用…...

逆向-还原代码之除法 (Interl 64)

除法和32位差不多&#xff0c;毕竟背后的数学公式是一样的。区别只是32位的乘法需要两个寄存器来存放大数相乘的结果&#xff0c;而64位的不需要&#xff0c;一个寄存器就能存下。所以在64位的环境下&#xff0c;多了右移32位这条指令&#xff0c;其他指令一样。 //code #incl…...

Python WebDriver自动化测试

Webdriver Selenium 是 ThroughtWorks 一个强大的基于浏览器的开源自动化测试工具&#xff0c;它通常用来编写 Web 应用的自动化测试。 Selenium 2&#xff0c;又名 WebDriver&#xff0c;它的主要新功能是集成了 Selenium 1.0 以及 WebDriver​&#xff08;WebDriver 曾经是…...

2023年微信小程序获取手机号授权登录注册详细教程,包含服务端教程

前言 小程序中有很多地方都会用到用户的手机号&#xff0c;比如登陆注册&#xff0c;填写收货地址等等。有了这个组件可以快速获取微信绑定手机号码&#xff0c;无须用户填写。网上大多数教程还是往年的&#xff0c;而微信官方的api已做了修改。本篇文章将使用最新的方法获取手…...

YOLOv8模型学习笔记

在前面的章节中博主学习了YOLOv5的相关知识&#xff0c;从YOLOv5的数据增强处理到模型设计&#xff0c;从正负样本匹配策略到LOSS设计&#xff0c;今天博主学习的是YOLOv8&#xff0c;同为ultralytics公司的产品&#xff0c;两者无论是思想层面还是具体的设计方面都有着异曲同工…...

Java SE知识点1

一、continue、break、和return的区别是什么? 在循环结构中,当循环条件不满足或者循环次数达到要求时,循环会正常结束。但是,有时候可能需要 在循环的过程中,当发生了某种条件之后 ,提前终止循环,这就需要用到下面几个关键词: 1. continue :指跳出当前的这一次循环,…...

华为OD机试模拟题 用 C++ 实现 - 端口合并(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明端口合并题目输入输出示例一输入输出说明示例二输入输出说明示例三输入输出说明...

C++ Primer Plus 第6版 读书笔记(3) 第3章 处理数据

目录 3.1 简单变量 3.1.1 变量名 *位与字节 3.1.4 无符号类型 3.1.7 C如何确定常量的类型 C是在 C 语言基础上开发的一种集面向对象编程、泛型编程和过程化编程于一体的编程语言&#xff0c;是C语言的超集。本书是根据2003年的ISO/ANSI C标准编写的&#xff0c;通过大量短…...

ArrayList源码解读

参数 //默认初始容量private static final int DEFAULT_CAPACITY 10;//空数组(用于空实例)private static final Object[] EMPTY_ELEMENTDATA {};//用于默认大小空实例的共享空数组private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA {};//保存数据的数组tra…...

python实战应用讲解-【语法高级篇】时间与日期(附python示例代码)

目录 保持时间、计划任务和启动程序 time 模块 time.time() 函数 time.sleep() 函数 Python3 日期和时间...

D. Moscow Gorillas(双指针 + 区间分析)

Problem - D - Codeforces 在冬天&#xff0c;莫斯科动物园的居民非常无聊&#xff0c;尤其是大猩猩。你决定娱乐他们&#xff0c;带了一个长度为n的排列p到动物园。长度为n的排列是由n个从1到n的不同整数以任意顺序组成的数组。例如&#xff0c;[2,3,1,5,4]是一个排列&#xf…...

华为OD机试题,用 Java 解【相同数字的积木游戏 1】问题

最近更新的博客 华为OD机试题,用 Java 解【停车场车辆统计】问题华为OD机试题,用 Java 解【字符串变换最小字符串】问题华为OD机试题,用 Java 解【计算最大乘积】问题华为OD机试题,用 Java 解【DNA 序列】问题华为OD机试 - 组成最大数(Java) | 机试题算法思路 【2023】使…...

Python实现GWO智能灰狼优化算法优化BP神经网络分类模型(BP神经网络分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。1.项目背景灰狼优化算法(GWO)&#xff0c;由澳大利亚格里菲斯大学学者 Mirjalili 等人于2014年提出来的一种群智能优…...

无线蓝牙耳机哪个牌子好?2023质量好的无线蓝牙耳机推荐

近几年&#xff0c;随着蓝牙技术的不断进步&#xff0c;使用蓝牙耳机的人也越来越多。蓝牙耳机的出现&#xff0c;不仅能让我们摆脱线带来的约束&#xff0c;还能提升我们学习和工作的效率。最近看到很多人问&#xff0c;无线蓝牙耳机哪个牌子好&#xff1f;下面&#xff0c;我…...

Qt之QTableView自定义排序/过滤(QSortFilterProxyModel实现,含源码+注释)

一、效果示例图 1.1 自定义表格排序示例图 本文过滤条件为行索引取余2等于0时返回true&#xff0c;且从下图中可以看到&#xff0c;奇偶行是各自挨在一起的。 1.2 自定义表格过滤示例图 下图添加两列条件&#xff08;当前数据大于当前列条件才返回true&#xff0c;且多个列…...

电商(强一致性系统)的场景设计

领域拆分&#xff1a;如何合理地拆分系统&#xff1f; 一般来说&#xff0c;强一致性的系统都会牵扯到“锁争抢”等技术点&#xff0c;有较大的性能瓶颈&#xff0c;而电商时常做秒杀活动&#xff0c;这对系统的要求更高。业内在对电商系统做改造时&#xff0c;通常会从三个方面…...

算法与数据结构(一)

一、时间复杂度 一个操作如果和样本的数据量没有关系&#xff0c;每次都是固定时间内完成的操作&#xff0c;叫做常数操作。 时间复杂度为一个算法流程中&#xff0c;常数操作数量的一个指标。常用O(读作big O)来表示。具体来说&#xff0c;这个算法流程中&#xff0c;发生了多…...

【Python】元组如何创建?

嗨害大家好鸭&#xff01;我是小熊猫~ Python 元组 Python 的元组与列表类似&#xff0c; 不同之处在于元组的元素不能修改。 元组使用小括号&#xff0c;列表使用方括号。 元组创建很简单&#xff0c;只需要在括号中添加元素&#xff0c; 并使用逗号隔开即可。 如下实例…...

qt操作文件以及字符串转换

//从文件加载英文属性与中文属性对照表QFile file(":/propertyname.txt");if (file.open(QFile::ReadOnly)) {//QTextStream方法读取速度至少快百分之30#if 0while(!file.atEnd()) {QString line file.readLine();appendName(line);}#elseQTextStream in(&file)…...

数组中只出现一次的两个数字(异或法思路)

题目简介 一个数组中只有2个数字只有一个&#xff0c;其他数字都有两个。找出这两个数字。a, b 用HashMap记录就不说了。 这里记录一下用异或的方式解决。 由于异或特性为自己异或自己为0。a^a 0;所以可以异或数组中的所有数字得出 a^b 的结果&#xff0c;其他相同的都消掉…...

python支持的操作系统有哪些

支持python开发环境的系统有Linux、OSX和windows&#xff0c;以及所有主要的操作系统中。 Linux&#xff0c;Linux系统是为编程而设计的&#xff0c;因此在大多数Linux计算机中&#xff0c;都默认安装了Python。编写和维护Linux的人认为会使用这种系统进行编程。要在Linux中运…...

S3C2440开发环境搭建

拿出了之前的S3C2440开发板&#xff0c;然后把移植uboot、移植内核、制作根文件系统、设备树编写驱动等几项再做一遍&#xff0c;这篇文章先记录下环境搭建过程&#xff0c;以及先把现成的uboot、内核、根文件系统下载进去&#xff0c;看看开发板还能不能用&#xff0c;先熟悉一…...

软件测试之测试用例

测试用例 1. 测试用例定义 测试用例又叫做test case&#xff0c;是为某个特殊目标而编制的一组测试输入、执行条件以及预期结果,以便测试某个程序路径或核实是否满足某个特定需求。 2. 编写测试用例的原因 2.1 理清思路&#xff0c;避免遗漏 如果测试的项目大而复杂&#…...

null和undefined的区别有哪些?

null和undefined的区别有哪些&#xff1f;相同点不同点undefinednull总结相同点 1.null和undefined都是js的基本数据类型 2.undefined和null都是假值&#xff08;falsy&#xff09;,都能作为条件进行判断&#xff0c;所以在绝大多数情况下两者在使用上没有区别 if(undefined)…...

【强烈建议收藏:计算机网络面试专题:HTTP协议、HTTP请求报文和响应报文、HTTP请求报文常用字段、HTTP请求方法、HTTP响应码】

一.知识回顾 之前我们一起学习了HTTP1.0、HTTP1.1、HTTP2.0协议之前的区别、以及URL地址栏中输入网址到页面展示的全过程&&DNS域名解析的过程、HTTP协议基本概念以及通信过程、HTTPS基本概念、SSL加密原理、通信过程、中间人攻击问题、HTTP协议和HTTPS协议区别。接下来…...

手机如何做微商城网站设计/百度seo白皮书

1.准备六个面 div做出六个面&#xff0c;在同一个父级容器下 父级容器的transform-style:preserve-3d 六个div设置成绝对定位&#xff0c;重叠在一起 2.父元素做简单的变换 父元素用transform:rotate旋转 3.每个子页面做3D变换 每个子页面则用rotate旋转&#xff0c;和t…...

网站被篡改怎样做/互联网营销策划是做什么的

Codeforces Round #618 (Div. 2) A做的还是不快 卡了一下B 果断开了C 然后回头发现B怎么过的人这么多 就想到了 最后抄板子过了D 一度以为能直接到蓝 熬了两个小时想看颜色变化 顶不死睡了起床发现只涨了100 而且也没有以前有过的涨很多分的congratulation A 题意&#xff1a…...

公司网站维护如何做分录/外链交易平台

Paxos算法在分布式领域具有非常重要的地位。但是Paxos算法有两个比较明显的缺点&#xff1a;1.难以理解 2.工程实现更难。 网上有很多讲解Paxos算法的文章&#xff0c;但是质量参差不齐。看了很多关于Paxos的资料后发现&#xff0c;学习Paxos最好的资料是论文《Paxos Made Sim…...

成都物流网站建设/重庆seo1

异 常&#xff1a;★★★★异常&#xff1a;就是不正常。程序在运行时出现的不正常情况。其实就是程序中出现的问题。这个问题按照面向对象思想进行描述&#xff0c;并封装成了对象。因为问题的产生有产生的原因、有问题的名称、有问题的描述等多个属性信息存在。当出现多属性信…...

查询网站日流量/产品推广方案模板

1.选择器 2.样式属性 3.布局 4.浏览器原理 以妓会友&#xff0c;评赞走起 转载于:https://www.cnblogs.com/lgyong/p/8617044.html...

java python 做网站/江苏seo和网络推广

3. 运行数据库部署工具 Windows中启动部署工具的三种方式&#xff1a; 选择『开始』|『程序』|『KingbaseES V8 』|『Client Tools』|『数据库部署工具』&#xff0c;启动数据库部署工具。 在Windows资源管理器中进入到数据库安装目录下$InstallDir/DeployTools&#xff0c;双…...