Airflow学习笔记

1 概述

  • Airflow是一个以编程方式编写(要用python脚本),安排和监视工作流的平台。使用Airflow将工作流编写任务的有向无环图(DAG)。Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变的容易。
  • 相关概念:
    1. Dynamic:Airflow配置需要实用Python,允许动态生产管道。这允许编写可动态实例化管道的代码。
    2. Extensible:轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。
    3. Elegant:Airlfow是精简的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中。
    4. Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务。

2 安装

  • airflow官网:https://airflow.apache.org

  • airflow需要python3.8的环境,这里安装miniconda来创建python环境

  • 具体安装步骤:

    1. 在conda下创建python3.8环境,命名为airflow

    2. 切换airflow环境,更改pip源

      sudo mkdir ~/.pip
      sudo vim  ~/.pip/pip.conf
      #添加以下内容
      [global]
      index-url = https://pypi.tuna.tsinghua.edu.cn/simple
      [install]
      trusted-host = https://pypi.tuna.tsinghua.edu.cn
      
    3. 安装airflow包

      pip install apache-airflow==2.4.3
      
    4. 初始化aiflow

      airflow db init
      
    5. 启动airflow服务

      airflow webserver -p 8080 -D
      
    6. 启动airflow调度

      airflow scheduler -D
      
    7. 创建账号,创建后会要求输入密码

      airflow users create \--username admin \--firstname F \--lastname F \--role Admin \--email F@F.com
      
  • airflow启停脚本

    #!/bin/bashcase $1 in
    "start"){echo " --------启动 airflow-------"ssh hadoop102 "conda activate airflow;airflow webserver -p 8080 -D;airflow scheduler -D; conda deactivate"
    };;
    "stop"){echo " --------关闭 airflow-------"ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15 
    };;
    esac
    

2.1 修改默认元数据库

  • 修改元数据库为mysql,默认使用的是本地的sqlite,效果不好。(P8)

    1. 在mysql中创建库

      mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
      
    2. 安装python连接mysql依赖

      pip install mysql-connector-python
      
    3. 修改airflow的配置文件

      vim ~/airflow/airflow.cfg[database]
      # The SqlAlchemy connection string to the metadata database.
      # SqlAlchemy supports many different database engines.
      # More information here:
      # http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
      #sql_alchemy_conn = sqlite:home/atguigu/airflow/airflow.db
      sql_alchemy_conn = mysql+mysqlconnector://root:root@hadoop102:3306/airflow_db
      
    4. 关闭airflow,初始化后重启

      (airflow) [atguigu@hadoop102 ~]$ af.sh stop
      (airflow) [atguigu@hadoop102 airflow]$ airflow db init
      (airflow) [atguigu@hadoop102 ~]$ af.sh start
      

      若初始化报错1067 - Invalid default value for ‘update_at’,原因:字段 ‘update_at’ 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。推荐修改mysql存储时间戳格式:

      mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'
      

      重启MySQL会造成参数失效,推荐将参数写入到配置文件/etc/my.cnf中并重启,然后再一次初始化

      sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTIONsudo systemctl restart mysqld
      
    5. 重新创建账号登录

      airflow users create \--username admin \--firstname F \--lastname F \--role Admin \--email F@F.com
      

2.2 修改默认执行器

  • 官网不推荐在开发中使用顺序执行器SequentialExecutor,类似单进单出的队列,会造成任务调度阻塞。

  • 修改airflow的配置文件,可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。

    [core]
    # The executor class that airflow should use. Choices include
    # ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
    # ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
    # full import path to the class when using a custom executor.
    executor = LocalExecutor
    

3 使用

  • 默认的存放airflow调度脚本的目录在~/airflow/dags,我们只需要在这个目录下编写调度脚本即可。

