FATE Flow 源码解析 - 日志输出机制

背景介绍

在 之前的文章 中介绍了 FATE 的作业处理流程,在实际的使用过程中,为了查找执行中的异常,需要借助运行生成的日志,但是 FATE-Flow 包含的流程比较复杂,对应的日志也很多,而且分散在不同的文件中,在这篇文章中就对 FATE-Flow 的日志机制进行梳理,帮助大家了解 Python 服务中实现一个更加灵活的日志机制

logging 机制

FATE-Flow 中日志主要基于 Python 原生的 logging 机制实现,也正是依靠强大而灵活的 logging 机制,FATE-Flow 才实现了丰富的日志查询能力,基础的 logging 知识可以自行查看官方文档。

为了了解 FATE-Flow 的日志实现机制,涉及的一个关键概念就是 Handler, Handler 是实际处理日志的模块,在 logging 中被设计为可插拔模式的,可以根据需要进行注册。举一个简单的例子进行介绍:

# 创建一个 logger 对象logger = logging.getLogger(name)# 注册日志处理器 handler1, 将日志输出至 a.log 文件中handler1 = TimedRotatingFileHandler("a.log")
logger.addHandler(handler1)# 注册日志处理器 handler2, 将日志输出至 b.log 文件中handler2 = TimedRotatingFileHandler("b.log")
logger.addHandler(handler2)# 实际输出日志logger.info("test log")

在上面的例子中会基于 logging 机制创建出日志实例 logger ,并注册了两个 Handler,后续输出的日志就会依次被两个 Handler 进行处理,即日志最终会输出至文件 a.logb.log 中,而且在实际运行中可以动态调整 Handler,从而动态调整日志处理方式,因此可以实现极其灵活的日志输出方式

理解了这个机制,就很容易理解 FATE-Flow 如何实现将同样的日志内容按照日志类型输出至 fate_flow_schedule.logfate_flow_audit.log 文件中,同时又按照日志等级输出至 DEBUG.log, INFO.log, WARNING.log, ERROR.log 文件中了。FATE-Flow 就是通过添加不同的 Handler 实现的

FATE-Flow 日志

FATE-Flow 的日志主要存在于 /data/projects/fate/fateflow/logs 目录下,在此目录下主要分为两种类型的日志目录:

  1. fate_flow 目录,其中包含整体的 FATE-Flow 日志;
  2. 独立 job 目录的日志,此时会以 job_id 为单位输出所有相关的日志至特定目录中;
    最终展示的效果如下所示:

请添加图片描述

其中数字部分是以 job 为单位的日志,右侧 fate_flow 就是 FATE-Flow 整体的日志

其中 fate_flow 中主要是如下所示的日志:

  • DEBUG.log
  • ERROR.log
  • INFO.log
  • WARNING.log
  • fate_flow_access.log
  • fate_flow_audit.log
  • fate_flow_database.log
  • fate_flow_detect.log
  • fate_flow_schedule.log
  • fate_flow_stat.log
  • stat.log

注意这部分日志因为量可能可能比较大,因此 FATE-Flow 中是按天进行分片,因此最终的效果如下所示的:

请添加图片描述

而 按照 job_id 为单位的日志主要包含如下所示的日志:

  • fate_flow_audit.log
  • fate_flow_schedule_error.log
  • fate_flow_schedule.log
  • fate_flow_sql.log
  • guest (目录)
  • host (目录)

其中 guest/host 是按照 job 执行中参与的角色为单位输出的日志,后续介绍中再进一步展开

实现原理

下面按照目录分别介绍 FATE-Flow 不同模块的的日志输出相关的实现

fate_flow 目录日志

fate_flow 目录下的日志分为按照类型的日志 fate_flow_schedule.log, fate_flow_audit.log 等,同时也包含按照日志等级的 DEBUG.log, INFO.log, WARNING.log, ERROR.log
事实上,这两种类型的日志中的内容可能有很多相同的内容,只是按照不同维度区分后放在不同的文件中,比如想查询任务调度相关的日志,可以去查询 fate_flow_schedule.log,而想查询执行中警告相关的日志,可以去查询 WARNING.log, 而任务调度中也可能有处于警告级别的日志,此时在两个文件中都可能会存在。

