当前位置: 首页 > news >正文

PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)

配置文件示例(config.json)

{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx

相关文章:

PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已…...

蓝桥杯备考:模拟算法之字符串展开

P1098 [NOIP 2007 提高组] 字符串的展开 - 洛谷 | 计算机科学教育新生态 #include <iostream> #include <cctype> #include <algorithm> using namespace std; int p1,p2,p3; string s,ret; void add(char left,char right) {string tmp;for(char ch left1;…...

使用LLaMA-Factory对AI进行认知的微调

使用LLaMA-Factory对AI进行认知的微调 引言1. 安装LLaMA-Factory1.1. 克隆仓库1.2. 创建虚拟环境1.3. 安装LLaMA-Factory1.4. 验证 2. 准备数据2.1. 创建数据集2.2. 更新数据集信息 3. 启动LLaMA-Factory4. 进行微调4.1. 设置模型4.2. 预览数据集4.3. 设置学习率等参数4.4. 预览…...

@Nullable 注解

文章目录 解释 Nullable 注解注解的组成部分&#xff1a;如何使用 Nullable 注解a. 标注方法返回值&#xff1a;b. 标注方法参数&#xff1a;c. 标注字段&#xff1a; 结合其他工具与 Nonnull 配合使用总结 Nullable 注解在 Java 中的使用场景通常与 Nullability&#xff08;空…...

Arduino大师练成手册 -- 控制 AS608 指纹识别模块

要在 Arduino 上控制 AS608 指纹识别模块&#xff0c;你可以按照以下步骤进行&#xff1a; 硬件连接 连接指纹模块&#xff1a;将 AS608 指纹模块与 Arduino 连接。通常&#xff0c;AS608 使用 UART 接口进行通信。你需要将 AS608 的 TX、RX、VCC 和 GND 引脚分别连接到 Ardu…...

Mask R-CNN与YOLOv8的区别

Mask R-CNN与YOLOv8虽然都是深度学习在计算机视觉领域的应用&#xff0c;但它们属于不同类型的视觉框架&#xff0c;各有特点和优势。 以下是关于 Mask R-CNN 和 YOLOv8 的详细对比分析&#xff0c;涵盖核心原理、性能差异、应用场景和选择建议&#xff1a; 1. 核心原理与功能…...

在Ubuntu上使用Docker部署DeepSeek

在Ubuntu上使用Docker部署DeepSeek&#xff0c;并确保其可以访问公网网址进行对话&#xff0c;可以按照以下步骤进行&#xff1a; 一、安装Docker 更新Ubuntu的软件包索引&#xff1a; sudo apt-get update安装必要的软件包&#xff0c;这些软件包允许apt通过HTTPS使用存储库…...

MySQL的覆盖索引

MySQL的覆盖索引 前言 当一个索引包含了查询所需的全部字段时&#xff0c;就可以提高查询效率&#xff0c;这样的索引又被称之为覆盖索引。 以MySQL常见的三种存储引擎为例&#xff1a;InnoDB、MyISAM、Memory&#xff0c;对于覆盖索引提高查询效率的方式均不同&#xff0c;…...

【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】2.12 连续数组:为什么contiguous这么重要?

2.12 连续数组&#xff1a;为什么contiguous这么重要&#xff1f; 目录 #mermaid-svg-wxhozKbHdFIldAkj {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-wxhozKbHdFIldAkj .error-icon{fill:#552222;}#mermaid-svg-…...

在React中使用redux

一、首先安装两个插件 1.Redux Toolkit 2.react-redux 第一步&#xff1a;创建模块counterStore 第二步&#xff1a;在store的入口文件进行子模块的导入组合 第三步&#xff1a;在index.js中进行store的全局注入 第四步&#xff1a;在组件中进行使用 第五步&#xff1a;在组件中…...

lstm预测

import numpy as np import pandas as pd import tensorflow as tf import math import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from keras.layers import LSTM,Activation,Dense,Dropout# 时间序列数据转换为监督学习的格式 def creatXY(d…...

《 C++ 点滴漫谈: 二十五 》空指针,隐秘而危险的杀手:程序崩溃的真凶就在你眼前!

摘要 本博客全面解析了 C 中指针与空值的相关知识&#xff0c;从基础概念到现代 C 的改进展开&#xff0c;涵盖了空指针的定义、表示方式、使用场景以及常见注意事项。同时&#xff0c;深入探讨了 nullptr 的引入及智能指针在提升代码安全性和简化内存管理方面的优势。通过实际…...

【AI】探索自然语言处理(NLP):从基础到前沿技术及代码实践

Hi &#xff01; 云边有个稻草人-CSDN博客 必须有为成功付出代价的决心&#xff0c;然后想办法付出这个代价。 目录 引言 1. 什么是自然语言处理&#xff08;NLP&#xff09;&#xff1f; 2. NLP的基础技术 2.1 词袋模型&#xff08;Bag-of-Words&#xff0c;BoW&#xff…...

2025年Android开发趋势全景解读

文章目录 一、界面开发&#xff1a;从"手写代码"到"智能拼装"1.1 Jetpack Compose实战进化1.2 淘汰XML布局的三大信号 二、AI融合开发&#xff1a;无需炼丹的普惠智能2.1 设备端AI三大杀手级应用2.2 成本对比&#xff1a;设备端VS云端AI 三、跨平台演进&am…...

C#面试常考随笔11:Dictionary<K, V>、Hashtable的内部实现原理是什么?效率如何?

Dictionary<K, V> 底层数据结构&#xff1a;使用哈希表&#xff08;Hash Table&#xff09;&#xff0c;由一个数组和链表&#xff08;或在.NET Core 2.1 及之后版本中&#xff0c;当链表长度达到一定阈值时转换为红黑树&#xff09;组成。数组中的每个元素称为一个桶&a…...

Linux防火墙基础

一、Linux防火墙的状态机制 1.iptables是可以配置有状态的防火墙&#xff0c;其有状态的特点是能够指定并记住发送或者接收信息包所建立的连接状态&#xff0c;其一共有四种状态&#xff0c;分别为established invalid new related。 established:该信息包已建立连接&#x…...

Qt u盘自动升级软件

Qt u盘自动升级软件 Chapter1 Qt u盘自动升级软件u盘自动升级软件思路&#xff1a;step1. 获取U盘 判断U盘名字是否正确&#xff0c; 升级文件是否存在。step2. 升级step3. 升级界面 Chapter2 Qt 嵌入式设备应用程序&#xff0c;通过U盘升级的一种思路Chapter3 在开发板上运行的…...

【Conda 和 虚拟环境详细指南】

Conda 和 虚拟环境的详细指南 什么是 Conda&#xff1f; Conda 是一个开源的包管理和环境管理系统&#xff0c;支持多种编程语言&#xff08;如Python、R等&#xff09;&#xff0c;最初由Continuum Analytics开发。 主要功能&#xff1a; 包管理&#xff1a;安装、更新、删…...

Python递归函数深度解析:从原理到实战

Python递归函数深度解析&#xff1a;从原理到实战 递归是计算机科学中重要的编程范式&#xff0c;也是算法设计的核心思想之一。本文将通过20实战案例&#xff0c;带你深入理解Python递归函数的精髓&#xff0c;掌握递归算法的实现技巧。 一、递归函数核心原理 1.1 递归三要…...

OpenGL学习笔记(五):Textures 纹理

文章目录 纹理坐标纹理环绕方式纹理过滤——处理纹理分辨率低的情况多级渐远纹理Mipmap——处理纹理分辨率高的情况加载与创建纹理 &#xff08; <stb_image.h> &#xff09;生成纹理应用纹理纹理单元练习1练习2练习3练习4 通过上一篇着色部分的学习&#xff0c;我们可以…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)

本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

热烈祝贺埃文科技正式加入可信数据空间发展联盟

2025年4月29日&#xff0c;在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上&#xff0c;可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞&#xff0c;强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

32位寻址与64位寻址

32位寻址与64位寻址 32位寻址是什么&#xff1f; 32位寻址是指计算机的CPU、内存或总线系统使用32位二进制数来标识和访问内存中的存储单元&#xff08;地址&#xff09;&#xff0c;其核心含义与能力如下&#xff1a; 1. 核心定义 地址位宽&#xff1a;CPU或内存控制器用32位…...

js 设置3秒后执行

如何在JavaScript中延迟3秒执行操作 在JavaScript中&#xff0c;要设置一个操作在指定延迟后&#xff08;例如3秒&#xff09;执行&#xff0c;可以使用 setTimeout 函数。setTimeout 是JavaScript的核心计时器方法&#xff0c;它接受两个参数&#xff1a; 要执行的函数&…...

在Zenodo下载文件 用到googlecolab googledrive

方法&#xff1a;Figshare/Zenodo上的数据/文件下载不下来&#xff1f;尝试利用Google Colab &#xff1a;https://zhuanlan.zhihu.com/p/1898503078782674027 参考&#xff1a; 通过Colab&谷歌云下载Figshare数据&#xff0c;超级实用&#xff01;&#xff01;&#xff0…...

自定义线程池1.2

自定义线程池 1.2 1. 简介 上次我们实现了 1.1 版本&#xff0c;将线程池中的线程数量交给使用者决定&#xff0c;并且将线程的创建延迟到任务提交的时候&#xff0c;在本文中我们将对这个版本进行如下的优化&#xff1a; 在新建线程时交给线程一个任务。让线程在某种情况下…...

【系统架构设计师-2025上半年真题】综合知识-参考答案及部分详解(回忆版)

更多内容请见: 备考系统架构设计师-专栏介绍和目录 文章目录 【第1题】【第2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16题】【第17题】【第18题】【第19题】【第20~21题】【第…...

无头浏览器技术:Python爬虫如何精准模拟搜索点击

1. 无头浏览器技术概述 1.1 什么是无头浏览器&#xff1f; 无头浏览器是一种没有图形用户界面&#xff08;GUI&#xff09;的浏览器&#xff0c;它通过程序控制浏览器内核&#xff08;如Chromium、Firefox&#xff09;执行页面加载、JavaScript渲染、表单提交等操作。由于不渲…...