技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口

引言

随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。

1111.png


一、MaxFrame产品最佳实践测评

1.1 分布式Pandas处理的最佳实践

1111.png

环境准备

为了实现基于MaxFrame的分布式Pandas处理,首先需要确保环境已经正确配置了MaxCompute服务,并安装了必要的Python库。以下是环境搭建的基本步骤:

# 安装maxcompute-python-sdk
pip install pyodps# 安装其他依赖项如pandas等
pip install pandas numpy##### 数据准备
1.在安装了MaxFrame的Python环境下运行如下脚本,准备测试表和测试数据。
```bash
from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import pandas as pd
import oso = ODPS(# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,# 不建议直接使用AccessKey ID和 AccessKey Secret字符串。os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),project='your-default-project',endpoint='your-end-point',
)data_sets = [{"table_name": "product","table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint","source_type": "records","records" : [[1, 100, 'Nokia', 1000],[2, 200, 'Apple', 5000],[3, 300, 'Samsung', 9000]],
},
{"table_name" : "sales","table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint","source_type": "records","records" : [[1, 1, 100, 101, 2008, 10, 5000],[2, 2, 300, 101, 2009, 7, 4000],[3, 4, 100, 102, 2011, 9, 4000],[4, 5, 200, 102, 2013, 6, 6000],[5, 8, 300, 102, 2015, 10, 9000],[6, 9, 100, 102, 2015, 6, 2000]],"lifecycle": 5
}]def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):for index, data in enumerate(data_sets):table_name = data.get("table_name")table_schema = data.get("table_schema")source_type = data.get("source_type")if not table_name or not table_schema or not source_type:raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")lifecycle = data.get("lifecycle", 5)table_name += suffixprint(f"Processing {table_name}...")if drop_if_exists:print(f"Deleting {table_name}...")o.delete_table(table_name, if_exists=True)o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)if source_type == "local_file":file_path = data.get("file")if not file_path:raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")sep = data.get("sep", ",")pd_df = pd.read_csv(file_path, sep=sep)ODPSDataFrame(pd_df).persist(table_name, drop_table=True)elif source_type == 'records':records = data.get("records")if not records:raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")with o.get_table(table_name).open_writer() as writer:writer.write(records)else:raise ValueError(f"Unknown data set source_type: {source_type}")print(f"Processed {table_name} Done")prepare_data(o, data_sets, "_maxframe_demo", True)##### 使用MaxFrame进行分布式处理
现在我们来展示如何使用MaxFrame执行分布式操作。以下代码片段展示了如何加载数据到MaxFrame中并执行一些基本的操作,例如过滤和聚合。```python
from odps import ODPS
from odps.df import DataFrame# 初始化ODPS客户端
odps = ODPS('<your-access-id>', '<your-secret-access-key>', '<your-project>', endpoint='<your-endpoint>')# 将本地pandas DataFrame转换为MaxCompute DataFrame
max_df = DataFrame(df)# 执行分布式过滤操作
filtered_df = max_df[max_df['value'] > 0.5]# 执行分布式聚合操作
aggregated_df = filtered_df.groupby('id').agg({'value': 'sum'})# 将结果转换回pandas DataFrame查看
result = aggregated_df.to_pandas()
print(result)
性能评估

为了评估MaxFrame在分布式Pandas处理方面的性能,我们可以通过比较相同任务在单机环境下的执行时间和在MaxFrame上的执行时间来进行对比。通常情况下,对于大规模数据集,MaxFrame能够显著缩短处理时间。

1.2 大语言模型数据处理的最佳实践
数据预处理

在训练大型语言模型时,数据预处理是非常重要的一步。MaxFrame可以帮助加速这一过程,特别是当处理海量文本数据时。下面的例子展示了如何使用MaxFrame清洗和格式化文本数据以供后续训练使用。

# 假设有一个包含文本数据的大表
text_data = odps.get_table('large_text_corpus')# 使用MaxFrame读取表格内容
text_df = DataFrame(text_data)# 对文本进行初步清理(去除HTML标签、特殊字符等)
cleaned_text_df = text_df.map(lambda row: (row.id, clean_html(row.text)), schema='id string, cleaned_text string')# 存储清理后的数据到新表中
cleaned_text_df.persist('cleaned_large_text_corpus')