我们可以先类型相关的日志是如何存入的,这边以 fate_flow_schedule.log 为例进行解释:
此日志使用的是 fate_flow/utils/log_utils.py 中的 schedule_logger() 方法进行日志输出,对应输出至 fate_flow 目录相关的代码如下:

def schedule_logger(job_id=None, delete=False):if not job_id:return getLogger("fate_flow_schedule")

此时调用的是 fate_arch/common/log.py 中的 getLogger() 方法去获取对应日志实例 logger, 最终会调用 LoggerFactory.init_logger() 初始化日志实例,对应的代码如下所示:

def init_logger(class_name):# 初始化日志实例 logger,此时对应的 class_name 为 fate_flow_schedulelogger = LoggerFactory.new_logger(class_name)# 创建 logger 对应的 Handler,此 Handler 负责将日志输出至对应文件中handler = LoggerFactory.get_handler(class_name)# 将 handler 添加至 logger 中logger.addHandler(handler)LoggerFactory.logger_dict[class_name] = logger, handler# 额外配置新 handler,按照等级输出至 `DEBUG.log`, `INFO.log`, `WARNING.log`, `ERROR.log` 就是在这边实现LoggerFactory.assemble_global_handler(logger)return logger, handler

可以看看 handler 是如何输出至文件中的,对应于 get_handler() 方法,具体实现如下:

def get_handler(class_name, level=None, log_dir=None, log_type=None, job_id=None):# 日志对应的文件log_file = os.path.join(LoggerFactory.LOG_DIR, "{}.log".format(class_name))# 最终使用 TimedRotatingFileHandler 实现按天的日志搜集os.makedirs(os.path.dirname(log_file), exist_ok=True)if LoggerFactory.log_share:handler = ROpenHandler(log_file,when='D',interval=1,backupCount=14,delay=True)else:handler = TimedRotatingFileHandler(log_file,when='D',interval=1,backupCount=14,delay=True)return handler

可以看到最终就是根据 classname 确定对应的文件路径,并最终创建 TimedRotatingFileHandler,此 Handler 会将日志输出至特定文件中,并按天进行日志轮转,最终实现上面图片中看到的按天分片的日志。

而按照日志等级的输出至文件中的实现是在 LoggerFactory.assemble_global_handler() 中实现的,起始就是依次创建各个日志层级的 Handler,并添加至 logger 上,对应的实现如下所示:

def assemble_global_handler(logger):if LoggerFactory.LOG_DIR:# 对所有日志层级,依次进行注册for level in LoggerFactory.levels:if level >= LoggerFactory.LEVEL:# 获得的 level_logger_name 分别是 DEBUG, DEBUG, WARNING, ERROR,最终用于输出的文件命名level_logger_name = logging._levelToName[level]# 创建特定日志层级的 Handler,添加至 logger 上logger.addHandler(LoggerFactory.get_global_handler(level_logger_name, level))

而调用的 LoggerFactory.get_global_handler() 事实上也是调用 get_handler() 方法创建 handler,而指定了 level 的情况下 handler 中会额外使用 handler.level = level 仅输出特定日志层级的日志。从而实现了将日志按照层级输出

而其他类型的日志原理都是一样,在 python/fate_flow/settings.py 中初始化了对应类型的日志实例 logger,业务中直接使用,具体代码如下所示:

stat_logger = getLogger("fate_flow_stat")
detect_logger = getLogger("fate_flow_detect")
access_logger = getLogger("fate_flow_access")
database_logger = getLogger("fate_flow_database")
job 日志

job 对应的日志也包含两种,一种是特定的几种类型的日志,比如 fate_flow_audit.log, fate_flow_sql.log, fate_flow_schedule.log,这部分的实现比较简单,具体是在 fate_flow/utils/log_utils.py 中,其中每个类型会包含一个特定的方法。

比如 schedule_logger() 方法用于获得对应日志实例 logger 输出日志至 fate_flow_schedule.log,而 sql_logger() 方法用于获得对应的日志实例 logger 输出日志至 fate_flow_sql.log,这些方法底层都是调用同样的 get_job_logger() 生成对应的日志实例的。具体实现如下所示:

