实战:MySQL数据同步神器之Canal

1.概叙

场景一:数据增量实时同步

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。

那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?

场景二:缓存一致性问题

Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客

在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?

我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?

针对上面两种场景,可以看看canal的解决方案。

什么是Canal?

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。

GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

Home · alibaba/canal Wiki · GitHub

支持范围

Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。

部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X

主要特性

高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。

2.Canal原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据

Canal工作原理

Canal巧妙地模拟了MySQL主从复制的机制。具体而言:

  1. 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
  3. 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
  4. 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

Canal总体架构

对应这些软件包:

  1. deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
    1. server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
  2. adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
    1. adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
  3. admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
    1. admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
  4. canal.client:消费server数据
    1. Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
    2. Java客户端:ClientExample
    3. C#客户端:CanalSharp
    4. Go客户端:canal-go
    5. Python客户端:canal-python
    6. PHP客户端:canal-php
    7. Rust客户端:canal-rs
    8. Node.js客户端:canal-nodejs
  5. 除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:

    Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
    canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
    Otter:Canal的消费端开源项目,用于数据同步与数据集成。

Canal高可用架构

整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。

canal server实现流程如下:

  • 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
  • 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
  • 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
  • 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。

依赖:

  • JDK1.8
  • MySQL:用于canal-admin存储配置和节点等相关数据
  • Zookeeper

3.Canal实战

准备环境

名称版本
MySQL5.7
elasticsearch7.17.9
canal1.1.7
jdk1.8
  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    下载 canal

本次操作只需要下载admin、deployer两个包。

下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。

创建admin的数据库:conf/canal_manager.sql

启动admin前,先完成建库和见表(包括admin的默认登录账号密码  admin/123456),否则启动报错。主要是如下六张表。

修改配置文件

修改server的配置文件:conf/canal.properties 和conf/example/instance.properties

切记数据库和配置文件中的密码要一致。

修改admin的配置文件:conf/application.yml

分别启动admin和server

启动server:bin/startup.bat

启动admin:bin/startup.bat

登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard

默认账号密码:admin/123456

配置java客户端

引入依赖

        <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>

客户端代码

