基于Canal同步MySQL数据到Elasticsearch

基于Canal同步MySQL数据到Elasticsearch

基于 canal 同步 mysql 的数据到 elasticsearch 中。

1、canal-server

相关软件的安装请参考:《Canal实现数据同步》

1.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>canal-to-elasticsearch</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>canal-to-elasticsearch</name><description>canal to elasticsearch</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.2 SimpleCanalClientExample编写

package com.example.canatest.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** 说明:用于测试canal是否已经连接上了mysql*/
public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {// 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

在这里插入图片描述

在这里插入图片描述

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

这个类只是测试,下面不使用。

2、canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

canal-adapter安装:

搜索镜像

$ docker search canal-adapter

在这里插入图片描述

拉取镜像

$ docker pull slpcat/canal-adapter:v1.1.5

在这里插入图片描述

启动

$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5

在这里插入图片描述

修改配置

$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumer# canal.tcp.server.host需要修改canal.tcp.server.host: 192.168.94.186:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:# url,username,password需要修改url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=trueusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger# name需要修改- name: es7# hosts需要修改hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest modeproperties:mode: rest# security.auth: test:123456 #  only used for rest mode# cluster.name需要修改cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:# 需要修改_index: canal_test_id: _id_type: _docupsert: true
#  pk: id# 需要修改sql: "
SELECTc.id AS _id,c.user_id AS userId,c.title AS title,c.url AS url,c.note AS note,c.collected AS collected,c.created AS created,c.personal AS personal,u.username AS username,u.avatar AS userAvatar
FROMm_collect c
LEFT JOIN m_user u ON c.user_id = u.id"
#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"commitBatch: 3000

也可以在外面编辑好,通过docker命令传输到docker容器中:

$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml

重启容器

$ docker restart 89ef714d3a0e

验证是否启动成功

$ docker logs -f 89ef714d3a0e

在这里插入图片描述

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

设置格式。

3、测试

准备测试条件:

1、首先在数据库中生成表和字段

CREATE TABLE `m_user` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`avatar` varchar(255) DEFAULT NULL,`created` date DEFAULT NULL,`lasted` date DEFAULT NULL,`open_id` varchar(255) DEFAULT NULL,`statu` int(11) DEFAULT NULL,`username` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;CREATE TABLE `m_collect` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`collected` date DEFAULT NULL,`created` date DEFAULT NULL,`note` varchar(255) DEFAULT NULL,`personal` int(11) DEFAULT NULL,`title` varchar(255) DEFAULT NULL,`url` varchar(255) DEFAULT NULL,`user_id` bigint(20) DEFAULT NULL,PRIMARY KEY (`id`),KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

在这里插入图片描述

2、然后在elasticsearch中生成索引

# 创建索引并添加映射字段
PUT /canal_test
{"mappings": {"properties": {"collected": {"type": "date","format": "date_optional_time||epoch_millis"},"created": {"type": "date","format": "date_optional_time||epoch_millis"},"note": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"personal": {"type": "integer"},"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"url": {"type": "text"},"userAvatar": {"type": "text"},"userId": {"type": "long"},"username": {"type": "keyword"}}}
}

在这里插入图片描述

3、插入数据

INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');

在这里插入图片描述

4、查看数据

GET /canal_test/_search

5、遇到的问题

如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/171541.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++多线程

目录 一、进程与线程 二、多线程的实现 2.1 C中创建多线程的方法 2.2 join() 、 detach() 和 joinable() 2.2.1 join() 2.2.2 detach() 2.2.3 joinable() 2.3 this_thread 三、同步机制&#xff08;同步原语&#xff09; 3.1 同步与互斥 3.2 互斥锁&#xff08;mu…

面向对象(类/继承/封装/多态)详解

简介: 面向对象编程&#xff08;Object-Oriented Programming&#xff0c;OOP&#xff09;是一种广泛应用于软件开发的编程范式。它基于一系列核心概念&#xff0c;包括类、继承、封装和多态。在这篇详细的解释中&#xff0c;我们将探讨这些概念&#xff0c;并说明它们如何在P…

【QT开发(17)】2023-QT 5.14.2实现Android开发

1、简介 搭建Qt For Android开发环境需要安装的软件有&#xff1a; JAVA SDK &#xff08;jdk 有apt install 安装&#xff09; Android SDK Android NDKQT官网的介绍&#xff1a; Different Qt versions depend on different NDK versions, as listed below: Qt versionNDK…

Hbase基本使用,读写原理,性能优化学习

文章目录 HBase简介HBase定义HBase数据模型**HBase** **逻辑结构****HBase** **物理存储结构****HBase** **基本架构** HBase 入门**HBase** **安装部署****HBase** 配置文件**HBase** 启动停止**HBase** **访问页面****HBase** **高可用****HBase Shell****HBase API**HBaseCo…

Java关于实例对象调用静态变量和静态方法问题

直接去看原文 原文链接:Java关于实例对象调用静态变量和静态方法问题_java对象可以调用static方法吗_骑个小蜗牛的博客-CSDN博客 --------------------------------------------------------------------------------------------------------------------------------- 实例…

ES6初步了解迭代器

迭代器是什么&#xff1f; 迭代器(iterator)是一种接口&#xff0c;为各种不同的数据结构提供统一的访问机制。任何数据结构只要部署 iterator 接口&#xff0c;就可以完成遍历操作 ES6创造了一种新的遍历方法for…of循环&#xff0c;iterator 接口主要供 for…of 使用 原生中具…

NIO和BIO编程

一、网络通信编程基本常识 1、什么是Socket&#xff1f; Socket是应用层与TCP/IP协议族通信的中间软件抽象层&#xff0c;它是一组接口&#xff0c;一般由操作系统提供。 2、短连接 短连接是指socket建立连接之后传输数据确定接收完后关闭连接 3、长连接 长连接是指建立so…

Go RESTful API 接口开发

文章目录 什么是 RESTful APIGo 流行 Web 框架-GinGo HelloWorldGin 路由和控制器Gin 处理请求参数生成 HTTP 请求响应Gin 的学习内容实战用 Gin 框架开发 RESTful APIOAuth 2.0接口了解用 Go 开发 OAuth2.0 接口示例 编程有一个准则——Don‘t Repeat Yourself&#xff08;不要…

vue源码分析(三)——new Vue 的过程(详解data定义值后如何获取的过程)

文章目录 零、准备工作1.创建vue2项目2.修改main.js 一、import Vue from vue引入的vue是哪里来的&#xff08;看导入node_modules包&#xff09;1&#xff1a; 通过node_modules包的package.json文件2&#xff1a; 通过配置中的main入口文件进入开发环境的源码&#xff08;1&a…

java将list转为逗号隔开字符串,将逗号连接的字符串转成字符数组,​将逗号分隔的字符串转换为List​(Java逗号分隔-字符串与数组相互转换)

一、通过testList.stream().collect(Collectors.joining(",")) &#xff0c;通过流转换&#xff0c;将list转为逗号隔开字符串 List<String> testList new ArrayList<>(); testList.add("test1"); testList.add("test2"); testList…

如何查找特定基因集合免疫基因集 炎症基因集

温故而知新&#xff0c;再次看下Msigdb数据库。它更新了很多内容。给我们提供了一个查询基因集的地方。 关注微信&#xff1a;生信小博士 比如纤维化基因集&#xff1a; 打开网址&#xff1a;https://www.gsea-msigdb.org/gsea/msigdb/index.jsp 2.点击search 3.比如我对纤维…

基于Python Django 的微博舆论、微博情感分析可视化系统(V2.0)

文章目录 1 简介2 意义3 技术栈Django 4 效果图微博首页情感分析关键词分析热门评论舆情预测 5 推荐阅读 1 简介 基于Python的微博舆论分析&#xff0c;微博情感分析可视化系统&#xff0c;项目后端分爬虫模块、数据分析模块、数据存储模块、业务逻辑模块组成。 Python基于微博…

如何在Windows和Linux系统上监听文件夹的变动?

文章目录 如何在Windows和Linux系统上监听文件夹的变动&#xff1f;读写文件文件系统的操作缓冲和流文件改变事件 如何在Windows和Linux系统上监听文件夹的变动&#xff1f; libuv库实现了监听整个文件夹的修改。本文详细介绍libuv库文件读写和监听的的实现方法。libuv库开发了…

浏览器事件循环 (event loop)

进程与线程 进程 进程的概念 进程是操作系统中的一个程序或者一个程序的一次执行过程&#xff0c;是一个动态的概念&#xff0c;是程序在执行过程中分配和管理资源的基本单位&#xff0c;是操作系统结构的基础。 简单的来说&#xff0c;就是一个程序运行开辟的一块内存空间&a…

【计算机网络笔记】Cookie技术

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

Fourier分析导论——第1章——Fourier分析的起源(E.M. Stein R. Shakarchi)

第 1 章 Fourier分析的起源 (The Genesis of Fourier Analysis) Regarding the researches of dAlembert and Euler could one not add that if they knew this expansion, they made but a very imperfect use of it. They were both persuaded that an arbitrary and d…

如何使用手机蓝牙设备作为电脑的解锁工具像动态锁那样,蓝牙接近了电脑,电脑自动解锁无需输入开机密码

环境&#xff1a; Win10 专业版 远程解锁 蓝牙解锁小程序 问题描述&#xff1a; 如何使用手机蓝牙设备作为电脑的解锁工具像动态锁那样&#xff0c;蓝牙接近了电脑&#xff0c;电脑自动解锁无需输入开机密码 手机不需要拿出来&#xff0c;在口袋里就可以自动解锁&#xff…

calcite 校验层总结

1、校验的作用 1&#xff09;完善语义信息 例如在SQL语句中&#xff0c;如果碰到select * 这样的指令&#xff0c;在SQL的语义当中&#xff0c;“*” 指的是取出对应数据源中所有字段的信息&#xff0c;因此就需要根据元数据信息来展开。 2&#xff09;结合元数据信息来纠偏…

Java游戏修炼手册:2023 最新学习线路图

前言 有没有一种令人兴奋的学习方法&#xff1f;当然有&#xff01;绝对有&#xff01;而且我要告诉你&#xff0c;学习的快乐可以媲美游戏的刺激。 小学时代&#xff0c;我曾深陷于一款名为"八百万勇士的梦"的游戏。每当放学&#xff0c;我总是迫不及待地打开电脑&a…

JSON(详解)

目录 什么是JSON&#xff1f; 哪里会用到JSON&#xff1f; JSON的特点 JSON的优点 JSON的缺点 JSON和cJSON的关系 什么是JSON&#xff1f; JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式。它以易于阅读和编写的文本格式来存储和表示…