目录
缓存同步策略
安装Canal
监听Canal
缓存同步策略
缓存数据同步的常见方式有三种:
1.设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
- 优势:简单、方便
- 缺点:时效性差,缓存过期之前可能不一致
- 场景:更新频率较低,时效性要求低的业务
2.同步双写:在修改数据库的同时,直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高;
- 场景:对一致性、时效性要求较高的缓存数据
3.异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
异步通知可以基于MQ或者Canal来实现:
1)基于MQ的异步通知
商品服务完成对数据的修改后,只需要发送一条消息到MQ中,缓存服务监听MQ消息,然后完成对缓存的更新
依然有少量的代码侵入
2)基于Canal的异步通知
商品服务完成商品修改后,业务直接结束,没有任何代码侵入,Canal监听MySQL变化,当发现变化后,立即通知缓存服务,缓存服务接收到canal通知,更新缓存
代码零侵入
安装Canal
Canal [kə'næl],译意为水道/管道/沟渠,Canal是阿里巴巴旗下的一款开源项目,基于Java开发,基于数据库增量日志解析,提供增量数据订阅&消费
GitHub的地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
Canal是基于MySQL的主从同步来实现的,MySQL主从同步的原理如下:
-
MySQL master将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
-
MySQL slave将master的binary log events拷贝到它的中继日志(relay log)
-
MySQL slave重放relay log中事件,将数据变更反映它自己的数据
Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化,再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步
安装步骤:
1.开启MySQL主从
vi /tmp/mysql/conf/my.cnf
添加
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
log-bin=/var/lib/mysql/mysql-bin
:设置binary log文件的存放地址和文件名,叫做mysql-binbinlog-do-db=heima
:指定对哪个database记录binary log events,这里记录heima这个库
设置用户权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
重启mysql容器
docker restart mysql
2.创建网络,将MySQL、Canal、MQ放到同一个Docker网络中
docker network create heima
让mysql加入
docker network connect heima mysql
3.导入canal的镜像
docker load -i canal.tar
4.创建canal容器
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5
-p 11111:11111
:这是canal的默认监听端口-e canal.instance.master.address=mysql:3306
:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id
来查看-e canal.instance.dbUsername=canal
:数据库用户名-e canal.instance.dbPassword=canal
:数据库密码-e canal.instance.filter.regex=
:要监听的表名称
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
监听Canal
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端
GitHub上的第三方开源的canal-starter客户端。地址:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client
引入依赖:
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version>
</dependency>
编写配置:
canal:destination: heima # canal的集群名字,要与安装canal时设置的名称一致server: 192.168.150.101:11111 # canal服务地址
通过@Id、@Column、@Transient注解完成Item与数据库表字段的映射:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;import javax.persistence.Column;
import java.util.Date;@Data
@TableName("tb_item")
public class Item {@TableId(type = IdType.AUTO)@Idprivate Long id;//商品id@Column(name = "name")private String name;//商品名称private String title;//商品标题private Long price;//价格(分)private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer status;//商品状态 1-正常,2-下架private Date createTime;//创建时间private Date updateTime;//更新时间@TableField(exist = false)@Transientprivate Integer stock;@TableField(exist = false)@Transientprivate Integer sold;
}
编写监听器,监听Canal消息:
import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {@Autowiredprivate RedisHandler redisHandler;@Autowiredprivate Cache<Long, Item> itemCache;@Overridepublic void insert(Item item) {// 写数据到JVM进程缓存itemCache.put(item.getId(), item);// 写数据到redisredisHandler.saveItem(item);}@Overridepublic void update(Item before, Item after) {// 写数据到JVM进程缓存itemCache.put(after.getId(), after);// 写数据到redisredisHandler.saveItem(after);}@Overridepublic void delete(Item item) {// 删除数据到JVM进程缓存itemCache.invalidate(item.getId());// 删除数据到redisredisHandler.deleteItemById(item.getId());}
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.List;@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}public void saveItem(Item item) {try {String json = MAPPER.writeValueAsString(item);redisTemplate.opsForValue().set("item:id:" + item.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteItemById(Long id) {redisTemplate.delete("item:id:" + id);}
}