当前位置: 首页 > news >正文

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记owner(str):任务的所有者,建议使用linux用户名email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'email':'kettle_test1@163.com', #pwd:kettle123456'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_cmd',default_args=default_args,schedule_interval=timedelta(minutes=1)
)t1=BashOperator(task_id='print_date',bash_command='date',dag = dag
)t2=BashOperator(task_id='print_helloworld',bash_command='echo "hello world!"',dag=dag
)t3=BashOperator(task_id='tempplated',bash_command="""{% for i in range(5) %}echo "{{ ds }}"echo "{{ params.name}}"echo "{{ params.age}}"{% endfor %}""",params={'name':'wangwu','age':10},dag=dag
)t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


相关文章:

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹…...

IJ中配置TortoiseSVN插件:

文章目录 一、报错情况:二、配置TortoiseSVN插件: 一、报错情况: 由于公司电脑加密,TortoiseSVN菜单没有提交和更新按钮,所以需要使用IJ的SVN进行代码相关操作 二、配置TortoiseSVN插件: 需要设置一个svn.…...

个人实现在线支付,一种另类的在线支付解决方案

Hi, I’m Shendi 个人实现在线支付,一种另类的在线支付解决方案 个人实现在线支付的方式 对于在线支付,最多的是接入微信与支付宝。但都需要营业执照,不适用于个人。 当然,可以去办理一个个体工商户,但对我这种小额收…...

浅谈智能安全配电装置应用在银行配电系统中

【摘要】银行是国家重点安全保护部分,关系到社会资金的稳定,也是消防重点单位。消防安全是银行工作的重要组成部分。在银行配电系统中应用智能安全配电装置,可以提高银行的智能控制水平,有效预防电气火灾。 【关键词】银行&#…...

macOS下如何使用Flask进行开发

👨🏻‍💻 热爱摄影的程序员 👨🏻‍🎨 喜欢编码的设计师 🧕🏻 擅长设计的剪辑师 🧑🏻‍🏫 一位高冷无情的编码爱好者 大家好,我是全栈工…...

记一次服务器配置文件获取OSS

一、漏洞原因 由于网站登录口未做双因子校验,导致可以通过暴力破解获取管理员账号,成功进入系统;未对上传的格式和内容进行校验,可以任意文件上传获取服务器权限;由于服务器上配置信息,可以进一步获取数据库权限和OSS管理权限。二、漏洞成果 弱口令获取网站的管理员权限通…...

合众汽车选用风河Wind River Linux系统

导读合众新能源汽车股份有限公司近日选择了Wind River Linux 用于开发合众智能安全汽车平台。 合众智能安全汽车平台(Hozon Automo-tive Intelligent Security Vehicle Plat-form)是一个面向高性能服务网关及车辆控制调度的硬件与软件框架,将于2024年中开始投入量产…...

PTA平台-2023年软件设计综合实践_5(指针及引用)

第一题 6-1 调和平均 - C/C 指针及引用 函数hmean()用于计算整数x和y的调和平均数,结果应保存在指针r所指向的浮点数对象中。当xy等于0时,函数返回0表示无法计算,否则返回1。数学上,两个数x和y的调和平均数 z 2xy/(xy) 。 直接…...

智慧卫生间

智慧卫生间 获取ApiKey/SecretKey获取Access_token获取卫生间实时数据返回说明 获取ApiKey/SecretKey ApiKey/SecretKey采用 线下获取的方式,手动分配。 获取Access_token 向授权服务地址http://xxxxxx:12345/token(示意)发送post请求,并在data中带上…...

Cadence virtuoso drc lvs pex 无法输入

问题描述:在PEX中的PEX options中 Ground node name 无法输入内容。 在save runset的时候也出现无法输入名称的情况 解决办法: copy一个.bashrc文件到自己的工作目录下 打开.bashrc文件 在.bashrc中加一行代码:unset XMODIFIERS 在终端sour…...

反序列化漏洞(2), 分析调用链, 编写POC

反序列化漏洞(2), 反序列化调用链分析 一, 编写php漏洞脚本 http://192.168.112.200/security/unserial/ustest.php <?php class Tiger{public $string;protected $var;public function __toString(){return $this->string;}public function boss($value){eval($valu…...

Pytorch reshape用法

