Apache Airflow (四) :Airflow 调度shell命令

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


上文说到使用Airflow进行任务调度大体步骤如下:

  1. 创建python文件,根据实际需要,使用不同的Operator
  2. 在python文件不同的Operator中传入具体参数,定义一系列task
  3. 在python文件中定义Task之间的关系,形成DAG
  4. 将python文件上传执行,调度DAG,每个task会形成一个Instance
  5. 使用命令行或者WEBUI进行查看和管理

以上python文件就是Airflow python脚本,使用代码方式指定DAG的结构。

下面我们以调度执行shell命令为例,来讲解Airflow使用。

1. 首先我们需要创建一个python文件,导入需要的类库

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.operators.bash import BashOperator

注意:以上代码可以在开发工具中创建,但是需要在使用的python3.7环境中导入安装Airflow包。

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

2. 实例化DAG

from datetime import datetime, timedelta# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)

注意:

  • 实例化DAG有三种方式

第一种方式:

with DAG("my_dag_name") as dag:op=XXOperator(task_id="task")

第二种方式(以上采用这种方式):

my_dag = DAG("my_dag_name")
op = XXOperator(task_id="task", dag=my_dag)

第三种方式:

@dag(start_date=days_ago(2))
def generate_dag():op = XXOperator(task_id="task")
dag = generate_dag()
  • baseoperator基础参数说明:

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多参数。

  • DAG参数说明

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html 查看DAG参数说明,也可以直接在开发工具点击DAG进入源码看下对应参数有哪些。

3. 定义Task

当实例化Operator时会生成Task任务,从一个Operator中实例化出来对象的过程被称为一个构造方法,每个构造方法中都有“task_id”充当任务的唯一标识符。

下面我们定义三个Operator,也就是三个Task,每个task_id 不能重复。

# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)

注意:

  • 每个operator中可以传入对应的参数,覆盖DAG默认的参数,例如:last task中“retries”=3 就替代了默认的1。任务参数的优先规则如下:①.显示传递的参数 ②.default_args字典中存在的值③.operator的默认值(如果存在)。
  • BashOperator使用方式参照:http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html#howto-operator-bashoperator

4. 设置task依赖关系

#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

注意:当执行脚本时,如果在DAG中找到一条环形链路(例如:A->B->C-A)会引发异常。更多DAG task依赖关系可参照官网:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies

5. 上传python配置脚本

到目前为止,python配置如下:

