Apache Airflow (十四) :Airflow分布式集群搭建及测试

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 节点规划

2. airflow集群搭建步骤

3. 初始化Airflow

4. 创建管理员用户信息

​​​​​​​5. 配置Scheduler HA

​​​​​​​6. 启动Airflow集群

​​​​​​​7. 访问Airflow 集群WebUI

8. 测试Airflow HA


1. 节点规划

节点IP

节点名称

节点角色

运行服务

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

2. airflow集群搭建步骤

1) 在所有节点安装python3.7

参照单节点安装Airflow中安装anconda及python3.7。

2) 在所有节点上安装airflow

  • 每台节点安装airflow需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 
  • 每台节点配置airflow环境变量
vim /etc/profileexport AIRFLOW_HOME=/root/airflow#使配置的环境变量生效source /etc/profile
  • 每台节点切换airflow环境,安装airflow,指定版本为2.1.3
(python37)   conda activate python37(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

默认Airflow安装在$ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow目录下。配置了AIRFLOW_HOME,Airflow安装后文件存储目录在AIRFLOW_HOME目录下。可以每台节点查看安装Airflow版本信息:

(python37)  airflow version2.1.3
  • 在Mysql中创建对应的库并设置参数

aiflow使用的Metadata database我们这里使用mysql,在node2节点的mysql中创建airflow使用的库及表信息。

CREATE DATABASE airflow CHARACTER SET utf8;create user 'airflow'@'%' identified by '123456';grant all privileges on airflow.* to 'airflow'@'%';flush privileges;

在mysql安装节点node2上修改”/etc/my.cnf”,在[mysqld]下添加如下内容:

[mysqld]explicit_defaults_for_timestamp=1

以上修改完成“my.cnf”值后,重启Mysql即可,重启之后,可以查询对应的参数是否生效:

#重启mysql[root@node2 ~]# service mysqld restart#重新登录mysql查询mysql> show variables like 'explicit_defaults_for_timestamp';

  • 每台节点配置Airflow airflow.cfg文件

修改AIRFLOW_HOME/airflow.cfg文件,确保所有机器使用同一份配置文件,在node1节点上配置airflow.cfg,配置如下:

[core]dags_folder = /root/airflow/dags#修改时区default_timezone = Asia/Shanghai#配置Executor类型,集群建议配置CeleryExecutorexecutor = CeleryExecutor# 配置数据库sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8[webserver]#设置时区default_ui_timezone = Asia/Shanghai[celery]#配置Celery broker使用的消息队列broker_url = redis://node4:6379/0#配置Celery broker任务完成后状态更新使用库result_backend = db+mysql://root:123456@node2:3306/airflow

将node1节点配置好的airflow.cfg发送到node2、node3、node4节点上

3. 初始化Airflow

1) 每台节点安装需要的python依赖包

初始化Airflow数据库时需要使用到连接mysql的包,执行如下命令来安装mysql对应的python包。

​
(python37) #  pip install mysqlclient -i Simple Index​

2) 在node1上初始化Airflow 数据库

(python37) [root@node1 airflow]# airflow db init

初始化之后在MySQL airflow库下会生成对应的表。

4. 创建管理员用户信息

在node1节点上执行如下命令,创建操作Airflow的用户信息:

airflow users create \--username airflow \--firstname airflow \--lastname airflow \--role Admin \--email xx@qq.com

执行完成之后,设置密码为“123456”并确认,完成Airflow管理员信息创建。

​​​​​​​5. 配置Scheduler HA

1) 下载failover组件

登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的zip包上传到node1 “/software”目录下。

在node1节点安装unzip,并解压failover组件:

(python37) [root@node1 software]# yum -y install unzip(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2) 使用pip进行安装failover需要的依赖包

需要在node1节点上安装failover需要的依赖包。

(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3) node1节点初始化failover

(python37) [root@node1 ~]# scheduler_failover_controller initAdding Scheduler Failover configs to Airflow config file...Finished adding Scheduler Failover configs to Airflow config file.Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

注意:初始化airflow时,会向airflow.cfg配置中追加配置,因此需要先安装 airflow 并初始化。

4) 修改airflow.cfg

首先修改node1节点的AIRFLOW_HOME/airflow.cfg

[scheduler_failover]# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密scheduler_nodes_in_cluster = node1,node2#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

配置完成后,可以通过以下命令进行验证Airflow Master节点:

(python37) [root@node1 airflow]# scheduler_failover_controller test_connectionTesting Connection for host 'node1'(True, ['Connection Succeeded', ''])Testing Connection for host 'node2'(True, ['Connection Succeeded\n'])

将node1节点配置好的airflow.cfg同步发送到node2、node3、node4节点上:

(python37) [root@node1 ~]# cd /root/airflow/(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

​​​​​​​6. 启动Airflow集群