参数说明:

ALIBABA_CLOUD_ACCESS_KEY_ID:需将该环境变量设置为具备目标MaxCompute项目中待操作对象相关MaxCompute权限的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
ALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为AccessKey ID对应的AccessKey Secret。
your-default-project:使用的MaxCompute项目名称。您可以登录MaxCompute控制台,在左侧导航栏选择工作区>项目管理,查看MaxCompute项目名称。
your-end-point:目标MaxCompute项目所在地域的Endpoint,可根据网络连接方式自行选择,例如http://service.cn-chengdu.maxcompute.aliyun.com/api。详情请参见Endpoint。
2.查询sales_maxframe_demo表和product_maxframe_demo表的数据,SQL命令如下。

--查询sales_maxframe_demo表
SELECT * FROM sales_maxframe_demo;--返回
+------------+------------+------------+------------+------------+------------+------------+
| index      | sale_id    | product_id | user_id    | year       | quantity   | price      |
+------------+------------+------------+------------+------------+------------+------------+
| 1          | 1          | 100        | 101        | 2008       | 10         | 5000       |
| 2          | 2          | 300        | 101        | 2009       | 7          | 4000       |
| 3          | 4          | 100        | 102        | 2011       | 9          | 4000       |
| 4          | 5          | 200        | 102        | 2013       | 6          | 6000       |
| 5          | 8          | 300        | 102        | 2015       | 10         | 9000       |
| 6          | 9          | 100        | 102        | 2015       | 6          | 2000       |
+------------+------------+------------+------------+------------+------------+------------+--查询product_maxframe_demo表数据
SELECT * FROM product_maxframe_demo;--返回
+------------+------------+--------------+---------------+
| index      | product_id | product_name | current_price |
+------------+------------+--------------+---------------+
| 1          | 100        | Nokia        | 1000          |
| 2          | 200        | Apple        | 5000          |
| 3          | 300        | Samsung      | 9000          |
+------------+------------+--------------+---------------+

使用MaxFrame进行数据分析

1:使用merge方法连接两张数据表,以获取sales_maxframe_demo表中所有sale_id对应的product_name以及该产品的所有year和price

示例代码:

from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import oso = ODPS(# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,# 不建议直接使用 Access Key ID / Access Key Secret 字符串os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),project='your-default-project',endpoint='your-end-point',
)session = new_session(o)#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)sales = md.read_odps_table("sales_maxframe_demo", index_col="index")
product = md.read_odps_table("product_maxframe_demo", index_col="product_id")#这里的df并不会立即执行,除非您使用df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute集群完成,避免了中间所不必要的数据传输和阻塞。
df = sales.merge(product, left_on="product_id", right_index=True)
df = df[["product_name", "year", "price"]]print(df.execute().fetch())#保存结果到MaxCompute表中,并销毁Session
md.to_odps_table(df, "result_df", overwrite=True).execute()session.destroy()

返回结果:

index product_name  year  price                   
1            Nokia  2008   5000
2          Samsung  2009   4000
3            Nokia  2011   4000
4            Apple  2013   6000
5          Samsung  2015   9000
6            Nokia  2015   2000

性能对比
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

1111.png

2:选出每个出售过的产品第一年销售的产品ID、年份、数量和价格
示例代码:

from odps import ODPS
from maxframe.session import new_session
import maxframe.dataframe as md
import oso = ODPS(# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,# 不建议直接使用 Access Key ID / Access Key Secret 字符串os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),project='your-default-project',endpoint='your-end-point',
)session = new_session(o)#session id是一串用于关联MaxFrame task的字符,对于调试和追踪任务状态有重要的作用。
print(session.session_id)# 聚合获取每个产品的第一个年份
min_year_df = md.read_odps_table("sales_maxframe_demo", index_col="index")
min_year_df = min_year_df.groupby('product_id', as_index=False).agg(first_year=('year', 'min'))# join 找到对应的销售记录
sales = md.read_odps_table("sales_maxframe_demo", index_col=['product_id', 'year'])
result_df = md.merge(sales, min_year_df, left_index=True, right_on=['product_id','first_year'],how='inner')#这里的result_df并不会立即执行,除非您使用 result_df.execute()来触发。
#这意味着所有的计算都将最终完全在MaxCompute中集群完成,避免了中间所不必要的数据传输和阻塞。
result_df = result_df[['product_id', 'first_year', 'quantity', 'price']]print(result_df.execute().fetch())#销毁 Session
session.destroy()

