milvus insert数据在s3的存储

insert数据在s3的存储

对segment进行flush操作,会将数据持久化至s3对象存储。

相关核心代码位置:

ibNode.flushManager.flushBufferData()

主要代码在flushBufferData()函数。

代码位置:internal\datanode\flush_manager.go

// flushBufferData notifies flush manager insert buffer data.
// This method will be retired on errors. Final errors will be propagated upstream and logged.
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, dropped bool, pos *msgpb.MsgPosition) (*storage.PrimaryKeyStats, error) {field2Insert := make(map[UniqueID]*datapb.Binlog)field2Stats := make(map[UniqueID]*datapb.Binlog)kvs := make(map[string][]byte)tr := timerecord.NewTimeRecorder("flushDuration")// get segment infocollID, partID, meta, err := m.getSegmentMeta(segmentID, pos)if err != nil {return nil, err}inCodec := storage.NewInsertCodecWithSchema(meta)// build bin log blobbinLogBlobs, fieldMemorySize, err := m.serializeBinLog(segmentID, partID, data, inCodec)if err != nil {return nil, err}// build stats log blobpkStatsBlob, stats, err := m.serializePkStatsLog(segmentID, flushed, data, inCodec)if err != nil {return nil, err}// allocate// alloc for stats log if have new stats log and not flushingvar logidx int64allocNum := uint32(len(binLogBlobs) + boolToInt(!flushed && pkStatsBlob != nil))if allocNum != 0 {logidx, _, err = m.Alloc(allocNum)if err != nil {return nil, err}}// binlogsfor _, blob := range binLogBlobs {fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64)if err != nil {log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))return nil, err}k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)// [rootPath]/[insert_log]/keykey := path.Join(m.ChunkManager.RootPath(), common.SegmentInsertLogPath, k)kvs[key] = blob.Value[:]field2Insert[fieldID] = &datapb.Binlog{EntriesNum:    data.size,TimestampFrom: data.tsFrom,TimestampTo:   data.tsTo,LogPath:       key,LogSize:       int64(fieldMemorySize[fieldID]),}logidx += 1}// pk stats binlogif pkStatsBlob != nil {fieldID, err := strconv.ParseInt(pkStatsBlob.GetKey(), 10, 64)if err != nil {log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err))return nil, err}// use storage.FlushedStatsLogIdx as logidx if flushed// else use last idx we allocatedvar key stringif flushed {k := metautil.JoinIDPath(collID, partID, segmentID, fieldID)key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k, storage.CompoundStatsType.LogIdx())} else {k := metautil.JoinIDPath(collID, partID, segmentID, fieldID, logidx)key = path.Join(m.ChunkManager.RootPath(), common.SegmentStatslogPath, k)}kvs[key] = pkStatsBlob.Valuefield2Stats[fieldID] = &datapb.Binlog{EntriesNum:    0,TimestampFrom: 0, // TODOTimestampTo:   0, // TODO,LogPath:       key,LogSize:       int64(len(pkStatsBlob.Value)),}}m.handleInsertTask(segmentID, &flushBufferInsertTask{ChunkManager: m.ChunkManager,data:         kvs,}, field2Insert, field2Stats, flushed, dropped, pos)metrics.DataNodeEncodeBufferLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(tr.ElapseSpan().Milliseconds()))return stats, nil
}

BufferData数据结构

// BufferData buffers insert data, monitoring buffer size and limit
// size and limit both indicate numOfRows
type BufferData struct {buffer   *InsertDatasize     int64limit    int64tsFrom   TimestamptsTo     TimestampstartPos *msgpb.MsgPositionendPos   *msgpb.MsgPosition
}// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field}
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
//
// system filed id:
// 0: unique row id
// 1: timestamp
// 100: first user field id
// 101: second user field id
// 102: ...
type InsertData struct {// TODO, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArrayData  map[FieldID]FieldData // field id to field dataInfos []BlobInfo
}// TODO: fill it
// info for each blob
type BlobInfo struct {Length int
}

FieldData是一个接口,有13种实现:

  • FloatVectorFieldData
  • Float16VectorFieldData
  • BinaryVectorFieldData
  • ArrayFieldData
  • BoolFieldData
  • DoubleFieldData
  • FloatFieldData
  • Int16FieldData
  • Int32FieldData
  • Int64FieldData
  • Int8FieldData
  • JSONFieldData
  • stringFieldData

BufferData数据示例

在这里插入图片描述

前面的数字01234是FieldID,value是列数据。

这里可以发现collection只有3列数据,这里有5列,多了FieldID为0和1的列。

FieldID为0的是行id。FieldID为1的是时间戳。

在这里插入图片描述

在这里插入图片描述

序列化

前面的BufferData的数据不会直接存储进s3,而是先序列化后再存储到s3。

inCodec := storage.NewInsertCodecWithSchema(meta)
// build bin log blob
binLogBlobs, fieldMemorySize, err := m.serializeBinLog(segmentID, partID, data, inCodec)

inCodec是InsertCodec类型,InsertCodec提供了数据的序列化(Serialize函数)和反序列化(Deserialize函数)。

