MongoDB change stream 详解

文章目录

    • 什么是 Chang Streams
    • 实现原理
    • 故障恢复
    • 使用场景
    • Spring Boot整合Chang Stream

什么是 Chang Streams

Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

在这里插入图片描述



实现原理

**Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。**它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除;
  • drop:集合被删除;
  • rename:集合被重命名;
  • dropDatabase:数据库被删除;
  • invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;

从MongoDB 6.0开始,change stream支持DDL事件的更改通知,如createIndexes和dropIndexes事件。

在这里插入图片描述



如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:

var cs = db.user.watch([{$match:{operationType:{$in:["insert","delete"]}}
}])

db.watch()语法: https://www.mongodb.com/docs/manual/reference/method/db.watch/#example

Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。因此:

  • 未开启 majority readConcern 的集群无法使用 Change Stream;
  • 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕机)。



MongoShell测试

# 窗口1执行 watch()
db.user.watch([],{maxAwaitTimeMS:1000000})# 窗口2进行一条新增操作
db.user.insert({name:"xxxx"})

在这里插入图片描述



在这里插入图片描述



# mongodb 6 的版本这里就只是简单的打印一条日志
rs0 [direct: primary] test> db.emp.watch([], {maxAwaitTimeMS:1000000})
ChangeStreamCursor on emp



故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?

在这里插入图片描述

想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>}) 

即可从上一条通知中断处继续获取后续的变更通知。



使用场景

  • 监控

    用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。

  • 分析平台

    例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如 类似 Flink、Spark 等计算平台等等。

  • 数据同步

    基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生 网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。

  • 消息推送

    假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。



注意事项

  • Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
  • 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
  • 删除数据时通知的仅是删除数据的 _id。



Spring Boot整合Chang Stream

引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>



配置yml

spring:data:mongodb:uri: mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0



配置mongo监听器,用于接收数据库的变更信息

package com.hs.learn.changestream;import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;/*** @Description: change stream 监听器* @Author 胡尚* @Date: 2024/8/1 14:34*/
@Component
@Slf4j
public class DocumentMessageListener<S,T> implements MessageListener<S, T> {@Overridepublic void onMessage(Message<S, T> message) {// TODO 在监听器方法中完成自己的定制化需求log.info("Received Message in collection: {}", message.getProperties().getCollectionName());log.info("trawsource: {}", message.getRaw());log.info("tconverted: {}", message.getBody());}
}



配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream

package com.hs.learn.config;import com.hs.learn.changestream.DocumentMessageListener;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.Document;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;import java.util.concurrent.*;/*** @Description: mongodb的配置类* @Author 胡尚* @Date: 2024/7/31 14:52*/
@Configuration
public class MongodbConfig {/*** change stream的配置** @param template                引入mongodb的依赖后,内置的bean对象* @param documentMessageListener 自定义的messageListener* @return*/@Beanpublic MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {// 创建一个线程池Executor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5));// 创建一个MessageListenerContainer对象MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> changeStreamRequest = ChangeStreamRequest.builder(documentMessageListener)// 需要监听的集合名.collection("emp")//过滤需要监听的操作类型,可以根据需求指定过滤条件.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))))//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(changeStreamRequest, Document.class);return messageListenerContainer;}
}



测试

mongo shell插入一条文档

rs0 [direct: primary] test> db.emp.insertOne({name: "hushang", age: 24})
{acknowledged: true,insertedId: ObjectId("66ab31bec9f5b6d436cdb9d5")
}

控制台输出:

Received Message in collection: emptrawsource: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "8266AB31BE000000012B022C0100296E5A1004ED8D395B97A348039DC133ABDBC3800F46645F6964006466AB31BEC9F5B6D436CDB9D50004"}, namespace=test.emp, destinationNamespace=null, fullDocument=Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}, documentKey={"_id": {"$oid": "66ab31bec9f5b6d436cdb9d5"}}, clusterTime=Timestamp{value=7398061504999718913, seconds=1722495422, inc=1}, updateDescription=null, txnNumber=null, lsid=null}tconverted: Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/389226.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

职场进阶还是智商税?一文看六西格玛绿带培训的真面目

随着企业对精细化管理需求的日益增长&#xff0c;六西格玛绿带培训逐渐成为职场人士争相追逐的热门课程。它不仅能够帮助学员掌握先进的质量管理工具&#xff0c;还能培养逻辑思维、数据分析能力以及团队合作精神&#xff0c;这些都是现代职场不可或缺的软实力。 职场助力or智商…

ElasticSearch优化实战:打造高性能搜索引擎的秘籍

在当今这个大数据时代&#xff0c;信息的海量增长对搜索技术提出了前所未有的挑战。用户不仅需要快速准确地从数以亿计的数据中找到所需信息&#xff0c;还希望搜索引擎能够提供个性化和智能化的搜索体验。ElasticSearch作为市场上领先的搜索引擎&#xff0c;因其强大的全文搜索…

Spring框架和Maven项目搭建

Spring Spring框架是一个用于构建企业级应用程序的开源Java框架。它提供了一个全面的编程和配置模型&#xff0c;用于开发现代化的Java应用程序。 Spring从早期的大量XML配置逐渐演变为采用注解和自动配置的方式&#xff0c;显著减少了配置的工作量。同时&#xff0c;Maven的…

mysql更改密码后,若依 后端启动不了解决方案

我原先的mysql 密码是 数字字符串 我想改成000 纯数字 改完之后&#xff0c;连接的数据库的代码 也更改后 &#xff0c;后端启动不了 因为原先 密码数字字符串 不需要用引号" " 括起来 我改成纯数字 需要用 " " 括起来 如下图 然后就可以运行成功了

软件测试基础1--功能测试