def get_job_logger(job_id, log_type):# 主要输出至 fate_flow 目录 和 job_id 相关的目录,包含 job_id 时会以 job_id 为单位分开输出job_log_dir = get_fate_flow_directory('logs', job_id)# 包含 job_id 会输出至 job_id 目录,audit 应该是请求调用跟踪日志,同时出现在两个 job_id 目录以及 fate_flow 目录if log_type == 'audit':log_dirs = [job_log_dir, fate_flow_log_dir]else:log_dirs = [job_log_dir]# 创建对应的目录os.makedirs(job_log_dir, exist_ok=True)os.makedirs(fate_flow_log_dir, exist_ok=True)# 生成对应的 logger,并对应匹配创建对应的 handler,从而实现日志输出至特定目录logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")for job_log_dir in log_dirs:# 创建对应的 handler,将日志输出特定文件中,此时文件根据 log_type 命名,此时就是 fate_flow_schedulehandler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,log_dir=job_log_dir, log_type=log_type, job_id=job_id)error_handler = LoggerFactory.get_handler(class_name=None, level=logging.ERROR, log_dir=job_log_dir, log_type=log_type, job_id=job_id)logger.addHandler(handler)logger.addHandler(error_handler)return logger

可以看到实现原理也是类似的,就是创建对应日志实例 logger,并生成对应的 handler,从而输出至特定文件中

job 中日志还有一部分的日志对应一个独立的目录 guest/host ,这部分是按照 job 训练中参与的角色为单位输出的日志,这些日志是如何被写入文件中的呢?

对应的代码在 fate_flow/controller/task_controller.py 中的 start_task() 方法,可以看到 task 实际执行的代码如下所示:

backend_engine = build_engine(run_parameters.computing_engine)# task 执行中对应的日志输出至 get_job_log_directory() 指定的 logs/{job_id}/{role}/{party_id}/{component_name} 对应的目录run_info = backend_engine.run(task=task,run_parameters=run_parameters,run_parameters_path=run_parameters_path,config_dir=config_dir,log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),user_name=kwargs.get("user_id"))

由于不同的执行引擎代码不同,我们以 eggroll 为例去查看对应的 run() 执行情况,此方法调用 WorkerManager.start_task_worker() 方法,并将日志目录通过 log_dir 传入,代码如下所示:

   def start_task_worker(cls, worker_name, task: Task, task_parameters: RunParameters = None,executable: list = None, extra_env: dict = None, **kwargs):# 根据 task 执行情况确定最终日志输出的目录 log_dir, 最终使用 logs/{job_id}/{role}/{party_id}/{component_name} 为日志目录worker_id, config_dir, log_dir = cls.get_process_dirs(worker_name=worker_name,job_id=task.f_job_id,role=task.f_role,party_id=task.f_party_id,task=task)# 实际执行的代码对应的文件from fate_flow.worker.task_executor import TaskExecutormodule_file_path = sys.modules[TaskExecutor.__module__].__file__# 实际执行进程,使用 python 执行对应的代码env = cls.get_env(task.f_job_id, task.f_provider_info)if executable:process_cmd = executableelse:process_cmd = [env.get("PYTHON_ENV") or sys.executable or "python3"]# 执行命令行,通过 log_dir 指定了对应的日志目录common_cmd = [module_file_path,"--job_id", task.f_job_id,"--component_name", task.f_component_name,"--task_id", task.f_task_id,"--task_version", task.f_task_version,"--role", task.f_role,"--party_id", task.f_party_id,"--config", config_path,'--result', result_path,"--log_dir", log_dir,"--parent_log_dir", os.path.dirname(log_dir),"--worker_id", worker_id,"--run_ip", RuntimeConfig.JOB_SERVER_HOST,"--run_port", RuntimeConfig.HTTP_PORT,"--job_server", f"{RuntimeConfig.JOB_SERVER_HOST}:{RuntimeConfig.HTTP_PORT}","--session_id", session_id,"--federation_session_id", federation_session_id]process_cmd.extend(common_cmd)# 启动新进程执行对应的命令p = process_utils.run_subprocess(job_id=task.f_job_id, config_dir=config_dir, process_cmd=process_cmd,added_env=env, log_dir=log_dir, cwd_dir=config_dir, process_name=worker_name.value,process_id=worker_id)return {"run_pid": p.pid, "worker_id": worker_id, "cmd": process_cmd}