1) 在所有节点安装启动Airflow依赖的python包

(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2) 在Master1节点(node1)启动相应进程

#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程airflow webserverairflow scheduler

3) 在Master2节点(node2)启动相应进程

airflow webserver

4) 在Worker1(node3)、Worker2(node4)节点启动Worker

在node3、node4节点启动Worker:

(python37) [root@node3 ~]# airflow celery worker(python37) [root@node4 ~]# airflow celery worker

5) 在node1启动Scheduler HA

(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

​​​​​​​

至此,Airflow高可用集群搭建完成。

​​​​​​​7. 访问Airflow 集群WebUI

浏览器输入node1:8080,查看Airflow WebUI:

8. 测试Airflow HA

1) 准备shell脚本

Airflow集群所有节点{AIRFLOW_HOME}目录下创建dags目录,准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

2) 编写airflow python 配置

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

将以上内容写入execute_shell.py文件,上传到所有Airflow节点{AIRFLOW_HOME}/dags目录下。

3) 重启Airflow,进入Airflow WebUI查看对应的调度

重启Airflow之前首先在node1节点关闭webserver ,Scheduler进程,在node2节点关闭webserver ,Scheduler进程,在node3,node4节点上关闭worker进程。

如果各个进程是后台启动,查看后台进程方式:

(python37) [root@node1 dags]# ps aux |grep webserver(python37) [root@node1 dags]# ps aux |grep scheduler(python37) [root@node2 dags]# ps aux |grep webserver(python37) [root@node2 dags]# ps aux |grep scheduler(python37) [root@node3 ~]# ps aux|grep "celery worker"(python37) [root@node4 ~]# ps aux|grep "celery worker"找到对应的启动命令对应的进程号,进行kill。

重启后进入Airflow WebUI查看任务:

点击“success”任务后,可以看到脚本执行成功日志:

​​​​​​​4) 测试Airflow HA

当我们把node1节点的websever关闭后,可以直接通过node2节点访问airflow webui:

在node1节点上,查找“scheduler”进程并kill,测试scheduler HA 是否生效:

(python37) [root@node1 ~]# ps aux|grep schedulerroot      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager#kill 掉scheduler进程(python37) [root@node1 ~]# kill -9 23744

访问webserver webui

在node1节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/207266.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OBS Studio 30.0 正式发布:支持 WebRTC

