当前位置: 首页 > news >正文

CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总

需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。

CDH版本为:6.3.2
spark版本为:2.4
python版本:2.7.5
操作系统:CentOS Linux 7
集群方式:yarn-cluster

一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
spark引用第三方包

1.1 转csv的python代码(python脚本)

#-*- coding:utf-8 -*-
import pandas as pd
import os, xlrd ,sysdef xlsx_to_csv_pd(fn):path1="/home/lzl/datax/"+fn+".xlsx"path2="/home/lzl/datax/"+fn+".csv"data_xls = pd.read_excel(path1, index_col=0)data_xls.to_csv(path2, encoding='utf-8')if __name__ == '__main__':fn=sys.argv[1]print(fn)try:xlsx_to_csv_pd(fn)print("转成成功!")except Exception as e:print("转成失败!")

1.2 数据中台上的代码(shell脚本):

#!/bin/bash
#@description:这是一句描述
#@author: admin(admin)
#@email: 
#@date: 2023-09-26 14:44:3# 文件名称
fn="项目投运计划"# xlsx转换成csv格式
ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" # 将文件上传到hfds上
ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
echo "上传成功~!"# 删除csv文件
ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
echo "删除成功~!"

二、pyspark写入hive中
2.1 写入过程中遇到的问题点
2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。

# 循环对每列去掉前后空格,以及删除换行符
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replacefor name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))

2.1.2 个别字段存在科学计数法,需要用cast转换

from pyspark.sql.types import *# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))

去掉换行符另一种方法:换行符问题也可以参照这个

2.2 数据中台代码(pyspark)

# -*- coding:utf-8
# coding=UTF-8# 引入sys,方便输出到控制台时不是乱码
import  sys   
reload(sys)
sys.setdefaultencoding( "utf-8" )# 引入模块
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext 
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import *# 设定资源大小
conf=SparkConf()\.set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\.set("spark.sql.shuffle.partitions", "4")\.set("spark.sql.execution.arrow.enabled", "true")\.set("spark.driver.maxResultSize","6G")\.set('spark.driver.memory','6G')\.set('spark.executor.memory','6G')# 建立SparkSession
spark = SparkSession \.builder\.config(conf=conf)\.master("local[*]")\.appName("dataFrameApply") \.enableHiveSupport() \.getOrCreate()# 读取cvs文件
# 文件名称和文件位置
fp= r"/origin_data/sgd/excel/项目投运计划.csv"
df = spark.read \.option("header", "true") \.option("inferSchema", "true") \.option("multiLine", "true") \.option("delimiter", ",") \.format("csv") \.load(fp)# 查看数据类型
# df.printSchema()# 循环对每列去掉前后空格,以及删除换行符
for name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))df.show(25,truncate = False) # 查看数据,允许输出25行# 设置日志级别 (这两个没用)
sc = spark.sparkContext
sc.setLogLevel("ERROR")# 写入hive中
spark.sql("use sgd_dev")  # 指定数据库# 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
spark.sql("""
CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (project_no                string         ,sale_order_no             string         ,customer_name             string         ,unoperating_amt           decimal(19,2)  , expected_operating_time   string         ,operating_amt             decimal(19,2)  ,  operating_progress_track  string         ,is_Supplied               string         ,operating_submit_time     string         ,Signing_contract_time     string         ,remake                    string  )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'    
""")# 注册临时表
df.createOrReplaceTempView("hdfs_df")
# spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据# 将数据插入hive临时表中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
""")# 将数据导入正式环境的hive中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
""")# 查看导入后的数据
spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)# 删除注册的临时表
spark.sql("""drop table hdfs_df
""")# 删除临时表
spark.sql("""drop table ods_sgd_project_operating_plan_info_tmp
""")

关于spark的更多知识,可以参看Spark SQL总结

相关文章:

CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总

需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。 CDH版本为:6.3.2 spark版本为:2.4 python版本:2.7.5 操作系统:CentOS Linux 7 集群方式:yarn-cluster …...

2120 -- 预警系统题解