而实际使用 log_dir 是在 fate_flow/worker/base_worker.py 中完成的,此时会调用 LoggerFactory.set_directory() 指定日志输出的父目录,后续即可将任务相关的日志输出至指定的目录中了

总结

这边介绍了 FATE-Flow 相关的日志存储情况,相对大部分服务而言,FATE-Flow 中的日志存储都属于比较复杂的了,但是借助 logging 中动态的 Handler 更新机制,可以比较方便地实现不同模块独立的日志文件输出路径,甚至可以实现部分日志输出至日志文件,部分日志输出至终端,有兴趣的也可以进一步了解下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/378703.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

转移C盘中的conda环境(包括.condarc文件修改,environment.txt文件修改,conda报错)

conda环境一般是默认安装到C盘的,若建立多个虚拟环境,时间长了,容易让本不富裕的C盘更加雪上加霜,下面给出将conda环境从C盘转移到D盘的方法。 目录 电脑软硬件转移方法查看当前conda目录转移操作第一步:.condarc文件修…

走进NoSql

一、引入 1.1什么是NoSql NoSQL(Not Only SQL)是一组非关系型数据库(或称为非SQL数据库)的统称,它们提供了与传统的关系型数据库不同的数据存储和检索方式。NoSQL数据库通常用于处理大量的、分布式的、非结构化或半结…

美式键盘 QWERTY 布局的来历

注:机翻,未校对。 The QWERTY Keyboard Is Tech’s Biggest Unsolved Mystery QWERTY 键盘是科技界最大的未解之谜 It’s on your computer keyboard and your smartphone screen: QWERTY, the first six letters of the top row of the standard keybo…

数据湖表格式 Hudi/Iceberg/DeltaLake/Paimon TPCDS 性能对比(Spark 引擎)

当前,业界流行的集中数据湖表格式 Hudi/Iceberg/DeltaLake,和最近出现并且在国内比较火的 Paimon。我们现在看到的很多是针对流处理场景的读写性能测试,那么本篇文章我们将回归到大数据最基础的场景,对海量数据的批处理查询。本文…

dp or 数学问题

看一下数据量&#xff0c;只有一千&#xff0c;说明这个不是数学问题 #include<bits/stdc.h> using namespace std;#define int long long const int mo 100000007; int n, s, a, b; const int N 1005;// 2 -3 // 1 3 5 2 -1 // 1 -2 -5 -3 -1 int dp[N][N]; int fun…

泛微Ecology8明细表对主表赋值

文章目录 [toc]1.需求及效果1.1 需求1.2 效果2.思路与实现3.结语 1.需求及效果 1.1 需求 在明细表中的项目经理&#xff0c;可以将值赋值给主表中的项目经理来作为审批人员 1.2 效果 在申请人保存或者提交后将明细表中的人名赋值给主表中对应的值2.思路与实现 在通过js测…

生成树(STP)协议

一、生成树的技术背景 1、交换机单线路上链,存在单点故障,上行线路及设备都不具备冗余性,一旦链路或上行设备发生故障,网络将面临断网。 总结:以下网络不够健壮,不具备冗余性。 2、因此引入如下网络拓扑结构: 上述冗余拓扑能够解决单点故障问题,但同时冗拓扑也带来了…

zookeeper基础知识学习

官网&#xff1a;Apache ZooKeeper 下载地址&#xff1a;Index of /dist/zookeeper/zookeeper-3.5.7Index of /dist/zookeeperIndex of /dist/zookeeper/zookeeper-3.5.7 ZK配置参数说明&#xff1a; 1、tickTime2000&#xff1a;通讯心跳时间&#xff0c;zookeeper服务器与客…

连锁直营店小程序赋能多店如何管理

如商超便利店卖货线下场景&#xff0c;也有不少品牌以同城多店和多地开店经营为主&#xff0c;获取店铺周围客户和散流&#xff0c;如今线上重要性凸显&#xff0c;品牌电商发展是经营的重要方式之一&#xff0c;也是完善同城和外地客户随时便捷消费的方式之一。 多个门店管理…

