MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!

引言:为什么需要实时同步?

MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:

  • 构建高性能搜索服务

  • 实时数据分析与大屏展示

  • 提升复杂查询效率

传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。

一、技术选型与核心原理

1.1 核心组件
  • MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。

  • Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级mysql-binlog-connector-java)。

  • Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。

1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch

二、环境准备与配置

2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)
[mysqld]
server_id=1
log_bin=mysql-bin
binlog_format=ROW  # 必须为ROW模式
2.2 创建ES索引
PUT /user
{"mappings": {"properties": {"id": {"type": "integer"},"name": {"type": "text"},"email": {"type": "keyword"},"create_time": {"type": "date"}}}
}

三、Java代码实现

3.1 Maven依赖
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.3</version>
</dependency>
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX = "user";public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient = ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client = new BinaryLogClient("localhost", 3306, "root", "password");client.setServerId(1001); // 唯一ID,避免冲突client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row -> {// 假设表结构为:id, name, email, create_timeString json = String.format("{\"id\":%d,\"name\":\"%s\",\"email\":\"%s\",\"create_time\":\"%s\"}",row[0], row[1], row[2], row[3]);IndexRequest request = new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似,代码略(完整源码见文末链接)
}

四、源码深度解析

4.1 Binlog监听流程
  • BinaryLogClient:核心类,负责连接MySQL并监听Binlog。

  • 事件类型判断:根据WriteRowsEventDataUpdateRowsEventDataDeleteRowsEventData区分增、改、删操作。

4.2 数据转换关键点
  • Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。

  • ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。

4.3 异常处理与优化
  • 重试机制:ES写入失败时,可加入重试队列。

  • 批量提交:攒批写入ES提升性能(需权衡实时性)。

  • 事务一致性:确保Binlog位置持久化,避免数据丢失。

五、方案优缺点对比

方案实时性复杂度资源消耗
定时全量同步低(分钟级)
基于触发器高(需改表)
Binlog监听

六、总结与扩展

本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:

  • 实时性:毫秒级延迟,满足大部分业务场景。

  • 无侵入:无需修改MySQL表结构。

  • 可扩展:可轻松适配其他数据源(如PostgreSQL)。

扩展方向

  • 使用Kafka作为中间层,解耦生产与消费。

  • 增加监控报警,保障数据一致性。

  • 支持DDL变更自动同步(如表结构修改)。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/30377.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

驱动 AI 边缘计算新时代!高性能 i.MX 95 应用平台引领未来

智慧浪潮崛起&#xff1a;AI与边缘计算的时代 正悄然深植于我们的日常生活之中&#xff0c;无论是火热的 ChatGPT 与 DeepSeek 语言模型&#xff0c;亦或是 Meta 智能眼镜&#xff0c;AI 技术已经无形地影响着我们的生活。这股变革浪潮并未停歇&#xff0c;而是进一步催生了更高…

vue3 vite项目安装eslint

npm install eslint -D 安装eslint库 npx eslint --init 初始化配置&#xff0c;按项目实际情况选 自动生成eslint.config.js&#xff0c;可以添加自定义rules 安装ESLint插件 此时打开vue文件就会标红有问题的位置 安装prettier npm install prettier eslint-config-pr…

【RocketMQ】二、架构与核心概念

文章目录 1、发布订阅模型2、角色3、工作流程4、RocketMQ的架构4.1 RocketMQ4.x版本4.2 RocketMQ5.0版本 1、发布订阅模型 几乎所有主流MQ产品&#xff0c;都是发布订阅模型&#xff08;Pub/Sub模型&#xff09;&#xff0c;是生产者和消费者进行基于主题Topic的消息传送 在这…

vue3 遇到babel问题(exports is not defined) 解决方案

由于我在引用ant-design-vue插件&#xff0c;于是产生了下图的问题。 1.问题分析 Babel 是一个 JavaScript 编译器&#xff0c;主要用于&#xff1a;将 ES6 代码转译为 ES5 代码&#xff0c;以兼容旧版浏览器。处理模块化语法&#xff08;如 import/export&#xff09;。 2.解…

【笔记】STM32L4系列使用RT-Thread Studio电源管理组件(PM框架)实现低功耗

硬件平台&#xff1a;STM32L431RCT6 RT-Thread版本&#xff1a;4.1.0 目录 一.新建工程 二.配置工程 ​编辑 三.移植pm驱动 四.配置cubeMX 五.修改驱动文件&#xff0c;干掉报错 六.增加用户低功耗逻辑 1.设置唤醒方式 2.设置睡眠时以及唤醒后动作 ​编辑 3.增加测试命…

数据结构篇——串(String)

一、引入 在计算机中的处理的数据内容大致可分为以整形、浮点型等的数值处理和字符、字符串等的非数值处理。 今天我们主要学习的就是字符串数据。本章主要围绕“串的定义、串的类型、串的结构及其运算”来进行串介绍与学习。 二、串的定义 2.1、串的基本定义 串&#xff08;s…

STM32F4 UDP组播通信:填一填ST官方HAL库的坑