# 导入 DAG 对象,后面需要实例化DAG对象
from airflow import DAG# 导入BashOperator Operators,我们需要利用这个对象去执行流程
from airflow.example_dags.example_bash_operator import dagfrom airflow.operators.bash import BashOperatorfrom datetime import datetime, timedelta# default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2021, 9, 4),  # 第一次开始执行的时间,为 UTC 时间'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5),  # 失败重试间隔
}dag = DAG(dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成default_args = default_args, #外部定义的 dic 格式的参数schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)# operator 支持多种类型, 这里使用 BashOperator
first = BashOperator(task_id='first',bash_command='echo "run first task"',dag=dag
)middle = BashOperator(task_id='middle',bash_command='echo "run middle task"',dag=dag
)last = BashOperator(task_id='last',bash_command='echo "run last task"',dag=dag,retries=3
)#使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
# middle.set_upstream(first) # middle会在first执行完成之后执行
# last.set_upstream(middle) # last 会在 middle执行完成之后执行#也可以使用位移符来设置依赖关系
first >> middle >>last # first 首先执行,middle次之,last最后
# first >> [middle,last] # first首先执行,middle ,last并行执行

将以上python配置文件上传到$AIRFLOW_HOME/dags目录下,默认$AIRFLOW_HOME为安装节点的“/root/airflow”目录,当前目录下的dags目录需要手动创建。

6. 重启Airflow

“ps aux|grep webserver”和“ps aux|grep scheduler”找到对应的airflow进程杀掉,重新启动Airflow。重启之后,可以在airflow webui看到对应的DAG ID ”myairflow_execute_bash”。

7. 执行airflow

按照如下步骤执行DAG,首先打开工作流,然后“Trigger DAG”执行,随后可以看到任务执行成功。

查看task执行日志:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/187438.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Delphi 12 重返雅典 (RAD Studio 12)

RAD Studio 12 的新功能: 以最新的平台版本为目标! RAD Studio 12 提供对 iOS 17(仅适用于 Delphi)、Android 14 和 macOS Sonoma 的官方支持。RAD Studio 12 还支持 Ubuntu 22 LTS 和 Windows Server 2022。 Delphi 源代码的多…

<蓝桥杯软件赛>零基础备赛20周--第5周--杂题-2

报名明年4月蓝桥杯软件赛的同学们,如果你是大一零基础,目前懵懂中,不知该怎么办,可以看看本博客系列:备赛20周合集 20周的完整安排请点击:20周计划 每周发1个博客,共20周(读者可以按…

一题三解(暴力、二分查找算法、单指针):鸡蛋掉落

涉及知识点 暴力、二分查找算法、单指针 题目 给你 k 枚相同的鸡蛋&#xff0c;并可以使用一栋从第 1 层到第 n 层共有 n 层楼的建筑。 已知存在楼层 f &#xff0c;满足 0 < f < n &#xff0c;任何从 高于 f 的楼层落下的鸡蛋都会碎&#xff0c;从 f 楼层或比它低的…

3DMAX汽车绑定动画模拟插件MadCar疯狂汽车使用教程

3DMAX汽车绑定动画模拟插件MadCar疯狂的汽车&#xff0c;用于通过模拟控制来快速装配轮式车辆及其动画。这个新版本允许装配任何数量的车轮的车辆&#xff0c;以及包括摩托车在内的任何相互布置。还支持任意数量的拖车。 每个车轮和悬架都有简化的行为设置以及微调&#xff0c…

【微服务专题】手写模拟SpringBoot

目录 前言阅读对象阅读导航前置知识笔记正文一、工程项目准备1.1 新建项目1.1 pom.xml1.2 业务模拟 二、模拟SpringBoot启动&#xff1a;好戏开场2.1 启动配置类2.1.1 shen-base-springboot新增2.1.2 shen-example客户端新增启动类 三、run方法的实现3.1 步骤一&#xff1a;启动…

RAW图像处理软件Capture One 23 Enterprise mac中文版功能特点

Capture One 23 Enterprise mac是一款专业的图像处理软件&#xff0c;旨在为企业用户提供高效、快速和灵活的工作流程。 Capture One 23 Enterprise mac软件的特点和功能 强大的图像编辑工具&#xff1a;Capture One 23 Enterprise提供了一系列强大的图像编辑工具&#xff0c;…

TensorFlow学习笔记--(1)张量的随机生成

张量的生成 如何判断一个张量的维数&#xff1a;看张量的中括号有几层 0 1 2 &#xff1a;零维数列 [2 4 6] : 一维向量 [ [1 2 3] [4 5 6] ] : 二维数组 两行三列 第一行数据为 1 2 3 第二行数据为 4 5 6 以此类推 n维张量有n层中括号 tf.zeros(%指定一个张量的维数%) 生成一…

Django如何创建表关系,Django的请求声明周期流程图

【1】表与表之间的关系 一对一 左表的一条记录对应右表的一条记录&#xff0c;反之亦然 多对一 左表的一条记录对应右表的多条记录&#xff0c;反之不成立 多对多 左表的一条记录对应右表的多表记录&#xff0c;反之成立 【2】django中创建表关系 class Book(models.Model):t…

canvas 曲线图 双数值轴 山峰图

下面的代码本人亲自撰写&#xff0c;原生不易啊。 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>D…

CSS3 用户界面、图片、按钮

一、CSS3用户界面&#xff1a; 在CSS3中&#xff0c;增加了一些新的用户界面特性来调整元素尺寸、框尺寸和外边框。CSS3用户界面属性&#xff1a;resize、box-sizing、outline-offset。 1、resize&#xff1a; resize属性指定一个元素是否应该由用户去调整大小。 <style…

Azure 机器学习 - 有关为 Azure 机器学习配置 Kubernetes 群集的参考

目录 受支持的 Kubernetes 版本和区域建议的资源计划ARO 或 OCP 群集的先决条件禁用安全增强型 Linux (SELinux)ARO 和 OCP 的特权设置 收集的日志详细信息Azure 机器学习作业与自定义数据存储连接支持的 Azure 机器学习排斥和容许最佳实践 通过 HTTP 或 HTTPS 将其他入口控制器…

DAY50 309.最佳买卖股票时机含冷冻期 + 714.买卖股票的最佳时机含手续费

309.最佳买卖股票时机含冷冻期 题目要求&#xff1a;给定一个整数数组&#xff0c;其中第 i 个元素代表了第 i 天的股票价格 。 设计一个算法计算出最大利润。在满足以下约束条件下&#xff0c;你可以尽可能地完成更多的交易&#xff08;多次买卖一支股票&#xff09;: 你不…

vue2+elementui使用MessageBox 弹框$msgbox自定义VNode内容:实现radio

虽说实现下面的效果&#xff0c;用el-dialog很轻松就能搞定。但是这种简单的交互&#xff0c;我更喜欢使用MessageBox。 话不多说&#xff0c;直接上代码~ <el-button type"primary" size"mini" click"handleApply()" >处理申请</el-b…

【Git】Git图形化工具SSH协议IDEA集成Git的使用讲解

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Git》。&#x1f3af;&#x1f3af; &#x1f449…

git命令之遭遇 ignore罕见问题解决

我先来讲讲背景 我的一些文件在ignore了&#xff0c;不会被提交到远程仓库&#xff0c;这时候我的远程仓库中是没有这几个文件的&#xff0c;这时候我如果使用 git reset 的话这时候除了那几个 ignore 的文件以外都被更新的&#xff0c;但是如果我不需要这几个被 ignore 的文件…

蓝桥杯之模拟与枚举day1

Question1卡片(C/CA组第一题) 这个是一道简单的模拟枚举题目&#xff0c;只要把对应每次的i的各个位都提取出来&#xff0c;然后对应的卡片数目减去1即可。属于打卡题目。注意for循环的特殊使用即可 #include <iostream> using namespace std; bool solve(int a[],int n…

NSS [鹏城杯 2022]压缩包

NSS [鹏城杯 2022]压缩包 考点&#xff1a;条件竞争/逻辑漏洞&#xff08;解压失败不删除已经解压文件&#xff09; 参考&#xff1a;回忆phpcms头像上传漏洞以及后续影响 | 离别歌 (leavesongs.com) 源码有点小多 <?php highlight_file(__FILE__);function removedir($…

大模型+人形机器人,用AI唤起钢筋铁骨

《经济参考报》11月8日刊发文章《多方布局人形机器人赛道,智能应用前景广》。文章称&#xff0c;工信部日前印发的《人形机器人创新发展指导意见》&#xff0c;按照谋划三年、展望五年的时间安排&#xff0c;对人形机器人创新发展作了战略部署。 从开发基于人工智能大模型的人…

CCLink转Modbus TCP网关_MODBUS报文配置

兴达易控CCLink转Modbus TCP网关是一种功能强大的设备&#xff0c;可实现两个不同通信协议之间的无缝对接。它能够将CCLink协议转换为Modbus TCP协议&#xff0c;并通过报文配置实现灵活的通信设置。兴达易控CCLink转Modbus TCP网关可以轻松实现CCLink和Modbus TCP之间的数据转…

汇编-EQU伪指令(数值替换)

EQU伪指令将一个符号名称与一个整数表达式或一个任意文本相关联&#xff0c; 它有3种格式 在第一种格式中&#xff0c; expression必须是一个有效的整数表达式。在第二种格式中&#xff0c; symbol是一个已存在的符号名称&#xff0c; 已经用或EQU定义过。在第三种格式中&…