Mongodb 开启oplog,java监听oplog并写入关系型数据库

开启Oplog

windows mongodb bin目录下找到配置文件/bin/mongod.cfg,配置如下:

replication:replSetName: localoplogSizeMB: 1024

在这里插入图片描述
双击mongo.exe
在这里插入图片描述
在这里插入图片描述
执行

rs.initiate({_id: "local", members: [{_id: 0, host: "localhost:27017"}]})

若出现如下情况则成功

{"ok" : 1,"operationTime" : Timestamp(1627503341, 1),"$clusterTime" : {"clusterTime" : Timestamp(1627503341, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}}
}

监听Oplog日志

pom

 	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.10</version><relativePath/></parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.7</version></dependency><dependency><groupId>com.vividsolutions</groupId><artifactId>jts</artifactId><version>1.13</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-spatial</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>org.hibernate</groupId><artifactId>hibernate-java8</artifactId><version>5.3.0.Beta1</version></dependency><dependency><groupId>com.bedatadriven</groupId><artifactId>jackson-datatype-jts</artifactId><version>2.3</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><scope>runtime</scope></dependency>

配置

spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://localhost:5432/databaseName?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&currentSchema=public
spring.datasource.username=postgres
spring.datasource.password=123456
spring.jpa.database=postgresql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.spatial.dialect.postgis.PostgisDialect
server.port=10050
spring.data.mongodb.uri=mongodb://admin:123456@localhost:27017/?authSource=admin
spring.data.mongodb.database=databseName

代码

  import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.util.JSON;