binLogBlobs是[]*Blob。

// Blob is a pack of key&value
type Blob struct {Key    stringValue  []byteSize   int64RowNum int64
}

在这里插入图片描述

Value为byte数组。

kvs

kvs := make(map[string][]byte)

遍历binLogBlobs数组,填充kvs。

向量数据在s3的存储路径:

分为insert_log和stats_log。stats_log存储的是主键状态。

files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)

logidx每次+1。

在这里插入图片描述

kvs的key为s3的路径,values为数据,按列写入s3。

例:

files/stats_log/447847714614087578/447847714614087579/447847714614287586/100为路径,

447847714613886995为文件名。

flushBufferInsertTask

m.handleInsertTask(segmentID, &flushBufferInsertTask{ChunkManager: m.ChunkManager,data:         kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)

flushInsertData

遍历数组写入s3。

// flushInsertData implements flushInsertTask
func (t *flushBufferInsertTask) flushInsertData() error {ctx, cancel := context.WithCancel(context.Background())defer cancel()if t.ChunkManager != nil && len(t.data) > 0 {tr := timerecord.NewTimeRecorder("insertData")group, ctx := errgroup.WithContext(ctx)for key, data := range t.data {key := keydata := datagroup.Go(func() error {return t.Write(ctx, key, data)})}err := group.Wait()metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))if err == nil {for _, d := range t.data {metrics.DataNodeFlushedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Add(float64(len(d)))}}return err}return nil
}

t.Write

t.Write()的实现为storage.MinioChunkManager

// Write writes the data to minio storage.
func (mcm *MinioChunkManager) Write(ctx context.Context, filePath string, content []byte) error {_, err := mcm.putMinioObject(ctx, mcm.bucketName, filePath, bytes.NewReader(content), int64(len(content)), minio.PutObjectOptions{})if err != nil {log.Warn("failed to put object", zap.String("bucket", mcm.bucketName), zap.String("path", filePath), zap.Error(err))return err}metrics.PersistentDataKvSize.WithLabelValues(metrics.DataPutLabel).Observe(float64(len(content)))return nil
}

总结

insert按列进行数据序列化后分别写入s3,一个列一个文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/260363.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Docker】Docker存储卷

文章目录 一、什么是存储卷二、为什么需要存储卷三、存储卷分类四、管理卷Volume创建卷方式一&#xff1a;Volume 命令操作方式二&#xff1a;-v 或者--mount 指定方式三&#xff1a;Dockerfile 匿名卷 操作案例Docker 命令创建管理卷Docker -v 创建管理卷Docker mount 创建管理…

java中容易被忽视的toString()方法

之前一直认为toString就是将数据转换成字符类型&#xff0c;直到最近写出了一个bug才对toString有了新的认识 不同数据类型&#xff0c;toString() 有不同的操作 定义一个student类&#xff0c;包含姓名 String类型、性别 String类型、年龄 int 类型、分数列表 String类型的li…

适合tiktok运营的云手机需要满足什么条件?

TikTok作为一款全球热门的社交媒体平台&#xff0c;具有无限的市场潜力。然而&#xff0c;卖家在运营过程中常常会面临到视频0播、账号被降权、限流等问题&#xff0c;甚至可能因为多人同时使用一个IP而导致封号的风险。为了规避这些问题&#xff0c;越来越多的卖家将目光投向了…

论UI的糟糕设计:以百度网盘为例

上面这一排鼠标一经过就会弹出来&#xff08;不是点才弹出来&#xff09;&#xff0c;然后挡住你的各种操作&#xff0c; 弹出来时你就必须等它消失&#xff0c;卡一下才能操作。 在用户顺畅地操作内容时&#xff0c;经常就卡一下、卡一下、卡一下…… 1、比如鼠标从下到上&am…

《基于CEEMDAN一小波包自适应阈值混凝土声发射信号降噪研究》算法思路笔记