先说写作本文的原因&#xff0c;由于开项目开发中需要用到UDP组播接收的功能&#xff0c;但是ST官方没有提供合适的参考&#xff0c;使用STM32CubeMX生成的代码也是不能直接使用的&#xff0c;而我在网上找了一大圈&#xff0c;也没有一个能够直接解决的方案&#xff0c;deepse…

考研数一非数竞赛复习之Stolz定理求解数列极限

在非数类大学生数学竞赛中&#xff0c;Stolz定理作为一种强大的工具&#xff0c;经常被用来解决和式数列极限的问题&#xff0c;也被誉为离散版的’洛必达’方法&#xff0c;它提供了一种简洁而有效的方法&#xff0c;使得原本复杂繁琐的极限计算过程变得直观明了。本文&#x…

MWC 2025 | 紫光展锐与中国联通联合发布5G eSIM 平板

2025 年 3 月 3 日至 6 日&#xff0c;在全球移动通信行业的年度盛会 —— 世界移动通信大会&#xff08;MWC 2025&#xff09;上&#xff0c;紫光展锐联合中国联通重磅发布了支持eSIM的5G平板VN300E。 该产品采用紫光展锐T9100高性能5G SoC芯片平台&#xff0c;内置8 TOPS算力…

MySQL进阶-关联查询优化

采用左外连接 下面开始 EXPLAIN 分析 EXPLAIN SELECT SQL_NO_CACHE * FROM type LEFT JOIN book ON type.card book.card; 结论&#xff1a;type 有All ,代表着全表扫描&#xff0c;效率较差 添加索引优化 ALTER TABLE book ADD INDEX Y ( card); #【被驱动表】&#xff0…

大模型gpt结合drawio绘制流程图

draw下载地址 根据不同操作系统选择不同的安装 截图给gpt 并让他生成drawio格式的&#xff0c;选上推理 在本地将生成的内容保存为xml格式 使用drawio打开 保存的xml文件 只能说效果一般。

2025-03-08 学习记录--C/C++-C 语言 判断一个数是否是完全平方数

C 语言 判断一个数是否是完全平方数 使用 sqrt 函数计算平方根&#xff0c;然后判断平方根的整数部分是否与原数相等。 #include <stdio.h> #include <math.h>int isPerfectSquare(int num) {if (num < 0) {return 0; // 负数不是完全平方数}int sqrtNum (int)…

Java高频面试之集合-07

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本baby今天来报道了&#xff01;哈哈哈哈哈嗝&#x1f436; 面试官&#xff1a;ArrayList 和 Vector 的区别是什么&#xff1f; ArrayList 与 Vector 的区别详解 ArrayList 和 Vector 都是 Java 中基于…

《原型链的故事:JavaScript 对象模型的秘密》

原型链&#xff08;Prototype Chain&#xff09; 是 JavaScript 中实现继承的核心机制。每个对象都有一个内部属性 [[Prototype]]&#xff08;可以通过 __proto__ 访问&#xff09;&#xff0c;指向其原型对象。每个对象都有一个原型&#xff0c; 原型本身也是一个对象&#xf…

第11章 web应用程序安全(网络安全防御实战--蓝军武器库)

网络安全防御实战--蓝军武器库是2020年出版的&#xff0c;已经过去3年时间了&#xff0c;最近利用闲暇时间&#xff0c;抓紧吸收&#xff0c;总的来说&#xff0c;第11章开始学习利用web应用程序安全&#xff0c;主要讲信息收集、dns以及burpsuite&#xff0c;现在的资产测绘也…

【redis】全局命令set、get、keys

生产环境 未来在工作中会涉及到的几个环境&#xff1a; 办公环境&#xff08;入职后&#xff0c;公司给你发个电脑&#xff09;开发环境 有的时候&#xff0c;开发环境和办公环境是一个&#xff08;一般做前端和做客户端&#xff09;有的时候&#xff0c;开发环境是一个单独的…

Paper Reading | AI 数据库融合经典论文回顾

人工智能&#xff08;AI&#xff09;和数据库&#xff08;DB&#xff09;在过去的50年里得到了广泛的研究&#xff0c;随着数据库近年来的不断发展&#xff0c;数据库开始与人工智能结合&#xff0c;数据库和人工智能&#xff08;AI&#xff09;可以相互促进。一方面&#xff0…

Linux上位机开发(开篇)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 传统的上位机开发&#xff0c;一般都是默认pc软件开发。既然是pc软件&#xff0c;一般来说都是基于windows平台开发。开放的框架&#xff0c;无非是…

最长递增子序列--蓝桥oj3046拍照

题目链接 arr[] 1700 1701 1702 1703 1704 1705 dp1[] 1 2 3 4 5 6 dp2[] 6 5 4 3 2 1 sum[]dp1[]dp2[] sum[] 7 7 7 7 7 7 7是最大的倒叙和正序的…

upload-labs文件上传

第一关 上传一个1.jpg的文件&#xff0c;在里面写好一句webshell 保留一个数据包&#xff0c;将其中截获的1.jpg改为1.php后重新发送 可以看到&#xff0c;已经成功上传 第二关 写一个webshell如图&#xff0c;为2.php 第二关在过滤tpye的属性&#xff0c;在上传2.php后使用b…