canal 嵌入式部署 监听binlog

canal 嵌入式部署

  • 背景
    • 技术选型
    • canal
    • 原理
    • 用途
    • 嵌入式代码实现
      • 引入pom
      • 引入工具pom
      • main方法
      • 引入
      • 常量定义
      • install方法
      • buildCanal方法
      • pull方法
      • printSummary
      • printEntry2
    • 总结
    • 谢谢

背景

最近发现一个需求,需要监听mysql 数据库里的数据变动, 但由于架构方面的原因, 只能做成单体嵌入式的方向,嵌入进应用中,不用单独部署

技术选型

我对监控binlog 监控工具进行了了解,包括

  1. mysql-binlog-connector-java1
  2. canal2
  3. open-replicator3

canal

本篇博文主讲cannal 的嵌入模式

原理

在这里插入图片描述

用途

要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

嵌入式代码实现

引入pom

         <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.common</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.deployer</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.server</artifactId><version>1.1.4</version></dependency>

引入工具pom

        <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency>

main方法

public static void main(String[] args) {EmbeddedCanalListener embeddedCanalListener = new EmbeddedCanalListener();//安装实体embeddedCanalListener.install();//拉取消息embeddedCanalListener.pull();}

引入

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

常量定义

//日志public static final Logger logger = LoggerFactory.getLogger(EmbeddedCanalListener.class);//随意命名protected static final String DESTINATION = "example";//测试sqlprotected static final String DETECTING_SQL = "select 1 from dual;";//MSQL配置protected static final String MYSQL_ADDRESS = "xxxx";protected static final int MYSQL_PORT = xxx;protected static final String USERNAME = "xxx";protected static final String PASSWORD = "xxx";//使用ORACLE 未实现protected static final String ORACLE_ADDRESS = "xx.xx.xx.xx";protected static final int ORACLE_PORT = xxx;protected static final String ORACLE_USERNAME = "xxx";protected static final String ORACLE_PASSWORD = "xxx";/*** 表筛选 , 这里默认全部* 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)* <p>* <p>* 常见例子:* <p>* 1.  所有表:.*   or  .*\\..* 2.  canal schema下所有表: canal\\..* 3.  canal下的以canal打头的表:canal\\.canal.* 4.  canal schema下的一张表:canal\\.test1* <p>* 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)*/protected static final String FILTER = ".*";//定义一个serverprivate CanalServerWithEmbedded server;//定义一个clientprivate ClientIdentity clientIdentity = new ClientIdentity(DESTINATION, (short) 1);static {context_format = SEP + "****************************************************" + SEP;context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;context_format += "* Start : [{}] " + SEP;context_format += "* End : [{}] " + SEP;context_format += "****************************************************" + SEP;row_format = SEP+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"+ SEP;transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"+ SEP;}

install方法

        //获取一个instanceserver = CanalServerWithEmbedded.instance();//设置一个gennertor去生成server.setCanalInstanceGenerator(destination -> {//构建cannal 把上面的参数设置进去Canal canal = buildCanal();//返回一个Managerreturn new CanalInstanceWithManager(canal, FILTER);});//启动server.start();//启动这个实例server.start(DESTINATION);

buildCanal方法

		Canal canal = new Canal();//ID无意义 随便设置canal.setId(12L);canal.setName(DESTINATION);canal.setDesc("my standalone server test ");CanalParameter parameter = new CanalParameter();//parameter.setDataDir("./conf");//索引的模式, 嵌入式选择内存parameter.setIndexMode(IndexMode.MEMORY);//存储buffsize 具体看canal 官方的介绍parameter.setMemoryStorageBufferSize(32 * 1024);//设置Mysql的配置 包括模式,地址,默认scheme,用户名,密码,slaveId(查看mysql的My.conf),链接编码格式,缓存设置(看官方介绍)parameter.setSourcingType(SourcingType.MYSQL);parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, MYSQL_PORT)));parameter.setDefaultDatabaseName("XXXX");parameter.setDbUsername(MYSQL_USERNAME);parameter.setDbPassword(MYSQL_PASSWORD);//可以指定binlog 和起始位置 或者其实timestamp
//        parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}",
//                "{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}"));parameter.setSlaveId(2L);parameter.setDefaultConnectionTimeoutInSeconds(30);parameter.setConnectionCharset("GBK");parameter.setConnectionCharsetNumber((byte) 33);parameter.setReceiveBufferSize(8 * 1024);parameter.setSendBufferSize(8 * 1024);//测试链接设置 ,这里是false 无意义parameter.setDetectingEnable(false);parameter.setDetectingIntervalInSeconds(10);parameter.setDetectingRetryTimes(3);parameter.setDetectingSQL(DETECTING_SQL);parameter.setGtidEnable(true);canal.setCanalParameter(parameter);return canal;

pull方法

		//定义拉取的大小int batchSize = 5 * 1024;while (running) {try {//订阅当前设定的clientserver.subscribe(clientIdentity);//循环拉取while (running) {Message message = server.getWithoutAck(clientIdentity, batchSize);List<CanalEntry.Entry> entries;//message如果是raw形式的需要去rawEntries去解析if (message.isRaw()) {List<ByteString> rawEntries = message.getRawEntries();entries = new ArrayList<>(rawEntries.size());for (ByteString byteString : rawEntries) {CanalEntry.Entry entry;try {entry = CanalEntry.Entry.parseFrom(byteString);} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}entries.add(entry);}} else {//如果不是就直接拉取entries = message.getEntries();}long batchId = message.getId();int size = entries.size();//如果是batchId是负一或者无内容进行睡眠if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//打印汇总信息printSummary(message, batchId, size);//打印实体信息printEntry2(entries);server.ack(clientIdentity, batchId); // 提交确认}}} finally {//取消订阅//server.unsubscribe(clientIdentity);}}

printSummary

//打印解读时间
//关注log的起始位置终止位置和时间延迟的可以关注这个类 如何取protected void printSummary(Message message, long batchId, int size) {long memsize = 0;for (Entry entry : message.getEntries()) {memsize += entry.getHeader().getEventLength();}String startPosition = null;String endPosition = null;if (!CollectionUtils.isEmpty(message.getEntries())) {startPosition = buildPositionForDump(message.getEntries().get(0));endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));}SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);logger.info(context_format,new Object[]{batchId, size, memsize, format.format(new Date()), startPosition, endPosition});}protected String buildPositionForDump(Entry entry) {long time = entry.getHeader().getExecuteTime();Date date = new Date(time);SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"+ entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {position += " gtid(" + entry.getHeader().getGtid() + ")";}return position;}

printEntry2

//打印实体private void printEntry2(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) 		      {//如果需要监控事务的可以在这里进行实现continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//关注具体内容可以在这里实现EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}//打印具体内容
protected void printColumn(List<Column> columns) {for (Column column : columns) {//如果column 是更新了的字段才打印if (column.getUpdated()) {StringBuilder builder = new StringBuilder();try {if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {// get value bytesbuilder.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));} else {builder.append(column.getName() + " : " + column.getValue());}} catch (UnsupportedEncodingException e) {}builder.append("    type=" + column.getMysqlType());if (column.getUpdated()) {builder.append("    update=" + column.getUpdated());}builder.append(SEP);logger.info(builder.toString());}}}

总结

到这里 代码基本完成了, 然后根据自己的业务实现就好了
具体可以参考 canal java 客户端 的官方实现
还有他们的 AdminGuide 里面有详细的案例
推荐一份源码解析

谢谢


  1. mysql-binlog-connector-java ↩︎

  2. canal ↩︎

  3. open-replicator ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/83051.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何优化测试用例?

在我们日常测试工作中&#xff0c;编写测试用例往往花费较多时间&#xff0c;而且设计的测试用例冗杂和不完整&#xff0c;从而造成用例执行检查不完整&#xff0c;效率低下&#xff0c;不能及时发现项目缺陷。 因此提高测试用例编写和执行效率迫在眉睫&#xff0c;一般来说&am…

python之prettytable库的使用

文章目录 一 什么是prettytable二 prettytable的简单使用1. 添加表头2. 添加行3. 添加列4. 设置对齐方式4. 设置输出表格样式5. 自定义边框样式6. 其它功能 三 prettytable在实际中的使用 一 什么是prettytable prettytable是Python的一个第三方工具库&#xff0c;用于创建漂亮…

Endnote 具体期刊格式检索和下载方法——以nature期刊参考文献格式检索和下载为例

Endnote 具体期刊格式检索和下载方法——以nature期刊参考文献格式检索和下载为例 在外文文章写作时候&#xff0c;有时为了提高写作效率&#xff0c;会用到Endnote文献引用功能。然而&#xff0c;有时可能没有现成的参考文献格式&#xff0c;此时&#xff0c;比较快捷的方式&…

【云原生】kubernetes控制器deployment的使用

目录 ​编辑 1 Controller 控制器 1.1 什么是 Controller 1.2 常见的 Controller 控制器 1.3 Controller 如何管理 Pod 2 Deployment 2.1 创建 deployment 2.2 查看 deployment 2.3 扩缩 deployment 2.4 回滚 deployment 2.5 删除 deployment 1 Controller 控制器 …

【设计模式】观察者模式

什么是观察者模式&#xff1f; 观察者模式&#xff08;又被称为发布-订阅&#xff08;Publish/Subscribe&#xff09;模式&#xff0c;属于行为型模式的一种&#xff0c;它定义了一种一对多的依赖关系&#xff0c;让多个观察者对象同时监听某一个主题对象。这个主题对象在状态…

【Git】标签管理与Git Flow模型

目录 一、操作标签 二、推送标签 三、删除标签 四、Git Flow模型分支设计 一、操作标签 git tag # 查看有哪些标签 git tag [name] # 给最近一次commit打标签 git tag [name] [commitID] #给指定的commit打标签 git tag -a [name] -m desc # 打标签并添加描述 二、推送标…

Titanic细节记录一

目录 chunker header index_col names Series与DataFrame的区别 df.columns del和drop的区别 reset_index loc与iloc的区别 不同的排序方式 sort_values sort_index DataFrame相加 describe函数查看数据基本信息 查看多个列的数据时使用列表 处理缺失值的几种思路 …

Java密码学

密码学 1.1 密码学基本概念 密码在我们的生活中有着重要的作用&#xff0c;那么密码究竟来自何方&#xff0c;为何会产生呢&#xff1f; 密码学是网络安全、信息安全、区块链等产品的基础&#xff0c;常见的非对称加密、对称加密、散列函数等&#xff0c;都属于密码学范畴。…

【ARM Cache 系列文章 9 -- ARM big.LITTLE技术】

文章目录 big.LITTLE 技术背景big.LITTLE 技术详解big.LITTLE 硬件要求 big.LITTLE 软件模型CPU MigrationGlobal Task SchedulingGlobal Task Scheduling比CPU Migration的优势 转自&#xff1a;https://zhuanlan.zhihu.com/p/630981648 如有侵权&#xff0c;请联系删除 big.L…

MySQL的查询方法

单表查询 素材&#xff1a; 表名&#xff1a;worker-- 表中字段均为中文&#xff0c;比如 部门号 工资 职工号 参加工作 要求&#xff1a; 1、显示所有职工的基本信息。 2、查询所有职工所属部门的部门号&#xff0c;不显示重复的部门号。 3、求出所有职工的人数。 4、…

php后端实现调用高德地图进行POI搜索

对于当前位置或者选定省市位置进行查询 接口实现 /*** 查询地址* ApiTitle (查询地址)* ApiSummary (查询地址)* ApiMethod (POST)* ApiRoute (/api/demo/address)* ApiParams (name"dart", type"integer", requiredtrue, description"省…

TCP/IP四层模型对比OSI七层网络模型的区别是啥?数据传输过程原来是这样的

一、TCP/IP四层模型对比OSI七层模型 它们两个定义的一些功能和协议都是差不多的。TCP/IP四层协议模型比我们的七层少了三层&#xff0c;把我们的数据链路层和物理层放在一层里面了&#xff0c;叫做数据链路层&#xff08;网络接口层&#xff09;&#xff0c;对应网络协议也没有…

openGauss学习笔记-35 openGauss 高级数据管理-ALTER TABLE语句

文章目录 openGauss学习笔记-35 openGauss 高级数据管理-ALTER TABLE语句35.1 语法格式35.2 参数说明35.3 示例 openGauss学习笔记-35 openGauss 高级数据管理-ALTER TABLE语句 修改表&#xff0c;包括修改表的定义、重命名表、重命名表中指定的列、重命名表的约束、设置表的所…

基于应用值迭代的马尔可夫决策过程(MDP)的策略的机器人研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

如何在 Spring Boot 中集成日志框架 SLF4J、Log4j

文章目录 具体步骤附录 笔者的操作环境&#xff1a; Spring Cloud Alibaba&#xff1a;2022.0.0.0-RC2 Spring Cloud&#xff1a;2022.0.0 Spring Boot&#xff1a;3.0.2 Nacos 2.2.3 Maven 3.8.3 JDK 17.0.7 IntelliJ IDEA 2022.3.1 (Ultimate Edition) 具体步骤 因为 …

Three.js光源

目录 Three.js入门 Three.js光源 Three.js阴影 Three.js纹理贴图 本文我们将研究three.js中的灯光类型和JavaScript中的光源&#xff0c;探索不同的光源设置。我们的目标是全面理解光源设置和类型&#xff0c;比如环境光、半球光、矩形光、方向光、点光源和聚光灯。我们将…

智能安防监控:基于Java+SpringBoot实现人脸识别搜索

目录 引言背景介绍目的和重要性 人脸识别技术的基本原理图像采集和预处理特征提取与表示人脸匹配算法 人脸识别搜索的应用领域公告安全和监控社交网络和照片管理 参考实现步骤数据收集与预处理人脸特征提取查询处理 引言 背景介绍 结合人脸识别技术&#xff0c;在工厂、学校、…

Java后台生成ECharts图片

前言 通过echarts的jar包&#xff0c;Java后台生成一张图片&#xff0c;并把图片插入到word中。关于word插图片的代码在下一章。 需要用到的工具PhantomJS,Echarts-convert.js,jquery.js,echarts.js。 1.PhantomJS 介绍 PhantomJS是一个不需要浏览器的富客户端。 官方介绍&…

gitlab-Runner搭建

root wget https://packages.gitlab.com/runner/gitlab-runner/packages/fedora/29/gitlab-runner-12.6.0-1.x86_64.rpm/download.rpm rpm -ivh download.rpm ---- 安装 rpm -Uvh download.rpm -----更新升级 然后运行&#xff1a; gitlab-runner register --url https://git…

泊松损坏图像的快速尺度间小波去噪研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…