什么是 Chang Stream
Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。
Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:
Change Stream 的实现原理
Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:
- insert/update/delete:插入、更新、删除;
- drop:集合被删除;
- rename:集合被重命名;
- dropDatabase:数据库被删除;
- invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;
如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:
var cs = db.user.watch([{$match:{operationType:{$in:["insert","delete"]}}}])
Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。因此:
- 未开启 majority readConcern 的集群无法使用 Change Stream;
- 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕机)。
MongoShell测试
窗口1
db.user.watch([],{maxAwaitTimeMS:1000000}).pretty()
窗口2
db.user.insert({name:"xxxx"})
变更事件字段说明
Change Stream 故障恢复
假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:
var cs = db.collection.watch([], {resumeAfter: <_id>})
即可从上一条通知中断处继续获取后续的变更通知。
使用场景
- 跨集群的变更复制——在源集群中订阅 Change Stream,一旦得到任何变更立即写 入目标集群。
- 微服务联动——当一个微服务变更数据库时,其他微服务得到通知并做出相应的变更。
- 其他任何需要系统联动的场景。
案例 1.监控
用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。
案例 2.分析平台
例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如 类似 Flink、Spark 等计算平台等等。
案例 3.数据同步
基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生 网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。
案例 4.消息推送
假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用 户,用户能够实时收到公交车变更的数据,非常便捷实用。
注意事项
- Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
- 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
- 删除数据时通知的仅是删除数据的 _id。
Chang Stream整合Spring Boot
引入依赖
<!--spring data mongodb--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency>
配置yml
spring:data:mongodb:uri: mongodb://fox:fox@192.168.65.174:28017,192.168.65.174:28018,192.168.65.174:28019/test?authSource=admin&replicaSet=rs0
配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream
@Configurationpublic class MongodbConfig {@BeanMessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {Executor executor = Executors.newFixedThreadPool(5);MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {@Overridepublic boolean isAutoStartup() {return true;}};ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documentMessageListener).collection("user") //需要监听的集合名//过滤需要监听的操作类型,可以根据需求指定过滤条件.filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert", "update", "delete"))))//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息.fullDocumentLookup(FullDocument.UPDATE_LOOKUP).build();messageListenerContainer.register(request, Document.class);return messageListenerContainer;}}
配置mongo监听器,用于接收数据库的变更信息
@Componentpublic class DocumentMessageListener<S, T> implements MessageListener<S, T> {@Overridepublic void onMessage(Message<S, T> message) {System.out.println(String.format("Received Message in collection %s.\n\trawsource: %s\n\tconverted: %s",message.getProperties().getCollectionName(), message.getRaw(), message.getBody()));}}
测试
mongo shell插入一条文档
控制台输出