用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错
首先看一下我们的示例代码
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor : zxxDate : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe' # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 连接kafkareadDF = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()# 使用DSL语句etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))etlDF.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "../../datas/kafka_stream") \.start().awaitTermination()# 关闭spark.stop()
运行发现报错
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>readDF = spark.readStream.format("kafka") \File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in loadreturn self._df(self._jreader.load())File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__return_value = get_return_value(File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in decoraise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark
下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar
进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

点进去,下载jar包

再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包





再次运行即可
jar包下载链接
【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库
相关文章:
用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错
首先看一下我们的示例代码 import os from pyspark.sql import SparkSession import pyspark.sql.functions as F """ ------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor : zxxDate : 2024/11/…...
Redis的过期删除策略和内存淘汰机制以及如何保证双写的一致性
Redis的过期删除策略和内存淘汰机制以及如何保证双写的一致性 过期删除策略内存淘汰机制怎么保证redis双写的一致性?更新策略先删除缓存后更新数据库先更新数据库后删除缓存如何选择?如何保证先更新数据库后删除缓存的线程安全问题? 过期删除策略 为了…...
异常处理:import cv2时候报错No module named ‘numpy.core.multiarray‘
问题描述 执行一个将视频变成二值视频输出时候,报错。No module named numpy.core.multiarray,因为应安装过了numpy,所以比较不解。试了卸载numpy和重新安装numpy多次操作,也进行了numpy升级的操作,但是都没有用。 解…...
C++手写PCD文件
前言 一般pcd读写只需要调pcl库接口,直接用pcl的结构写就好了 这里是不依赖pcl库的写入方法 主要是开头写一个header 注意字段大小,类型不要写错 结构定义 写入点需要与header中定义一致 这里用的RoboSense的结构写demo 加了个1字节对齐 stru…...
优选算法(双指针)
1.双指针介绍 双指针算法是一种常用的算法思想,特别适用于处理涉及阵列、链表或字符串等线性数据结构的问题。通过操作两个一个指针来进行导航或操作数据结构,双指针可以最大程度优化解决方案的效率。提高效率并减少空间复杂度。 在Java中使用双指针的核…...
【保姆级】Mac上IDEA卡顿优化
保姆级操作,跟着操作即可~~~ 优化内存 在你的应用程序中,找到你的idea 按住control键+单击 然后点击“显示包内容” </...
python实战案例----使用 PyQt5 构建简单的 HTTP 接口测试工具
python实战案例----使用 PyQt5 构建简单的 HTTP 接口测试工具 文章目录 python实战案例----使用 PyQt5 构建简单的 HTTP 接口测试工具项目背景技术栈用户界面核心功能实现结果展示完整代码总结 在现代软件开发中,测试接口的有效性与响应情况变得尤为重要。本文将指导…...
pytest 接口串联场景
在编写接口测试时,如果有多个接口需要串联在一起调用,并且这些接口共同构成了一个业务场景,通常可以使用以下几种方法来组织代码,使其更具可读性和维护性。以下是一些规范的建议: 1. 使用 pytest 的 fixture 来管理接…...
Springboot项目搭建(2)-用户详细信息查询
1. 提要信息 1.1 java四类八种 在Java中,四类指的是Java中的基本数据类型和引用数据类型: 基本数据类型:Java提供了八种基本数据类型,包括整数型、浮点型、字符型和布尔型。引用数据类型:指向对象的引用,…...
Stable Diffusion的加噪和去噪详解
SD模型原理: Stable Diffusion概要讲解Stable diffusion详细讲解Stable Diffusion的加噪和去噪详解Diffusion ModelStable Diffusion核心网络结构——VAEStable Diffusion核心网络结构——CLIP Text EncoderStable Diffusion核心网络结构——U-NetStable Diffusion中…...
解决 Gradle 报错:`Plugin with id ‘maven‘ not found` 在 SDK 开发中的问题
在 SDK 开发过程中,使用 Gradle 构建和发布 SDK 是常见的任务。在将 SDK 发布为 AAR 或 JAR 包时,你可能会使用 apply plugin: maven 来发布到本地或远程的 Maven 仓库。但是,随着 Gradle 版本的更新,特别是从 Gradle 7 版本开始&…...
EMD-KPCA-Transformer多变量回归预测!分解+降维+预测!多重创新!直接写核心!
EMD-KPCA-Transformer多变量回归预测!分解降维预测!多重创新!直接写核心! 目录 EMD-KPCA-Transformer多变量回归预测!分解降维预测!多重创新!直接写核心!效果一览基本介绍程序设计参…...
前端 px、rpx、em、rem、vh、vw计量单位的区别
目录 一、px 二、rpx 三、em 四、rem 五、vh和vw 六、rpx 和 px之间的区别 七、px 与 rem 的区别 一、px px(像素): 1、相对单位,代表屏幕上的一个基本单位,逻辑像素。 2、不会根据屏幕尺寸或分辨率自动调整大…...
OceanBase数据库产品与工具介绍
OceanBase:蚂蚁集团自主研发的分布式关系数据库 1、什么是 OceanBase? OceanBase 是由蚂蚁集团完全自主研发的企业级分布式关系数据库,始创于 2010 年。它具有以下核心特点: 数据强一致性:在分布式架构下确保数据强…...
学习threejs,对模型多个动画切换展示
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️THREE.AnimationMixer 动画…...
【Bug合集】——Java大小写引起传参失败,获取值为null的解决方案
阿华代码,不是逆风,就是我疯 你们的点赞收藏是我前进最大的动力!! 希望本文内容能够帮助到你!! 目录 一:本文面向的人群 二:错误场景引入 三:正确场景引入 四…...
Python爬虫:如何从1688阿里巴巴获取公司信息
在当今的数字化时代,数据已成为企业决策和市场分析的重要资产。对于市场研究人员和企业分析师来说,能够快速获取和分析大量数据至关重要。阿里巴巴的1688.com作为中国最大的B2B电子商务平台之一,拥有海量的企业档案和产品信息。本文将介绍如何…...
单片机学习笔记 2. LED灯闪烁
更多单片机学习笔记:单片机学习笔记 1. 点亮一个LED灯 目录 0、实现的功能 1、Keil工程 2、代码实现 0、实现的功能 LED灯闪烁 1、Keil工程 闪烁原理:需要进行软件延时达到人眼能分辨出来的效果。常用的延时方法有软件延时和定时器延时。此次先进行软…...
折叠光腔衰荡高反射率测量技术的matlab模拟理论分析
折叠光腔衰荡高反射率测量技术的matlab模拟理论分析 1. 前言2. 光腔模型3. 光腔衰荡过程4. 衰荡时间与反射率的关系5. 测量步骤①. 光腔调节:②. 光腔衰荡测量:③. 计算衰荡时间常数:④. 反射率计算: 6. 实际应用中的调整7. 技术优…...
ubuntu 16.04 中 VS2019 跨平台开发环境配置
su 是 “switch user” 的缩写,表示从当前用户切换到另一个用户。 sudo 是 “superuser do” 的缩写,意为“以超级用户身份执行”。 apt 是 “Advanced Package Tool” 的缩写,Ubuntu中用于软件包管理的命令行工具。 1、为 root 用户设置密码…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

