由于本人目前在公司中主要负责的业务之一就是文件图片的上传和下载系统,所以为了提升自我能力,我在业余的时间对分布式文件系统Minio源码进行了学习,希望对之后的工作和学习有所帮助。下面对Minio进行介绍。
使用场景:
用于互联网海量非结构化数据的存储需求。
- 电商网站:海量电商图片
- 视频网站:海量视频资源
- 网盘:海量文件
- 社交网站:海量社交图片
为什么不使用数据库来存储海量的图片等非结构化数据呢,而是使用分布式文件系统呢?
从成本上来说,数据库的存储成本,比如MySQL或者PostgreSQL,每个字段都要存储,对大文件来说,存储效率低,而且备份和恢复可能麻烦。另外,数据库的索引机制不适合大块数据,读取速度可能慢。
分布式文件系统比如HDFS、Ceph或者对象存储如AWS S3,它们设计用来处理大量非结构化数据,提供高吞吐量,数据分片存储,容易扩展。还有CDN加速下载,这对图片这类需要快速访问的数据很有利。另外,分布式文件系统具有高吞吐I/O的优点,可以针对大规模数据的并行读写优化,适合批量处理场景。
Minio介绍
MinIO 是一个开源的高性能分布式对象存储系统,专为云原生和大规模非结构化数据存储而设计。它兼容 Amazon S3 API,支持海量数据的存储与管理,适用于图片、视频、日志文件、备份数据等场景。另外,Minio使用go语言开发的,go语言天然就有跨平台特性和性能高的特点.
Minio的核心特性
- 高性能
MinIO 在标准硬件上可实现高达 183 GB/s 的读取速度和 171 GB/s 的写入速度,适合作为主存储层处理 Spark、TensorFlow 等高性能计算任务28。
- 兼容性
完全兼容 Amazon S3 API,支持无缝集成现有 S3 生态工具(如 AWS CLI、SDK),并支持 S3 Select 功能,可直接查询存储的数据18。
- 可扩展性
采用分布式架构,支持通过添加节点线性扩展存储容量和吞吐量,同时支持纠删码技术(Reed-Solomon Code),允许在丢失半数存储节点的情况下恢复数据34。
- 安全性
提供数据加密(传输加密和静态加密)、访问控制(IAM 策略、RBAC)及身份验证(JWT、OAuth2)功能,确保数据安全13。
- 轻量易用
单二进制文件部署,支持多种操作系统(Windows、Linux、macOS)和容器化平台(Docker、Kubernetes),并提供可视化控制台管理
与HDFS相比
HDFS 的局限:架构复杂、扩展性差、小文件性能差,逐渐被云原生方案取代。
MinIO 的崛起:凭借轻量、S3 兼容性和纠删码技术,成为现代分布式存储的首选。
混合架构趋势:部分企业将冷数据存至 HDFS,热数据迁移至 MinIO,兼顾成本与性能
企业通过冷热分层存储实现成本与性能的最优解:
- HDFS可以无缝衔接Hdoop生态,它可以保证保障冷数据低成本存储:借助 EC 和压缩技术,在 Hadoop 生态内完成批量分析。
- MinIO 赋能热数据高性能访问:通过云原生架构和 S3 协议兼容性,支撑实时业务需求。
此策略尤其适合需同时处理历史数据分析与实时交互的场景(如金融风控、物联网平台),是传统大数据架构向云原生演进的关键过渡方案。
SpringBoot使用minio实现文件上传下载
本代码以minio的单服务器模式举例,对于单服务器模式,如果上传大文件(>500M),为了提升稳定性和效率,文件的分片合并需要由我们自己来完成。
为什么单机模式下minio没有提供分片呢?
如果是分布式部署minio,minio会将数据分片,提升存储扩展性和并行处理能力,通过将数据分散到多个节点或磁盘,实现负载均衡和容错。而单机模式中,所有数据存储在单一服务器或本地磁盘上,无需跨节点协作,因此分片的分布式优势(如并行读写、冗余恢复)失去意义。
一、客户端分片生成(dataShard 的获取)
File file = new File("/path/to/large-file.zip");
long chunkSize = 5 * 1024 * 1024; // 5MB
long totalSize = file.length();
int totalShards = (int) Math.ceil((double) totalSize / chunkSize);List<byte[]> dataShard = new ArrayList<>();
try (InputStream is = new FileInputStream(file)) {for (int i = 0; i < totalShards; i++) {int bytesRead;byte[] buffer = new byte[(int) chunkSize];bytesRead = is.read(buffer);// 去除末尾空字节(最后一片可能不足 chunkSize)byte[] trimmedBuffer = Arrays.copyOf(buffer, bytesRead);dataShard.add(trimmedBuffer);}
}
二、分片上传流程
//1、初始化上传任务
// 创建上传 ID
String uploadId = minioClient.initMultipartUpload("my-bucket", "large-file.zip");
//2、并发上传分片
List<Part> parts = new ArrayList<>();
for (int i = 0; i < dataShard.size(); i++) {byte[] chunk = dataShard.get(i);int partNumber = i + 1; // 分片编号从 1 开始// 上传分片并获取 ETagString etag = minioClient.uploadPart("my-bucket", "large-file.zip", uploadId, partNumber,new ByteArrayInputStream(chunk), chunk.length);parts.add(new Part(partNumber, etag));
}
//3、合并分片
minioClient.completeMultipartUpload("my-bucket", "large-file.zip", uploadId, parts
);
分片唯一标识
- ETag 校验:每个分片上传后,MinIO 返回分片的唯一标识 ETag(即 MD5 哈希值)。
- 客户端缓存:需本地记录分片顺序和 ETag,用于断点续传和合并校验。
三、分片下载与组装
//1、直接下载完整文件
InputStream stream = minioClient.getObject(GetObjectArgs.builder().bucket("my-bucket").object("large-file.zip").build()
);
//2、断点续传下载
// 从第 5MB 开始下载剩余部分
GetObjectArgs args = GetObjectArgs.builder().bucket("my-bucket").object("large-file.zip").offset(5 * 1024 * 1024L).build();
InputStream stream = minioClient.getObject(args);
//3、客户端手动组装
#列出所有分片
ListPartsResponse partsResponse = minioClient.listParts("my-bucket", "large-file.zip", uploadId
);
List<Part> parts = partsResponse.parts();
#按顺序下载分片
List<byte[]> downloadedShards = new ArrayList<>();
for (Part part : parts) {InputStream stream = minioClient.getPart("my-bucket", "large-file.zip", uploadId, part.partNumber());downloadedShards.add(IOUtils.toByteArray(stream));
}
#本地合成分片
try (FileOutputStream fos = new FileOutputStream("merged-file.zip")) {for (byte[] shard : downloadedShards) {fos.write(shard);}
}
Minio上传下载文件的核心流程和代码执行路径
一、上传文件流程(PUT请求)
1、请求入口(HTTP路由)
- 触发代码
internal/api/router.go
- 路由
GET /{bucket}/{object...}
映射到objectHandlers.GetObjectHandler
- 路由
关键函数
// internal/api/object-handlers.go
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) {// 处理上传逻辑
}
2、认证与权限校验
- 触发代码:触发代码:
internal/auth/ 和中间件
- 请求会通过中间件链(如 internal/handler/auth-handler.go)进行身份验证(IAM、STS)和桶权限校验(Bucket Policy)。
- 校验客户端是否有 s3:PutObject 权限。
关键函数
// internal/handler/auth-handler.go
func AuthMiddleware(next http.Handler) http.Handler {// 校验请求签名和权限
}
3、数据分片读取与解码
- 触发代码:
internal/erasure/encode.go
-
如果是分布式模式(Erasure Coding),对象会被分片为 N+M 块(数据块 + 校验块)。
-
使用 Reed-Solomon 算法进行编码(依赖库 klauspost/reedsolomon)。
-
关键函数
// internal/erasure/encode.go
func (e *Erasure) Encode(ctx context.Context, data []byte) ([][]byte, error) {// 数据分片和纠删码编码
}
4、数据持久化到磁盘
- 触发代码:
internal/storage/posix.go
- 将分片后的数据块写入磁盘(每个分片存储为单独的文件)。
- 路径格式:
<disk>/<bucket>/<object>/part.<shard-id>
。
关键函数
// internal/storage/posix.go
func (s *posix) WriteFile(ctx context.Context, volume, path string, buf []byte) error {// 写入分片文件
}
5、元数据更新
- 触发代码:
internal/metacache/
- 更新对象的元数据(如 xl.meta 文件),记录分片位置、哈希值、时间戳等。
- 元数据存储在
.minio.sys/buckets/<bucket>/<object>/xl.meta
。
关键代码
// internal/metacache/metacache.go
func (m *metaCache) UpdateMetadata(oi ObjectInfo) error {// 更新元数据
}
6、响应客户端
- 触发代码:
internal/api/object-handlers.go
- 返回 HTTP 200 OK 或错误码(如 403 Forbidden 或 500 Internal Server Error)。
二、下载文件(GET 请求)流程
1. 请求入口(HTTP 路由)
- 触发代码:internal/api/router.go
- 路由 GET /{bucket}/{object…} 映射到 objectHandlers.GetObjectHandler。
关键函数:
- 路由 GET /{bucket}/{object…} 映射到 objectHandlers.GetObjectHandler。
// internal/api/object-handlers.go
func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) {// 处理下载逻辑
}
2. 认证与权限校验
- 触发代码:同上传流程,校验
s3:GetObject
权限。
3. 数据分片读取与解码
- 触发代码:
internal/erasure/decode.go
- 从多个磁盘读取分片数据(至少需要 N 个分片才能解码)。
- 如果某些分片损坏或丢失,通过纠删码算法恢复数据。
关键函数:
// internal/erasure/decode.go
func (e *Erasure) Decode(ctx context.Context, shards [][]byte) ([]byte, error) {// 解码并合并数据
}
4. Bitrot 校验(可选)
- 触发代码:
internal/hashicorp/bitrot.go
- 校验数据块的哈希值,检测磁盘静默错误(Bitrot)。
- 若数据损坏,触发自动修复(通过后台 heal 服务)。
关键函数:
// internal/hashicorp/bitrot.go
func VerifyShard(algorithm BitrotAlgorithm, checksum []byte, data []byte) bool {// 校验数据完整性
}
5、数据流式返回
- 触发代码:
internal/http/response-writer.go
- 将解码后的数据通过 HTTP 分块传输(Chunked Encoding)流式返回。
关键逻辑
// internal/http/response-writer.go
func (w *responseWriter) Write(p []byte) (int, error) {// 流式写入响应体
}
三、关键设计细节
1. 并发控制
- 上传:通过
internal/sync/
中的互斥锁(Mutex)或分布式锁(Dsync)保证并发写入的一致性。 - 下载:无锁设计,允许多个客户端并发读取。
2. 错误处理
- 上传失败:若部分分片写入失败,客户端会收到错误码,未完成的分片会被垃圾回收(
internal/background/gc.go
)。 - 下载失败:若无法读取足够分片(如节点宕机),返回 404 Not Found 或 503 Service Unavailable。
3. 分布式模式下的流程
- 节点选择:使用一致性哈希算法选择存储节点。
- 数据修复:后台服务
internal/background/heal.go
自动检测并修复丢失的分片。
源码分析
在Minio中发起上传请求时,核心流程是数据的分片和编码、分布式写入部分。
一、分片与编码阶段
1、纠删码分片逻辑
MinIO 使用 Reed-Solomon 算法将原始数据切分为 数据块(Data Shards) 和 校验块(Parity Shards)。例如,若配置为 4+2 纠删码,一个 100MB 的文件会被切分为 4 个 25MB 的数据块,并生成 2 个 25MB 的校验块。
代码实现:核心逻辑位于 cmd/erasure-code.go
的 Erasure.Encode()
方法,通过 splitData 方法分片,再调用纠删码编码器生成校验块。
//数据通过 EncodeData() 方法被分片为数据块和校验块,调用 Reed-Solomon 算法完成编码
func (e *Encoder) EncodeData(data []byte) ([][]byte, error) {shards, err := e.Split(data) // 数据分片if err != nil {return nil, err}return e.Encode(shards) // 生成校验块
}
2、哈希定位存储节点
分片后的每个块通过 一致性哈希算法 确定目标节点和磁盘。例如,使用 hashKey() 函数(位于 cmd/hasher.go)对对象键(Key)进行哈希计算,再通过取模运算分配到具体磁盘。
//每个分片通过一致性哈希算法(hashKey())确定存储位置,源码位于 cmd/hasher.go
func hashKey(key string, totalDisks int) int {hash := fnv.New32a()hash.Write([]byte(key))return int(hash.Sum32()) % totalDisks
}
示例:若集群有 6 个节点,哈希值对 6 取模后,确定数据块应写入的节点索引。
并行写入阶段
二、多线程并行传输
1、客户端或服务端通过 并发协程(Goroutine) 将分片后的数据块和校验块同时发送到目标节点。例如,4 个数据块和 2 个校验块会被 6 个独立的线程并行传输。
性能优化:通过 Go 语言的并发模型,充分利用网络带宽和磁盘 I/O,避免单线程瓶颈。
//使用 Go 的 errgroup 包启动多个协程,每个协程负责一个分片的传输,并行写入不同节点
g := new(errgroup.Group)
for _, shard := range shards {g.Go(func() error {return node.Write(shard) // 并行写入分片})
}
if err := g.Wait(); err != nil {// 错误处理
}
2、分布式存储协议
节点间通过 HTTP/HTTPS 协议 进行通信,每个分片作为独立对象写入目标节点的磁盘。写入时遵循以下步骤:
- 临时目录写入:分片先被写入节点的临时目录(如 tmp),完成校验后通过原子操作重命名为正式存储路径。
- 元数据一致性:每个分片的元数据(如哈希值、分片索引)同步记录到所有相关节点,确保后续读取时能快速定位。
分片先写入临时目录(如 .minio/tmp),通过文件系统级 rename 操作原子提交,避免写入过程中断导致数据不一致
3、冗余与一致性校验
- 冗余写入:每个分片根据纠删码规则分布在多个节点,例如 4+2 配置下,每个数据块至少存在于 2 个节点(通过哈希副本策略)。
- 校验机制:写入完成后,MinIO 会验证所有分片的哈希值(如 SHA-256),确保数据完整性。若某节点写入失败,立即触发重试或切换备用节点。
总结
MinIO 的并行写入通过 分片编码、一致性哈希定位、协程并发传输 实现,其源码设计特点包括:
- 高效并发:利用 Go 协程和 errgroup 实现无锁并行;
- 原子性保障:临时文件 + rename 操作确保写入完整性;
- 智能容错:动态重试、负载均衡与自动修复机制。
这种设计使其在分布式场景下能充分发挥硬件性能,同时保障数据可靠性。
个人实现
我对要MinIO 并行写入的核心逻辑(分片、哈希定位、多节点并发写入),使用Java进行了简单实现。以下是关键实现步骤及代码示例
一、核心架构设计
1. 模块划分
// 模块职责说明
public class MinIOParallelWriter {// 分片编码器(模拟纠删码)private ErasureEncoder encoder; // 哈希定位器private NodeHasher nodeHasher;// 线程池管理并发写入private ExecutorService executor;
}
二、分片与编码实现
1. 数据分片(模拟纠删码)
public class ErasureEncoder {// 数据分片(示例:4+2 配置)public List<byte[]> splitAndEncode(byte[] data) {int dataShards = 4, parityShards = 2;List<byte[]> shards = new ArrayList<>();// 模拟分片(实际需实现 Reed-Solomon 算法)for (int i = 0; i < dataShards + parityShards; i++) {shards.add(Arrays.copyOfRange(data, i * shardSize, (i+1)*shardSize));}return shards;}
}
三、并发写入实现
- 哈希定位存储节点
public class NodeHasher {// 一致性哈希计算(类似 MinIO 的 hashKey 逻辑)public int hashKey(String objectKey, int nodeCount) {return objectKey.hashCode() % nodeCount;}
}
- 多线程写入控制
public class MinIOParallelWriter {public void writeParallel(byte[] data, String objectKey) {List<byte[]> shards = encoder.splitAndEncode(data);List<CompletableFuture<Void>> futures = new ArrayList<>();for (int i = 0; i < shards.size(); i++) {int nodeIndex = nodeHasher.hashKey(objectKey + i, nodeCount);futures.add(CompletableFuture.runAsync(() -> {uploadShardToNode(shards.get(i), nodeIndex, objectKey);}, executor));}CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();}private void uploadShardToNode(byte[] shard, int nodeIndex, String objectKey) {// 调用 MinIO SDK 写入特定节点(需自定义节点路由)minioClient.putObject(PutObjectArgs.builder().bucket("bucket").object(objectKey + "_shard_" + nodeIndex).stream(new ByteArrayInputStream(shard), shard.length, -1).build());}
}
关键点:
- 使用 CompletableFuture 实现非阻塞并发
- 通过 objectKey + “shard” + nodeIndex 模拟分片存储路径
四、优化方向
1. 动态节点发现
// 结合服务发现机制(如 Consul)
public class DynamicNodeManager {public List<String> getActiveNodes() {// 从注册中心获取在线节点}
}
- 原子性写入保障
// 写入临时路径后原子重命名
private void atomicCommit(String tempPath, String finalPath) {Path source = Paths.get(tempPath);Files.move(source, source.resolveSibling(finalPath), StandardCopyOption.ATOMIC_MOVE);
}
- 故障重试机制
// 指数退避重试(参考 Resilience4j)
RetryConfig config = RetryConfig.custom().maxAttempts(3).waitDuration(Duration.ofMillis(500)).build();
Retry retry = Retry.of("uploadRetry", config);