3.1 添加DAG

  1. 在dags目录创建python脚本,对一些关键信息进行修改

    #!/usr/bin/python
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedeltadefault_args = {# 用户,任写'owner': 'test_owner', # 是否开启任务依赖'depends_on_past': True, # 邮箱,任写'email': ['403627000@qq.com'],# 启动时间,任写'start_date':datetime(2022,11,28),# 出错是否发邮件报警'email_on_failure': False,# 重试是否发邮件报警'email_on_retry': False,# 重试次数'retries': 1,# 重试时间间隔'retry_delay': timedelta(minutes=5),
    }
    # 声明任务图,传递参数,设置调度时间间隔
    dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))# 创建单个任务
    t1 = BashOperator(# 任务idtask_id='dwd',# 任务命令bash_command='echo 1',# 重试次数retries=3,# 把任务添加进图中dag=dag)t2 = BashOperator(task_id='dws',bash_command='echo 2',retries=3,dag=dag)t3 = BashOperator(task_id='ads',bash_command='echo 3',retries=3,dag=dag)# 设置任务依赖
    t2.set_upstream(t1)
    t3.set_upstream(t2)
    
  2. 等待一段时间,刷新任务列表,可以在web页面查看,也可以通过以下命令查看

    airflow dags list
    
  3. 在web页面上开启任务调度脚本,点进去能到查看DAG,以及DAG中每个任务的执行情况,执行日志等
    在这里插入图片描述

3.2 删除DAG

  1. 在页面上点击删除按钮删除DAG任务,会连同任务的执行记录一块删除
  2. 删除底层的python脚本,否则过段时间会自动重新加载这个任务

3.3 修改DAG

  • airflow支持动态修改,直接修改python脚本即可

3.4 查看DAG

  • 相关命令

    # 查看所有任务
    (airflow) [atguigu@hadoop102 airflow]$ airflow list_dags 
    # 查看单个任务
    (airflow) [atguigu@hadoop102 airflow]$ airflow tasks list 任务名 --tree
    

3.5 配置邮件服务器

  1. 保证邮箱已开SMTP服务

  2. airflow配置文件,用stmps服务对应587端口

    (airflow) [atguigu@hadoop102 airflow]$ vim ~/airflow/airflow.cfg  
    smtp_host = smtp.qq.com
    smtp_starttls = True
    smtp_ssl = False
    smtp_user = 403627000@qq.com
    # smtp_user =
    smtp_password = qluxdbuhgrhgbigi
    # smtp_password =
    smtp_port = 587
    smtp_mail_from = 403627000@qq.com
    
  3. 重启airflow

  4. python脚本也需要有所更改

    #!/usr/bin/python
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedeltadefault_args = {# 用户,任写'owner': 'test_owner', # 是否开启任务依赖'depends_on_past': True, # 邮箱,任写'email': ['403627000@qq.com'],# 启动时间,任写'start_date':datetime(2022,11,28),# 出错是否发邮件报警'email_on_failure': False,# 重试是否发邮件报警'email_on_retry': False,# 重试次数'retries': 1,# 重试时间间隔'retry_delay': timedelta(minutes=5),
    }
    # 声明任务图,传递参数,设置调度时间间隔
    dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))# 创建单个任务
    t1 = BashOperator(# 任务idtask_id='dwd',# 任务命令bash_command='echo 1',# 重试次数retries=3,# 把任务添加进图中dag=dag)t2 = BashOperator(task_id='dws',bash_command='echo 2',retries=3,dag=dag)t3 = BashOperator(task_id='ads',bash_command='echo 3',retries=3,dag=dag)email=EmailOperator(task_id="email",to="yaohm163@163.com ",subject="test-subject",html_content="<h1>test-content</h1>",cc="403627000@qq.com ",dag=dag)# 设置任务依赖
    t2.set_upstream(t1)
    t3.set_upstream(t2)
    email.set_upstream(t3)
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/402469.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kaggle中访问本地上传的图片(找到图片地址)

由于代码中需要使用自己上传一个图片&#xff0c;对图片进行操作&#xff0c;尝试了很多种办法终于摸索出来了,希望可以帮助到大家 首先&#xff0c;在kaggle中左侧导航栏中找到datasets->New Dataset->Browse Files 创建成功后就可以看到数据集的详细信息 返回到代码中…

31集-33集【求助】AIGC返回的对话内容文字转语音失败-《MCU嵌入式AI开发笔记》

31集【求助】AIGC返回的对话内容文字转语音失败-《MCU嵌入式AI开发笔记》 问题描述 ESP32 C3开发板把AIGC大模型返回的对话文字转语音的时候出现错误。 我们先看一下附件Log&#xff0c; 梳理一下程序流程 按键&#xff0c;收到event Event received&#xff0c; cmd:1, da…

学习分享:如何利用AI创作高质量的文章【请按需收藏】

成长路上不孤单&#x1f60a;【14后小学生一枚&#xff0c;C爱好者&#xff0c;持续分享所学&#xff0c;如有需要欢迎收藏转发&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;】 关于【如何利用AI创作高质量的文章】 AI给现代这…

pnpm【实用教程】2024最新版

pnpm 简介 pnpm 全称 performant npm&#xff0c;即高性能的 npm&#xff0c;由 npm/yarn 衍生而来&#xff0c;解决了 npm/yarn 内部潜在的 bug&#xff0c;极大的优化了性能&#xff0c;扩展了使用场景&#xff0c;被誉为 最先进的包管理工具 安装 pnpm npm i -g pnpm使用 pn…

Linux git安装与部署

目录 git安装 1、下载与安装 2、配置git账号信息 创建本地仓库 1、创建本地代码库文件夹 2、创建项目代码本地仓库文件夹 3、进入到projCode目录下&#xff0c;创建git本地仓库 4、创建过滤文件.gitignore 5、添加.gitignore到git暂存区 6、提交.gitignore 7、将项目…

数学建模笔记(1):插值法

1.插值法的用途 在对数据进行处理的时候&#xff0c;我们往往会碰到由于数据量比较小的情况&#xff0c;这样的情况不利对数据进行分析。插值法就是是针对这种情况&#xff0c;模拟产生和原来数据相近的数据来为数据分析提供完整可靠的数据。 总结&#xff1a;插值法是一种自己…

游戏安全入门-扫雷分析远程线程注入

前言 无论学习什么&#xff0c;首先&#xff0c;我们应该有个目标&#xff0c;那么入门windows游戏安全&#xff0c;脑海中浮现出来的一个游戏 – 扫雷&#xff0c;一款家喻户晓的游戏&#xff0c;虽然已经被大家分析的不能再透了&#xff0c;但是我觉得自己去分析一下还是极好…

PHPStorm 环境配置与应用详解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; PHPStorm 是 JetBrains 出品的一款专业 PHP 集成开发环境&#xff08;IDE&#xff09;&#xff0c;凭借其智能的代码补全、调试功能、深度框架支持和前端开发工具&#xff0c;为用户提供了丰富的功能和工具…

Systools Outlook PST Recovery Outlook PST邮箱邮件数据修复工具下载

可正常激活使用&#xff0c;非常强大好用的PST邮箱邮件数据文件修复工具 下载地址(资源制作整理不易&#xff0c;下载使用需付费&#xff0c;不能接受请勿浪费时间下载) 链接&#xff1a;https://pan.baidu.com/s/1bfkVNrgdaVS2MkTnW19Zqw?pwdu2sj 提取码&#xff1a;u2sj

Linux进程间通信学习记录(无名管道)

0.Linux进程间通信的方式 &#xff08;1&#xff09;.从UNIX继承过来的通信方式 无名管道&#xff08;pipe&#xff09; 有名管道&#xff08;fifo&#xff09; 信号&#xff08;signal&#xff09; &#xff08;2&#xff09;.System V IPC 共享内存 消息队列 信号灯集 &am…

Python环境安装及PIP安装(Mac OS版)

官网 https://www.python.org/downloads/ 安装python python-3.12.1-macos11.pkg下载后&#xff0c;安装一直下一步即可 验证是否安装成功&#xff0c;执行python3命令和pip3命令 配置环境变量 获取python3安装位置并配置在.bash_profile #查看python路径 which python3#…

centos8以上系统安装docker环境

由于docker官方更新了相关镜像路由&#xff0c;导致国内用户无法正常手段安装使用docker&#xff0c;本人推荐使用下面操作进行安装。 1.docker-ce安装 # 添加docker-ce仓库&#xff0c;本次使用的是阿里云的仓库 dnf config-manager --add-repo https://mirrors.aliyun.com/do…

CoCoOp(论文解读):Conditional Prompt Learning for Vision-Language Models

摘要 随着预训练的视觉语言模型&#xff08;如 CLIP&#xff09;的兴起&#xff0c;研究使这些模型适应下游数据集的方法变得至关重要。最近CoOp方法将NLP领域中的提示学习引入到视觉领域中&#xff0c;来调整预训练的视觉语言模型。具体来说&#xff0c;CoOp 将提示中的上下文…

【C语言初阶】C语言指针全攻略:解锁C语言深层奥秘的钥匙

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C语言 “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;C语言操作符 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀指针 &#x1f4d2;1. 指针和指…

前端各种文本文件预览 文本编辑excel预览编辑 pdf预览word预览 excel下载pdf下载word下载

前端各种文本文件预览 文本编辑excel预览编辑 pdf预览word预览 excel下载pdf下载word下载 各种文本文件预览&#xff08;pdf, xlsx, docx, cpp, java, sql, py, vue, html, js, json, css, xml, rust, md, txt, log, fa, fasta, tsv, csv 等各种文本文件&#xff09; 其中 除p…

C 408—《数据结构》算法题基础篇—数组(通俗易懂)

目录 Δ前言 一、数组的合并 0.题目&#xff1a; 1.算法设计思想&#xff1a; 2.C语言描述&#xff1a; 3.算法的时间和空间复杂度 : 二、数组元素的倒置 0.题目 : 1.算法设计思想 : 2.C语言描述 : 3.算法的时间和空间复杂度 : 三、数组中特定值元素的删除 0.题目 : …

SpringBoot3 + Flowable7 工作流引擎使用笔记

目录 Flowable 简介流程设计器安装使用 SpringBoot 3 整合表结构流程部署启动流程流程审批流程挂起和激活任务分配固定分配表达式分配值表达式方法表达式 监听器分配 流程变量运行时变量历史变量 身份服务候选人拾取任务归还任务指派给别人候选人组创建用户创建用户组用户关联用…

VueUse 基于 Vue 3 Composition API 的高质量 Hooks 库

VueUse 是什么? VueUse 是基于 Vue 3 Composition API 的高质量 Hooks 库。例如获取滚动的距离 VueUse 官网:VueUse | VueUse VueUse 什么使用? 1、通过npm安装 VueUse npm i @vueuse/core 2、搜索需要使用的函数,例如搜索 useScroll 滚动 3、使用useScroll 滚动函数 …

C语言传递指针给函数

C 语言允许您传递指针给函数&#xff0c;只需要简单地声明函数参数为指针类型即可。 下面的实例中&#xff0c;我们传递一个无符号的 long 型指针给函数&#xff0c;并在函数内改变这个值 实例1&#xff1a;获取系统的时间值 能接受指针作为参数的函数&#xff0c;也能接受数…

为什么Pandas是最流行的Python数据分析库?

本文将从Python生态、Pandas历史背景、Pandas核心语法、Pandas学习资源四个方面去聊一聊Pandas&#xff0c;期望能带给大家一点启发。 一、Python生态里的Pandas 五月份TIOBE编程语言排行榜&#xff0c;Python追上Java又回到第二的位置。Python如此受欢迎一方面得益于它崇尚简…