Description OiersOiers 国的预警系统是一棵树,树中有 �n 个结点,编号 1∼�1∼n,树中每条边的长度均为 11。预警系统中只有一个预警信号发射站,就是树的根结点 11 号结点,其它 �−1…...

C++入门-day01

一、认识C C融合了三种不同的编程方式 C代表的过程性语言在C基础上添加的类、结构体puls代表的面向对象语言C模板支持泛型编程 C完全兼容C的特性 Tips:侯捷老师提倡的Modren C是指C11、C14、C17和C20这些新标准所引入的一系列新特性和改进。在我们练习的时候也应当去…...

Android开源 Skeleton 骨架屏 V1.3.0

目录 一、简介 二、效果图 三、引用 Skeleton 添加jitpack 仓库 添加依赖: 四、新增 “块”骨架屏 1、bind方法更改和变化: 2、load方法更改和变化: 五、关于上一个版本 一、简介 骨架屏的作用是在网络请求较慢时,提供基础占位&…...

网络资料搬运(2)

(1) Ubuntu 22.04: 为 Ubuntu22.04 系统添加中文输入法 linux解压gz文件的命令 Ubuntu20.04出现Unit ssh.service could not be found 详解使用SSH远程连接Ubuntu服务器系统 Configuring networks(配置网络) (2) Python && OpenCV: …...

SEO搜索引擎

利用搜索引擎的规则提高网站在有关搜索引擎内的自然排名,吸引更多的用户访问网站,提高网站的访问量,提高网站的销售能力和宣传能力,从而提升网站的品牌效应 搜索引擎优化的技术手段 黑帽SEO 通过欺骗技术和滥用搜索算法来推销毫不…...

动态规划-状态机(188. 买卖股票的最佳时机 IV)