返回结果:

    product_id  first_year  quantity  price
100         100        2008        10   5000
300         300        2009         7   4000
200         200        2013         6   6000

性能对比:
在sales表数据量为5000W条(size:1.96 GB),product表数据量为10W条(size:3 MB)的数据样本中进行运算,可以得到如下耗时对比结果:

1111.png

MaxFrame兼容Pandas接口且自动进行分布式处理,在保证强大数据处理能力的同时,可以大幅度提高数据处理规模及计算效率。


二、MaxFrame在公司/工作/学习中的作用

2.1 提升数据分析效率

MaxFrame使得数据科学家和工程师能够在熟悉的Python环境中高效地处理大规模数据集,而无需担心底层基础设施的问题。通过简化复杂的ETL流程,团队可以更快地迭代实验,提高项目开发速度。

2.2 加速AI模型开发周期

对于从事机器学习和深度学习的研究人员来说,MaxFrame提供了强大的工具链来支持从数据收集到模型部署的整个生命周期。它允许用户无缝地在本地和云端之间切换,从而更好地利用计算资源。

2.3 促进跨部门协作

由于MaxFrame与MaxCompute Notebook、镜像管理等功能紧密结合,形成了完整的Python开发生态系统,不同专业背景的团队成员可以在统一平台上合作,共同推进项目的进展。

2.4 支持创新应用探索

最后但同样重要的是,MaxFrame为企业和个人开发者提供了一个理想的平台去尝试新的想法和技术。无论是构建智能推荐系统还是自然语言处理解决方案,MaxFrame都能为用户提供所需的灵活性和支持。


最后附上官方整理出来的常见问题:

问题1:报错invalid type INT for function UDF definition, you need to set odps.sql.type.system.odps2=true; in order to use it
报错原因:在未开启MaxCompute 2.0数据类型版本的情况下,使用MaxCompute 2.0的数据类型,导致作业执行时出现错误。
解决方案:通过Flag开启MaxCompute 2.0数据类型,示例如下:

from maxframe import config
# 在new_session之前添加
config.options.sql.settings = {"odps.sql.type.system.odps2": "true"
}

问题2:报错UDF : No module named 'cloudpickle'
报错原因:缺少依赖的cloudpickle包。
解决方案:引用MaxCompute基础镜像,示例如下:

from maxframe import config
# 在new_session之前添加
config.options.sql.settings = {"odps.session.image": "common",
}

问题3:如何在DataFrame提交(apply)的UDF中实现资源复用?
在部分UDF场景中,可能涉及到某些较多的资源创建或者销毁行为(例如初始化数据库连接、加载模型等),希望在每个UDF被加载时只会执行一次。可以利用Python中函数参数默认值只被初始化一次的特性,实现资源复用。例如,下述UDF中,模型只会被加载一次。

def predict(s, _ctx={}):from ultralytics import YOLO# _ctx 的初始值是一个空 dict,在 Python 执行过程中只会被初始化一次。# 使用模型时,可以先判断 _ctx 中是否存在该模型,不存在则执行加载,然后存入 dict 中。if not _ctx.get("model", None):model = YOLO(os.path.join("./", "yolo11n.pt"))_ctx["model"] = modelmodel = _ctx["model"]# 后续调用模型的相关接口

下面给出了一个需要销毁资源的UDF示例,该示例中使用了一个自定义的类MyConnector负责创建和关闭数据库连接。

