基于canal监听MySQL binlog实现数据增量同步

一、背景

业务反馈客服消息列表查询速度慢,有时候甚至要差不多20秒,急需优化提升速度。

二、方案

引入

首先,体验系统,发现查询慢的正是消息列表查询接口。

接着去看代码的设计,流程比较长,但从代码逻辑上设计没有问题什么大问题。

接着拿到查库的主SQL,发现连接的表比较多,然后在测试库看索引,索引缺了一些。加上索引,然后确定确实走了预期的索引之后就给正式库加上了索引。速度确实有了较大提升。能够进入十秒内,但因为数据量大,还是需要4~5秒左右。

不过,业务对这个已经比较满意了,但还是提出能否速度更快。优化完索引之后,可以试试上缓存,但细看了一下数据,变化频率不低,不太适合缓存。

接着就想到了本文的主角-----Elastic Search。支持高性能检索,倒排索引的机制,也更适合大数据量的场景。于是就以测试库来尝试,做引入ES的探索。

选型

ES和数据库作为不同的系统,要查询效果一致,首先就要考虑数据的同步问题。

首先对于数据库已有的数据,做全量同步。这个是没有异议的。

但对于增量数据如何处理就引出了几种常用的方案。

方案一

数据库更新的时候,直接同步更新ES。这种方式实现简单,数据同步也及时,但每处更新数据库都要加上更新ES的操作,在后续开发中,很容易会遗漏。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案二

数据库更新的时候,就将更新的操作发送到MQ,让MQ异步去更新ES。这种方案相比第一种复用性更高,可维护性更强一些,但还是有缺漏的风险。(某些时候可能需要用SQL改数据库数据,这种操作无法更新ES)

方案三

监听数据库binlog日志,只要有变更记录的操作,就同步更新ES。这种方案实现起来较复杂,但基本写完一套之后,后续基本不需要再变动。

综合考虑下来,我觉得第三种方案是最好的。在之前的学习中,我尝试了给项目引入Easy ES框架来实现ES的引入。

springboot整合easy-es实现数据的增删改查_easy es-CSDN博客

接下来就差数据的全量同步和增量同步了。全量同步可以直接通过工具来实现,而增量同步就需要工具结合代码来实现了。本文就是打算用阿里的canal来实现监听MySQL数据库的binlog日志。

三、原理

MySQL的分布式是基于主从架构实现的。一般情况下是一主多从,其中一个数据库作为主节点,其他数据库作为从节点,主从节点之间通过订阅binlog的方式实现数据同步。

canal的数据增量同步底层就是利用MySQL的主从同步机制实现的。将canal伪装成master的一个slave节点,向master节点发起dump协议,master节点在接收到dump协议之后,就会将binlog日志推送给canal,canal拿到binlog日志之后执行相应的操作从而实现数据同步。

四、配置流程

官方教程

4.1、配置MySQL

查看源MySQL的binlog配置,确保MySQL开启了binlog日志。

SHOW VARIABLES LIKE "%bin%"

日志的格式为ROW

查看源MySQL的server_id

准备好一个拥有slave权限的MySQL账号。

4.2、配置canal

1.下载canalicon-default.png?t=N7T8https://github.com/alibaba/canal/releases

2.解压,配置canal,修改文件conf/example/instance.properties

配置canal的server_id,注意要和上面查看的源MySQL的不一样。

配置源MySQL的ip+端口

配置源MySQL的账号密码

3.启动项目

在bin目录下找到对应系统的启动文件,双击启动。

/bin/startup.bat(window)

/bin/startup.sh(linux)

查看日志文件,检查是否启动成功。logs/canal/canal.log和logs/example/example.log

服务启动成功

4.3、canal集成到springboot

添加依赖

		<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>

直接用官方的测试代码验证(不需要更改,如果canal安装在本地的话)

package org.jeecg.modules.admin.assignImClassTeacher;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等RowChange rowChage;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}}

运行main方法

在更新后,每个字段都有一个update字段,如果值为true代表这个字段更新了,为false代表没更新。

