从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

Canal实时同步Mysql BinlogElasticsearch


文章目录

  • Canal实时同步Mysql **Binlog**至**Elasticsearch**
      • 一. 环境准备
        • 1.环境检查
          • 检查`Mysql`是否开启`BinLog`
          • 开启Mysql Binlog
          • Java环境检查
        • 2.新建测试库和表
        • 3.新建Es索引
      • 二.**部署 Canal Server**
        • **2.1 解压安装包**
        • **2.2 配置 Canal Server**
        • **2.3 启动 Canal Server**
      • **三. 部署 Canal Adapter(同步到 Elasticsearch)**
        • **3.1 配置 Adapter**
        • **3.2 配置数据映射**
        • **3.3 启动 Adapter**
      • **4. 验证同步**
        • **4.1 插入测试数据到 MySQL**
        • **4.2 查询 Elasticsearch**

一. 环境准备

  • 操作系统:Linux(Ubuntu 20.04)
  • Java 环境:JDK 8+(建议 OpenJDK 11)
  • MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
  • Elasticsearch:已部署(版本 7.x 或 8.x)
  • Canal 二进制包:从 Canal Release 下载 canal.deployer-1.1.8.tar.gzcanal.adapter-1.1.8.tar.gz
1.环境检查
  • 检查Mysql是否开启BinLog
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

输出如下证明已经打开:

image-20250211103029832

创建 Canal 用户并授权:

#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新权限
FLUSH PRIVILEGES;

如果没打开BinLog可以通过如下方法打开:

  • 开启Mysql Binlog

修改my.cnf文件,加入如下内容:

log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2

log_bin:启用二进制日志,日志文件会以 mysql-bin 为前缀,并依次生成日志文件(例如:mysql-bin.000001mysql-bin.000002 等)。

binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format 的默认值已经变为 ROW。所以,即使你在配置文件中没有明确设置 binlog_format,MySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT

binlog_expire_logs_seconds=172800expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。

配置好后重启Mysql:

systemctl restart mysqld.service
  • Java环境检查
echo $JAVA_HOME
image-20250211111637904
2.新建测试库和表
 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;CREATE TABLE `test_user` (`id` bigint unsigned NOT NULL AUTO_INCREMENT,`name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',`sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',`tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{"mappings": {"properties": {"id": {"type": "long"},"title": {"type": "text"},"sex": {"type": "text"},"tel": {"type": "text"}}}
}
'

二.部署 Canal Server

2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server

修改配置文件 /opt/canal/server/conf/canal.properties

# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112# 目标实例名称(默认 example)
canal.destinations = example# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

这里主要修改canal.ip其他保持默认即可。

修改实例配置 /opt/canal/server/conf/example/instance.properties

#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log

image-20250211153418697

image-20250211153400835

image-20250211153538656

可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

image-20250211153842261

三. 部署 Canal Adapter(同步到 Elasticsearch)

3.1 配置 Adapter

修改配置文件 /opt/canal/adapter/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts:    # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources: # 源数据库配置defaultDScanal是测试数据库url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库username: root #数据库账号password: Pass@1234 #数据库密码canalAdapters: # 适配器列表- instance: example # canal实例名,和上述Server的配置一样groups: # 分组列表- groupId: g1 # 分组id, 如果是MQ模式将用到该值outerAdapters:- name: logger # 日志打印适配器- name: es8 # ES同步适配器根据自己的es版本来hosts: <your IP>:9200 # ES连接地址properties:mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用cluster.name: elasticsearch # ES集群名称

如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:

curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

image-20250211170653195

3.2 配置数据映射

创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: test_user # es 的索引名称_id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: "SELECTtb.id AS _id,tb.name,tb.sex,tb.telFROMtest_user us"        # sql映射etlCondition: "where p.id>={}"   #etl的条件参数commitBatch: 3000   # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log

会输出很多数据库变更的日志:

image-20250211171145018

image-20250211171208031

4. 验证同步

4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');

image-20250211171534121

image-20250211171547780

4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"

也可以在工具上查看,这边是Eage插件:

image-20250211171753695

image-20250211171808091

至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。

image-20250211171849802

image-20250211172502715

然后我们测试修改Es的数据:

image-20250211172542248

image-20250211172555087