class MyConnector:def __init__(self):# 在 __init__ 中创建数据库连接self.conn = create_connection()def __del__(self):# 在 __del__ 中关闭数据库连接try:self.conn.close()except:passdef process(s, connector=MyConnector()):# 直接调用 connector 内的数据库连接,无需在 UDF 内部再次执行连接创建和关闭connector.conn.execute("xxxxx")
通过对MaxFrame产品的深入体验和评测,我们可以看到它不仅是一个强大的分布式计算框架,而且是连接大数据和AI领域的桥梁。无论是在提升数据分析效率、加速AI模型开发周期,还是促进跨部门协作方面,MaxFrame都展现出了巨大的潜力。未来,随着更多功能的不断加入和完善,相信MaxFrame将继续引领云计算时代的创新发展潮流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18634.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

优选算法《位运算》

在本篇当中我们将会复习之前在C语言阶段学习的各种位运算&#xff0c;并且在复习当中将再补充一些在算法题当中没有进行总结的位运算的使用方法&#xff0c;再总结完常见的位运算使用方法之和接下来还是和之前的算法篇章一样通过几道算法题来对这些位运算的方法技巧进行巩固。在…

复旦大学:公共数据开放利用层报告(2024)

摘 要: 数据利用是公共数据开放的成效展现环节。 中国公共数据开放评估中利用层的指标体系包括利用促进、 利用多样性、 成果数量、 成果质量、成果价值 5 个一级指标。 其中, 省域评估指标体系更关注省级统筹与省市协同, 而城市评估指标体系更强调成果产出与价值释放。 根据该…

CAS单点登录(第7版)24.高可用性

如有疑问&#xff0c;请看视频&#xff1a;CAS单点登录&#xff08;第7版&#xff09; 高可用性 概述 高可用性指南 &#xff08;HA/Clustering&#xff09; 高度可用的 CAS 部署是一种提供弹性以响应各种故障模式的部署&#xff0c;以便 CAS 在发生故障时继续提供 SSO 服务…

Vue极简插件安装

1. 打开Google浏览器&#xff0c;输入网址极简插件官网_Chrome插件下载_Chrome浏览器应用商店极简插件是一个优质Chrome插件下载商店&#xff0c;收录热门好用的Chrome插件扩展&#xff0c;国内最方便的Chrome插件下载网站。一键下载谷歌扩展插件&#xff0c;无套路下载插件。h…

代码随想录刷题攻略---动态规划---子序列问题1---子序列

子序列&#xff08;不连续&#xff09;和子序列&#xff08;连续&#xff09;的问题 例题1: 最长递增子序列 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的…

3-初始化项目

在文件UIStaticHelper配置路径 YIUI自动化工具 在Tools->YIUI自动化工具即可看到面板。有6个功能&#xff0c;如下所示。 在运行的过程中&#xff0c;用绑定代替反射是因为手机运行放射是开销比较大的&#xff0c;所以用绑定代替反射&#xff0c;在发布前UI如果有改动&…

算法与数据结构(多数元素)

题目 思路 方法一&#xff1a;哈希表 因为要求出现次数最多的元素&#xff0c;所以我们可以使用哈希映射存储每个元素及其出现的次数。每次记录出现的次数若比最大次数大&#xff0c;则替换。 方法二&#xff1a;摩尔算法 摩尔的核心算法就是对抗&#xff0c;因为存在次数多…

124. 二叉树中的最大路径和

【题目】&#xff1a;124. 二叉树中的最大路径和 这题有两个关键点&#xff1a; 更新res&#xff1a;res max(l r root->val, res)&#xff0c;左子树最大链长 右子树最大链长 根节点的值其实也可以当成一条链子树root的最大链长&#xff1a;max(max(l, r) root->…

CF 144A.Arrival of the General(Java实现)

题目分析 一个n个身高数据&#xff0c;问最高的到最前面&#xff0c;最矮的到最后面的最短交换次数 思路分析 首先&#xff0c;如果数据有重复项&#xff0c;例如示例二中&#xff0c;最矮的数据就是最后一个出现的数据位置&#xff0c;最高的数据就是最先出现的数据位置&…

SpringBoot整合Email 邮件发送详解

文章目录 SpringBoot整合Email 邮件发送详解 一、引言二、邮件发送需要的配置 1、获取客户端授权码 163邮箱授权码获取QQ邮箱授权码获取 2、SpringBoot配置SMTP服务 SpringBoot整合Email 邮件发送详解 一、引言二、环境准备与配置 1、依赖配置2、配置文件设置 163邮箱配置示例…

