Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

一、Mysql 的安装和配置

1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码

# 获取镜像
docker pull mysql:8.0.40-debian# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0.40-debian

2.连接 mysql,检查是否正确安装

        使用 Navicat 连接

使用 linux 命令行连接,172.21.121.208 是我本地映射的ip地址,这里换成对应 ip 即可

3.修改 mysql 的配置文件

# 进入刚刚安装的 mysql 容器
docker exec -it super-mysql /bin/bash# 查找 mysql 的配置文件 my.cnf
find / -name my.cnf 2>/dev/null#显示:
# /var/lib/dpkg/alternatives/my.cnf
# /etc/alternatives/my.cnf
# /etc/mysql/my.cnf# 修改配置文件 my.cnf 
vim /etc/mysql/my.cnf# 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复# 如果没有vim,则需 先安装vim,执行如下命令
# 更新源
apt update# 安装vim
apt install vim

4.授权 canal 链接 mysql 账号具有作为 mysql slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

运行截图如下

使用刚刚新建的账号: canal 密码: canal 连接 mysql,成功了才往下配置

二、Canal 的安装和配置

1.使用 docker 安装 canal

# 获取镜像
docker pull canal/canal-server:latest# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run --name super-canal -p 3399:11111 -d canal/canal-server:latest

2.修改 canal 配置,并且重启 canal

# 进入刚刚安装的 canal 容器
docker exec -it super-canal /bin/bash# 查找 canal 的配置文件 instance.properties
find / -name 'instance.properties' 2>/dev/null 
# 显示:
# /home/admin/canal-server/conf/example/instance.properties# 修改配置文件 instance.properties
vi /home/admin/canal-server/conf/example/instance.propertiescanal.instance.master.address=连接mysql的ip:port
canal.instance.dbUsername=连接mysql的账号
canal.instance.dbPassword=连接mysql的密码# 重启 canal 服务
find / -name 'stop.sh' 2>/dev/null # 查找停止 canal 命令
bash /home/admin/canal-server/bin/stop.sh # 停止 canalfind / -name 'startup.sh' 2>/dev/null # 查找重启 canal 命令
bash /home/admin/canal-server/bin/startup.sh # 重启 canal

主要修改连接 msyql 的 ip 和 port,使用的账号和密码,修改配置如下:

三、使用 PHP 测试 Canal 是否监听到了 Mysql 的变化

1.初始化项目

# 创建项目文件夹 canal-elasticsearch,并且进入到项目文件夹# composer 初始化项目
composer init 
# 一直按回车键 Enter# 安装 cannal 依赖
composer require xingwenge/canal_php

2.在 src 目录下新建文件 index.php ,写入一下代码

<?php
require __DIR__.'/../vendor/autoload.php';
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);}}sleep(1);}$client->disConnect();
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

3.命令行中启动脚本 php ./src/index.php

4.在 mysql 中新建表或者新增数据,就会在命令行中打印出来

# 创建数据表
CREATE TABLE `user` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',`age` int unsigned NOT NULL DEFAULT '0',`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

注意:如果新增数据表或者其它情况,导致 canal 突然连接不上 mysql 了,建议停止 canal,且删除当前的 canal 容器,重新使用 docker 安装 canal,这样解决起来比较迅速(坏笑)

四、ElasticSearch 的安装和配置

1.使用 docker 安装 elasticsearch

# 获取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1# 查看镜像是否下载成功
docker images# 创建数据文件夹
mkdir /mnt/d/Work/Code/docker/elasticsearch/data# 运行 elasticsearch 镜像
docker run --name super-elasticsearch -d \-p 3320:9200 -p 3330:9300 \-e "ES_JAVA_OPTS=-Xms4g -Xmx16g" \-e "discovery.type=single-node" \-v /mnt/d/Work/Code/docker/elasticsearch/data:/usr/share/elasticsearch/data \registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1

2.测试 elasticsearch 是否安装成功, 在浏览器地址栏输入:127.0.0.1:3320,出现如下画面表示安装成功了

五、使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

1.在 php 中使用 composer 安装 elasticsearch 依赖包

# 使用 composer 安装 elasticsearch 依赖包
composer require elasticsearch/elasticsearch

2.使用 php 在 elasticsearch 中创建 mapping,在 src 目录下创建文件 creatElasticSearchMapping.php