拿到这些涉及数据库变更的事件之后,就可以根据需要去做数据的增量同步了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/315107.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【新知实验室 - TRTC 实践】音视频互动 Demo、即时通信 IM 服务搭建

一、TRTC 初识 TRTC 是什么 TRTC&#xff08;Tencent RTC&#xff09;腾讯实时音视频&#xff0c;源自于 QQ 音视频团队&#xff0c;是基于 QQ 音视频多年来的音视频技术积累&#xff0c;位于腾讯云的 RTC 云服务。TRTC 支持腾讯会议、企业微信直播、微信视频号、腾讯云课堂、…

clickhouse安装部署

虚拟机&#xff1a;virtualbox7.0 操作系统&#xff1a;ubuntu server 22.04.3 虚拟机硬件&#xff1a;cpu 1&#xff0c;内存 2G&#xff0c; 硬盘 100G 采用默认安装 参照 https://clickhouse.com/docs/en/install#quick-install 安装部署 对于Debian、Ubuntu&#xff0c…

【漏洞复现】艺创科技智能营销路由器后台命令执行漏洞

漏洞描述&#xff1a; 成都艺创科技有限公司是一家专注于新型网络设备研发、生产、销售和服务的企业&#xff0c;在大数据和云时代&#xff0c;致力于为企业提供能够提升业绩的新型网络设备。 智能营销路由器存在后台命令执行漏洞&#xff0c;攻击者可利用漏洞获取路由器控制…

用NuGet安装 Oracle ODP.NET

oracle官网原文&#xff1a;Using NuGet to Install and Configure Oracle Data Provider for .NET Using NuGet to Install and Configure Oracle Data Provider for .NET In this section, you will install ODP.NET NuGet packages from nuget.org. Select View > Solut…

马赛克,克星,又火一个,懒人包!

在AI技术日新月异的今天&#xff0c;各种边界不断被突破&#xff0c;今天我要给大家带来的&#xff0c;是一款名为InstructIR的革命性AI工具&#xff0c;只需一句话&#xff0c;即可实现高质量图像修改&#xff01;这不仅仅是一个普通的图像处理工具&#xff0c;而是一种革命性…

【经验分享】MySQL集群部署一:主从模式

目录 前言一、基本介绍1.1、概念1.2、执行流程 二、部署2.1、通用配置2.2、主节点配置2.3、从节点配置2.4、主从测试2.5、谈一谈主节点历史数据同步问题 前言 MySQL的部署模式常见的包括以下几种&#xff1a; 独立服务器部署主从复制部署高可用性集群&#xff08;HA&#xff…

2024 年最好的免费数据恢复软件,您可以尝试的几个数据恢复软件

由于系统崩溃而丢失数据可能会给用户带来麻烦。我们将重要的宝贵数据和个人数据保存在我们的 PC、笔记本电脑和其他数字设备上。您可能会因分区丢失、意外删除文件和文件夹、格式化硬盘驱动器而丢失数据。数据丢失是不幸的&#xff0c;如果您不小心从系统中删除了文件或数据&am…

远程连接docker,实现本地发布版本到服务器

最近在学jenkins的时候&#xff0c;发现涉及到了docker的远程发布调用。后续应该还要自己搭建一个docker的本地仓库。 简单描述一下具体是如何实现的&#xff1a; 1、将docker的服务器开启2375端口&#xff08;注意&#xff0c;这里的开启是将端口直接暴露出去&#xff0c;不用…

spring高级篇(四)

1、DispatcherServlet DispatcherServlet 是 Spring MVC 中的一个关键组件&#xff0c;用于处理 Web 请求并将其分发给相应的处理器&#xff08;Controller&#xff09;进行处理。它是一个 Servlet&#xff0c;作为前端控制器&#xff08;Front Controller&#xff09;的核心&a…

基于深度学习神经网络的AI图片上色DDcolor系统源码

第一步&#xff1a;DDcolor介绍 DDColor 是最新的 SOTA 图像上色算法&#xff0c;能够对输入的黑白图像生成自然生动的彩色结果&#xff0c;使用 UNet 结构的骨干网络和图像解码器分别实现图像特征提取和特征图上采样&#xff0c;并利用 Transformer 结构的颜色解码器完成基于视…