状态分类: f[i,j,0]考虑前i只股票,进行了j笔交易,目前未持有股票 所能获得最大利润 f[i,j,1]考虑前i只股票,进行了j笔交易,目前持有股票 所能获得最大利润 状态转移: f[i][j][0] Math.max(f[i-1][j][0],f[…...

银行业务队列简单模拟(队列应用)

设某银行有A、B两个业务窗口,且处理业务的速度不一样,其中A窗口处理速度是B窗口的2倍 —— 即当A窗口每处理完2个顾客时,B窗口处理完1个顾客。给定到达银行的顾客序列,请按业务完成的顺序输出顾客序列。假定不考虑顾客先后到达的时…...

2023/8/8 下午10:42:04 objectarx

2023/8/8 下午10:42:04 objectarx 2023/8/8 下午10:42:16 ObjectARX(AutoCAD Runtime Extension)是用于开发和自定义AutoCAD软件的编程接口。ObjectARX允许开发者使用C++、.NET等编程语言来创建插件、扩展功能和定制化AutoCAD的行为。 通过ObjectARX,开发者可以访问Auto…...

Day-06 基于 Docker安装 Nginx 镜像

1.去官方公有仓库查询nginx镜像 docker search nginx 2.拉取该镜像 docker pull nginx 3. 启动镜像,使用nginx服务,代理本机8080端口(测试是不是好使) docker run -d -p 8080:80 --name nginx-8080 nginx docker ps curl 127.0.0.1:8080...

linux入门---信号的保存和捕捉

目录标题 信号的一些概念信号的保存pending表block表handler表 信号的捕捉内核态和用户态信号的捕捉 信号的一些概念 1.进程会收到各种各样的信号,那么程序对该信号进行实际处理的动作叫做信号的递达。 2.我们之前说过当进程收到信号的时候可能并不会立即处理这个信…...

5.外部中断

中断初始化配置步骤: IO口初始化配置 开启中断总允许EA 打开某个IO口的中断允许 打开IO口的某一位的中断允许 配置该位的中断触发方式 中断函数: #pragma vector PxINT_VECTOR __interrupt void 函数名(void){}#pragma vector PxINT_VECTOR __int…...

Mydb数据库问题

1、请简要介绍一下这个基于 Java 的简易数据库管理系统。它的主要功能是什么? TM(Transaction Manager):事务管理器,用于维护事务的状态,并提供接口供其他模块查询某个事务的状态。DM(Data Man…...

部署并应用ByteTrack实现目标跟踪

尽管YOLOv8已经集成了ByteTrack算法,但在这里我还是想利用ByteTrack官网的代码,自己实现目标跟踪。 要想应用ByteTrack算法,首先就要从ByteTrack官网上下载并安装。虽然官网上介绍得很简单,只需要区区6行代码,但对于国…...

MacOS怎么配置JDK环境变量

1 输入命令看是否配置了JDk 的环境变量:echo $JAVA_HOME 要是什么也没输出 证明是没配置 2 输入命令编辑 sudo vim ~/.bash_profile 然后按 i ,进入编辑模式,粘贴下面的代码,注意:JAVA_HOME后面路径需要改成自己的版…...

Spring Boot 开发16个实用的技巧

当涉及到使用Spring Boot开发应用程序时,以下是16个实用的技巧: 1. **使用Spring Initializr**:Spring Initializr是一个快速创建Spring Boot项目的工具,可以帮助您选择项目依赖和生成项目骨架。 2. **自动配置**:Sp…...

《机器学习实战》学习记录-ch2

PS: 个人笔记&#xff0c;建议不看 原书资料&#xff1a;https://github.com/ageron/handson-ml2 2.1数据获取 import pandas as pd data pd.read_csv(r"C:\Users\cyan\Desktop\AI\ML\handson-ml2\datasets\housing\housing.csv")data.head() data.info()<clas…...

lv7 嵌入式开发-网络编程开发 07 TCP服务器实现

目录 1 函数介绍 1.1 socket函数 与 通信域 1.2 bind函数 与 通信结构体 1.3 listen函数 与 accept函数 2 TCP服务端代码实现 3 TCP客户端代码实现 4 代码优化 5 练习 1 函数介绍 其中read、write、close在IO中已经介绍过&#xff0c;只需了解socket、bind、listen、acc…...

mysql技术文档--阿里巴巴java准则《Mysql数据库建表规约》--结合阿丹理解尝试解读--国庆开卷

阿丹&#xff1a; 国庆快乐呀大家&#xff01; 在项目开始前一个好的设计、一个健康的表关系&#xff0c;不仅会让开发变的有趣舒服&#xff0c;也会在后期的维护和升级迭代中让系统不断的成长。那么今天就认识和解读一下阿里的准则&#xff01;&#xff01; 建表规约 表达是…...

Qt+openCV学习笔记(十六)Qt6.6.0rc+openCV4.8.1+emsdk3.1.37编译静态库

前言&#xff1a; 有段时间没来写文章了&#xff0c;趁编译库的空闲&#xff0c;再写一篇记录文档 WebAssembly的发展逐渐成熟&#xff0c;即便不了解相关技术&#xff0c;web前端也在不经意中使用了相关技术的库&#xff0c;本篇文档记录下如何编译WebAssembly版本的openCV&…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

STM32---外部32.768K晶振(LSE)无法起振问题

晶振是否起振主要就检查两个1、晶振与MCU是否兼容&#xff1b;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容&#xff08;CL&#xff09;与匹配电容&#xff08;CL1、CL2&#xff09;的关系 2. 如何选择 CL1 和 CL…...

在树莓派上添加音频输入设备的几种方法

在树莓派上添加音频输入设备可以通过以下步骤完成&#xff0c;具体方法取决于设备类型&#xff08;如USB麦克风、3.5mm接口麦克风或HDMI音频输入&#xff09;。以下是详细指南&#xff1a; 1. 连接音频输入设备 USB麦克风/声卡&#xff1a;直接插入树莓派的USB接口。3.5mm麦克…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG

TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码&#xff1a;HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...

Java后端检查空条件查询

通过抛出运行异常&#xff1a;throw new RuntimeException("请输入查询条件&#xff01;");BranchWarehouseServiceImpl.java // 查询试剂交易&#xff08;入库/出库&#xff09;记录Overridepublic List<BranchWarehouseTransactions> queryForReagent(Branch…...