package com.zxx.study.base.canal;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;import java.net.InetSocketAddress;
import java.util.List;/*** @author zhouxx* @create 2024-08-01 21:59*/
public class CanalClient  {@SneakyThrowspublic static void main(String[] args) {try {// 创建canal客户端,单链接模式CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",11111), "example", "", "");// 创建连接canalConnector.connect();while (true) {// 订阅数据库// canalConnector.subscribe("mall");// 获取数据Message message = canalConnector.get(100);// 获取Entry集合List<CanalEntry.Entry> entries = message.getEntries();// 判断集合是否为空,如果为空,则等待一会继续拉取数据if (entries.size() <= 0) {
//                System.out.println("当次抓取没有数据,休息一会。。。。。。");Thread.sleep(1000);} else {// 遍历entries,单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName = entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();//3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();//7.获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();//8.遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);/*** Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}* */}}}}}}catch (Exception e){e.printStackTrace();}}}

运行效果:在数据库里面忝删改查,均可以在客户端中打印出来

cancel框架同步mysql数据到kafka

参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客

利用canal进行MySQL到ES的数据实时同步

参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客

结合上图,至此,场景一和场景二中的问题,均可以解决。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/392297.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

六、8 TIM编码器计数和测速代码

&#xff08;1&#xff09;所用函数 &#xff08;2&#xff09; 1&#xff09; 上拉输入和下拉输入选择&#xff1a;与外部模块保持一致 若外部模块空闲默认输出高电平&#xff0c;就选择上拉输入&#xff0c;默认输入高电平&#xff1b;若外部模块空闲默认输出低电平&#x…

U-Net++原理与实现(含Pytorch和TensorFlow源码)

U-Net原理与实现 引言1. U-Net简介1.1 编码器&#xff08;Encoder&#xff09;1.2 解码器&#xff08;Decoder&#xff09;1.3 跳跃连接&#xff08;Skip Connections&#xff09; 2. U-Net详解2.1 密集跳跃连接2.2 嵌套和多尺度特征融合2.3 参数效率和性能2.4 Pytorch代码2.5 …

【C++ STL】vector

文章目录 vector1. vector的接口1.1 默认成员函数1.2 容量操作1.3 访问操作1.4 修改操作1.5 vector与常见的数据结构的对比 2. vector的模拟实现2.1 类的定义2.2 默认成员函数迭代器的分类 2.3 容量接口memcpy 浅拷贝问题内存增长机制reserve和resize的区别 2.4 修改接口迭代器…

老照片修复软件分享3款!码住一些实用的方法!

在数字时代&#xff0c;老照片不仅是时间的印记&#xff0c;更是我们珍贵的记忆载体。然而&#xff0c;随着时间的流逝&#xff0c;这些照片往往会变得模糊、褪色甚至破损。幸运的是&#xff0c;现代科技的发展为我们提供了多种老照片修复软件&#xff0c;让我们能够轻松恢复这…

Flux:Midjourney的新图像模型挑战者

--->更多内容&#xff0c;请移步“鲁班秘笈”&#xff01;&#xff01;<--- Black Forest Labs是一家由前Stability.ai开发人员创立的AI初创公司&#xff0c;旨在为图像和视频创建尖端的生成式 AI 模型。这家初创公司声称&#xff0c;其第一个模型系列Flux.1为文本到图像…

现代前端架构介绍(第二部分):如何将功能架构分为三层

远离JavaScript疲劳和框架大战&#xff0c;了解真正重要的东西 在这个系列的前一部分 《App是如何由不同的构建块构成的》中&#xff0c;我们揭示了现代Web应用是由不同的构建块组成的&#xff0c;每个构建块都承担着特定的角色&#xff0c;如核心、功能等。在这篇文章中&#…

重塑汽车制造未来:3D插图技术大师,零误差高效驱动新时代

在当今快速革新的汽车制造领域&#xff0c;高效、精准的产品设计与制造流程已成为众多车企破浪前行的核心引擎。但随着市场竞争的日益激烈&#xff0c;在产品设计与制造中&#xff0c;传统二维CAD设计的局限性越发明显——设计周期长、沟通成本高、错误频发及资源利用低效等问题…

联想M7615DNA打印机复印证件太黑的解决方法及个人建议

打印机在使用过程中&#xff0c;可能会出现复印的文字或图片太黑的问题&#xff0c;这会影响到打印或复印的效果。下面我们来了解一下这种情况的原因和解决方法&#xff1b;以下所述操作仅供大家参考&#xff0c;如有不足请大家提出宝贵意见&#xff1b; 证件包括&#xff1a;…

【MySQL】索引——索引的实现、B+ vs B、聚簇索引 VS 非聚簇索引、索引操作、创建索引、查询索引、删除索引

文章目录 MySQL5. 索引的实现5.1 B vs B5.2 聚簇索引 VS 非聚簇索引 6. 索引操作6.1 创建主键索引6.2 创建唯一索引6.3 创建普通索引6.4 创建全文索引6.5 查询索引6.6 删除索引 MySQL 5. 索引的实现 因为MySQL和磁盘交互的基本单位为Page&#xff08;页&#xff09;。 MySQL 中…

C# 串口通信(通过serialPort控件发送及接收数据)

连接串口 界面设计打开串口发送数据通过文件发送发送数据 接收数据 首先可以在 工具箱中搜索serialport&#xff0c;将控件拖到你的Winfrom窗口。 界面设计 打开串口 private void Connect_Click(object sender, EventArgs e){serialPort1.PortName comboBox2.Text;//端口名s…

CAS单点登录

1.相同顶级域名的单点登录SSO 相同顶级域名的单点登录:SSO:SINGLE SIGN ON 单点登录可以通过基于用户会话的共享&#xff1b;分为两种&#xff0c;第一种&#xff1a;相同顶级域名&#xff1b; 原理是分布式会话完成的&#xff1b;关键是顶级域名的cookie值是可以共享的 比如…

【C#】ThreadPool的使用

1.Thread的使用 Thread的使用参考&#xff1a;【C#】Thread的使用 2.ThreadPool的使用 .NET Framework 和 .NET Core 提供了 System.Threading.ThreadPool 类来帮助开发者以一种高效的方式管理线程。ThreadPool 是一个线程池&#xff0c;它能够根据需要动态地分配和回收线程…

【Kubernetes】Deployment 的清理策略

Deployment 的清理策略 在 Deployment 中配置 spec.revisionHistoryLimit 字段&#xff0c;可以指定其 清理策略。该字段用于指定 Deployment 保留旧 ReplicaSet 的个数&#xff0c;即更新 Pod 前的版本个数。该字段的默认值是 10。 创建 revisionhistory-demo.yaml 文件&…

上升探索WebKit的奥秘:打造高效、兼容的现代网页应用

嘿&#xff0c;朋友们&#xff01;想象一下&#xff0c;你正在浏览一个超级炫酷的网站&#xff0c;页面加载飞快&#xff0c;布局完美适应你的设备&#xff0c;动画流畅得就像你在看一场好莱坞大片。这一切的背后&#xff0c;有一个神秘的英雄——WebKit。今天&#xff0c;我们…

学习笔记--算法(双指针)3

快乐数 . - 力扣&#xff08;LeetCode&#xff09; 题目 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为&#xff1a; 对于一个正整数&#xff0c;每一次将该数替换为它每个位置上的数字的平方和。然后重复这个过程直到这个数变为 1&#xff0c;也可能是 无…

如何在IDEA上使用JDBC编程【保姆级教程】

目录 前言 什么是JDBC编程 本质 使用JDBC编程的优势 JDBC流程 如何在IEDA上使用JDBC JDBC编程 1.创建并初始化数据源 2.与数据库服务器建立连接 3.创建PreparedStatement对象编写sql语句 4.执行SQL语句并处理结果集 executeUpdate executeQuery 5.释放资源 前言 在…

二叉树中的深搜

目录 二叉树中的深搜&#xff1a; 一、计算布尔二叉树的值 1.题目链接&#xff1a;2331. 计算布尔二叉树的值 2.题目描述&#xff1a; 3.解法&#xff08;递归&#xff09; &#x1f352;算法思路&#xff1a; &#x1f352;算法流程&#xff1a; &#x1f352;算法代码…

C# Unity 面向对象补全计划 泛型

本文仅作学习笔记与交流&#xff0c;不作任何商业用途&#xff0c;作者能力有限&#xff0c;如有不足还请斧正 1.什么是泛型 泛型&#xff08;Generics&#xff09;是C#中的一个强大特性&#xff0c;允许你编写可以适用于多种数据类型的可重用代码&#xff0c;而不需要重复编写…

CSP-J复赛-模拟题4

1.区间覆盖问题&#xff1a; 题目描述 给定一个长度为n的序列1,2,...,a1​,a2​,...,an​。你可以对该序列执行区间覆盖操作&#xff0c;即将区间[l,r]中的数字,1,...,al​,al1​,...,ar​全部修改成同一个数字。 现在有T次操作&#xff0c;每次操作由l,r,p,k四个值组成&am…

GD32 SPI 通信协议

1.0 SPI 简介 SPI是一种串行通信接口&#xff0c;相对于IIC而言SPI需要的信号线的个数多一点&#xff0c;时钟的信号是主机产生的。 MOSI&#xff1a;主机发送&#xff0c;从机接收 MISO&#xff1a;主机接收&#xff0c;从机发送 CS&#xff1a;表示的是片选信号 都是单向…