Python | Leetcode Python题解之第238题除自身以外数组的乘积

题目&#xff1a; 题解&#xff1a; class Solution:def productExceptSelf(self, nums: List[int]) -> List[int]:length len(nums)# L 和 R 分别表示左右两侧的乘积列表L, R, answer [0]*length, [0]*length, [0]*length# L[i] 为索引 i 左侧所有元素的乘积# 对于索引为…

STM32智能交通监测系统教程

目录 引言环境准备智能交通监测系统基础代码实现&#xff1a;实现智能交通监测系统 4.1 数据采集模块 4.2 数据处理与控制模块 4.3 通信与网络系统实现 4.4 用户界面与数据可视化应用场景&#xff1a;交通监测与管理问题解决方案与优化收尾与总结 1. 引言 智能交通监测系统通…

MyBatis源码中的设计模式1

1. 建造者模式的应用 建造者模式属于创建类模式&#xff0c;通过一步一步地创建一个复杂的对象&#xff0c;能够将部件与其组装过程分开。用户只需指定复杂对象的类型&#xff0c;就可以得到该对象&#xff0c;而不需要了解其内部的具体构造细节。《Effective Java》中也提到&…

OpenCV教程04:结合pillow在图片上显示中文文字

1.如果添加的内容是纯英文文字&#xff0c;直接使用cv2.putText 函数操作即可。但它不支持自定义字体文件&#xff0c;仅限于这些内置的字体样式。如果你需要更复杂的字体支持&#xff0c;可能需要使用其他库&#xff0c;如 Python Imaging Library (PIL) 或 Pillow。可用的字体…

Docker-Nvidia(NVIDIA Container Toolkit)

安装NVIDIA Container Toolkit工具&#xff0c;支持docker使用GPU 目录 1.NVIDIA Container Toolkit 安装1.1 nvidia-docker安装1.2 验证1.2.1 验证安装1.2.2 额外补充 1.NVIDIA Container Toolkit 安装 1.1 nvidia-docker安装 NVIDIA/nvidia-docker Installing the NVIDIA …

【BUG】已解决:java.lang.IllegalStateException: Duplicate key

已解决&#xff1a;java.lang.IllegalStateException: Duplicate key 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;武汉城市…

线程控制

对线程的控制思路和进程相似&#xff0c;创建、等待、终止&#xff0c;只需要调用接口就行。但是在Linux下没有线程的概念&#xff0c;因为Linux的设计者认为&#xff0c;线程是一种轻量级的进程&#xff0c;毕竟创建线程只需要创建PCB。因此Linux中使用多线程必须使用第三方pt…

聊一聊前后端权限控制 RBAC(完整流程)

介绍 RBAC&#xff08;Role-Based Access Control&#xff09;模型也就是基于角色的权限控制。 权限会分配到角色中&#xff0c;角色再分配给用户&#xff0c;这样用户就根据角色有了不同的权限。 当然&#xff0c;你可以说把权限直接挂载到用户上&#xff0c;这样不是更直接…

前端工程化10-webpack静态的模块化打包工具之各种loader处理器

9.1、案例编写 我们创建一个component.js 通过JavaScript创建了一个元素&#xff0c;并且希望给它设置一些样式&#xff1b; 我们自己写的css,要把他加入到Webpack的图结构当中&#xff0c;这样才能被webpack检测到进行打包&#xff0c; style.css–>div_cn.js–>main…

代码随想录二刷复习(二分法)

二分法模板&#xff1a; 1&#xff1a;左闭右闭区间写法 第一种写法&#xff0c;我们定义 target 是在一个在左闭右闭的区间里&#xff0c;也就是[left, right] &#xff08;这个很重要非常重要&#xff09;。 区间的定义这就决定了二分法的代码应该如何写&#xff0c;因为定…

泛微e-cology WorkflowServiceXml SQL注入漏洞(POC)

漏洞描述&#xff1a; 泛微 e-cology 是泛微公司开发的协同管理应用平台。泛微 e-cology v10.64.1的/services/接口默认对内网暴露&#xff0c;用于服务调用&#xff0c;未经身份认证的攻击者可向 /services/WorkflowServiceXml 接口发送恶意的SOAP请求进行SQL注入&#xff0c;…