当前位置: 首页 > news >正文

【spark】spark structrued streaming读写kafka 使用kerberos认证

spark版本:2.4.0
官网
Spark --files使用总结
Spark --files理解

一、编写jar


import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.{Logger, LoggerFactory}import scala.collection.mutable/*** 在集群使用以下脚本发送和接受消息!* kafka-console-producer.sh --bootstrap-server xxx.01.com:6667,xxx.02.com,xxx.03.com:6667 --topic tp-read  --producer-property security.protocol=SASL_PLAINTEXT* kafka-console-consumer.sh --bootstrap-server xxx.01.com:6667,xxx.02.com,xxx.03.com:6667 --topic tp-write --from-beginning  --consumer-property security.protocol=SASL_PLAINTEXT --group tester*/
object SparkKafka {val LOG = LoggerFactory.getLogger(classOf[SparkSession])val bootstrapServers = "xxx.01.com:6667,xxx.02.com,xxx.03.com:6667"val readTopic = "tp-read"val writeTopic = "tp-write"def main(args: Array[String]): Unit = {val SparkKafkaProps: mutable.Map[String, String] = mutable.Map.empty// Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port").// For possible kafka parameters, see Kafka consumer config docs for parameters related to reading data, and Kafka producer config docs for parameters related to writing data.SparkKafkaProps.put("kafka." + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")SparkKafkaProps.put("kafka." + SaslConfigs.SASL_MECHANISM, "GSSAPI")SparkKafkaProps.put("kafka." + SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")val spark = SparkSession.builder().appName("spark-jar-job").config("spark.sql.streaming.checkpointLocation", "hdfs:///tmp/spark/chkp/test-job")  // 必须设置!.enableHiveSupport().getOrCreate()// no useful! this api is for spark DStreams not for Structured Streaming!!!
//    val sc=spark.sparkContext
//    val ssc=new StreamingContext(sc,Seconds(5))spark.sparkContext.setLogLevel("debug")val read = spark.readStream.format("kafka").options(SparkKafkaProps).option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", readTopic).option("group.id", "sr").option("includeHeaders", "false").load()val write=read.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").outputMode("update").options(SparkKafkaProps).option("kafka.bootstrap.servers", bootstrapServers).option("topic", writeTopic).trigger(Trigger.ProcessingTime("1 second")) // only change in query.start()write.awaitTermination()LOG.warn("job has started!!!")val secConfig = System.getProperty("java.security.auth.login.config")LOG.warn(s"Got `java.security.auth.login.config` as: ${secConfig}")// You can start any number of queries in a single SparkSession.// They will all be running concurrently sharing the cluster resources.// You can use sparkSession.streams() to get the StreamingQueryManager// block until any one of them terminates 任何一个流结束就会停止!// spark.streams.awaitAnyTermination() // 如果定义多流则使用此项。}
}

打jar。提交yarn运行

二、启动任务on yarn:

相对路径使用文件名直接表示,绝对路径使用/xxx/文件名表示
spark 任务启动后会有driver和executor。
client模式下的driver就是本机的黑窗口,cluster模式下driver就是就到某一节点机器。
所有模式下executor都是集群的各个节点机器。

# client模式
# 此模式下driver-java-options内的路径就是本机路径,jaas-abs.conf文件内的keytab路径也是本机路径(绝对路径)
# spark.executor.extraJavaOptions 因为是集群的机器,在--files上传到集群各个机器后需要使用相对路径,jaas-rel.conf文件内的keytab路径也是相对路径
# spark.yarn.keytab 文件会和 --files /xxx/user.keytab 文件在上传至集群其他机器后会在同一个文件夹内造成冲突(yarn.Client: Same name resource file:///xxx/user.keytab added multiple times to distributed cache),其实文件是一样的,user-bak.keytab只是user.keytab的一个副本。
# driver-java-options 和 spark.driver.extraJavaOptions 是一样的,但更推荐在client模式下使用driver-java-options
spark-submit \--class com.xxx.SparkKafka \--verbose \--master yarn \--deploy-mode client \--executor-memory 10G \--total-executor-cores 6 \--conf spark.yarn.principal=user@XXXXX.COM  \--conf spark.yarn.keytab=/xxx/user-bak.keytab \--driver-java-options "-Djava.security.auth.login.config=/xxx/jaas-abs.conf" \--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas-rel.conf" \--files /xxx/jaas-rel.conf,/xxx/user.keytab \
/xxx/spark-kafka-example-0.1.jar# cluster模式
# 此模式下spark.driver.extraJavaOptions和spark.executor.extraJavaOptions 因为driver运行在是集群的机器,不一定是本机。在--files上传到集群各个机器后需要使用相对路径,jaas-rel.conf文件就是--files上传的,其的keytab路径也是相对路径
# spark.yarn.keytab 文件会和 --files /xxx/user.keytab 文件在上传至集群其他机器后会在同一个文件夹内造成冲突(yarn.Client: Same name resource file:///xxx/user.keytab added multiple times to distributed cache),其实文件是一样的,user-bak.keytab只是user.keytab的一个副本。
spark-submit \--class com.xxx.SparkKafka \--verbose \--master yarn \--deploy-mode cluster \--executor-memory 10G \--total-executor-cores 6 \--conf spark.yarn.principal=user@XXXXX.COM  \--conf spark.yarn.keytab=/xxx/user-bak.keytab \--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas-rel.conf" \--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas-rel.conf" \--files /xxx/jaas-rel.conf,/xxx/user.keytab \
/xxx/spark-kafka-example-0.1.jar

三、spark shell运行

spark shell其driver运行在本地


bin/spark-shell \
--conf spark.yarn.principal=user@XXXXX.COM  \
--conf spark.yarn.keytab=/xxx/user.keytab \
--driver-java-options "-Djava.security.auth.login.config=/xxx/jaas-abs.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas-rel.conf \
--files /xxx/jaas-rel.conf,/xxx/jaas-abs.conf,/xxx/user.keytab \
-Dsun.security.krb5.debug=true \
-Dsun.security.spnego.debug=true \
--verbose
// 测试是否存在user.keytab文件val secConfig=System.getProperty("java.security.auth.login.config") // 是指driver端的。println(secConfig)import java.io.Fileval shortKeyPath=new File("user.keytab")shortKeyPath.exists()val longKeyPath=new File("/xxx/user.keytab")longKeyPath.exists()// 测试kafka-batch: spark.read
import scala.collection.mutable
val props:mutable.Map[String,String]=mutable.Map.empty
props.put("kafka.bootstrap.servers","xxx.01.com:6667,xxx.02.com,xxx.03.com:6667")
props.put("subscribe","tp-read")
props.put("kafka.security.protocol","SASL_PLAINTEXT")
props.put("kafka.sasl.mechanism","GSSAPI")
props.put("kafka.sasl.kerberos.service.name","kafka")
spark.read.format("kafka").options(props.toMap).load().show// 测试kafka-stream: spark.readStream
import scala.collection.mutable
import org.apache.spark.sql.streaming.Triggerval props:mutable.Map[String,String]=mutable.Map.empty
props.put("kafka.bootstrap.servers","xxx.01.com:6667,xxx.02.com,xxx.03.com:6667")
props.put("subscribe","tp-read")
props.put("kafka.security.protocol","SASL_PLAINTEXT")
props.put("kafka.sasl.mechanism","GSSAPI")
props.put("kafka.sasl.kerberos.service.name","kafka")
val rd=spark.readStream.format("kafka").options(props.toMap).load()
rd.writeStream.outputMode("update").format("console").trigger(Trigger.ProcessingTime("3 seconds")).start()  // 可以没有 .awaitAnyTermination()

四、jaas.conf文件内容:

KafkaClient 是kafka客户端使用的,Client 是zookeeper客户端使用的
jaas-abs.conf文件内容

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falsekeyTab="/xxx/user.keytab"principal="user@XXXXX.COM";
};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="/xxx/user.keytab"principal="user@XXXXX.COM";
}

