大数据-254 离线数仓 - Airflow 任务调度 核心交易调度任务集成

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

  • Airflow 任务调度
  • 核心概念与实际案例测试
  • Py脚本编写

在这里插入图片描述

任务集成部署

Airflow 基本介绍

Apache Airflow 是一个开源的任务调度和工作流管理工具,用于编排复杂的数据处理任务。最初由 Airbnb 开发,于 2016 年捐赠给 Apache 软件基金会。Airflow 的主要特点是以代码方式定义任务及其依赖关系,支持任务的调度和监控,适合处理复杂的大数据任务。

Airflow 的特点

  • 以代码为中心:Airflow 使用 Python 定义 DAG,提供灵活性和可编程性。
  • 扩展性强:用户可以自定义 Operator 和 Hook,集成各种数据源和工具。
  • 强大的 UI 界面:提供可视化界面监控任务状态、查看日志、重试失败任务等。
  • 丰富的调度选项:支持基于时间 (Time-based) 和事件 (Event-based) 的调度。
  • 高可用性:配合 Celery 和 Kubernetes 等执行器,支持分布式架构,适合处理大规模任务。

使用场景

数据管道调度

用于管理数据从源到目标的 ETL 流程。
如每天从数据库中抽取数据、清洗后存入数据仓库。

机器学习工作流管理

调度数据预处理、模型训练和模型部署任务。

数据验证

自动化检查数据的质量和一致性。

定期任务自动化

定时清理日志、归档数据或生成报告。

Airflow核心概念

DAGs

有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序

Operators

Airflow内置了很多Operators

  • BashOperator 执行一个Bash命令
  • PythonOperator 调用任意的Python函数
  • EmailOperator 用于发送邮件
  • HTTPOperator 用于发送HTTP请求
  • SqlOperator 用于执行SQL命令
  • 自定义 Operator

Task

Task:Task是Operator的一个实例

Task Instance

Task Instance:由于Task会被重复调度,每次Tasks的运行就是不同的Task Instance,Task Instance 有自己的状态,包括 success、running、failed、skipped、up_for_rechedule、up_for_retry、queued、no_status等

Task Relationships

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系

核心交易调度任务集成

核心交易分析

之前我们已经写了很多脚本,可以在这里调度

# 加载ODS数据(DataX迁移数据)
/opt/wzk/hive/ods_load_trade.sh
# 加载DIM层数据
/opt/wzk/hive/dim_load_product_cat.sh
/opt/wzk/hive/dim_load_shop_org.sh
/opt/wzk/hive/dim_load_payment.sh
/opt/wzk/hive/dim_load_product_info.sh
# 加载DWD层数据
/opt/wzk/hive/dwd_load_trade_orders.sh
# 加载DWS层数据
/opt/wzk/hive/dws_load_trade_orders.sh
# 加载ADS层数据
/opt/wzk/hive/ads_load_trade_order_analysis.sh

备注:depeds_on_past,设置为True时,上一次调度成功时,才可以触发

编写脚本

vim $AIRFLOW_HOME/dags/trade_test.py

写入的内容如下所示:

import datetime
from datetime import timedelta, date
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago# 定义 DAG 的缺省参数
default_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime.datetime(2020, 6, 20),'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}# 定义 DAG
coretradedag = DAG('coretrade',default_args=default_args,description='Core trade analyze',schedule_interval='30 0 * * *',  # 每天00:30运行
)# 获取昨天的日期
today = date.today()
oneday = timedelta(days=1)
yesterday = (today - oneday).strftime("%Y-%m-%d")# 定义任务
odstask = BashOperator(task_id='ods_load_data',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/ods_load_trade.sh {yesterday}',dag=coretradedag,
)dimtask1 = BashOperator(task_id='dimtask_product_cat',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dim_load_product_cat.sh {yesterday}',dag=coretradedag,
)dimtask2 = BashOperator(task_id='dimtask_shop_org',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dim_load_shop_org.sh {yesterday}',dag=coretradedag,
)dimtask3 = BashOperator(task_id='dimtask_payment',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dim_load_payment.sh {yesterday}',dag=coretradedag,
)dimtask4 = BashOperator(task_id='dimtask_product_info',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dim_load_product_info.sh {yesterday}',dag=coretradedag,
)dwdtask = BashOperator(task_id='dwd_load_data',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dwd_load_trade_orders.sh {yesterday}',dag=coretradedag,
)dwstask = BashOperator(task_id='dws_load_data',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/dws_load_trade_orders.sh {yesterday}',dag=coretradedag,
)adstask = BashOperator(task_id='ads_load_data',depends_on_past=False,bash_command=f'sh /opt/wzk/hive/ads_load_trade_order_analysis.sh {yesterday}',dag=coretradedag,
)# 定义任务依赖关系
odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4
odstask >> dwdtask
dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask
dwdtask >> dwstask
dwstask >> adstask

执行的结果如下图所示:
在这里插入图片描述