![1]杨智中,林军志,汪魁等.基于CEEMDAN-小波包自适应阈值混凝土声发射信号降噪研究[J].振动与冲击,2023,42(03):139-149.DOI:10.13465/j.cnki.jvs.2023.03.016.](https://img-blog.csdnimg.cn/direct/9814ff64cc474cd3aa06ecaea60f2f75.png) 首先对周期循环荷载作用下混凝土试…

【RPG Maker MV 仿新仙剑 战斗场景UI (二)】

RPG Maker MV 仿新仙剑 战斗场景UI 二 战斗指令菜单原仙剑战斗指令图RMMV战斗指令对应代码战斗指令菜单代码效果 战斗指令菜单 原仙剑战斗指令菜单是使用方向键控制&#xff0c;同时按照使用情况正好对应四个指令和四个方向&#xff0c;同时没有选中的菜单用黑色透明图片覆盖&…

App启动优化笔记 1

app大致的启动流程。有Launcher进程,system_server进程,zygote进程,APP进程。 Launcher进程:启动activity来启动应用 system_server进程:(ams是其中的一个binder):发送一个socket消息给Zygote。 zygote进程:收到消息后,fork新的进程,---》app进程启动 APP进程:…

国际语言代码 Language Code 对照表速查

前言 语言代码是英国教育社会学家伯恩斯坦的术语。指在一定的语言集团中&#xff0c;特定的人群在特定的社会环境下使用的特定的言语。分为限定代码&#xff08;restricted code&#xff09;和精制代码&#xff08;elaborated code&#xff09;。语言代码是由字母或数字组成的…

STM32引脚重定义问题

最近在搞资源管理&#xff0c;发现有些引脚不能用 比如这个PE引脚。我想用他输出PWM&#xff0c;但是不能用&#xff0c;我也重定义了&#xff0c;还是不能用。回去翻看了技术手册。 RCC_APB2PeriphClockCmd(RCC_APB2Periph_AFIO, ENABLE); //重映射引脚功能&#xff0c;需…

MB-106UP——进口抛光树脂的技术优势

超纯水的制备和稳定性一直是相关领域极为重视的&#xff0c;那么超纯水中常会用到的抛光树脂技术&#xff0c;进口和国产对比起来究竟谁更甚一筹呢&#xff1f;接下来为大家分享的技术就是超纯水制备中常会用到的进口品牌&#xff1a;美国Tulsimer杜笙树脂中抛光树脂MB-106UP的…

CORS就是跨域吗?

首先&#xff0c;跨域的域是什么&#xff1f; 跨域的英文是&#xff1a;Cross-Origin。 Origin 中文含义为&#xff1a;起源&#xff0c;源头&#xff0c;出生地。 在跨域中&#xff0c;"域"指的是一个 Web 资源&#xff08;比如网页、脚本、图片等&#xff09;的…

压缩感知(Compressed Sensing,CS)的基础知识

压缩感知&#xff08;Compressed Sensing&#xff0c;CS&#xff09;是一种用于信号处理的技术&#xff0c;旨在以少于奈奎斯特采样定理所要求的样本频率来重构信号。该技术利用信号的稀疏性&#xff0c;即信号可以用较少的非零系数表示。压缩感知在图像获取中的应用使得在采集…

Kubernetes概述

目录 1.K8S 是什么 2.为什么要用 K8S Kubernetes 主要功能如下&#xff1a; 3.Kubernetes 集群架构与组件 Master 组件 Kube-apiserver Kube-controller-manager Kube-scheduler 配置存储中心 etcd Node 组件 Kubelet Kube-Proxy docker 或 rocket 4.Kubernete…

css2背景

css2背景 一.背景颜色二.背景图片三.背景平铺四.背景图片位置五.背景图像固定六.复合型写法七.背景颜色半透明八.总结 一.背景颜色 默认是transparent(透明&#xff09; 二.背景图片 默认是none 三.背景平铺 默认是background-repeat(平铺&#xff09; 四.背景图片位置…

Vue中$root的使用方法

查看本专栏目录 关于作者 还是大剑师兰特&#xff1a;曾是美国某知名大学计算机专业研究生&#xff0c;现为航空航海领域高级前端工程师&#xff1b;CSDN知名博主&#xff0c;GIS领域优质创作者&#xff0c;深耕openlayers、leaflet、mapbox、cesium&#xff0c;canvas&#x…

力扣题目训练(17)

2024年2月10日力扣题目训练 2024年2月10日力扣题目训练551. 学生出勤记录 I557. 反转字符串中的单词 III559. N 叉树的最大深度241. 为运算表达式设计优先级260. 只出现一次的数字 III126. 单词接龙 II 2024年2月10日力扣题目训练 2024年2月10日第十七天编程训练&#xff0c;今…

ai数字仿真辩论主持人提升用户体验

Ai虚拟主持人是元宇宙和AI人工智能技术在播音主持行业的重要应用&#xff0c;AI虚拟主持人能极大提升新闻资讯内容的精准度&#xff0c;改变单一的播报形式。 首先&#xff0c;AI虚拟主持人极大地提升了节目的制作效率和灵活性。传统主持人需要花费大量时间进行彩排和录制&…

照片去除多余人物的方法分享之三分钟教你怎么去除

在拍摄照片时&#xff0c;有时候会遇到照片中有多余的人物&#xff0c;这会影响照片的美观度和主题表达。去除照片中多余的人物&#xff0c;需要采用一些技巧和方法。本文将介绍几种常用的去除照片中多余人物的方法。 一、使用水印云软件去除多余人物 水印云是一款功能强大的图…

ChatGPT的大致原理

国外有个博主写了一篇博文&#xff0c;名字叫TChatGPT: Explained to KidsQ」&#xff0c; 直译过来就是&#xff0c;给小孩子解释什么是ChatGPT。 因为现实是很多的小孩子已经可以用父母的手机版ChatGPT玩了 &#xff0c;ChatGPT几乎可以算得上无所不知&#xff0c;起码给小孩…

linux ext3/ext4文件系统(part2 jbd2)

概述 jbd2&#xff08;journal block device 2&#xff09;是为块存储设计的 wal 机制&#xff0c;它为要写设备的buffer绑定了一个journal_head&#xff0c;这个journal_head与一个transaction绑定&#xff0c;随着事务状态的转移&#xff08;运行&#xff0c;生成日志&#…