生成式人工智能:技术革命与应用图景

(这文章有些地方看不懂很正常&#xff0c;因为有太多生词&#xff0c;需要对 计算机/人工智能 研究至深的人才能看懂&#xff0c;遇到不会的地方用浏览器搜索或跳过&#xff09; 引言 2023年被称我们为"生成式AI元年"&#xff0c;以GPT-4、DALL-E 3、Stable Diffusi…

android studio下载安装汉化-Flutter安装

1、下载android studio官方地址&#xff1a;&#xff08;这个网址可能直接打不开&#xff0c;需要VPN&#xff09; https://developer.android.com/studio?hlzh-cn mac版本分为X86和arm版本&#xff0c;电脑显示芯片是Inter的就是x86的&#xff0c;显示m1和m2的就是arm的 …

【Apache Paimon】-- 作为一名小白,如何系统地学习 Apache paimon?

目录 一、整体规划 1. 了解基本概念与背景 2. 学习资料的选择 3. 学习路径与规划 4. 学习建议 5. 注意事项 6. 参考学习资料 二、详细计划 阶段 1&#xff1a;了解基础&#xff08;1-2 周&#xff09; 阶段 2&#xff1a;深入掌握核心功能&#xff08;3-4 周&#xf…

Matlab离线安装硬件支持包的方法

想安装支持树莓派的包&#xff0c;但是发现通过matlab安装需要续订维护服务 可以通过离线的方式安装。 1. 下载SupportSoftwareDownloader Support Software Downloader - MATLAB & Simulink 登录账号 选择对应的版本 2. 选择要安装的包 3.将下载的包copy到安装目录下 …

WPF的MVVMLight框架

在NuGet中引入该库&#xff1a; MVVMLight框架中的命令模式的使用&#xff1a; <StackPanel><TextBox Text"{Binding Name}"/><TextBox Text"{Binding Title}"/><Button Content"点我" Command"{Binding ShowCommand…

ShenNiusModularity项目源码学习(8:数据库操作)

ShenNiusModularity项目使用SqlSugar操作数据库。在ShenNius.Repository项目中定义了ServiceCollectionExtensions.AddSqlsugarSetup函数注册SqlSugar服务&#xff0c;并在ShenNius.Admin.API项目的ShenniusAdminApiModule.OnConfigureServices函数中调用&#xff0c;SqlSugar所…

【含开题报告+文档+源码】基于Web的房地产销售网站的设计与实现

开题报告 该系统是一个全面的房地产销售平台&#xff0c;旨在提供用户一个一站式的购房体验。系统具备注册登录功能&#xff0c;允许用户注册账户并登录系统进行操作。在系统中&#xff0c;用户可以获取最新的房产资讯&#xff0c;包括房地产市场的动态、政策法规以及楼盘介绍…

啥是目标文件?目标文件里面有什么?

从c文件到二进制可执行文件主要经过预处理、编译、汇编和链接的过程&#xff0c;而在这些过程中参与到的文件类型主要有源文件、头文件、目标文件、静态库、动态库和可执行文件。下面表格对这些文件的生成过程和涉及到的步骤名称做个总结。 过程涉及文件编译source header -&…

Day45(补)【软考】2022年下半年软考软件设计师综合知识真题-计算机软件知识1

文章目录 2022年下半年软考软件设计师综合知识真题第1章 计算机系统基础知识(12/38)计算机软件知识1-6/6哲学概念及收敛思维&#xff1a;是Python程序语言中&#xff0c;处理异常的结构集合&#xff0c;和这个集合之外的结构的区分&#xff0c;考Python集合之外的结构 哲学概念…

Spring MVC Streaming and SSE Request Processing SSE可以实现chatgpt一次请求分批次响应的效果

1. Introduction This simple tutorial demonstrates the use of several asynchronous and streaming objects in Spring MVC 5.x.x. Specifically, we’ll review three key classes: ResponseBodyEmitterSseEmitterStreamingResponseBody Also, we’ll discuss how to i…