jaas-rel.conf文件内容
keyTab="user.keytab"keyTab="./user.keytab"完全一样。

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=trueuseTicketCache=falsekeyTab="user.keytab"principal="user@XXXXX.COM";
};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truestoreKey=truekeyTab="user.keytab"principal="user@XXXXX.COM";
}

五、其他

如下:报错大多是因为kafka没连接上,也多因为kerberos认证没通过。请仔细检查jaas-*.conf先关的内容。kafka使用jaas.conf配置kerberos。

[Kafka Offset Reader] clients.NetworkClient: [Consumer clientId=consumer-2, groupId=spark-kafka-relation-589f753a-51b2-498d-971e-e513b067ca87-driver-0] Bootstrap broker xxx.com:6667 (id: -3 rack: null) disconnected
2024-10-24 13:17:34,787 WARN  [Kafka Offset Reader] clients.NetworkClient: [Consumer clientId=consumer-2, groupId=spark-kafka-relation-589f753a-51b2-498d-971e-e513b067ca87-driver-0] Bootstrap broker broker xxx.com:6667 (id: -1 rack: null) disconnected

相关文章:

【spark】spark structrued streaming读写kafka 使用kerberos认证

spark版本:2.4.0 官网 Spark --files使用总结 Spark --files理解 一、编写jar import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.T…...