查看结果

检查脚本的问题:

# 执行命令检查脚本是否有错误。如果命令行没有报错,就表示没问题
python $AIRFLOW_HOME/dags/trade_test.py

执行结果如下图所示:
在这里插入图片描述
列出所有的内容

airflow dags list

运行结果如下图所示:
在这里插入图片描述
列出树形结构:

airflow tasks list coretrade --tree

执行结果如下图所示:

(airflow_env) [root@h122 airflow]# airflow tasks list coretrade --tree
<Task(BashOperator): ods_load_data><Task(BashOperator): dimtask_payment><Task(BashOperator): dws_load_data><Task(BashOperator): ads_load_data><Task(BashOperator): dimtask_product_cat><Task(BashOperator): dws_load_data><Task(BashOperator): ads_load_data><Task(BashOperator): dimtask_product_info><Task(BashOperator): dws_load_data><Task(BashOperator): ads_load_data><Task(BashOperator): dimtask_shop_org><Task(BashOperator): dws_load_data><Task(BashOperator): ads_load_data><Task(BashOperator): dwd_load_data><Task(BashOperator): dws_load_data><Task(BashOperator): ads_load_data>
(airflow_env) [root@h122 airflow]# 

对应的截图如下:
在这里插入图片描述
关系图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/492666.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

昇思25天学习打卡营第33天|共赴算力时代

文章目录 一、平台简介二、深度学习模型2.1 处理数据集2.2 模型训练2.3 加载模型 三、共赴算力时代 一、平台简介 昇思大模型平台&#xff0c;就像是AI学习者和开发者的超级基地&#xff0c;这里不仅提供丰富的项目、模型和大模型体验&#xff0c;还有一大堆经典数据集任你挑。…

Docker 镜像加速和配置的分享 云服务器搭建beef-xss

前言 最近很多的docker镜像加速都鸡鸡了 找点资源是越来越不容易了 什么事docker 因为我是个业余的人 我简单的说 docker就是比如我们的软件商店的 下载 docker镜像&#xff08;之前就是我们在服务器上搭建网站 和环境的很费力费时 之后就有了这个 镜像 &#xff1a;这…

浅谈怎样系统的准备前端面试

前言 创业梦碎&#xff0c;回归现实&#xff0c;7 月底毅然裸辞&#xff0c;苦战两个月&#xff0c;拿到了美团和字节跳动的 offer&#xff0c;这算是从业以来第一次真正意义的面试&#xff0c;遇到蛮多问题&#xff0c;比如一开始具体的面试过程我都不懂&#xff0c;基本一直是…

告别机器人味:如何让ChatGPT写出有灵魂的内容

目录 ChatGPT的一些AI味道小问题 1.提供编辑指南 2.提供样本 3.思维链大纲 4.融入自己的想法 5.去除重复增加多样性 6.删除废话 ChatGPT的一些AI味道小问题 大多数宝子们再使用ChatGPT进行写作时&#xff0c;发现我们的老朋友ChatGPT在各类写作上还有点“机器人味”太重…

【长城杯】Web题 hello_web 解题思路

查看源代码发现路径提示 访问…/tips.php显示无用页面&#xff0c;怀疑…/被过滤&#xff0c;采用…/./形式&#xff0c;看到phpinfo()页面 注意到disable_functions&#xff0c;禁用了很多函数 访问hackme.php,看到页面源码 发现eval函数&#xff0c;包含base64 解密获得php代…

Windows部署Docker及PostgreSQL数据库相关操作

一、Windows安装Docker 1.wsl安装 以管理员身份启动命令行&#xff0c;运行&#xff1a;wsl --install&#xff1b; 安装结束后&#xff0c;重启电脑&#xff0c;以管理员身份启动命令行&#xff0c;运行&#xff1a;wsl --install -d Ubuntu&#xff1b; 中间需要输入用户名…

HTML零基础入门教学

目录 一. HTML语言 二. HTML结构 三. HTML文件基本结构 四. 准备开发环境 五. 快速生成代码框架 六. HTML常见标签 6.1 注释标签 6.2 标题标签&#xff1a;h1-h6 6.3 段落标签&#xff1a;p 6.4 换行标签&#xff1a;br 6.5 格式化标签 6.6 图片标签&a…

Springboot应用开发:工具类整理

目录 一、编写目的 二、映射工具类 2.1 依赖 2.2 代码 三、日期格式 3.1 依赖 3.2 代码 四、加密 4.1 代码 五、Http请求 5.1 依赖 5.2 代码 六、金额 6.1 代码 七、二维码 7.1 依赖 7.2 代码 八、坐标转换 8.1 代码 九、树结构 9.1 代码 9.1.1 节点 9.1…

libaom 源码分析:熵编码模块介绍

