MinIO分片上传超大文件(纯服务端)

目录

  • 一、MinIO快速搭建
    • 1.1、拉取docker镜像
    • 1.2、启动docker容器
  • 二、分片上传大文件到MinIO
    • 2.1、添加依赖
    • 2.2、实现MinioClient
    • 2.3、实现分片上传
      • 2.3.0、初始化MinioClient
      • 2.3.1、准备分片上传
      • 2.3.2、分片并上传
        • 2.3.2.1、设置分片大小
        • 2.3.2.2、分片
      • 2.3.3、分片合并
  • 三、测试
    • 3.1、完整测试代码
    • 3.2、运行日志和效果

一、MinIO快速搭建

这里简单介绍一下通过docker方式快速搭建MinIO的大体流程。

1.1、拉取docker镜像

首先直接尝试拉取:

docker pull minio/minio

如果拉不到,试图更改docker镜像源:

echo '{"registry-mirrors": ["https://4xxwxhl6.mirror.aliyuncs.com","https://mirror.iscas.ac.cn","https://docker.rainbond.cc","https://docker.nju.edu.cn","https://6kx4zyno.mirror.aliyuncs.com","https://mirror.baidubce.com","https://docker.m.daocloud.io","https://dockerproxy.com"]
}' | sudo tee /etc/docker/daemon.json > /dev/null

接着重启docker服务,使新配置生效:

sudo systemctl restart docker

最后再次拉取即可。

1.2、启动docker容器

首先创建配置和数据目录:

mkdir -p /opt/minio/config
mkdir -p /opt/minio/data

接着启动:

docker run -p 9000:9000 -p 9001:9001 --net=host --name minio -d --restart=always -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" -v /opt/minio/data:/data -v /opt/minio/config:/root/.minio minio/minio server /data --console-address ":9001" -address ":9000"

最后进入MinIO控制台http://192.168.2.195:9001,简单做点存储桶、用户、用户组等配置即可。比如创建新用户名minioUser,密码minioUser123。

二、分片上传大文件到MinIO

2.1、添加依赖

这里需要注意minio 8.3.3必须依赖okhttp的版本不小于4.8.1。

// minio 8.3.3 Must use okhttp >= 4.8.1
implementation 'io.minio:minio:8.3.3'
implementation 'com.squareup.okhttp3:okhttp:4.12.0'

2.2、实现MinioClient

参考S3官方文档https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html#mpu-process,大文件的分片上传,主要分三步实现:

  1. initMultiPartUpload创建一个大文件分片上传任务
  2. uploadMultiPart逐个上传分片
  3. mergeMultipartUpload合并分片

通过继承默认的MinioClient,将一些相关的重要方法暴露出来,以便使用。

