Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法
}
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {//日志记录private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@Resourceprivate RedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/198403.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

08.智慧商城——购物车布局、全选反选、功能实现

01. 购物车 - 静态布局 基本结构 <template><div class"cart"><van-nav-bar title"购物车" fixed /><!-- 购物车开头 --><div class"cart-title"><span class"all">共<i>4</i>件商品…

STM32存储左右互搏 SPI总线FATS文件读写FLASH W25QXX

STM32存储左右互搏 SPI总线FATS文件读写FLASH W25QXX FLASH是常用的一种非易失存储单元&#xff0c;W25QXX系列Flash有不同容量的型号&#xff0c;如W25Q64的容量为64Mbit&#xff0c;也就是8MByte。这里介绍STM32CUBEIDE开发平台HAL库实现FATS文件操作W25Q各型号FLASH的例程。…

好莱坞罢工事件!再次警醒人类重视AI监管,人工智能矛盾一触即发!

原创 | 文 BFT机器人 关注国外新闻的应该都知道&#xff0c;最近焦点新闻是好莱坞史上最大规模的一场罢工运动。这场维持118天的罢工运动&#xff0c;终于在11月9号早上12点在好莱坞宣布结束。这场罢工运动虽是演员工会和代表资方的影视制片人联盟的茅盾&#xff0c;但直接引发…

求二叉树的高度(可运行)

输入二叉树为&#xff1a;ABD##E##C##。 运行环境&#xff1a;main.cpp 运行结果&#xff1a;3 #include "bits/stdc.h" using namespace std; typedef struct BiTNode{char data;struct BiTNode *lchild,*rchild;int tag; }BiTNode,*BiTree;void createTree(BiTre…

【数据结构】详解链表结构

目录 引言一、链表的介绍二、链表的几种分类三、不带头单链表的一些常用接口3.1 动态申请一个节点3.2 尾插数据3.3 头插数据3.4 尾删数据3.5 头删数据3.6 查找数据3.7 pos位置后插入数据3.8 删除pos位置数据3.9 释放空间 四、带头双向链表的常见接口4.1创建头节点&#xff08;初…

有趣的按钮分享

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 广告打完&#xff0c;我们进入正题&#xff0c;先看效果&#xff1a; 废话不多&#xff0c;上源码&#xff1a; <button class&quo…

【LeetCode刷题日志】232.用栈实现队列

&#x1f388;个人主页&#xff1a;库库的里昂 &#x1f390;C/C领域新星创作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏✨收录专栏&#xff1a;LeetCode 刷题日志&#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论区留言指正&#xff0c;…

竞赛 题目:基于机器视觉opencv的手势检测 手势识别 算法 - 深度学习 卷积神经网络 opencv python

文章目录 1 简介2 传统机器视觉的手势检测2.1 轮廓检测法2.2 算法结果2.3 整体代码实现2.3.1 算法流程 3 深度学习方法做手势识别3.1 经典的卷积神经网络3.2 YOLO系列3.3 SSD3.4 实现步骤3.4.1 数据集3.4.2 图像预处理3.4.3 构建卷积神经网络结构3.4.4 实验训练过程及结果 3.5 …

负载均衡简介

负载均衡 负载均衡&#xff08;Load Balance&#xff0c;简称 LB&#xff09;是高并发、高可用系统必不可少的关键组件&#xff0c;目标是 尽力将网络流量平均分发到多个服务器上&#xff0c;以提高系统整体的响应速度和可用性。 负载均衡的分类和OSI模型息息相关&#xff0c…

【广州华锐互动】VR可视化政务服务为公众提供更直观、形象的政策解读

虚拟现实&#xff08;VR&#xff09;技术正在逐渐应用于政务服务领域&#xff0c;为公众提供更加便捷、高效和个性化的服务体验。通过VR眼镜、手机等设备&#xff0c;公众可以在虚拟环境中参观政务服务中心&#xff0c;并根据自己的需求选择不同的办事窗口或事项进行咨询和办理…

03 前后端数据交互【小白入门SpringBoot + Vue3】

项目笔记&#xff0c;教学视频来源于B站青戈 https://www.bilibili.com/video/BV1H14y1S7YV 前两个笔记。是把前端页面大致做出来&#xff0c;接下来&#xff0c;把后端项目搞一下。 后端项目&#xff0c;使用IDEA软件、jdk1.8、springboot2.x 。基本上用的是稳定版。 还有My…

【Linux】vscode远程连接ubuntu失败

VSCode远程连接ubuntu服务器 这部分网上有很多&#xff0c;都烂大街了&#xff0c;自己搜吧。给个参考连接&#xff1a;VSCode远程连接ubuntu服务器 注意&#xff0c;这里我提前设置了免密登录。至于怎么设置远程免密登录&#xff0c;可以看其它帖子&#xff0c;比如这个。 …

51单片机应用

目录 ​编辑 1. C51的数据类型 1.1 C51中的基本数据类型 1.2 特殊功能寄存器类型 2. C51的变量 2.1 存储种类 1. C51的数据类型 C51是一种基于8051架构的单片机&#xff0c;它支持以下基本数据类型&#xff1a; 位&#xff08;Bit&#xff09;&#xff1a;可以表…

WSL 2 更改默认安装的 Linux 发行版

目录 什么是 WSL 2&#xff1f;更改默认安装的 Linux 发行版更改发行版的 WSL 版本 什么是 WSL 2&#xff1f; WSL 2 是适用于 Linux 的 Windows 子系统体系结构的一个新版本&#xff0c;它支持适用于 Linux 的 Windows 子系统在 Windows 上运行 ELF64 Linux 二进制文件。 它的…

单元测试实战(四)MyBatis-Plus 的测试

为鼓励单元测试&#xff0c;特分门别类示例各种组件的测试代码并进行解说&#xff0c;供开发人员参考。 本文中的测试均基于JUnit5。 单元测试实战&#xff08;一&#xff09;Controller 的测试 单元测试实战&#xff08;二&#xff09;Service 的测试 单元测试实战&am…

Flutter 使用 device_info_plus 遇到的问题

问题&#xff1a;引用device_info_plus 插件出现了异常&#xff0c;不知道为啥打开项目的时候就不能用了。 解决&#xff1a;改了版本解决 Target of URI doesnt exist: package:device_info_plus/device_info_plus.dart. (Documentation) Try creating the file reference…

react antd下拉选择框选项内容换行

下拉框选项字太多&#xff0c;默认样式是超出就省略号&#xff0c;需求要换行全展示&#xff0c;选完在选择框里还是要省略的 .less: .aaaDropdown {:global {.ant-select-dropdown-menu-item {white-space: pre-line !important;word-break: break-all !important;}} } html…

uniapp 手动调用form表单submit事件

背景&#xff1a; UI把提交的按钮弄成了图片&#xff0c;之前的button不能用了。 <button form-type"submit">搜索</button> 实现&#xff1a; html&#xff1a; 通过 this.$refs.fd 获取到form的vue对象。手动调用里面的_onSubmit()方法。 methods:…

STM32CubeMX学习笔记-CAN接口使用

STM32CubeMX学习笔记-CAN接口使用 CAN总线传输协议1.CAN 总线传输特点2.位时序和波特率3.帧的种类4.标准格式数据帧和遥控帧从STM32F407参考手册中可以看出主要特性如下CAN模块基本控制函数CAN模块消息发送CAN模块消息接收标识符筛选发送中断的事件源和回调函数 CubeMX项目设置…

OpenAI 地震!首席执行官被解雇,背后的原因是?

11月17日&#xff0c;ChatGPT的制造商OpenAI表示&#xff0c;经过审查后发现联合创始人兼首席执行官 Sam Altman与董事会“沟通时并不一贯坦诚”&#xff0c;因此公司已经决定解雇他。这家人工智能&#xff08;AI&#xff09;公司在一份声明中表示&#xff1a;“董事会不再相信…