CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总
需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。
CDH版本为:6.3.2
spark版本为:2.4
python版本:2.7.5
操作系统:CentOS Linux 7
集群方式:yarn-cluster
一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
spark引用第三方包
1.1 转csv的python代码(python脚本)
#-*- coding:utf-8 -*-
import pandas as pd
import os, xlrd ,sysdef xlsx_to_csv_pd(fn):path1="/home/lzl/datax/"+fn+".xlsx"path2="/home/lzl/datax/"+fn+".csv"data_xls = pd.read_excel(path1, index_col=0)data_xls.to_csv(path2, encoding='utf-8')if __name__ == '__main__':fn=sys.argv[1]print(fn)try:xlsx_to_csv_pd(fn)print("转成成功!")except Exception as e:print("转成失败!")
1.2 数据中台上的代码(shell脚本):
#!/bin/bash
#@description:这是一句描述
#@author: admin(admin)
#@email:
#@date: 2023-09-26 14:44:3# 文件名称
fn="项目投运计划"# xlsx转换成csv格式
ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" # 将文件上传到hfds上
ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
echo "上传成功~!"# 删除csv文件
ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
echo "删除成功~!"
二、pyspark写入hive中
2.1 写入过程中遇到的问题点
2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。
# 循环对每列去掉前后空格,以及删除换行符
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replacefor name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))
2.1.2 个别字段存在科学计数法,需要用cast转换
from pyspark.sql.types import *# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))
去掉换行符另一种方法:换行符问题也可以参照这个
2.2 数据中台代码(pyspark)
# -*- coding:utf-8
# coding=UTF-8# 引入sys,方便输出到控制台时不是乱码
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )# 引入模块
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import *# 设定资源大小
conf=SparkConf()\.set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\.set("spark.sql.shuffle.partitions", "4")\.set("spark.sql.execution.arrow.enabled", "true")\.set("spark.driver.maxResultSize","6G")\.set('spark.driver.memory','6G')\.set('spark.executor.memory','6G')# 建立SparkSession
spark = SparkSession \.builder\.config(conf=conf)\.master("local[*]")\.appName("dataFrameApply") \.enableHiveSupport() \.getOrCreate()# 读取cvs文件
# 文件名称和文件位置
fp= r"/origin_data/sgd/excel/项目投运计划.csv"
df = spark.read \.option("header", "true") \.option("inferSchema", "true") \.option("multiLine", "true") \.option("delimiter", ",") \.format("csv") \.load(fp)# 查看数据类型
# df.printSchema()# 循环对每列去掉前后空格,以及删除换行符
for name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))df.show(25,truncate = False) # 查看数据,允许输出25行# 设置日志级别 (这两个没用)
sc = spark.sparkContext
sc.setLogLevel("ERROR")# 写入hive中
spark.sql("use sgd_dev") # 指定数据库# 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
spark.sql("""
CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (project_no string ,sale_order_no string ,customer_name string ,unoperating_amt decimal(19,2) , expected_operating_time string ,operating_amt decimal(19,2) , operating_progress_track string ,is_Supplied string ,operating_submit_time string ,Signing_contract_time string ,remake string )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
""")# 注册临时表
df.createOrReplaceTempView("hdfs_df")
# spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据# 将数据插入hive临时表中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
""")# 将数据导入正式环境的hive中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
""")# 查看导入后的数据
spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)# 删除注册的临时表
spark.sql("""drop table hdfs_df
""")# 删除临时表
spark.sql("""drop table ods_sgd_project_operating_plan_info_tmp
""")
关于spark的更多知识,可以参看Spark SQL总结
相关文章:
CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总
需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。 CDH版本为:6.3.2 spark版本为:2.4 python版本:2.7.5 操作系统:CentOS Linux 7 集群方式:yarn-cluster …...
2120 -- 预警系统题解
Description OiersOiers 国的预警系统是一棵树,树中有 �n 个结点,编号 1∼�1∼n,树中每条边的长度均为 11。预警系统中只有一个预警信号发射站,就是树的根结点 11 号结点,其它 �−1…...
C++入门-day01
一、认识C C融合了三种不同的编程方式 C代表的过程性语言在C基础上添加的类、结构体puls代表的面向对象语言C模板支持泛型编程 C完全兼容C的特性 Tips:侯捷老师提倡的Modren C是指C11、C14、C17和C20这些新标准所引入的一系列新特性和改进。在我们练习的时候也应当去…...
Android开源 Skeleton 骨架屏 V1.3.0
目录 一、简介 二、效果图 三、引用 Skeleton 添加jitpack 仓库 添加依赖: 四、新增 “块”骨架屏 1、bind方法更改和变化: 2、load方法更改和变化: 五、关于上一个版本 一、简介 骨架屏的作用是在网络请求较慢时,提供基础占位&…...
网络资料搬运(2)
(1) Ubuntu 22.04: 为 Ubuntu22.04 系统添加中文输入法 linux解压gz文件的命令 Ubuntu20.04出现Unit ssh.service could not be found 详解使用SSH远程连接Ubuntu服务器系统 Configuring networks(配置网络) (2) Python && OpenCV: …...
SEO搜索引擎
利用搜索引擎的规则提高网站在有关搜索引擎内的自然排名,吸引更多的用户访问网站,提高网站的访问量,提高网站的销售能力和宣传能力,从而提升网站的品牌效应 搜索引擎优化的技术手段 黑帽SEO 通过欺骗技术和滥用搜索算法来推销毫不…...
动态规划-状态机(188. 买卖股票的最佳时机 IV)
状态分类: f[i,j,0]考虑前i只股票,进行了j笔交易,目前未持有股票 所能获得最大利润 f[i,j,1]考虑前i只股票,进行了j笔交易,目前持有股票 所能获得最大利润 状态转移: f[i][j][0] Math.max(f[i-1][j][0],f[…...
银行业务队列简单模拟(队列应用)
设某银行有A、B两个业务窗口,且处理业务的速度不一样,其中A窗口处理速度是B窗口的2倍 —— 即当A窗口每处理完2个顾客时,B窗口处理完1个顾客。给定到达银行的顾客序列,请按业务完成的顺序输出顾客序列。假定不考虑顾客先后到达的时…...
2023/8/8 下午10:42:04 objectarx
2023/8/8 下午10:42:04 objectarx 2023/8/8 下午10:42:16 ObjectARX(AutoCAD Runtime Extension)是用于开发和自定义AutoCAD软件的编程接口。ObjectARX允许开发者使用C++、.NET等编程语言来创建插件、扩展功能和定制化AutoCAD的行为。 通过ObjectARX,开发者可以访问Auto…...
Day-06 基于 Docker安装 Nginx 镜像
1.去官方公有仓库查询nginx镜像 docker search nginx 2.拉取该镜像 docker pull nginx 3. 启动镜像,使用nginx服务,代理本机8080端口(测试是不是好使) docker run -d -p 8080:80 --name nginx-8080 nginx docker ps curl 127.0.0.1:8080...
linux入门---信号的保存和捕捉
目录标题 信号的一些概念信号的保存pending表block表handler表 信号的捕捉内核态和用户态信号的捕捉 信号的一些概念 1.进程会收到各种各样的信号,那么程序对该信号进行实际处理的动作叫做信号的递达。 2.我们之前说过当进程收到信号的时候可能并不会立即处理这个信…...
5.外部中断
中断初始化配置步骤: IO口初始化配置 开启中断总允许EA 打开某个IO口的中断允许 打开IO口的某一位的中断允许 配置该位的中断触发方式 中断函数: #pragma vector PxINT_VECTOR __interrupt void 函数名(void){}#pragma vector PxINT_VECTOR __int…...
Mydb数据库问题
1、请简要介绍一下这个基于 Java 的简易数据库管理系统。它的主要功能是什么? TM(Transaction Manager):事务管理器,用于维护事务的状态,并提供接口供其他模块查询某个事务的状态。DM(Data Man…...
部署并应用ByteTrack实现目标跟踪
尽管YOLOv8已经集成了ByteTrack算法,但在这里我还是想利用ByteTrack官网的代码,自己实现目标跟踪。 要想应用ByteTrack算法,首先就要从ByteTrack官网上下载并安装。虽然官网上介绍得很简单,只需要区区6行代码,但对于国…...
MacOS怎么配置JDK环境变量
1 输入命令看是否配置了JDk 的环境变量:echo $JAVA_HOME 要是什么也没输出 证明是没配置 2 输入命令编辑 sudo vim ~/.bash_profile 然后按 i ,进入编辑模式,粘贴下面的代码,注意:JAVA_HOME后面路径需要改成自己的版…...
Spring Boot 开发16个实用的技巧
当涉及到使用Spring Boot开发应用程序时,以下是16个实用的技巧: 1. **使用Spring Initializr**:Spring Initializr是一个快速创建Spring Boot项目的工具,可以帮助您选择项目依赖和生成项目骨架。 2. **自动配置**:Sp…...
《机器学习实战》学习记录-ch2
PS: 个人笔记,建议不看 原书资料:https://github.com/ageron/handson-ml2 2.1数据获取 import pandas as pd data pd.read_csv(r"C:\Users\cyan\Desktop\AI\ML\handson-ml2\datasets\housing\housing.csv")data.head() data.info()<clas…...
lv7 嵌入式开发-网络编程开发 07 TCP服务器实现
目录 1 函数介绍 1.1 socket函数 与 通信域 1.2 bind函数 与 通信结构体 1.3 listen函数 与 accept函数 2 TCP服务端代码实现 3 TCP客户端代码实现 4 代码优化 5 练习 1 函数介绍 其中read、write、close在IO中已经介绍过,只需了解socket、bind、listen、acc…...
mysql技术文档--阿里巴巴java准则《Mysql数据库建表规约》--结合阿丹理解尝试解读--国庆开卷
阿丹: 国庆快乐呀大家! 在项目开始前一个好的设计、一个健康的表关系,不仅会让开发变的有趣舒服,也会在后期的维护和升级迭代中让系统不断的成长。那么今天就认识和解读一下阿里的准则!! 建表规约 表达是…...
Qt+openCV学习笔记(十六)Qt6.6.0rc+openCV4.8.1+emsdk3.1.37编译静态库
前言: 有段时间没来写文章了,趁编译库的空闲,再写一篇记录文档 WebAssembly的发展逐渐成熟,即便不了解相关技术,web前端也在不经意中使用了相关技术的库,本篇文档记录下如何编译WebAssembly版本的openCV&…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
在树莓派上添加音频输入设备的几种方法
在树莓派上添加音频输入设备可以通过以下步骤完成,具体方法取决于设备类型(如USB麦克风、3.5mm接口麦克风或HDMI音频输入)。以下是详细指南: 1. 连接音频输入设备 USB麦克风/声卡:直接插入树莓派的USB接口。3.5mm麦克…...
相关类相关的可视化图像总结
目录 一、散点图 二、气泡图 三、相关图 四、热力图 五、二维密度图 六、多模态二维密度图 七、雷达图 八、桑基图 九、总结 一、散点图 特点 通过点的位置展示两个连续变量之间的关系,可直观判断线性相关、非线性相关或无相关关系,点的分布密…...