【脚本】B站视频AB复读

控制台输入如下代码,回车 const video document.getElementsByTagName("video")[0];//获取bpx-player-control-bottom-center容器,更改其布局方式const div document.getElementsByClassName("bpx-player-control-bottom-center")[0];div.sty…...

leetcode - 257. 二叉树的所有路径

257. 二叉树的所有路径 题目 解决 做法一:深度优先搜索 回溯 深度优先搜索(Depth-First Search, DFS)是一种用于遍历或搜索树或图的算法。这种搜索方式会尽可能深地探索每个分支,直到无法继续深入为止,然后回溯到上…...

python 相关

python 1. pip 安装某个版本范围的软件 pip install “elasticsearch>6,<7” pip install elasticsearchX.Y.Z 2. pip 查看包版本 pip show pandas 3. pip 下载whl包 https://tendcode.com/subject/article/pip-offline-download/ (更多平台与架构)pip downl…...

静态分析2:控制流分析(构建CFG)

参考&#xff1a;南京大学《软件分析》课程2 1、控制流分析 控制流分析实际上指的是构建控制流图&#xff08;Control Flow Graph&#xff0c;CFG&#xff09;CFG是静态分析的基础数据结构CFG的节点可以是单个指令、基本块&#xff08;Basic Block&#xff0c;BB&#xff09;…...

Linux 应用领域

目录 服务器领域 桌面环境 软件开发 数据分析与科学计算 嵌入式系统 虚拟化和云计算 人工智能与机器学习 物联网&#xff08;IoT&#xff09; 网络安全 服务器领域 Linux在服务器领域的应用是其最为广泛和成熟的领域之一。由于其开源、稳定、高效和安全的特性&#xf…...

FPM383C指纹模块超详解 附驱动

0. 本人使用环境介绍 0.1 硬件环境 ESP32-C3FPM383C指纹模块一根破旧的usb数据线 0.2 软件环境 Clion2024.2.2ESP-IDF5.3.1Clion插件ESP-IDF 1. 硬件接口说明 1.1 UART UART 缺省波特率为 57.6Kbps&#xff0c;数据格式&#xff1a;8 位数据位&#xff0c;2 位停止位&am…...

若依框架篇-若依集成 X-File-Storage 框架(实现图片上传阿里云 OSS 服务器)、EasyExcel 框架(实现 Excel 数据批量导入功能)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 实现使用 Excel 文件批量导入 1.1 导入功能的前端具体实现 1.2 导入功能的后端具体实现 1.3 使用 EasyExcel 框架实现 Excel 读、写功能 1.4 将 Easy Excel 集成到…...

.rmallox勒索病毒肆虐:如何有效防范与应对

引言 在当今这个数字化时代&#xff0c;网络安全已成为一个不可忽视的重要议题。随着信息技术的飞速发展&#xff0c;网络空间的安全威胁也日益复杂多变。病毒、木马、勒索软件等恶意程序层出不穷&#xff0c;比如.rmallox勒索病毒。它们利用先进的技术手段&#xff0c;如代码…...

人工智能能否影响未来生活:一场深刻的社会与技术变革

随着人工智能技术的不断发展&#xff0c;我们已经目睹了它在各行各业掀起的巨大变革浪潮。从医疗行业的病例诊断、药物研发&#xff0c;到企业运营的数据分析、智能决策&#xff0c;再到日常生活中的智能语音助手、自动驾驶汽车、智能家居&#xff0c;人工智能正以前所未有的速…...

cmu 15-445学习笔记-3 存储引擎

03 Database Storage-Part Ⅰ 数据库存储上半部分 数据库分层划分结构图&#xff1a; Disk Manager&#xff1a;存储引擎&#xff0c;管理磁盘上的文件Bufferpool Manager&#xff1a;管理内存的缓存池Access Methods&#xff1a;访问方法Operator Execution&#xff1a;执行…...

[linux]和windows间传输命令scp 执行WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!错误解决