这里-1是指未设定行数&#xff0c;程序自动计算&#xff0c;所以这里-1表示任一正整数 example reshape(-1, 1) 表示&#xff08;任意行&#xff0c;1列&#xff09;&#xff0c;4行4列变为16行1列reshape(1, -1) 表示&#xff08;1行&#xff0c;任意列&#xff09;&#xf…...

Latex 辅助写作工具

语法修改 https://app.grammarly.com/润色 文心一言、ChatGPTlatex 编辑公式 https://www.latexlive.comlatex 编辑表格 https://www.tablesgenerator.comlatex 图片转公式 https://www.tablesgenerator.com...

frp新版本frp_0.52.3设置

服务端 frps.toml cp /root/frp/frpc /usr/bin #bindPort 7000 bindPort 7000# 如果指定了“oidc”&#xff0c;将使用 OIDC 设置颁发 OIDC&#xff08;开放 ID 连接&#xff09;令牌。默认情况下&#xff0c;此值为“令牌”。auth.method “token” auth.method "…...

100G.的DDoS高防够用吗?

很多人以为100G的DDoS防御已经足够了&#xff0c;但殊不知DDoS攻击大小也是需要分行业类型的&#xff0c;比如游戏、金融、影视、电商甚至ZF或者行业龙头等等行业类型&#xff0c;都是大型DDoS攻击的重灾区&#xff0c;别说100G防御&#xff0c;就算300G防御服务器也不一定够用…...

【django+vue】项目搭建、解决跨域访问

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 【djangovue】项目搭建、解决跨域访问 djangovue介绍vue环境准备vue框架搭建1.创建vue项目2.配置vue项目3.进入项目目录4.运行项目5.项目文件讲解6.vue的扩展库或者插件 django环境准备django框架搭建1.使用conda…...

【数据库】数据库连接池导致系统吞吐量上不去-复盘

在实际的开发中&#xff0c;我们会使用数据库连接池&#xff0c;但是如果不能很好的理解其中的含义&#xff0c;那么就可以出现生产事故。 HikariPool-1 - Connection is not available, request timed out after 30001ms.当系统的调用量上去&#xff0c;就出现大量这样的连接…...

华纳云:租用的服务器连接超时怎么办?

服务器连接超时可能由多种原因引起&#xff0c;解决问题的方法取决于具体的情况。以下是一些常见的原因和相应的解决方法&#xff1a; 网络问题&#xff1a; 检查本地网络&#xff1a; 确保本地网络连接正常&#xff0c;尝试访问其他网站或服务&#xff0c;检查是否存在网络问题…...

基于MS16F3211芯片的触摸控制灯的状态变化和亮度控制(11.17,PWM)

紧接上文&#xff0c;基本的控制逻辑并不难写&#xff0c;难的是是、如何输出自己想要频率的PWM波在对应的端口 阅读文档定时器与PWM相关的寄存器&#xff0c;因为之前玩的STM32&#xff0c;所以看起来还是有点困难&#xff0c;准备边看边记录。 如果想要实现在长按时改变PWM…...

编译buildroot出错,这个怎么解决呢,感谢

编译buildroot出错,这个怎么解决呢,感谢 发表于 2019-5-22 20:24:25 浏览:8025 | 回复:5 打印 只看该作者 [复制链接]楼主 g++: internal compiler error: 已杀死 (program cc1plus) Please submit a full bug report, with preprocessed source if appro…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

C++ 基础特性深度解析

目录 引言 一、命名空间&#xff08;namespace&#xff09; C 中的命名空间​ 与 C 语言的对比​ 二、缺省参数​ C 中的缺省参数​ 与 C 语言的对比​ 三、引用&#xff08;reference&#xff09;​ C 中的引用​ 与 C 语言的对比​ 四、inline&#xff08;内联函数…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

脑机新手指南(七):OpenBCI_GUI:从环境搭建到数据可视化(上)

一、OpenBCI_GUI 项目概述 &#xff08;一&#xff09;项目背景与目标 OpenBCI 是一个开源的脑电信号采集硬件平台&#xff0c;其配套的 OpenBCI_GUI 则是专为该硬件设计的图形化界面工具。对于研究人员、开发者和学生而言&#xff0c;首次接触 OpenBCI 设备时&#xff0c;往…...

Ubuntu Cursor升级成v1.0

0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开&#xff0c;快捷键也不好用&#xff0c;当看到 Cursor 升级后&#xff0c;还是蛮高兴的 1. 下载 Cursor 下载地址&#xff1a;https://www.cursor.com/cn/downloads 点击下载 Linux (x64) &#xff0c;…...