以生命健康为中心的物联网旅居养老运营平台

随着科技的飞速发展和人口老龄化的日益加剧&#xff0c;养老问题逐渐成为社会关注的焦点。传统的养老模式已经难以满足现代老年人的多元化需求&#xff0c;因此&#xff0c;构建一个以生命健康为中心的物联网旅居养老运营平台显得尤为重要。 以生命健康为中心的物联网旅居养老运…

未来已来:解锁AGI的无限潜能与挑战

未来已来&#xff1a;解锁AGI的无限潜能与挑战 引言 假设你有一天醒来&#xff0c;发现你的智能手机不仅提醒你今天的日程&#xff0c;还把你昨晚做的那个奇怪的梦解释了一番&#xff0c;并建议你可能需要减少咖啡摄入量——这不是科幻电影的情节&#xff0c;而是人工通用智能…

本地认证的密码去哪了?怎么保证安全的?

1. windows登录的明文密码&#xff0c;存储过程是怎么样的&#xff1f;密文存在哪个文件下?该文件是否可以打开&#xff0c;并且查看到密文&#xff1f; 系统将输入的明文密码通过hash算法转为哈希值&#xff0c;且输入的值会在内存中立即删除无法查看。 然后将密文存放在C:…

Vue3+Vite开发的项目进行加密打包

本文主要介绍Vue3+Vite开发的项目如何进行加密打包。 目录 一、vite简介二、混淆工具三、使用方法1. 安装插件:2. 配置插件:3. 运行构建:4. 自定义混淆选项:5. 排除文件:下面是Vue 3+Vite开发的项目进行加密打包的方法。 一、vite简介 Vite 是一个由 Evan You 创造的现代…

MultiHeadAttention在Tensorflow中的实现原理

前言 通过这篇文章&#xff0c;你可以学习到Tensorflow实现MultiHeadAttention的底层原理。 一、MultiHeadAttention的本质内涵 1.Self_Atention机制 MultiHeadAttention是Self_Atention的多头堆嵌&#xff0c;有必要对Self_Atention机制进行一次深入浅出的理解&#xff0c;这…

AJAX——案例

1.商品分类 需求&#xff1a;尽可能同时展示所有商品分类到页面上 步骤&#xff1a; 获取所有的一级分类数据遍历id&#xff0c;创建获取二级分类请求合并所有二级分类Promise对象等待同时成功后&#xff0c;渲染页面 index.html代码 <!DOCTYPE html> <html lang&qu…

ssh 文件传输:你应该掌握的几种命令行工具

这篇文章主要分享一下我使用过的 ssh 传输文件的进阶路程&#xff0c;从 scp -> lrzsz -> trzsz&#xff0c;希望能给你带来一些帮助&#xff5e; scp scp 命令可以用于在 linux 系统之间复制文件&#xff0c;具体的语法可以参考下图 其实使用起来也还比较方便&#x…

【Docker】Docker 实践(三):使用 Dockerfile 文件构建镜像

Docker 实践&#xff08;三&#xff09;&#xff1a;使用 Dockerfile 文件构建镜像 1.使用 Dockerfile 文件构建镜像2.Dockerfile 文件详解 1.使用 Dockerfile 文件构建镜像 Dockerfile 是一个文本文件&#xff0c;其中包含了一条条的指令&#xff0c;每一条指令都用于构建镜像…

智慧码头港口:施工作业安全生产AI视频监管与风险预警平台方案

一、建设思路 随着全球贸易的快速发展&#xff0c;港口作为连接海洋与内陆的关键节点&#xff0c;其运营效率和安全性越来越受到人们的关注。为了提升港口的运营效率和安全性&#xff0c;智慧港口视频智能监控系统的建设显得尤为重要。 1&#xff09;系统架构设计 系统应该采…

针对icon报错

针对上篇文章生成图标链接中图标报错 C# winfrom应用程序添加图标-CSDN博客 问题&#xff1a;参数“picture”必须是可用作Icon的参数 原因&#xff1a;生成的ico图标类型不匹配 解决方法&#xff1a; 更改导出的ico类型