1、什么是软件测试&#xff1f; 软件是控制计算机硬件运行的工具。 软件测试&#xff1a;使用技术手段验证软件是否满足使用需求&#xff0c;为了发现软件功能和需求不相符合的地方&#xff0c;或者寻找实际输出和预期输出之间的差异。 软件测试的目的&#xff1a;减少软件缺陷…

C#用Socket实现TCP客户端

1、TCP客户端实现代码 using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks;namespace PtLib.TcpClient {public delegate void Tcp…

CSS 的工作原理

我们已经学习了CSS的基础知识,它的用途以及如何编写简单的样式表。在本课中,我们将了解浏览器如何获取 CSS 和 HTML 并将其转换为网页。 先决条件:已安装基本软件,了解处理文件的基本知识以及 HTML 基础知识(学习 HTML 简介。目的:要了解浏览器如何解析 CSS 和 HTML 的基…

总线①I2C

很久以前就听说总线这个词了&#xff0c;一直不懂&#xff0c;所以觉得很牛叉。。。这次有机会学习&#xff0c;就干脆一起看看吧。 1 环境介绍 说实话&#xff0c;计算机的学习最好还是有个环境&#xff0c;裸学真的要难一些。这次搭的环境总的来说还是用之前的树莓派Pico搭配…

标题:组合式API:优化Vue代码结构的艺术

摘要&#xff1a; 在Vue 3中&#xff0c;引入了组合式API&#xff0c;它提供了一种新的方式来组织组件逻辑。虽然组合式API带来了更高的灵活性和可维护性&#xff0c;但开发者也面临着代码组织和可读性的挑战。本文将探讨如何有效地利用组合式API&#xff0c;优化Vue代码结构&a…

gpio的使用,---->使用sysfs 控制gpio(第二节)

目的&#xff1a; 在 linux 文件系统上使用 sysfs 来控制 &#xff0c;gpio的高低的变化。 逻辑&#xff1b;我只在 内核中是能 &#xff47;&#xff50;&#xff49;&#xff4f; 的&#xff50;&#xff49;&#xff4e;&#xff43;&#xff54;&#xff52;&#xff4…

FPGA开发——状态机的使用

一、概述 我们在使用FPGA进行开发的过程当中&#xff0c;实现一个东西用得最多的实现方法就是状态机的实现方法&#xff0c;用一句话总结就是万物皆可状态机&#xff0c;这和我们在学习Linux时常说的在Linux中万物都是文件差不多&#xff0c;这里就主要就是突出状态机的应用范…

使用模版完成不同数据类型的数组的选择排序

目录 6.模版(167-263) 6.1函数模板 6.1.1函数模版注意事项 6.1.2函数模版案例--选择排序 1. 比较排序的基本概念 2. 决策树 3. 决策树的深度 4. 结论 5.选择排序示例: 6.模版(167-263) (项目先跳过) 模板不能直接使用,它只是一个框架. 模板不是万能的. 6.1函数模板…

JNPF全新V5.0版本!重磅升级——APP篇

尊敬的JNPF用户们&#xff1a; 我们非常高兴地宣布&#xff0c;经过团队数月的辛勤努力和不断的技术创新&#xff0c;JNPF快速开发平台终于迎来了里程碑式的全新升级——V5.0版本&#xff01;这一版本的更新发布&#xff0c;不仅代表着我们技术实力的进一步提升&#xff0c;是…

Office Tool Plus部署、激活

1、下载安装&#xff0c;安装图片红色数字操作步骤 2、安装完成&#xff0c;激活&#xff0c;点击新手教程 找到相关教程 复制链接&#xff0c;在Office Tool Plus激活

Prometheus 监控 Nginx

作者&#xff1a;琉璃 一、Nginx_exporter安装 下载链接&#xff1a; https://github.com/discordianfish/nginx_exporter 下载nginx_exporter的docker镜像。 ocker pull fish/nginx-exporter先run一下&#xff0c;执行之后&#xff0c;会hold住&#xff0c;先不要关闭窗口…

THS6011容器版docker使用说明(by why+lqw)

THS6011容器版有分x86和arrch64两种安装包&#xff0c;主要是针对ths节点&#xff0c;本身并没有控制台的安装包&#xff0c;请根据自己的系统的cpu架构进行选择&#xff0c;本次使用的是x86的安装包作为演示。 下图是arrch64的镜像&#xff08;PDMP-4980&#xff09;&#xf…

Codeforces Round 962 (Div. 3)

前言 势必要拿下的一场比赛&#xff0c;最后结果也算如愿。 Standings&#xff1a;300 重新回到蓝名了&#xff0c;也完成了之前 “ 早日在比赛切掉 6 题 ” 的期望。 题目链接&#xff1a;Dashboard - Codeforces Round 962 (Div. 3) - Codeforces A. Legs 第一次在第一分钟就…

Segment Anything Model 2:使用Ultralytics框架进行SAM2图像分割

Segment Anything Model 2&#xff1a;使用Ultralytics框架进行SAM2图像分割 前言相关介绍前提条件实验环境安装环境项目地址LinuxWindows 使用Ultralytics框架进行SAM2图像分割参考文献 前言 由于本人水平有限&#xff0c;难免出现错漏&#xff0c;敬请批评改正。更多精彩内容…

Vue进阶之Vue无代码可视化项目(九)

Vue无代码可视化项目—补充内容 背景介绍、方案设计Canvas Table创建一个新的vue项目普通表格的效果Canvas上手Canvas画表格-画基本表格CanvasTable处理事件系统CanvasTable表格滚动Vue组件封装思想拖拽组件 —smooth-dndDndDemo1.vueDndContainer.jsCanvasTable封装CanvasTabl…