canal 嵌入式部署
- 背景
- 技术选型
- canal
- 原理
- 用途
- 嵌入式代码实现
- 引入pom
- 引入工具pom
- main方法
- 引入
- 常量定义
- install方法
- buildCanal方法
- pull方法
- printSummary
- printEntry2
- 总结
- 谢谢
背景
最近发现一个需求,需要监听mysql 数据库里的数据变动, 但由于架构方面的原因, 只能做成单体嵌入式的方向,嵌入进应用中,不用单独部署
技术选型
我对监控binlog 监控工具进行了了解,包括
- mysql-binlog-connector-java1
- canal2
- open-replicator3
canal
本篇博文主讲cannal 的嵌入模式
原理
用途
要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
嵌入式代码实现
引入pom
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.common</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.deployer</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.server</artifactId><version>1.1.4</version></dependency>
引入工具pom
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency>
main方法
public static void main(String[] args) {EmbeddedCanalListener embeddedCanalListener = new EmbeddedCanalListener();//安装实体embeddedCanalListener.install();//拉取消息embeddedCanalListener.pull();}
引入
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
常量定义
//日志public static final Logger logger = LoggerFactory.getLogger(EmbeddedCanalListener.class);//随意命名protected static final String DESTINATION = "example";//测试sqlprotected static final String DETECTING_SQL = "select 1 from dual;";//MSQL配置protected static final String MYSQL_ADDRESS = "xxxx";protected static final int MYSQL_PORT = xxx;protected static final String USERNAME = "xxx";protected static final String PASSWORD = "xxx";//使用ORACLE 未实现protected static final String ORACLE_ADDRESS = "xx.xx.xx.xx";protected static final int ORACLE_PORT = xxx;protected static final String ORACLE_USERNAME = "xxx";protected static final String ORACLE_PASSWORD = "xxx";/*** 表筛选 , 这里默认全部* 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)* <p>* <p>* 常见例子:* <p>* 1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打头的表:canal\\.canal.* 4. canal schema下的一张表:canal\\.test1* <p>* 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)*/protected static final String FILTER = ".*";//定义一个serverprivate CanalServerWithEmbedded server;//定义一个clientprivate ClientIdentity clientIdentity = new ClientIdentity(DESTINATION, (short) 1);static {context_format = SEP + "****************************************************" + SEP;context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;context_format += "* Start : [{}] " + SEP;context_format += "* End : [{}] " + SEP;context_format += "****************************************************" + SEP;row_format = SEP+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"+ SEP;transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"+ SEP;}
install方法
//获取一个instanceserver = CanalServerWithEmbedded.instance();//设置一个gennertor去生成server.setCanalInstanceGenerator(destination -> {//构建cannal 把上面的参数设置进去Canal canal = buildCanal();//返回一个Managerreturn new CanalInstanceWithManager(canal, FILTER);});//启动server.start();//启动这个实例server.start(DESTINATION);
buildCanal方法
Canal canal = new Canal();//ID无意义 随便设置canal.setId(12L);canal.setName(DESTINATION);canal.setDesc("my standalone server test ");CanalParameter parameter = new CanalParameter();//parameter.setDataDir("./conf");//索引的模式, 嵌入式选择内存parameter.setIndexMode(IndexMode.MEMORY);//存储buffsize 具体看canal 官方的介绍parameter.setMemoryStorageBufferSize(32 * 1024);//设置Mysql的配置 包括模式,地址,默认scheme,用户名,密码,slaveId(查看mysql的My.conf),链接编码格式,缓存设置(看官方介绍)parameter.setSourcingType(SourcingType.MYSQL);parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, MYSQL_PORT)));parameter.setDefaultDatabaseName("XXXX");parameter.setDbUsername(MYSQL_USERNAME);parameter.setDbPassword(MYSQL_PASSWORD);//可以指定binlog 和起始位置 或者其实timestamp
// parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}",
// "{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}"));parameter.setSlaveId(2L);parameter.setDefaultConnectionTimeoutInSeconds(30);parameter.setConnectionCharset("GBK");parameter.setConnectionCharsetNumber((byte) 33);parameter.setReceiveBufferSize(8 * 1024);parameter.setSendBufferSize(8 * 1024);//测试链接设置 ,这里是false 无意义parameter.setDetectingEnable(false);parameter.setDetectingIntervalInSeconds(10);parameter.setDetectingRetryTimes(3);parameter.setDetectingSQL(DETECTING_SQL);parameter.setGtidEnable(true);canal.setCanalParameter(parameter);return canal;
pull方法
//定义拉取的大小int batchSize = 5 * 1024;while (running) {try {//订阅当前设定的clientserver.subscribe(clientIdentity);//循环拉取while (running) {Message message = server.getWithoutAck(clientIdentity, batchSize);List<CanalEntry.Entry> entries;//message如果是raw形式的需要去rawEntries去解析if (message.isRaw()) {List<ByteString> rawEntries = message.getRawEntries();entries = new ArrayList<>(rawEntries.size());for (ByteString byteString : rawEntries) {CanalEntry.Entry entry;try {entry = CanalEntry.Entry.parseFrom(byteString);} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}entries.add(entry);}} else {//如果不是就直接拉取entries = message.getEntries();}long batchId = message.getId();int size = entries.size();//如果是batchId是负一或者无内容进行睡眠if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//打印汇总信息printSummary(message, batchId, size);//打印实体信息printEntry2(entries);server.ack(clientIdentity, batchId); // 提交确认}}} finally {//取消订阅//server.unsubscribe(clientIdentity);}}
printSummary
//打印解读时间
//关注log的起始位置终止位置和时间延迟的可以关注这个类 如何取protected void printSummary(Message message, long batchId, int size) {long memsize = 0;for (Entry entry : message.getEntries()) {memsize += entry.getHeader().getEventLength();}String startPosition = null;String endPosition = null;if (!CollectionUtils.isEmpty(message.getEntries())) {startPosition = buildPositionForDump(message.getEntries().get(0));endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));}SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);logger.info(context_format,new Object[]{batchId, size, memsize, format.format(new Date()), startPosition, endPosition});}protected String buildPositionForDump(Entry entry) {long time = entry.getHeader().getExecuteTime();Date date = new Date(time);SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"+ entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {position += " gtid(" + entry.getHeader().getGtid() + ")";}return position;}
printEntry2
//打印实体private void printEntry2(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {//如果需要监控事务的可以在这里进行实现continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//关注具体内容可以在这里实现EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}//打印具体内容
protected void printColumn(List<Column> columns) {for (Column column : columns) {//如果column 是更新了的字段才打印if (column.getUpdated()) {StringBuilder builder = new StringBuilder();try {if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {// get value bytesbuilder.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));} else {builder.append(column.getName() + " : " + column.getValue());}} catch (UnsupportedEncodingException e) {}builder.append(" type=" + column.getMysqlType());if (column.getUpdated()) {builder.append(" update=" + column.getUpdated());}builder.append(SEP);logger.info(builder.toString());}}}
总结
到这里 代码基本完成了, 然后根据自己的业务实现就好了
具体可以参考 canal java 客户端 的官方实现
还有他们的 AdminGuide 里面有详细的案例
推荐一份源码解析
谢谢
mysql-binlog-connector-java ↩︎
canal ↩︎
open-replicator ↩︎