可以发现数据库并没有变,至此Canal单向实时同步Mysql BinlogElasticsearch就配置完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/17654.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

五、k8s:容忍 存储卷

容忍&#xff1a; 即使节点上有污点&#xff0c;依然可以部署pod。 tolerations: operator: "Exists" 不指定key&#xff0c;表示容忍所有的污点 cordon和drain cordon: 直接标记节点为不可用&#xff0c;pod不能部署到该节点。新建的pod不会再部署到该节点&#…

Springboot_实战

项目开发 lombok使用 自动为实体类提供get、set、toString方法 引入依赖 实体类上添加注解 统一响应结果 注意要写get、set方法&#xff1b;下面是错误的&#xff0c;因此要加上Data注解 一个注册的接口的示例 Controller层 Service层 Mapper层 参数校验 但是同样存在一…

稀土抑烟剂——为纺织品安全加持,保护您的每一寸触感

一、稀土抑烟剂的基本概念 稀土抑烟剂是基于稀土元素&#xff08;如稀土氧化物和稀土金属化合物&#xff09;研发的一类新型阻燃材料。它能够有效提高纺织品的阻燃性&#xff0c;抑制火灾发生时产生的烟雾和有害气体&#xff0c;减少火灾对人体的危害。稀土抑烟剂具有更强的稳…

本地部署SafeLine详细指南:抵御网络攻击构建更安全的网站环境

文章目录 前言1.关于SafeLine2.安装Docker3.本地部署SafeLine4.使用SafeLine5.cpolar内网穿透工具安装6.创建远程连接公网地址7.固定Uptime Kuma公网地址前言 各位建站小能手们,无论是想搭建个人博客、企业官网还是各种应用平台来推广自己的内容或产品,在这个数字时代都已经…

Chrome 浏览器可以读写本地文件了,虽说是实验api,但是基本86+已经支持了

目前该API只能在https域名上使用&#xff01;&#xff01;&#xff01; 实现逻辑&#xff1a; 1.唤醒浏览器选择文件夹、文件权限 document.getElementById(button).addEventListener(click, async () > {getFile()});async function getFile () {// 打开文件选择器 记…

华宇TAS应用中间件与因朵科技多款产品完成兼容互认证

在数字化浪潮澎湃向前的当下&#xff0c;信息技术的深度融合与协同发展成为推动各行业创新变革的关键力量。近日&#xff0c;华宇TAS应用中间件携手河北因朵科技有限公司&#xff0c;完成了多项核心产品的兼容互认证。 此次兼容性测试的良好表现&#xff0c;为双方的进一步深入…

数字IC秋招知识点—1

数字IC秋招准备知识点—1 时序逻辑与组合逻辑 1. 定义与核心原理 组合逻辑&#xff1a; 输入决定输出&#xff1a;当前的输出仅由输入决定&#xff0c;无记忆功能。无反馈回路示例&#xff1a;基本逻辑单元&#xff0c;加法器&#xff0c;多路选择器MUX&#xff0c;译码器&am…

【webview Android】视频获取首帧为封面

文章目录 需求分析获得首帧其他方法 需求分析 客户端中h5上传视频&#xff0c;视频封面默认首帧。 遇到问题&#xff1a;原生的video现象如下 IOS会在加载好后显示首帧&#xff08;没加载好显示黑屏&#xff0c;符合预期&#xff09;Android加载好后默认封面为一个奇怪的占位…

大脑网络与智力:基于图神经网络的静息态fMRI数据分析方法|文献速递-医学影像人工智能进展

Title 题目 Brain networks and intelligence: A graph neural network based approach toresting state fMRI data 大脑网络与智力&#xff1a;基于图神经网络的静息态fMRI数据分析方法 01 文献速递介绍 智力是一个复杂的构念&#xff0c;包含了多种认知过程。研究人员通…

DeepSeek如何重塑我的编程学习:计算机新生的AI实践

目录 &#x1f680;前言&#x1f31f;邂逅DeepSeek&#xff1a;从困惑到惊喜&#x1f4af;初学编程的困境&#x1f4af;DeepSeek的优势 &#x1f58a;️DeepSeek在编程学习中的运用&#x1f4af;注释&#x1f4af;算法逐步分析&#x1f4af;调试帮助&#x1f4af;跨语言迁移学习…