import lombok.extern.slf4j.Slf4j;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.Query;@Slf4j
@Component
public class OplogListener implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate MongoTemplate mongoTemplate;@Resourceprivate EntityManager entityManager;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {MongoDatabase db = mongoTemplate.getMongoDatabaseFactory().getMongoDatabase("local");MongoCollection<Document> oplog = db.getCollection("oplog.rs");BsonTimestamp startTS = getStartTimestamp();BsonTimestamp endTS = getEndTimestamp();Bson filter = Filters.and(Filters.gt("ts", startTS));MongoCursor<Document> cursor = oplog.find(filter).cursorType(CursorType.TailableAwait).iterator();while (true) {if (cursor.hasNext()) {Document doc = cursor.next();String operation = doc.getString("op");if (!"n".equals(operation)) {String namespace = doc.getString("ns");String[] nsParts = StringUtils.split(namespace, ".");String collectionName = nsParts[1];String databaseName = nsParts[0];Document object = (Document) doc.get("o");log.info("同步数据:databse-{}  collention-{}  data-{}", databaseName, collectionName, object);if ("i".equals(operation)) {insert((Document) doc.get("o"), databaseName, collectionName);} else if ("u".equals(operation)) {update((Document) doc.get("o"), (Document) doc.get("o2"), databaseName, collectionName);} else if ("d".equals(operation)) {delete((Document) doc.get("o"), databaseName, collectionName);}}}}}private BsonTimestamp getStartTimestamp() {long currentSeconds = System.currentTimeMillis() / 1000;return new BsonTimestamp((int) currentSeconds, 1);}private BsonTimestamp getEndTimestamp() {return new BsonTimestamp(0, 1);}private void insert(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("INSERT INTO " + collectionName + " (json) VALUES (:json)");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void update(Document object, Document update, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);String updateJson = JSON.serialize(update);Query query = entityManager.createNativeQuery("UPDATE " + collectionName + " SET json = :json WHERE json = :updateJson");query.setParameter("json", json);query.setParameter("updateJson", updateJson);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}private void delete(Document object, String databaseName, String collectionName) {entityManager.getTransaction().begin();try {String json = JSON.serialize(object);Query query = entityManager.createNativeQuery("DELETE FROM " + collectionName + " WHERE json = :json");query.setParameter("json", json);query.executeUpdate();entityManager.getTransaction().commit();} catch (Exception e) {entityManager.getTransaction().rollback();throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/210946.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android : 篮球记分器app _简单应用

示例图&#xff1a; 1.导包 在build.gradle 中 加入 // 使用androidx版本库implementation androidx.lifecycle:lifecycle-extensions:2.1.0-alpha03 2. 开启dataBinding android{...// 步骤1.开启data bindingdataBinding {enabled true}...} 3.写个类继承 ViewModel pac…

消息中间件之间的区别

一.单机吞吐量 ActiveMQ&#xff1a;万级&#xff0c;吞吐量比RocketMQ和Kafka要低了一个数量级 RabbitMQ&#xff1a;万级&#xff0c;吞吐量比RocketMQ和Kafka要低了一个数量级 RocketMQ&#xff1a;10万级&#xff0c;RocketMQ也是可以支撑高吞吐的一种MQ Kafka&#xff…

Linux服务器部署XXL-JOB

参考文档及下载地址&#xff1a;分布式任务调度平台XXL-JOB 1 从git拉取XXL-JOB代码 我们的大部分变动&#xff0c;是发生在xxl-job-admin&#xff0c;最终将这个模块打包成jar包部署在linux服务器上。 2 执行数据库脚本 doc\db\tables_xxl_job.sql 3 修改pom文件&#xff0c…

Siemens-NXUG二次开发-打开与关闭prt文件[Python UF][20231206]

Siemens-NXUG二次开发-打开与关闭prt文件[Python UF][20231206] 1.python uf函数1.1 NXOpen.UF.Part.Open1.2 NXOpen.UF.Part.LoadStatus1.3 NXOpen.UF.Part.Close1.4 NXOpen.UF.Part.AskUnits 2.示例代码3.运行结果3.1 内部模式3.2 外部模式 1.python uf函数 1.1 NXOpen.UF.P…

CUDA简介——Grid和Block内Thread索引

1. 引言 前序博客&#xff1a; CUDA简介——基本概念CUDA简介——编程模式CUDA简介——For循环并行化 Thread Index&#xff1a; 每个Thread都有其thread index。 在Kernel中&#xff0c;可通过内置的threadIdx变量来获取其thread index。threadIdx为三维的&#xff0c;有相…

一. 初识数据结构和算法

数据结构与算法是一个达到高级程序员的敲门砖。当你脱离了语言的应用层面&#xff0c;去思考他的设计层面时&#xff0c;你就依旧已经开始初识数据结构与算法了 数据结构 什么是数据结构 对于数据结构的定义官方并没有统一的解释&#xff0c;在各个百科以及算法的书中&#xf…

MicroPython标准库

MicroPython标准库 arraybinascii(二进制/ASCII转换)builtins – 内置函数和异常cmath – 复数的数学函数collections – 集合和容器类型errno – 系统错误代码gc – 控制垃圾收集器hashlib – 散列算法heapq – 堆队列算法io – 输入/输出流json – JSON 编码和解码math – 数…

《微信小程序开发从入门到实战》学习四十一

4.3 云开发文件存储 文件存储功能支持将任意数量和格式的文件&#xff08;如图片和视频&#xff09;保存在云端&#xff0c;支持 以文件夹的形式将文件归类。 在云开发控制台中&#xff0c;可以对云端保存的文件进行管理。 也可以通过文件存储API对文件进行上传、删除、移动…

SA与NSA网络架构的区别

SA与NSA网络架构的区别 1. 三大运营商网络制式&#xff1a;2. 5G组网方式及业务特性3. NSA-3系列4. NSA—4系列5. NSA-7系列6. 5G SA网络架构7. 运营商策略 1. 三大运营商网络制式&#xff1a; 联通&#xff1a;3G(WCDMA)\4G(FDD-LTE/TD-LTE)\5G(SA/NSA)移动&#xff1a;2G(GS…

如何购买华为云服务器

华为云是华为推出的云计算服务平台&#xff0c;旨在为企业和个人提供全面的云端解决方案。它提供了包括计算、存储、数据库、人工智能、大数据、安全等多种云服务&#xff0c;覆盖了基础设施、平台和软件级别的需求。华为云致力于构建安全可信赖的云计算基础设施&#xff0c;以…

数据库Delete的多种用法

数据库的Delete操作是用来删除数据库中的数据记录的&#xff0c;它是数据库操作中的一种重要操作&#xff0c;能够帮助用户删除不需要的数据&#xff0c;以便保持数据库的整洁和高效。在使用Delete操作时&#xff0c;需要注意确保操作的准确性和安全性&#xff0c;以免误删重要…

AI助力智慧农业,基于YOLOv3开发构建农田场景下的庄稼作物、田间杂草智能检测识别系统

智慧农业随着数字化信息化浪潮的演变有了新的定义&#xff0c;在前面的系列博文中&#xff0c;我们从一些现实世界里面的所见所想所感进行了很多对应的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《自建数据集&#xff0c;基于YOLOv7开发构建农田场景下杂草…

数电笔记之寄存器

数电 1 数字电路基础 1.2 二进制数据表达 1.2.1 二进制简介 1.2.2 用二进制表达文字 常用的中文字符集&#xff1a;GBK&#xff0c;UTF8 1.2.3 用二进制表达图片 图片像素化&#xff0c;像素数字化 1.2.4 用二进制表达声音 1.2.5 用二进制表达视频 1.3 数字电路 1.3…

Python与PHP:编写大型爬虫的适用性比较

目录 一、引言 二、Python编写爬虫的优势 1、强大的数据处理能力 2、丰富的网络库和框架 3、良好的可读性和易维护性 4、社区支持和生态系统 三、PHP编写爬虫的优势 1、简单易学 2、广泛的应用领域 3、高效的性能 4、灵活的请求处理方式 四、大型爬虫的编写实例&am…

JavaScript <md5加密的两种不同输出结果分析>--案例(二点一)

前言: 问题是这样的,在浏览器中看到这段代码 然后在控制台进行输出.得到: 紧接着,就在,js文件里面进行转译: 可是,得到的结果是: 这是问题!!! 正题: 为什么相同的js代码,在 .js 文件中的输出与 Chrome 控制台中的输出不一样? 环境差异&#xff1a;不同的JavaScript环境&…

Linux使用root用户安装完MySQL软件后,配置MySQL这个普通用户登录

在 Linux 系统中&#xff0c;当您使用 root 用户安装 MySQL 后&#xff0c;系统会自动创建一个名为 mysql 的系统用户。这个 mysql 用户主要用于管理 MySQL 服务的运行&#xff0c;通常是没有登录系统的权限的。如果您希望使这个 mysql 用户能够登录到系统&#xff0c;您需要设…

Android 12.0 Folder文件夹全屏后文件夹图标列表居中时拖拽app到桌面的优化

1.概述 在12.0的系统rom产品开发中,在Launcher3中在目前的产品需求开发中,对于Launcher3中的文件夹Folder的布局UI 进行了定制化的需求要求把Folder修改为全屏,然后在中间显示文件夹图标的列表,这时候如果Folder是全屏的话,如果拖拽文件夹列表中的app图标,只有拖拽 到屏…

【数据结构实验】树(一)构建二叉查找树(BST)

文章目录 1. 引言2. 二叉查找树3. 实验内容3.1 实验题目&#xff08;一&#xff09;输入要求&#xff08;二&#xff09;输出要求 3.2 算法实现1. 数据结构2. 全局变量3. 中序遍历函数InOrder4. 二叉查找树的构建函数T5. 主函数 3.3 代码整合 4. 实验结果 1. 引言 二叉查找树&a…

ntopng如何将漏洞扫描与流量监控相结合,以提高网络安全性

来源&#xff1a;艾特保IT 虹科干货 | ntopng如何将漏洞扫描与流量监控相结合&#xff0c;以提高网络安全性 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; ntopng为人所知的“身份”是被动流量监控。然而&#xff0c;如今的ntopng6.0也进化出主动监控功能来&#xf…