package com.szh.minio;import com.google.common.collect.Multimap;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Part;import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;public class CustomMinioClient extends MinioClient {/*** 继承父类*/public CustomMinioClient(MinioClient client) {super(client);}/*** 初始化分片上传即获取uploadId*/public String initMultiPartUpload(String bucket, String region, String object, Multimap<String, String> headers, Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException {CreateMultipartUploadResponse response = this.createMultipartUpload(bucket, region, object, headers, extraQueryParams);return response.result().uploadId();}/*** 上传单个分片*/public UploadPartResponse uploadMultiPart(String bucket, String region, String object, Object data,long length,String uploadId,int partNumber,Multimap<String, String> headers,Multimap<String, String> extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException {return this.uploadPart(bucket, region, object, data, length, uploadId, partNumber, headers, extraQueryParams);}/*** 合并分片*/public ObjectWriteResponse mergeMultipartUpload(String bucketName, String region, String objectName, String uploadId, Part[] parts, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws IOException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException, ServerException, InvalidKeyException {return this.completeMultipartUpload(bucketName, region, objectName, uploadId, parts, extraHeaders, extraQueryParams);}public void cancelMultipartUpload(String bucketName, String region, String objectName, String uploadId, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, IOException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {this.abortMultipartUpload(bucketName, region, objectName, uploadId, extraHeaders, extraQueryParams);}/*** 查询当前上传后的分片信息*/public ListPartsResponse listMultipart(String bucketName, String region, String objectName, Integer maxParts, Integer partNumberMarker, String uploadId, Multimap<String, String> extraHeaders, Multimap<String, String> extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {return this.listParts(bucketName, region, objectName, maxParts, partNumberMarker, uploadId, extraHeaders, extraQueryParams);}
}

2.3、实现分片上传

2.3.0、初始化MinioClient

连接到minio,并确保存储桶的存在。

static CustomMinioClient minioClient = new CustomMinioClient(MinioClient.builder().endpoint("http://192.168.2.195:9000").credentials("minioUser", "minioUser123").build());
// 测试桶
static String bucketName = "test";
static {try {boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());if (!found) {minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());}} catch (Exception e) {throw new RuntimeException(e);}
}

2.3.1、准备分片上传

创建一个大文件分片上传任务。

String contentType = "application/octet-stream";
HashMultimap<String, String> headers = HashMultimap.create();
headers.put("Content-Type", contentType);
String uploadId = minioClient.initMultiPartUpload(bucketName, null, file.getName(), headers, null);
System.out.println("uploadId: " + uploadId);

2.3.2、分片并上传

本文是使用纯服务端进行分片和上传,而实际项目中更推荐由后端首先调用minio的接口getPresignedObjectUrl,逐个生成每个分片的签名后的上传url,然后前端直接以此上传到minio,即可省去后端服务的网络IO开销。

📢 后者方案请见:MinIO分片上传超大文件(非纯服务端)

2.3.2.1、设置分片大小

一方面,需要注意单个分片大小最小5MB,如果每个分片设置小于5MB,则minio或S3底层在合并时报错:code = EntityTooSmall, message = Your proposed upload is smaller than the minimum allowed object size

另一方面,在调整分片大小时,需要注意minio或S3底层允许的分片范围[1,10000]

2.3.2.2、分片

一方面,为了保证分片的效率,借助线程池的并发,以及RandomAccessFile的文件随机访问能力,更快地完成分片的流程。当然,可控制并发数和分片大小以防止并发分片中的OOM。

另一方面,考虑到分片全部完成之后,还有最后的合并操作,所以借助CountDownLatch来确保所有分片上传之后,再去执行合并。

2.3.3、分片合并

合并所有已上传的分片。

Part[] parts = new Part[(int) chunkCount];
// 查询上传后的分片数据。S3最大允许10000,且从1开始
ListPartsResponse partResult = minioClient.listMultipart(bucketName, null, file.getName(), 10000, 0, uploadId, null, null);
int partNumber = 1;
for (Part part : partResult.result().partList()) {parts[partNumber - 1] = new Part(partNumber, part.etag());partNumber++;
}
ObjectWriteResponse objectWriteResponse = minioClient.mergeMultipartUpload(bucketName, null, file.getName(), uploadId, parts, null, null);

三、测试

3.1、完整测试代码

package com.szh.minio;import com.google.common.collect.HashMultimap;
import io.minio.*;
import io.minio.messages.Part;
import lombok.Getter;
import lombok.Setter;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Setter
@Getter
public class MinioMain {
static CustomMinioClient minioClient = new CustomMinioClient(MinioClient.builder().endpoint("http://192.168.2.195:9000").credentials("minioUser", "minioUser123").build());// 测试桶static String bucketName = "test";static {try {boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());if (!found) {minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());}} catch (Exception e) {throw new RuntimeException(e);}}// 需要被分片上传的大文件static String filePath = "C:\\tmp\\psi_result.csv";static File file = new File(filePath);// 单个分片大小5MB,如果每个分片设置小于5MB,则minio或S3底层在合并时报错:// code = EntityTooSmall, message = Your proposed upload is smaller than the minimum allowed object size.static final long CHUNK_SIZE = 5 * 1024 * 1024;// 当前分片号,minio或S3底层允许的分片范围[1,10000]// https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html#mpu-processprivate int chunkIndex;// 用于得知所有分片都传输成功后的时刻,进而进行合并private static CountDownLatch countDownLatch;public static void main(String[] args) throws Exception {// 第一步:准备分片上传String contentType = "application/octet-stream";HashMultimap<String, String> headers = HashMultimap.create();headers.put("Content-Type", contentType);String uploadId = minioClient.initMultiPartUpload(bucketName, null, file.getName(), headers, null);System.out.println("uploadId: " + uploadId);// 第二步:分片并上传// ps:实际项目中可由后端先getPresignedObjectUrl逐个生成每个分片的签名后的上传url,前端直接以此上传到minio,即可省去后端服务的网络开销long totalLength = file.length();System.out.println("totalLength: " + totalLength + " Byte");// 计算分片数量long chunkCount = (totalLength + CHUNK_SIZE - 1) / CHUNK_SIZE;System.out.println("chunkCount: " + chunkCount);countDownLatch = new CountDownLatch((int) chunkCount);// 5个核心线程并发上传分片ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);for (long i = 0; i < chunkCount; i++) {long position = i * CHUNK_SIZE;int bytesRead = (int) Math.min(CHUNK_SIZE, totalLength - position);MinioMain minioMain = new MinioMain();// S3分片号从1开始minioMain.setChunkIndex((int) i + 1);fixedThreadPool.submit(new Runnable() {@Overridepublic void run() {try {// 上传分片minioMain.processChunk(filePath, position, bytesRead, uploadId);} catch (Exception e) {throw new RuntimeException(e);}}});}countDownLatch.await();fixedThreadPool.shutdownNow();// 第三步:合并分片System.out.println("ready to merge <" + file.getName() + " - " + uploadId + " - " + bucketName + ">");Part[] parts = new Part[(int) chunkCount];// 查询上传后的分片数据。S3最大允许10000,且从1开始ListPartsResponse partResult = minioClient.listMultipart(bucketName, null, file.getName(), 10000, 0, uploadId, null, null);int partNumber = 1;for (Part part : partResult.result().partList()) {parts[partNumber - 1] = new Part(partNumber, part.etag());partNumber++;}ObjectWriteResponse objectWriteResponse = minioClient.mergeMultipartUpload(bucketName, null, file.getName(), uploadId, parts, null, null);System.out.println("mergeMultipartUpload resp etag: " + objectWriteResponse.etag());StatObjectResponse statObjectResponse = minioClient.statObject(StatObjectArgs.builder().bucket(bucketName).object(file.getName()).build());System.out.println("etag: " + statObjectResponse.etag() + " size: " + statObjectResponse.size() + " lastModified: " + statObjectResponse.lastModified());}private void processChunk(String filePath, long position, int bytesRead, String uploadId) {// 可控制并发数和分片大小以防止OOMbyte[] buffer = new byte[bytesRead];RandomAccessFile raf = null;try {int chunkIndex = this.getChunkIndex();raf = new RandomAccessFile(filePath, "r");// 定位到指定位置raf.seek(position);// 读取bytesRead字节长度作为分片raf.readFully(buffer);String contentType = "application/octet-stream";HashMultimap<String, String> headers = HashMultimap.create();headers.put("Content-Type", contentType);UploadPartResponse uploadPartResponse = minioClient.uploadMultiPart(bucketName, null, file.getName(),buffer, bytesRead,uploadId, chunkIndex, headers, null);System.out.println("chunk[" + chunkIndex + "] buffer size: [" + buffer.length + " Byte] upload etag: [" + uploadPartResponse.etag() + "]");} catch (Exception e) {e.printStackTrace();} finally {if (raf != null) {try {raf.close();} catch (IOException e) {e.printStackTrace();}}countDownLatch.countDown();}}
}

3.2、运行日志和效果

运行日志如下:

uploadId: MzFiMWRmZjctMDg0Yy00YzMyLTk5NTYtMjRkZGZiMDZlYjJhLmUwZmFkNzFiLWEwZTctNDU1Yi04ZWFjLWFhODQyZjBiMmIyOXgxNzI3MzQwMjUzMTA2Njc5MTEz
totalLength: 3576974860 Byte
chunkCount: 683
chunk[1] buffer size: [5242880 Byte] upload etag: [97096e510d1dcda56646608345de08ea]
chunk[3] buffer size: [5242880 Byte] upload etag: [d8102f80f10eb79f600cdf2d378ae8fe]
chunk[4] buffer size: [5242880 Byte] upload etag: [b74f9b8fa2025580b4fc00449c66e271]
chunk[5] buffer size: [5242880 Byte] upload etag: [e77603ee49cc3f7d229f124ecd9a3f38]
chunk[2] buffer size: [5242880 Byte] upload etag: [b148b311ccd2b3fcd4777d56a8758c3d]
chunk[6] buffer size: [5242880 Byte] upload etag: [94abe5a7a2117b612d9805029398cfd9]
chunk[7] buffer size: [5242880 Byte] upload etag: [433b52aed0d1b1486df07a2259932a83]
chunk[8] buffer size: [5242880 Byte] upload etag: [2c242bd205f9b3c4546454fe2d0abef4]
...
chunk[679] buffer size: [5242880 Byte] upload etag: [8492b0573cc74ec55cb6d2a86aee0f69]
chunk[678] buffer size: [5242880 Byte] upload etag: [4aa5c01b4f7aea95952ec62d71ee9996]
chunk[681] buffer size: [5242880 Byte] upload etag: [ac0b739044bfd2644fc8da97fc03a1a9]
chunk[680] buffer size: [5242880 Byte] upload etag: [d95ee210ac774b3ca26e091941c66e20]
chunk[682] buffer size: [5242880 Byte] upload etag: [75e78df64c1fad0839ba8a1583cd93ec]
chunk[683] buffer size: [1330700 Byte] upload etag: [2f30c8d65e23d266c7f10f051854bc6a]
ready to merge <psi_result.csv - MzFiMWRmZjctMDg0Yy00YzMyLTk5NTYtMjRkZGZiMDZlYjJhLmUwZmFkNzFiLWEwZTctNDU1Yi04ZWFjLWFhODQyZjBiMmIyOXgxNzI3MzQwMjUzMTA2Njc5MTEz - test>
mergeMultipartUpload resp etag: "ff6ebd330b3cb224ade84463dd14df82-683"
etag: ff6ebd330b3cb224ade84463dd14df82-683 size: 3576974860 lastModified: 2024-09-26T09:09Z

上传后的控制台:
MinioConsole

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/444088.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vscode+Pycharm+Vue.js+WEUI+django火锅(三)理解Vue

新创建的Vue项目里面很多文件&#xff0c;对于新手&#xff0c;老老实实做一下了解。 1.框架逻辑 框架的逻辑都是相通的&#xff0c;花点时间理一下就清晰了。 2.文件目录及文件 创建好的vue项目下&#xff0c;主要的文件和文件夹要先认识一下&#xff0c;并与框架逻辑对应起…

计算机网络803-(4)网络层

目录 1.虚电路服务 虚电路是逻辑连接 2.数据报服务 3.虚电路服务与数据报服务的对比 二.虚拟互连网络-IP网 1.网络通信问题 2.中间设备 3.网络互连使用路由器 三.分类的 IP 地址 1. IP 地址及其表示方法 2.IP 地址的编址方法 3.分类 IP 地址 &#xff08;1&#x…

使用 Go 和 Gin 框架构建简单的用户和物品管理 Web 服务

使用 Go 和 Gin 框架构建简单的用户和物品管理 Web 服务 在本项目中&#xff0c;我们使用 Go 语言和 Gin 框架构建了一个简单的 Web 服务&#xff0c;能够管理用户和物品的信息。该服务实现了两个主要接口&#xff1a;根据用户 ID 获取用户名称&#xff0c;以及根据物品 ID 获…

蓝桥杯【物联网】零基础到国奖之路:十七. 扩展模块之单路ADC和NE555

蓝桥杯【物联网】零基础到国奖之路:十七. 扩展模块之单路ADC和NE555 第一节 硬件解读第二节 CubeMx配置第三节 代码1&#xff0c;脉冲部分代码2&#xff0c;ADC部分代码![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/57531a4ee76d46daa227ae0a52993191.png) 第一节 …

EasyExcel读入数字类型数据时出现小数位丢失精度问题

这里写自定义目录标题 问题现象解决方案 问题现象 目前使用easyExcel读取导入文档时发现文档中的小数值4076204076.65会被读取为4076204076.6500001 尝试去查看了excel解压后的文件&#xff0c;发现这条数据在xml里存储的值就是4076204076.6500001&#xff0c;即是excel存储小…

利用 Python 爬虫采集 1688商品详情

1688是中国的一个大型B2B电子商务平台&#xff0c;主要用于批发和采购各种商品。对于需要从1688上获取商品详情数据、工程数据或店铺数据的用户来说&#xff0c;可以采用以下几种常见的方法&#xff1a; 官方API接口&#xff1a;如果1688提供了官方的API接口&#xff0c;那么可…

喜讯!迈威通信TSN产品通过“时间敏感网络(TSN)产业链名录计划”评测,各项指标名列前茅

TSN技术&#xff0c;作为推动企业网络化与智能化转型的关键力量&#xff0c;已成为工业网络迈向下一代演进的共识方向&#xff0c;正加速重构工业网络的技术架构与产业生态。为响应这一趋势&#xff0c;工业互联网产业联盟携手中国信息通信研究院及50余家产学研用单位&#xff…

使用Google开源工具gperftools进行堆内存占用分析

背景&#xff1a;项目中有多卡训练的需求&#xff0c;多进程时每个进程都需要编译&#xff0c;占用内存过大&#xff0c;需要找出内存占用多的点并尝试优化。 目标程序是python的多进程程序&#xff0c;torch_xla多卡训练&#xff0c;程序包含python及c库&#xff0c;尝试过其他…

精益生产现场管理和改善:从知识到实操的落地

在制造业的广阔天地中&#xff0c;精益生产作为一种追求浪费最小化、效率最大化的生产管理模式&#xff0c;已成为众多企业转型升级的关键路径。本文&#xff0c;深圳天行健企业管理咨询公司将从精益生产现场管理和改善的理论知识出发&#xff0c;深入探讨其从理念导入到实操落…

【重学 MySQL】四十七、表的操作技巧——修改、重命名、删除与清空

【重学 MySQL】四十七、表的操作技巧——修改、重命名、删除与清空 修改表添加字段语法示例注意事项 删除字段语法示例 修改字段使用 MODIFY COLUMN语法示例 使用 CHANGE COLUMN语法示例 重命名表语法示例 删除表语法示例 清空表使用 TRUNCATE TABLE使用 DELETE FROM对比 TRUNC…

pytest框架之fixture测试夹具详解

前言 大家下午好呀&#xff0c;今天呢来和大家唠唠pytest中的fixtures夹具的详解&#xff0c;废话就不多说了咱们直接进入主题哈。 一、fixture的优势 ​ pytest框架的fixture测试夹具就相当于unittest框架的setup、teardown&#xff0c;但相对之下它的功能更加强大和灵活。 …

宠物空气净化器该怎么选?希喂,小米、安德迈这三款好用吗?

不得不说&#xff0c;虽然现在购物网站的活动不少&#xff0c;可力度都好弱啊&#xff01;我想买宠物空气净化器很久了&#xff0c;觉得有点贵&#xff0c;一直没舍得入手。价格一直没变化&#xff0c;平台小活动根本没什么优惠&#xff0c;只能寄希望于双十一了&#xff0c;准…

【docker】要将容器中的 livox_to_pointcloud2 文件夹复制到宿主机上

复制文件夹 使用 docker cp 命令从容器复制文件夹到宿主机&#xff1a; docker cp <container_id_or_name>:/ws_livox/src/livox_to_pointcloud2 /path/to/host/folder sudo docker cp dandong_orin_docker:/ws_livox/src/livox_to_pointcloud2 /home

WPS的JS宏实现删除某级标题下的所有内容

想要删除Word文档中&#xff0c;包含特定描述的标题下所有内容&#xff08;包含各级子标题以及正文描述&#xff09;。 例如下图中&#xff0c;想删除1.2.1.19.1业务场景下所有内容&#xff1a; 简单版&#xff1a; 删除光标停留位置的大纲级别下所有的内容。实现的JS代码如下…

【YOLO学习】YOLOv2详解

文章目录 1. 概述2. Better2.1 Batch Normalization&#xff08;批归一化&#xff09;2.2 High Resolution Classifier&#xff08;高分辨率分类器&#xff09;2.3 Convolutional With Anchor Boxes&#xff08;带有Anchor Boxes的卷积&#xff09;2.4 Dimension Clusters&…

光伏开发:一充一放和两充两放是什么意思?

一充一放 一充一放是指储能设备在一次充电过程中充满电&#xff0c;并在一次放电过程中将电能全部释放。这种模式的原理相对简单&#xff0c;充电时电能转化为化学能或其他形式的能量储存&#xff0c;放电时则将这些能量转化回电能供应给负载。一充一放模式适用于对储能设备充…

2024年9月国产数据库大事记-墨天轮

本文为墨天轮社区整理的2024年9月国产数据库大事件和重要产品发布消息。 目录 2024年9月国产数据库大事记 TOP102024年9月国产数据库大事记&#xff08;时间线&#xff09;产品/版本发布兼容认证代表厂商大事记厂商活动相关资料 2024年9月国产数据库大事记 TOP10 2024年9月国…

51单片机的无线通信智能车库门【proteus仿真+程序+报告+原理图+演示视频】

1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块红外传感器光照传感器时钟模块步进电机蓝牙按键、LED、蜂鸣器等模块构成。适用于智能车库自动门、无线控制车库门等相似项目。 可实现功能: 1、LCD1602实时显示北京时间和自动/手动模式&#xff0c;以及验证是否成…

揭秘HCIE证书:职场神话or锦上添花?深度剖析!

HCIE&#xff1a;职场赛道上的加速器 在职场这条充满挑战与机遇的赛道上&#xff0c;每个人都渴望找到那个能让自己加速前行的助推器。 HCIE证书&#xff0c;作为IT领域的顶级认证&#xff0c;无疑成为了许多人心目中的理想选择。它不仅是华为对网络专家专业能力的认可&#…

Biomamba求职| 国奖+4篇一作SCI

转眼间我也要参加秋招啦&#xff0c;认真的求职帖&#xff0c;各位老师/老板欢迎联系~其它需要求职的小伙伴也欢迎把简历发给我们&#xff0c;大家一起找工作。 一、基本信息 姓名&#xff1a;Biomamba 性别&#xff1a;男 出厂年份&#xff1a;1998 籍贯&#xff1a;浙江…