信息收集-Web应用JS架构URL提取数据匹配Fuzz接口WebPack分析自动化

知识点&#xff1a; 1、信息收集-Web应用-JS提取分析-人工&插件&项目 2、信息收集-Web应用-JS提取分析-URL&配置&逻辑 FUZZ测试 ffuf https://github.com/ffuf/ffuf 匹配插件 Hae https://github.com/gh0stkey/HaE JS提取 JSFinder https://github.com/Threez…

安科瑞光储充一体化微电网系统的设计与优化研究-安科瑞 蒋静

摘要&#xff1a;双碳能源技术是一种绿色、可持续的能源发展方向&#xff0c;光储充一体系统作为其中的重要组成部分&#xff0c;具有将光能转化为电能并进行储存和供电的功能。文章对光储充一体系统的设计与性能进行分析&#xff0c;以期为双碳能源技术的推广和应用提供技术支…

【BUG】Ubuntu|有nvcc,没有nvidia-smi指令,找不到nvidia-driver安装包

很奇怪&#xff0c;本来能使用的&#xff0c;放个假回来就用不了了。 排查了以下所有步骤最终解决。 我的Ubuntu版本&#xff1a;Ubuntu22 nvcc -v&#xff1a;有。如果没有的话你需要安装“sudo apt-get install nvidia-cuda-toolkit”&#xff0c;其他问题请去别的博客查。…

spring-ai快速集成deepseek大模型

一、Spring AI简介&#xff1a;Spring AI致力于简化AI项目的开发与部署流程&#xff0c;使Java开发者能够更高效地将AI技术集成到业务系统中。它提供了开箱即用的工具和接口&#xff0c;方便集成和管理各种AI模型。 目前,Spring AI 支持的 AI 平台包括&#xff1a;OpenAI (Chat…

进阶——第十六届蓝桥杯嵌入式熟练度练习(开发板输出占空比和频率)

定义变量 uint16_t PA6_frq,PA7_frq; uint16_t PA6_duty10,PA7_duty20; 开启定时器PWM HAL_TIM_PWM_Start(&htim16,TIM_CHANNEL_1);HAL_TIM_PWM_Start(&htim17,TIM_CHANNEL_1); 给变量赋值 PA6_frq80000000/8000/TIM16->ARR1;PA7_frq80000000/4000/TIM17->AR…

TCNE 网络安全

一.概况 CTF&#xff08;Capture The Flag&#xff09;在网络安全领域中指的是网络技术人员之间进行技术竞技的一种比赛形式&#xff0c;它起源于1996年的DEFCON全球黑客大会&#xff0c;以代替之前黑客们通过互相发起真实攻击进行技术比拼的方式&#xff0c;现已成为全球范围网…

信呼OA办公系统sql注入漏洞分析

漏洞描述 信呼OA办公系统uploadAction存在SQL注入漏洞&#xff0c;攻击者可利用该漏洞获取数据库敏感信息。 环境搭建 源码下载地址&#xff1a;https://github.com/rainrocka/xinhu 下载后解压到本地网站根目录下&#xff0c;配置好数据库&#xff0c;然后安装即可 默认密…

springboot如何将lib和jar分离

遇到一个问题&#xff0c;就是每次maven package或者maven install后target中的jar很大&#xff0c;少的50几MB&#xff0c;大的100多兆 优化前&#xff1a; 优化后&#xff1a; 优化前 优化后压缩率77.2MB4.65MB93% 具体方案&#xff1a; pom.xml中 <build><…

ThreadLocal源码分析

文章目录 1.核心数据结构 ThreadLocalMap1.静态内部类 Entry2.真正存储数据的是table数组 2.ThreadLocal.set()方法源码详解1.set2.getMap3.ThreadLocalMap.set4.createMap5.rehash6.resize 3.ThreadLocalMap.get()详解1.get2.ThreadLocalMap.getEntry3.getEntryAfterMiss 4.Th…

VUE环境搭建

node.js安装 node npm – node Package Management 安装完成后&#xff0c;需要设置&#xff1a; npm config set prefix "D:\nodejs"注意&#xff1a;“D:\nodejs” 此处为自己安装的node.js路径。管理员身份运行 切换镜像源 npm config set registry https://r…