MongoDB change stream实战

什么是 Chang Stream

Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

Change Stream 的实现原理

Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除;
  • drop:集合被删除;
  • rename:集合被重命名;
  • dropDatabase:数据库被删除;
  • invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;

如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:

var cs = db.user.watch([{$match:{operationType:{$in:["insert","delete"]}}}])

Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。因此:

  • 未开启 majority readConcern 的集群无法使用 Change Stream;
  • 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕机)。

MongoShell测试

窗口1

db.user.watch([],{maxAwaitTimeMS:1000000}).pretty()

窗口2

db.user.insert({name:"xxxx"})

变更事件字段说明

Change Stream 故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?

想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>})

即可从上一条通知中断处继续获取后续的变更通知。

使用场景

  • 跨集群的变更复制——在源集群中订阅 Change Stream,一旦得到任何变更立即写 入目标集群。
  • 微服务联动——当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更。
  • 其他任何需要系统联动的场景。

案例 1.监控

用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。

案例 2.分析平台

例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如 类似 Flink、Spark 等计算平台等等。

案例 3.数据同步

基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生 网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。

案例 4.消息推送

假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用 户,用户能够实时收到公交车变更的数据,非常便捷实用。

注意事项

  • Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
  • 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
  • 删除数据时通知的仅是删除数据的 _id。

Chang Stream整合Spring Boot

引入依赖

<!--spring data mongodb--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency>

配置yml

spring:data:mongodb:uri: mongodb://fox:fox@192.168.65.174:28017,192.168.65.174:28018,192.168.65.174:28019/test?authSource=admin&replicaSet=rs0

配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream

@Configurationpublic class MongodbConfig {@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {Executor executor = Executors.newFixedThreadPool(5);MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documentMessageListener).collection("user")  //需要监听的集合名//过滤需要监听的操作类型,可以根据需求指定过滤条件.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))))//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(request, Document.class);return messageListenerContainer;}}

配置mongo监听器,用于接收数据库的变更信息

@Componentpublic class DocumentMessageListener<S, T> implements MessageListener<S, T> {@Overridepublic void onMessage(Message<S, T> message) {System.out.println(String.format("Received Message in collection %s.\n\trawsource: %s\n\tconverted: %s",message.getProperties().getCollectionName(), message.getRaw(), message.getBody()));}}

测试

mongo shell插入一条文档

控制台输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/485940.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux其二设置端口号,静态ip以及命令

目录 1、VI编辑器 【linux版本的文本文件】 2&#xff09; 补充的vi编辑器的其他内容(了解) 2、ln 连接的意思 link的缩写 3、文件的查看 【重点】 4、压缩与解压&#xff08;重点&#xff09; 5、find 查找命令 6、which & whereis 作用是一样的&#xff0c;表示某…

MetaGPT 安装

1. 创建环境 conda create -n metagpt python3.10 && conda activate metagpt2. 可编辑方式安装 git clone --depth 1 https://github.com/geekan/MetaGPT.git cd MetaGPT pip install -e .3. 配置 metagpt --init-config运行命令&#xff0c;在C盘位置C:\Users\325…

WEB开发: Node.js路由之由浅入深(一) - 全栈工程师入门

作为一个使用Node.js多年的开发者&#xff0c;我已经习惯于用Node.js写一些web应用来为工作服务&#xff0c;因为实现快速、部署简单、自定义强。今天我们一起来学习一个全栈工程师必备技能&#xff1a;web路由。&#xff08;观看此文的前提是默认你已经装好nonde.js了&#xf…

【后端面试总结】Redis字符串实现原理

字符串是我们平时接触频率最高的一个基础类型&#xff0c;但就是这么一个平平无奇的基本类型&#xff0c;在Redis里面也是经历了各种各样的优化&#xff0c;来优化它对内存的占用&#xff0c;了解这部分内容&#xff0c;与其说是“学习Redis”&#xff0c;不如说是“向Redis学习…

GitToolBox插件:让IntelliJ IDEA的Git操作如虎添翼

GitToolBox插件介绍 GitToolBox是一款针对IntelliJ IDEA的插件&#xff0c;旨在增强IDE内置的Git功能&#xff0c;使Git操作更加便捷和高效。无论是单独开发者还是团队中的一员&#xff0c;这个插件都能帮助更好地管理代码和协作流程。 功能特点 分支管理&#xff1a;GitToolBo…

Vulhub:Shiro[漏洞复现]

目录 CVE-2010-3863(Shiro未授权) 使用浏览器访问靶场主页面 使用Yakit进行抓包 使用ffuf对靶机8080端口进行根路径FUZZ CVE-2016-4437(Shiro-550) 使用浏览器访问靶场主页面 使用Yakit进行抓包 使用Yakit反连中自带的Yso-Java Hack进行漏洞利用 首先运行脚本生成一个…

Netty 框架——TCP 粘包和拆包

Netty 框架——TCP 粘包和拆包 1. 产生的原因 在 TCP 协议中&#xff0c;发送端为了提高网络传输的效率&#xff0c;通常会使用优化算法&#xff0c;如 Nagle 算法&#xff0c;将多个小的数据包合并成一个较大的数据块一起发送。这是因为频繁的小数据包传输可能会导致效率低下…

SQL靶场第九关攻略

我们的第九关需要用到时间盲注 使用条件&#xff1a;完全没有变化的页面 我们在了解一下时间盲注和布尔盲注的区别&#xff0c;时间盲注比布尔盲注多了一个if判断加上sleep()函数的运用 if(a,b,c) if判断句&#xff0c;a为条件&#xff0c;b、c为执行语句&#xff1b;如果a为…

STM32一keil5更换芯片后报错问题的解决。

目录 一、STM32型号认识二、报错问题三、常用的启动配置文件四、问题解决 一、STM32型号认识 二、报错问题 当我们在原来工程下修改芯片时&#xff0c;原本可以编译通过的代码突然很多报错。如下所示&#xff0c;这是因为我们的启动文件配置错误。对于不同型号的芯片其flash容量…

STM32 自学笔记

摘抄于大学期间记录在QQ空间的一篇自学笔记&#xff0c;当前清理空间&#xff0c;本来想直接删除掉的&#xff0c;但是感觉有些舍不得&#xff0c;因此先搬移过来。 RAM vs ROM vs FLASH 2013-09-05记录&#xff0c;ROM和RAM指的都是半导体存储器&#xff0c;ROM是Read Only …

深入解析 HTML Input 元素:构建交互性表单的核心

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

MBox20边缘计算网关:氢能车间数据采集的智慧引擎

氢能作为未来能源体系的重要组成部分&#xff0c;其安全、高效、环保的特性备受瞩目。在氢能车间的日常运营中&#xff0c;数据采集是确保生产流程优化、设备稳定运行及能效提升的关键环节。然而&#xff0c;面对氢能车间复杂多变的生产环境和海量数据&#xff0c;如何实现高效…

敏捷开发之路

1. 引言 最近有个企业软件开发项目&#xff0c;用户要求采用敏捷开发的方法实施项目。以前也参加过敏捷方法的培训&#xff0c;结合最近找的敏捷开发材料&#xff0c;形成了下面的敏捷实施过程内容。 以下采用了QAD量化敏捷开发方法&#xff0c;关于此方法详细参考内容见最后…

threejs相机辅助对象cameraHelper

为指定相机创建一个辅助对象&#xff0c;显示这个相机的视锥。 想要在场景里面显示相机的视锥&#xff0c;需要创建两个相机。 举个例子&#xff0c;场景中有个相机A&#xff0c;想要显示相机A的视锥&#xff0c;那么需要一个相机B&#xff0c;把B放在A的后面&#xff0c;两个…

Milvus向量数据库03-搜索理论

Milvus向量数据库03-搜索理论 1-ANN搜索 通过 k-最近邻&#xff08;kNN&#xff09;搜索可以找到一个查询向量的 k 个最近向量。kNN 算法将查询向量与向量空间中的每个向量进行比较&#xff0c;直到出现 k 个完全匹配的结果。尽管 kNN 搜索可以确保准确性&#xff0c;但十分耗…

解决git did not exit cleanly (exit code 128)问题

解决 git did not exit cleanly &#xff08;exit code 128&#xff09;问题 1、错误描述2、解决方法2.1 方法一2.2 方法二 1、错误描述 使用TortoiseGit进行操作时&#xff0c;总是提示下述错误。 2、解决方法 2.1 方法一 打开 TortoiseGit -> Settings 点击 Network&…

唇形同步视频生成工具:Wav2Lip

一、模型介绍 今天介绍一个唇形同步的工具-Wav2Lip&#xff1b;Wav2Lip是一种用于生成唇形同步&#xff08;lip-sync&#xff09;视频的深度学习算法&#xff0c;它能够根据输入的音频流自动为给定的人脸视频添加准确的口型动作。 &#xff08;Paper&#xff09; Wav2Lip模型…

ubuntu下Qt5自动编译配置QtMqtt环境(10)

文章目录 [toc]1、概述2、下载QtMqtt源码3、编译4、验证5、参考6、视频 更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;Qt网络编程 &#x1f448; 1、概述 Qt默认是不包含mqtt库的&#xff0c;如果需要使用到mqtt库就只能自己编译配置&#xff1b; 网络所有的…

verilog编程规范

verilog编程规范 文章目录 verilog编程规范前言一、代码划分二、verilog编码ABCDEFG 前言 高内聚&#xff0c;低耦合&#xff0c;干净清爽的代码 一、代码划分 高内聚&#xff1a; 一个功能一个模块干净的接口提取公共的代码 低耦合&#xff1a; 模块之间低耦合尽量用少量…

使用VScode 和 Keil搭建STM32的开发环境

目录 概述 1 Keil工具 1.1 Keil工具介绍 1.2 Keil 下载 1.3 安装Keil 1.4 Keil软件测试 2 VSCode软件 2.1 VSCode介绍 2.2 VSCode下载 2.3 安装VSCode 3 搭建STM32集成开发环境 3.1 安装Keil插件 3.2 参数配置 3.3 测试 4 配置头文件路径 4.1 参数配置 4.2 测…