导读OBS Studio 30.0 已正式发布。此版本移除了对 Ubuntu 20.04、Qt 5 和 FFmpeg 4.4 之前版本的支持。 OBS Studio 30.0 已正式发布。此版本移除了对 Ubuntu 20.04、Qt 5 和 FFmpeg 4.4 之前版本的支持。 主要变化包括: 支持 WebRTC(详情查看 OBS Stu…

整体迁移SVN仓库到新的windows服务器

一、背景 公司原有的SVN服务器年代比较久远经常出现重启情况,需要把SVN仓库重新迁移到新的服务器上,在网上也搜到过拷贝Repositories文件直接在新服务器覆盖的迁移方案,但考虑到原有的操作系统和现有的操作系统版本不一致,SVN版本…

python的制图

测试数据示例: day report_user_cnt report_user_cnt_2 label 2023-10-01 3 3 欺诈 2023-10-02 2 4 欺诈 2023-10-03 6 5 欺诈 2023-10-04 2 1 正常 2023-10-05 4 3 正常 2023-10-06 4 4 正常 2023-10-07 2 6 正常 2023-10-08 3 7 正常 2023-10-09 3 12 正常 2023-…

代码随想录刷题题Day2

刷题的第二天,希望自己能够不断坚持下去,迎来蜕变。😀😀😀 刷题语言:C / Python Day2 任务 977.有序数组的平方 209.长度最小的子数组 59.螺旋矩阵 II 1 有序数组的平方(重点:双指针…

华为电视盒子 EC6108V9C 刷机成linux系统

场景: 提示:这里简述项目相关背景: 家里装宽带的时候会自带电视盒子,但是由于某些原因电视盒子没有用,于是就只能摆在那里吃土,闲来无事,搞一下 问题描述 提示:这里描述项目中遇到…

使用yolov7进行多图像视频识别

1.yolov7你可以让你简单的部署,比起前几代来说特别简单 #下面是我转换老友记的测试视频,可以看到几乎可以准确预测 2.步骤 1.在github官网下载代码 https://github.com/WongKinYiu/yolov7 2.点击下载权重文件放到项目中 3.安装依赖,我的python版本是3.6的 pip install -r requ…

无mac电脑生成uniapp云打包私钥证书的攻略

uniapp顾名思义是一个跨平台的开发工具,大部分uniapp的开发者,其实并没有mac电脑来开发,但是生成ios的证书,官网的教程却是需要mac电脑的,那么有没有办法无需mac电脑即可生成uniapp云打包的私钥证书呢? 下…

【计算机网络】14、DHCP

文章目录 一、概述1.1 好处 二、概念2.1 分配 IP2.2 控制租赁时间2.3 DHCP 的其他网络功能2.4 IP地址范围和用户类别2.5 安全 三、DHCP 消息3.1 DHCP discover message3.2 DHCP offers a message 如果没有 DHCP,IT管理者必须手动选出可用的 ip,这太耗时了…

和鲸科技与国科环宇建立战略合作伙伴关系,以软硬件一体化解决方案促进科技创新

近日,在国科环宇土星云算力服务器产品发布会暨合作伙伴年度会上,和鲸科技与国科环宇正式完成战略伙伴签约仪式,宣布达成战略合作伙伴关系。未来,双方将深化合作,充分发挥在产品和市场方面的互补优势,为企事…

什么是工业物联网(IOT)?这样的IOT平台你需要吗?——青创智通

物联网(IOT)是指在互联网上为传输和共享数据而嵌入传感器和软件的互联设备的广泛性网络。这允许将从物理对象收集的信息(数据)存储在专用服务器或云中。通过分析这些积累的信息,通过提供最优的设备控制和方法,可以实现一个更安全、更方便的社会。在智能家…

【NodeJS】 API Key 实现 短信验证码功能

这里使用的平台是 短信宝整体来讲还是挺麻烦的平台必须企业才行,个人是无法使用该平台的 平台必须完成 身份信息认证 和 企业认证 这里就需要 “营业执照”了 ,没有 “营业执照” 的朋友还是后退一步吧 后端我用的是NodeJS ,使用第三方库 ro…

【办公软件】电脑开机密码忘记了如何重置?

这个案例是家人的电脑,已经使用多年,又是有小孩操作过的,所以电脑密码根本不记得是什么了?那难道这台电脑就废了吗?需要重新装机吗?那里面的资料不是没有了? 为了解决以上问题,一般…

【云原生系列】Kubernetes知识点

目录 概念 基础架构 单master节点 多master节点 组件 Master节点核心组件 其他组件 请求发送流程 插件 核心资源 调度资源 Pod 创建pod组件间调用流程 pod生命周期: 初始化容器 镜像拉取策略 重启策略 钩子函数 探针 探针的实现方式 DownwardAP…

Git的介绍和下载安装

Git的介绍和下载安装 概述 Git是一个分布式版本控制工具, 通常用来管理项目中的源代码文件(Java类、xml文件、html页面等)进行管理,在软件开发过程中被广泛使用 Git可以记录文件修改的历史记录并形成备份从而实现代码回溯, 版本切换, 多人协作, 远程备份的功能Git具有廉价的…

FPGA falsh相关知识总结

1.存储容量是128M/8 Mb16MB 2.有256个sector扇区*每个扇区64KB16MB 3.一页256Byte 4.页编程地址0256 5:在调试SPI时序的时候一定注意,miso和mosi两个管脚只要没发送数据就一定要悬空(处于高组态),不然指令会通过两…

年终好价节入手什么数码合适?盘点23年度值得入手的数码好物

各位科技控和数码迷!时间过得飞快,一眨眼2023年就要过去了,说到年底,当然少不了年终好价节啦!这时候正是更新你的数码装备、升级生活品质的绝佳时机。别管你是不是科技控,工作狂还是生活追求者,…

JavaWeb服务器详解和后端分层解耦

JavaWeb HTTP协议请求数据格式响应数据格式协议解析 Web服务器请求响应请求参数的接收响应 分层解耦IOC&DI入门IOC详解 HTTP协议 超文本传输协议,规定了浏览器和服务器之间数据传输的规则 特点: 基于TCP协议:面向连接,安全 …

npm WARN npm npm does not support Node.js v13.9.0

Microsoft Windows [版本 10.0.19045.2965] (c) Microsoft Corporation。保留所有权利。C:\Users\Administrator>node -v v13.9.0C:\Users\Administrator>npm -v npm WARN npm npm does not support Node.js v13.9.0 npm WARN npm You should probably upgrade to a newe…

正则表达式回溯陷阱

一、匹配场景 判断一个句子是不是正规英文句子 text "I am a student" 一个正常的英文句子如上,英文单词 空格隔开 英文单词 多个英文字符 [a-zA-Z] 空格用 \s 表示 那么一个句子就是单词 空格(一个或者多个,最后那个单词…

力扣:1419. 数青蛙

题目&#xff1a; 代码&#xff1a; class Solution { public:int minNumberOfFrogs(string croakOfFrogs){string s "croak";int ns.size();//首先创建一个哈希表来标明每个元素出现的次数&#xff01;vector<int>hash(n); //不用真的创建一个hash表用一个数…