AV1 熵编码原理介绍 关于AV1 熵编码原理介绍可以参考:AV1 编码标准熵编码技术概述libaom 熵编码相关源码介绍 函数流程图 核心函数介绍 av1_pack_bitstream 函数:该函数负责将编码后的数据打包成符合 AV1 标准的比特流格式;包括写入序列头 OBU 的函数 av1_write_obu_header…

一个开源的自托管虚拟浏览器项目,支持在安全、私密的环境中使用浏览器

大家好&#xff0c;今天给大家分享一个开源的自托管虚拟浏览器项目Neko&#xff0c;旨在利用 WebRTC 技术在 Docker 容器中运行虚拟浏览器&#xff0c;为用户提供安全、私密且多功能的浏览体验。 项目介绍 Neko利用 WebRTC 技术在 Docker 容器中运行虚拟浏览器&#xff0c;提供…

【已解决】启动此实时调试器时未使用必需的安全权限。要调试该进程,必须以管理员身份运行此实时调试器。是否调试该进程?

【已解决】启动此实时调试器时未使用必需的安全权限。要调试该进程&#xff0c;必须以管理员身份运行此实时调试器。是否调试该进程? 目录一、前言二、具体原因三、解决方法 目录 报错截图 一、前言 进行应用程序开发时&#xff0c;需要对w3wp进行附加调试等场景&#xff…

Docker--Docker Registry(镜像仓库)

什么是Docker Registry&#xff1f; 镜像仓库&#xff08;Docker Registry&#xff09;是Docker生态系统中用于存储、管理和分发Docker镜像的关键组件。 镜像仓库主要负责存储Docker镜像&#xff0c;这些镜像包含了应用程序及其相关的依赖项和配置&#xff0c;是构建和运行Doc…

如何用细节提升用户体验?

前端给用户反馈是提升用户体验的重要部分&#xff0c;根据场景选择不同的方式可以有效地提升产品的易用性和用户满意度。以下是常见的方法&#xff1a; 1. 视觉反馈 用户执行了某些操作后&#xff0c;需要即时确认操作结果。例如&#xff1a;按钮点击、数据提交、页面加载等。…

flutter 使用dio 请求go语言后台数据接口展示瀑布流图片

添加依赖 dependencies:flutter:sdk: flutterdio: ^5.0.0 # 请检查最新版本flutter_staggered_grid_view: ^0.4.0 添加网络权限 <uses-permission android:name"android.permission.INTERNET" /> go后端代码 图片存放目录 // main.go package mainimport (&q…

感知机与逻辑回归的异同点

1. 共同点 (1) 应用场景 都用于二分类问题。都假设数据是线性可分或近似线性可分的。 (2) 决策边界 两者都通过寻找一个超平面来区分数据。决策函数是线性的&#xff0c;形式为&#xff1a; (3) 输入特征 都可以处理连续和离散特征。都可以通过添加非线性变换扩展到非线…

实操给桌面机器人加上超拟人音色

前面我们讲了怎么用CSK6大模型开发板做一个桌面机器人充当AI语音助理&#xff0c;近期上线超拟人方案&#xff0c;不仅大模型语音最快可以1秒内回复&#xff0c;还可以让我们的桌面机器人使用超拟人音色、具备声纹识别等能力&#xff0c;本文以csk6大模型开发板为例实操怎么把超…

docker(wsl)命令 帮助文档

WSL wsl使用教程 wsl -l -v 列出所有已安装的 Linux 发行版 wsl -t Ubuntu-22.04 --shutdown 关闭所有正在运行的WSL发行版。如果你只想关闭特定的发行版 wsl -d Ubuntu-22.04 登录到Ubuntu环境 wsl --list --running 查看正在wsl中运行的linux发行版 wsl --unregister (系统名…

CNN、RNN、LSTM和Transformer之间的区别和联系

文章目录 CNN、RNN、LSTM和Transformer之间的区别和联系前言CNN&#xff08;卷积神经网络&#xff09;RNN&#xff08;循环神经网络&#xff09;LSTM&#xff08;长短期记忆网络&#xff09;Transformer四者之间的联系与区别Yolo算法简介Yolo和CNN的关系YOLO各版本 CNN、RNN、L…

无人机航测VS传统测绘

无人机航测系统的优点 机动灵活&#xff0c;作业周期短&#xff1a; 无人机航测系统能够迅速响应测绘需求&#xff0c;不受地形和交通限制&#xff0c;可以灵活调整航线&#xff0c;作业周期短。 无人机体积小&#xff0c;噪音小&#xff0c;可以垂直起降、悬停、侧飞、倒飞…

小红书飞书素材库 | AI改写 | 无水印下载 | 多维表格 | 采集同步 | 影刀RPA

小红书飞书素材库 | AI改写 | 无水印下载 | 多维表格 | 采集同步 | 影刀RPA 模板准备 进入【小红书】素材采集库_荷逸模板&#xff0c;点击使用模板 创建文档应用 在开发者后台 - 飞书开放平台创建 企业自建应用 (需要账号有相应的权限, 如果没有权限向管理员申请) 获取 Ap…