Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。
前提条件
- 部署 Easysearch 集群。
- 部署 MySQL 数据库。
- 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。
- 部署 Console,方便查看 Easysearch 数据。
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
操作步骤
在进行数据同步时支持自定义索引 Mapping,但需保证 Mapping 中定义的字段(名称+类型)与 MySQL 中一致。
1. 准备 MySQL 数据源
create database canal;
use canal;
CREATE TABLE `test` (`id` bigint(32) NOT NULL,`name` text NOT NULL,`age` smallint NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;
2. Easysearch 创建索引
PUT test
{"settings" : {"index" : {"number_of_shards" : "1","number_of_replicas" : "1"}},"mappings" : {"properties" : {"id": {"type": "integer"},"name": {"type" : "text"},"age" : {"type" : "integer"}}}
}
3. 安装并启动 Canal-server
下载:https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
修改配置文件
vi conf/example/instance.properties
启动 canal
sh bin/startup.sh
启动成功日志信息,logs/canal/canal.log
关闭 canal
sh bin/stop.sh
4. 安装并启动 Canal-adapter
下载:https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
修改配置文件:application.yml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:flatMessage: truesyncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:canal.tcp.server.host: 127.0.0.1:11111canal.tcp.batch.size: 500srcDataSources:defaultDS:url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=trueusername: canalpassword: canalcanalAdapters:groups:- groupId: g1outerAdapters:- name: logger- name: es7properties:security.auth: admin:4ad8f8f792e81cd0a6decluster.name: easysearch
新增 canal-adapter/conf/es7/test.yml,配置索引和表的映射关系。
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: test # es 的索引名称_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配# sql映射sql: " select a.id as _id,a.id,a.name,a.age from test a "etlCondition: "where a.id>={}"commitBatch: 3000 # 提交批大小
启动 canal-adapter
./bin/startup.sh
5. 验证增量数据同步
在 MySQL 数据库中,对 test 表插入两条数据。
insert
test(
id,
name,
age) values(1,'canal_test1',11);
insert
test(
id,
name,
age) values(2,'canal_test2',22);
6. 在 Console 中,执行以下命令查询数据
最后
Canal 同步的是增量数据,不会同步之前的存量数据。要同步存量数据可参考《使用 Logstash 同步 MySQL 到 Easysearch》
关于 Easysearch
INFINI Easysearch 是一个分布式的近实时搜索与分析引擎,核心引擎基于开源的 Apache Lucene。Easysearch 的目标是提供一个轻量级的 Elasticsearch 可替代版本,并继续完善和支持更多的企业级功能。 与 Elasticsearch 相比,Easysearch 更关注在搜索业务场景的优化和继续保持其产品的简洁与易用性。
官网文档:https://www.infinilabs.com/docs/latest/easysearch
下载地址:https://www.infinilabs.com/download