[linux]和windows间传输命令scp 执行WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!错误解决. 现象&#xff1a; 原因&#xff1a; 接收方服务器系统做了某些更改&#xff0c;导致登录时会报错。主要因为接收方服务器对登录过它的主机都会把该主机登录标识证书记录下来&a…...

C++ | Leetcode C++题解之第518题零钱兑换II

题目&#xff1a; 题解&#xff1a; class Solution { public:int change(int amount, vector<int>& coins) {vector<int> dp(amount 1), valid(amount 1);dp[0] 1;valid[0] 1;for (int& coin : coins) {for (int i coin; i < amount; i) {valid[…...

高并发-负载均衡

负载均衡在微服务架构中是一个重要的组成部分&#xff0c;旨在优化资源利用、提高服务可用性和确保系统的高可扩展性。以下是对微服务中的负载均衡的详细介绍&#xff0c;包括其原理、类型、实现方式以及相关的技术。 一、负载均衡的原理 负载均衡的基本原理是将进入系统的请…...

Docker 常用命令全解析:提升对雷池社区版的使用经验

Docker 常用命令解析 Docker 是一个开源的容器化平台&#xff0c;允许开发者将应用及其依赖打包到一个可移植的容器中。以下是一些常用的 Docker 命令及其解析&#xff0c;帮助您更好地使用 Docker。 1. Docker 基础命令 查看 Docker 版本 docker --version查看 Docker 运行…...

基于 Postman 和 Elasticsearch 测试乐观锁的操作流程

鱼说&#xff0c;你看不到我眼中的泪&#xff0c;因为我在水中。水说&#xff0c;我能感觉到你的泪&#xff0c;因为你在我心中。 -村上春树 在分布式系统中&#xff0c;多个并发操作对同一资源的修改可能导致数据不一致。为了解决这种问题&#xff0c;Elasticsearch 提供了乐观…...

如何从PPT中导出600dpi的高清图

Step1. 修改PPT注册表 具体过程&#xff0c;参见如下链接&#xff1a;修改ppt注册表&#xff0c;导出高分辨率图片 Step2. 打开PPT&#xff0c;找到自己想要保存的图&#xff0c;选中图像&#xff0c;查看图像尺寸并记录 Step3. 重新新建一个PPT&#xff0c;并根据记录的图片…...

day01-ElasticStack+Kibana

ElasticStack-数据库 #官网https://www.elastic.co/cn/ #下载7.17版环境准备 主机名IP系统版本VMware版本elk110.0.0.91Ubuntu 22.04.417.5.1elk210.0.0.92Ubuntu 22.04.417.5.1elk310.0.0.93Ubuntu 22.04.417.5.1 单机部署ES 1.下载ES软件包&#xff0c;放到/usr/local下 […...

HTML 约束验证

HTML5引入了表单相关的一些新机制&#xff1a;它为<input>元素和约束验证增加了一些新的语义类型&#xff0c;使得客户端检查表单内容变得容易。基本上&#xff0c;通过设置一些新的属性&#xff0c;常用的约束条件可以无需 JavaScript 代码而检测到&#xff1b;对于更复…...

vue3项目开发一些必备的内容,该安装安装,该创建创建

重新整理了一下项目开发必备的一些操作&#xff0c;以后直接复制黏贴运行&#xff0c;随着项目开发&#xff0c;后期会陆续补充常用插件或组件等 如果你是还没有安装过的新人&#xff0c;建议从《通过安装Element UI/Plus来学习vue之如何创建项目、搭建vue脚手架、npm下载、封装…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险

C#入门系列【类的基本概念】&#xff1a;开启编程世界的奇妙冒险 嘿&#xff0c;各位编程小白探险家&#xff01;欢迎来到 C# 的奇幻大陆&#xff01;今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类&#xff01;别害怕&#xff0c;跟着我&#xff0c;保准让你轻松搞…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

Linux部署私有文件管理系统MinIO

最近需要用到一个文件管理服务&#xff0c;但是又不想花钱&#xff0c;所以就想着自己搭建一个&#xff0c;刚好我们用的一个开源框架已经集成了MinIO&#xff0c;所以就选了这个 我这边对文件服务性能要求不是太高&#xff0c;单机版就可以 安装非常简单&#xff0c;几个命令就…...

云安全与网络安全:核心区别与协同作用解析

在数字化转型的浪潮中&#xff0c;云安全与网络安全作为信息安全的两大支柱&#xff0c;常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异&#xff0c;并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全&#xff1a;聚焦于保…...