<?php
require 'vendor/autoload.php';use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\BadRequest400Exception;// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['localhost:3320']) // 设置 Elasticsearch 主机和端口->build();// 索引名称
$indexName = 'canal_user_index';// 索引设置和映射
$params = ['index' => $indexName,'body'  => ['settings' => ['number_of_shards' => 1, // 分片数量'number_of_replicas' => 0 // 副本数量],'mappings' => ['properties' => ['name' => ['type' => 'text'],'age' => ['type' => 'integer'],'email' => ['type' => 'keyword'],'created_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],'updated_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],]]]
];try {// 创建索引$response = $client->indices()->create($params);echo "索引 '$indexName' 创建成功:\n";print_r($response);
} catch (BadRequest400Exception $e) {// 如果索引已经存在if (strpos($e->getMessage(), 'index_already_exists_exception') !== false) {echo "索引 '$indexName' 已经存在。\n";} else {echo "创建索引时发生错误: " . $e->getMessage() . "\n";}
} catch (\Exception $e) {echo "创建索引时发生错误: " . $e->getMessage() . "\n";
}

3.检查elasticsearch中的mapping是否创建成功

curl -XGET 'http://localhost:3320/canal_user_index/_mapping'

提示如图

4.在 php 中通过 canal 获取 msyql 的数据变化,且更新到 elasticsearch 中,数据的增删改代码都在下面了,在 src 目录下创建文件 canalToElasticSearch.php

<?php
require __DIR__.'/../vendor/autoload.php';
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
use Elasticsearch\ClientBuilder;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;$clientES = ClientBuilder::create()->setHosts(['localhost:3320'])->setSSLVerification(false) // 禁用 SSL 验证(仅用于开发环境)->setRetries(3) // 设置重试次数->build();// 索引名称
$indexName = 'canal_user_index';try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");echo "script start success!";while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);$rowChange = new RowChange();$rowChange->mergeFromString($entry->getStoreValue());$evenType = $rowChange->getEventType();$header = $entry->getHeader();/** @var RowData $rowData */foreach ($rowChange->getRowDatas() as $rowData) {switch ($evenType) {/** 删除数据 */case EventType::DELETE:if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && $clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->delete(['index' => $indexName, 'type' => '_doc', 'id' => $id]);}}break;/** 新增数据 */case EventType::INSERT:$insertData = [];if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {$insertData = array_merge($insertData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($insertData)) {$params = ['index' => $indexName, 'body' => $insertData];if (!empty($id)) $params['id'] = $id;$response = $clientES->index($params);}}break;default:/** 更新数据 */if ($rowData->getAfterColumns()) {$updateData = [];foreach ($rowData->getAfterColumns() as $column) {$updateData = array_merge($updateData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && !empty($updateData)) {$params = ['index' => $indexName,'id' => $id,'body' => ['doc' => $updateData],];if ($clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->update($params);} else {$updateData['id'] = $id;$params['body'] = $updateData;$response = $clientES->index($params);}}}break;}}}}sleep(1);}
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

5.执行php脚本,监听数据变化

我这里在 mysql 中添加了一些数据,都同步到 elasticsearch 里面了

msyql 中的截图:

elasticsearch 中的截图

看到这里,辛苦了。

感觉自己今天又又又变得比昨天更强了

参考文档链接:

1.Mysql 数据库 主从数据库 (主从)(主主)-CSDN博客

2.https://github.com/xingwenge/canal-php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/489707.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搭建springmvc项目

什么是springmvc MVC它是一种设计理念。把程序按照指定的结构来划分: Model模型 View视图 Controller控制层 springmvc框架是spring框架的一个分支。它是按照mvc架构思想设计的一款框架。 springmvc的主要作用: 接收浏览器的请求数据&#xff0c;对数据进行处理&#xff0c;…

学习笔记069——Java集合框架

文章目录 集合1、List 接口2、Set 接口3、Map3.1、Map 常用实现类 集合 需要创建多个对象&#xff0c;但是数量和类型不确定。 集合是 Java 提供的一种类型&#xff0c;功能和数组类似&#xff0c;但是长度和数据类型都是动态。 集合框架&#xff08;包括很多类和接口&#…

【Linux】基础IO(内存文件)

目录 一、预备知识二、复习常见C语言的文件接口2.1 文件接口的说明2.1.1 fopen函数2.1.2 fputs函数2.1.3 fclose函数 2.2 文件接口的使用 三、认识操作文件的系统调用3.1 系统调用的说明3.1.1 open函数3.1.1.1 Linux中常用的传参方法 3.1.2 write函数3.1.3 close函数 3.2 系统调…

基础开发工具-编辑器vim

vim操作键盘图 下图是比较基础的vim操作键盘图 &#xff08;IDE例子&#xff09; vi/vim的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是vim是vi的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;⽽且还有⼀些新的特性在⾥⾯。例如语法加亮&a…

RT-DETR融合[CVPR2024]Starnet中的star block取模块

RT-DETR使用教程&#xff1a; RT-DETR使用教程 RT-DETR改进汇总贴&#xff1a;RT-DETR更新汇总贴 《Rewrite the Stars》 一、 模块介绍 论文链接&#xff1a;https://arxiv.org/abs/2403.19967 代码链接&#xff1a;https://github.com/ma-xu/Rewrite-the-Stars/tree/main 论…

使用webrtc-streamer查看实时监控

摄像头配置&#xff08;海康摄像头为例&#xff09; 摄像头视频编码应改成H264格式 webrtc-streamer下载 webrtc-streamer下载地址 下载后解压出来双击运行&#xff0c;端口默认8000 VUE2项目引入文件 在项目静态文件“public”中需引入两个js文件“webrtcstreamer.js”与“…

04面向对象篇(D4_OOT(D1_OOT - 面向对象测试))

目录 一、 面向对象影响测试 1. 封装性影响测试 2. 继承性影响测试 3. 多态性影响测试 二、 面向对象测试模型 三、 面向对象分析测试 1. 对象测试 2. 结构测试 3. 主题测试 4. 属性和实例关联测试 5. 服务和消息关联测试 四、面向对象设计测试 1. 对认定类测试 …

每天40分玩转Django:简介和环境搭建

Django简介和环境搭建 一、课程概述 学习项目具体内容预计用时Django概念Django框架介绍、MVC/MTV模式、Django特点60分钟环境搭建Python安装、pip配置、Django安装、IDE选择45分钟创建项目项目结构、基本配置、运行测试75分钟实战练习创建个人博客项目框架60分钟 二、Djang…

【CTF-Web】文件上传漏洞学习笔记(ctfshow题目)

文件上传 文章目录 文件上传 What is Upload-File&#xff1f;Upload-File In CTF Web151 考点&#xff1a;前端校验解题&#xff1a; Web152 考点&#xff1a;后端校验要严密解题&#xff1a; Web153 考点&#xff1a;后端校验 配置文件介绍解题&#xff1a; Web154 考点&am…

uniappp配置导航栏自定义按钮(解决首次加载图标失败问题)

1.引入iconfont的图标&#xff0c;只保留这两个文件 2.App.vue引入到全局中 import "./static/fonts/iconfont.css"3.pages.json中配置text为图标对应的unicode {"path": "pages/invite/invite","style": {"h5": {"…

基于Android的生活记录app的设计与实现

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…

Photoshop提示错误弹窗dll缺失是什么原因?要怎么解决?

Photoshop提示错误弹窗“DLL缺失”&#xff1a;原因分析与解决方案 在创意设计与图像处理领域&#xff0c;Photoshop无疑是众多专业人士和爱好者的首选工具。然而&#xff0c;在使用Photoshop的过程中&#xff0c;有时会遇到一些令人头疼的问题&#xff0c;比如突然弹出的错误…

软考:工作后再考的性价比分析

引言 在当今的就业市场中&#xff0c;软考&#xff08;软件设计师、系统分析师等资格考试&#xff09;是否值得在校学生花费时间和精力去准备&#xff1f;本文将从多个角度深入分析软考在不同阶段的性价比&#xff0c;帮助大家做出明智的选择。 一、软考的价值与局限性 1.1 …

Ensembl数据库下载参考基因组(常见模式植物)bioinfomatics 工具37

拟南芥参考基因组_拟南芥数据库-CSDN博客 1 Ensembl数据库网址 http://plants.ensembl.org/index.html #官网 如拟南芥等 那么问题来了&#xff0c;基因组fa文件和gff文件在哪里&#xff1f; 2 参考案例 拟南芥基因组fa在这里 注释gff文件在这里

H.323音视频协议

概述 H.323是国际电信联盟&#xff08;ITU&#xff09;的一个标准协议栈&#xff0c;该协议栈是一个有机的整体&#xff0c;根据功能可以将其分为四类协议&#xff0c;也就是说该协议从系统的总体框架&#xff08;H.323&#xff09;、视频编解码&#xff08;H.263&#xff09;、…

mp4影像和m4a音频无损合成视频方法

第一步&#xff1a;复制高清视频地址 url 第二步:打开网址粘贴复制的视频url视频下载 第三步&#xff1a;下载-影像.mp4和-音频.m4a 第四步&#xff1a;合并视频&#xff1b; 使用ffmpeg进行无损合成&#xff08;如果没有安装ffmpeg请自行下载安装下载 FFmpeg (p2hp.com)&…

LLM之RAG实战(五十)| FastAPI:构建基于LLM的WEB接口界面

FastAPI是WEB UI接口&#xff0c;随着LLM的蓬勃发展&#xff0c;FastAPI的生态也迎来了新的机遇。本文将围绕FastAPI、OpenAI的API以及FastCRUD&#xff0c;来创建一个个性化的电子邮件写作助手&#xff0c;以展示如何结合这些技术来构建强大的应用程序。 下面我们开始分步骤操…

40 list类 模拟实现

目录 一、list类简介 &#xff08;一&#xff09;概念 &#xff08;二&#xff09;list与string和vector的区别 二、list类使用 &#xff08;一&#xff09;构造函数 &#xff08;二&#xff09;迭代器 &#xff08;三&#xff09;list capacity &#xff08;四&#x…

迎接全新的 Kotlin 支持 – K2 模式:基本信息

K2 模式有什么作用&#xff1f; K2 模式是 IntelliJ IDEA 中 Kotlin 支持的新实现&#xff0c;它可以提高 IDE 的稳定性&#xff0c;同时也会为支持未来 Kotlin 语言功能奠定基础。 K2 模式与 Kotlin K2 编译器有什么区别&#xff1f; K2 编译器负责编译 Kotlin 语言 2.0 或…

openGauss开源数据库实战二十四

文章目录 任务二十四 openGaussss WAL管理和归档管理任务目标实施步骤一、WAL管理1.不能修改的WAL参数2.可以修改的WAL参数 二、配置openGauss工作在归档模式1.查看当前的归档设置2.停止openGauss数据库3.创建归档日志的保存目录4.修改启动参数文件5.重新启动openGauss数据库6.…