SaaS 电商设计 (五) 私有化部署-实现 binlog 中间件适配

一、 背景

  具体的中间件私有化背景在上文 SaaS` 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysqlES 数据同步的过程中,常用的技术方案有这样几种.

二、 设计主体

2.1 N种方案

方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例)
在这里插入图片描述

如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入.

方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致
在这里插入图片描述

在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入.

方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制
在这里插入图片描述

  • 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点.

  • 业务开发的同时需要同步关心数据的同步
    在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的.

    • 散落在各个业务代码角落的维护成本
      方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升.
      基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离.

    方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.)

在这里插入图片描述

对于第三种方式来说的话,接下来引入了第二个讨论的点.

  • 私有化支持
    就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.
  • 同步目标逻辑的不同支持
    在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美.

2.2 方案4 coding落地

2.2.1 类图

在这里插入图片描述
核心步骤:

step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC .
step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现.
step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler.

2.2.2 核心实现

BinlogHandlerAdapter 完成 binlog client 接口定义.

package com.baixiu.middleware.binlog.adapter;import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.mq.model.CommonMessage;/*** binlog 适配器接口* 适配中间件list:canal,jingwei,drc等。* function1:完成不同中间件解析能力* function2:完成不同中间件handlerMsg能力* @author baixiu* @date 2023年12月11日*/
public interface BinlogHandlerAdapter {/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/BinlogData deserializationMQMsg(CommonMessage mqMsg);/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/void handleBinLogData(BinlogData binLogData) throws Exception;}

CanalBinlogHandlerAdapter 完成 canal 解析

package com.baixiu.middleware.binlog.adapter;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.baixiu.middleware.binlog.consts.CommonConsts;
import com.baixiu.middleware.binlog.core.AbstractBinlogHandler;
import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter;
import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum;
import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel;
import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import com.baixiu.middleware.mq.model.CommonMessage;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** canal binlog handler adapter* 当property配置的clientType=canal时进行注入bean* canal client 用以解析 mq -starter 发送过来的消费消息* @author baixiu* @date 创建时间 2023/12/11 8:39 PM*/
@Slf4j
public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{@Autowiredprivate BinlogTableHandlerRouter binlogTableHandlerRouter;@Overridepublic BinlogData deserializationMQMsg(CommonMessage mqMsg) {FlatMessage flatMessage = JSON.parseObject(mqMsg.getText(),FlatMessage.class);BinlogData binLogData=new BinlogData ();if(flatMessage!=null){binLogData.setBinlogDataObject(flatMessage);}return binLogData;}@Overridepublic void handleBinLogData(BinlogData binLogData) throws Exception {if(binLogData==null || binLogData.getBinlogDataObject()==null){return;}FlatMessage flatMessage= (FlatMessage) binLogData.getBinlogDataObject ();List<Map<String, String>> rowDatas = flatMessage.getData();List<Map<String, String>> oldDatas = flatMessage.getOld();String tableName = flatMessage.getTable();AbstractBinlogHandler handler = binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);for (int i = 0; i < rowDatas.size(); i++) {Map<String, String> rowData = rowDatas.get(i);Map<String, String> oldData = new HashMap<>(i,0.75f);if (oldDatas != null && oldDatas.size() == rowDatas.size()) {oldData = oldDatas.get(i);}Map<String, String> fieldsMaps = Maps.newHashMapWithExpectedSize(20);BinlogDataToDiffModel binlogDataToDiffModel = transRowDataToAllBinlogData(handler, rowData, oldData, fieldsMaps, flatMessage.getType());switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {case INSERT:log.info("Canal.handleMessage.binlogTransConfigToMap.INSERT.{}", JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case UPDATE:log.info("Canal.handleMessage.binlogTransConfigToMap.UPDATE.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case DELETE:Map<String, String> delMap = getBeforeColumnsFromBinlogData(handler, oldData);log.info("Canal.handleMessage.binlogTransConfigToMap.DELETE");handler.delete(delMap);break;default:log.info("CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));break;}}}public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, Map<String, String> afterColumns, Map<String, String> beforeColumns, Map<String, String> fieldsMap, String type) {try {String[] updateFields = binlogData.getUpdateFields();String[] keyFields = binlogData.getFields();List<BinlogTableRowDiffModel> changeList = new ArrayList<> ();for (String key : afterColumns.keySet()) {if (keyFields.length == 1 && ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {fieldsMap.put(key, afterColumns.get(key));} else if (ArrayUtils.contains(keyFields, key)) {fieldsMap.put(key, afterColumns.get(key));}if (beforeColumns != null && !beforeColumns.isEmpty() && beforeColumns.get(key) != null) {BinlogTableRowDiffModel bean = new BinlogTableRowDiffModel();bean.setField(key);bean.setAfter(afterColumns.get(key));bean.setBefore(beforeColumns.get(key));if (updateFields.length == 1 && ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {changeList.add(bean);} else if (ArrayUtils.contains(updateFields, key)) {changeList.add(bean);}}}BinlogDataToDiffModel data = new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));log.info("transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{}",JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));return data;} catch (Exception e) {log.error("handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}", JSON.toJSONString(binlogData), e);}return null;}/*** 删除操作* 不同的表需要从binlogData中获取的信息不同,这里抽取** @return*/private Map<String, String> getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, Map<String, String> beforeColumns) {Map<String, String> keys = new HashMap<>();if (beforeColumns != null && !beforeColumns.isEmpty()) {String[] keyFields = binlogData.getFields();for (String key : beforeColumns.keySet()) {// 找出关心的字段值if (ArrayUtils.contains(keyFields, key)) {keys.put(key, beforeColumns.get(key));}}}return keys;}
}

AbstractBinlogHandler 抽象binloghandler 处理类.

package com.baixiu.middleware.binlog.core;import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import java.util.List;
import java.util.Map;/*** @author baixiu* @date 创建时间 2023/12/12 11:31 AM*/
public interface AbstractBinlogHandler {/*** 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中* @return 监控字段*/String[] getFields();/*** 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中* @return 更新字段*/String[] getUpdateFields();/*** 新增时触发* @param fieldValues 唯一字段,用于确定一条数据* @param changeList 字段的值发生变化的* @throws Exception 业务exception*/void insert(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 数据修改时触发* @param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值* @param changeList  字段的值发生变化的* @throws Exception 业务exception*/void update(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 删除时触发* @param fieldValues 唯一字段,用于确定一条数据* @throws Exception 业务exception*/void delete(Map<String, String> fieldValues) throws Exception;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/218197.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【答案】2023年国赛信息安全管理与评估第三阶段夺旗挑战CTF(网络安全渗透)

【答案】2023年国赛信息安全管理与评估第三阶段夺旗挑战CTF&#xff08;网络安全渗透&#xff09; 全国职业院校技能大赛高职组信息安全管理与评估 &#xff08;赛项&#xff09; 评分标准 第三阶段 夺旗挑战CTF&#xff08;网络安全渗透&#xff09; *竞赛项目赛题* 本文…

【贝叶斯分析】计算机科学专业博士作业二

1 第一题 1.1 题目 已知变量A和B的取值只能为0或1&#xff0c;A⫫&#x1d469;&#xff0c;且&#x1d45d;(&#x1d434;1)0.65&#xff0c;&#x1d45d;(&#x1d435;1)0.77。C的取值与A和B有关&#xff0c;具体关系如下图所表&#xff1a; ABP(C1|A,B)000.1010.99100…

正则表达式:简化模式匹配的利器

正则表达式&#xff1a;简化模式匹配的利器 一、正则表达式简介1.1 正则表达式介绍1.2 正则表达式使用场景 二、正则表达式语法2.1 正则表达式元字符和特性2.2 正则表达式常用匹配 三、正则表达式实战3.1 常见的正则表达式用法3.2 正则表达式的过滤用法3.3 正则表达式的代码用法…

Fiddler中AutoResponder的简单使用

AutoResponder&#xff0c;自动回复器&#xff0c;用于将 HTTP 请求重定向为指定的返回类型。 这个功能有点像是一个代理转发器&#xff0c;可以将某一请求的响应结果替换成指定的资源&#xff0c;可以是某个页面也可以是某个本地文件 1.使用 打开“Fiddler”&#xff0c;点击…

11.jvm第三方工具使用实践

目录 概述GCEasy官网jvm内存占用情况关键性能指标堆内存与元空间优化 MAT安装MAT相关概念说明内存泄漏与内存溢出shallow heap及retained heapoutgoing references与incoming referencesDominator Tree GCViewerArthas下载安装与启动jdk8jdk 11jdk11自定义boot jarjdk17 常用命…

聊聊Java中的常用类String

String、StringBuffer、StringBuilder 的区别 从可变性分析 String不可变。StringBuffer、StringBuilder都继承自AbstractStringBuilder &#xff0c;两者的底层的数组value并没有使用private和final修饰&#xff0c;所以是可变的。 AbstractStringBuilder 源码如下所示 ab…

fckeditor编辑器在Chrome浏览器下编辑时多出空格解决方法

查看专栏目录 Network 灰鸽宝典专栏主要关注服务器的配置&#xff0c;前后端开发环境的配置&#xff0c;编辑器的配置&#xff0c;网络服务的配置&#xff0c;网络命令的应用与配置&#xff0c;windows常见问题的解决等。 文章目录 结尾语网络的梦想 dedecms网站后台采用fckedi…

「构」向云端 - 我与 2023 亚马逊云科技 re:Invent 大会

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 亚马逊云科技开发者社区, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 2023年亚马逊AWS re:Invent大会宣布一项Amazon Q的创新项目&#x…

nodejs+vue+微信小程序+python+PHP全国天气可视化分析系统-计算机毕业设计推荐

3.2.1前台用户功能 前台用户可分为未注册用户需求和以注册用户需求。 未注册用户的功能如下&#xff1a; 注册账号&#xff1a;用户填写个人信息&#xff0c;并验证手机号码。 浏览天气资讯&#xff1a;用户可以浏览天气资讯信息详情。 已注册用户的功能如下&#xff1a; 登录&…

Ajax详解

目录 服务器与Ajax 1.初识Ajax Asynchronous JavaScript and XML&#xff08;异步的 JavaScript 和 XML&#xff09;。 2.Ajax可以做什么&#xff1f; 3.Ajax基础知识铺垫 4.前端相关的技术点&#xff1a; 5.客户端与服务器 6.客户端 浏览器、app、应用软件 7.服务器…

图论——二分图

图论——二分图 二分图通俗解释 有一个图&#xff0c;将顶点分成两类&#xff0c;边只存在不同类顶点之间&#xff0c;同类顶点之间设有边。称图 G 为二部图&#xff0c;或称二分图&#xff0c;也称欧图。 性质 二分图不含有奇数环图中没有奇数环&#xff0c;一定可以转换为二…

SpringBoot中日志的使用log4j2

SpringBoot中日志的使用log4j2 1、log4j2介绍 Apache Log4j2 是对 Log4j 的升级&#xff0c;它比其前身 Log4j 1.x 提供了重大改进&#xff0c;并提供了 Logback 中可用的许多改 进&#xff0c;同时修复了 Logback 架构中的一些问题&#xff0c;主要有&#xff1a; 异常处理…

Docker Swarm编排:构建简单集群

Docker Swarm 是 Docker 官方提供的容器编排工具&#xff0c;通过它可以轻松构建和管理多个 Docker 容器的集群。本文将深入探讨 Docker Swarm 的基础概念、构建集群的步骤&#xff0c;并提供更为丰富和实际的示例代码&#xff0c;帮助大家全面了解如何使用 Docker Swarm 搭建一…

解决Chrome同一账号在不同设备无法自动同步书签的问题

文章目录 一、问题与原因&#xff1f;2. 解决办法 一、问题与原因&#xff1f; 1.问题 使用谷歌Chrome浏览器比较头疼的问题就是&#xff1a;使用同一个Google账号&#xff0c;办公电脑与家用电脑的数据无法同步。比如&#xff1a;办公电脑中的书签、浏览记录等数据&#xff0…

【深度学习目标检测】六、基于深度学习的路标识别(python,目标检测,yolov8)

YOLOv8是一种物体检测算法&#xff0c;是YOLO系列算法的最新版本。 YOLO&#xff08;You Only Look Once&#xff09;是一种实时物体检测算法&#xff0c;其优势在于快速且准确的检测结果。YOLOv8在之前的版本基础上进行了一系列改进和优化&#xff0c;提高了检测速度和准确性。…

网络互通--三层交换机配置

目录 一、三层交换机的原理 1、概念 2、PC A与不同网段的PC B第一次数据转发过程 3、一次路由&#xff0c;多次转发的概念 4、 三层交换机和路由器的比较 二、利用实验理解交换机 1、建立以下拓扑图​编辑 2、分别配置主机的IP地址&#xff0c;子网掩码、网关等信息 3、…

[每周一更]-(第27期):HTTP压测工具之wrk

[补充完善往期内容] wrk是一款简单的HTTP压测工具,托管在Github上,https://github.com/wg/wrkwrk 的一个很好的特性就是能用很少的线程压出很大的并发量. 原因是它使用了一些操作系统特定的高性能 io 机制, 比如 select, epoll, kqueue 等. 其实它是复用了 redis 的 ae 异步事…

Axure的安装及界面基本功能介绍

目录 一. Axure概述 二. Axure安装 2.1 安装包下载 2.2 安装步骤 三. Axure功能介绍​ 3.1 工具栏介绍 3.1.1 复制&#xff0c;剪切及粘贴 3.1.2 选择模式和连接 3.1.3 插入形状 3.1.4 点&#xff08;编辑控点&#xff09; 3.1.5 置顶和置底 3.1.6 组合和取消组合 …

(1)(1.8) MSP(MultiWii 串行协议)(4.1 版)

文章目录 前言 1 协议概述 2 配置 3 参数说明 前言 ArduPilot 支持 MSP 协议&#xff0c;可通过任何串行端口进行遥测和传感器。这允许 ArduPilot 将其遥测数据发送到 MSP 兼容设备&#xff08;如大疆护目镜&#xff09;&#xff0c;用于屏幕显示&#xff08;OSD&#xff…

2021年数维杯国际大学生数学建模A题新冠肺炎背景下港口资源优化配置策略求解全过程文档及程序

2021年数维杯国际大学生数学建模 A题 新冠肺炎背景下港口资源优化配置策略 原题再现&#xff1a; 2020年初&#xff0c;新型冠状病毒&#xff08;COVID-19&#xff09;在全球迅速蔓延。根据世界卫生组织2021年7月31日的报告&#xff0c;新冠病毒疫情对人类的影响可能比原先预…