🎯 本文介绍了一种使用Canal监听MySQL Binlog实现数据库与缓存最终一致性的方案。文章首先讲解了如何修改Canal配置以适应订单表和时间段表的变化,然后详细描述了通过责任链模式优化消息处理逻辑的方法,确保能够灵活应对不同数据表的更新需求。最后,展示了如何利用RocketMQ消费Canal消息并通过责任链处理器同步更新缓存,从而保证数据的一致性。此方法有效提升了系统的可扩展性和维护效率。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)
文章目录
- 前言
- 修改Canal配置
- 修改 Canal 消息消费逻辑
- 识别修改哪个数据表
- 如何实现根据表执行相应操作
- 责任链模式框架
- 常量
- 具体处理器
- MQ 消费者调用责任链
前言
在上一篇文章中,使用令牌限流方式来实现时间段的预定。时间段令牌和时间段库存缓存是分离的,因此需要额外对库存缓存进行更新,如何实现数据库与缓存的数据一致性是一个常见问题。本文使用 Canal 监听 Binlog 实现数据库、缓存的库存最终一致性。为什么使用这种方案?不了解的读者可以先阅读文章:https://zhuanlan.zhihu.com/p/408515044
不了解 MySQL Binlog 开启和 Canal 安装与配置的朋友请先阅读小白手把手教程:https://hellodam.blog.csdn.net/article/details/144483823
修改Canal配置
修改instance.properties
的过滤规则为canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1])|time_period_([0-9]|1[0-5]))$
。现在不仅需要考虑订单表time_period_order
,还要考虑时间段表time_period
,因为现在要保证时间段库存和空闲场号的数据库、缓存一致性
修改 Canal 消息消费逻辑
之前的实现如下
import cn.hutool.core.util.ObjectUtil;
import com.vrs.annotation.Idempotent;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {private final TimePeriodService timePeriodService;/*** 消费消息的方法* 方法报错就会拒收消息** @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "canal_binlog_common:",key = "#canalBinlogDTO.getId()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(CanalBinlogDTO canalBinlogDTO) {if (canalBinlogDTO.getOld() == null) {return;}Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {log.info("[消费者] 消费canal的消息,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));Long userId = Long.parseLong(alterDataMap.get("user_id").toString());Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Long courtIndex;if (alterDataMap.containsKey("partition_index")) {courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());} else {courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());}Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {// 恢复库存timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder().userId(userId).courtIndex(courtIndex).timePeriodId(timePeriodId).partitionId(partitionId).build());}}}
}
因为之前只需要处理订单表即可,现在还需要处理时间段表的更改。所以需要做两件事:
- 识别是哪个数据表更新了
- 根据所识别的表,执行相应的业务逻辑
识别修改哪个数据表
首先需要了解 Canal 发送的消息内容格式,从下图可以看到有数据库名,但没有表名。因此为了识别数据表,只能通过表的独有字段名来识别了
如何实现根据表执行相应操作
一种方式是,直接在onMessage
方法中,识别完数据表类型之后,调用相应的方法来处理。这种方式实现简单,但后续如果要处理新的表,需要修改代码,违反了开闭原则。
为了解决这个问题,本文使用责任链模式,即封装多个处理器到责任链上,每个处理器负责识别一个表,并进行相应的业务逻辑。后续使用时就依次调用链上的处理器,如果处理器发现是自己的职责,就执行逻辑,否则直接返回,调用下一个处理器。
责任链模式框架
【抽象处理器】
package com.vrs.chain_of_responsibility;/*** @Author dam* @create 2024/12/11 19:18*/
public interface AbstractChainHandler<T> {/*** 由实现类来实现具体的处理方法*/boolean handle(T param);/*** 名称,用来区分不同的责任链*/String name();/*** 处理器的排序*/int order();
}
handler
:由具体的处理器来实现,用来实现业务逻辑name
:用来标识责任链,返回相同名字的处理器被归到一个责任链中order
:用来给同一责任链的处理器排序
【责任链上下文】
该类用来管理不同的责任链。
- 当Spring启动时,执行
run
方法,通过获取AbstractChainHandler
的实现类来初始化所有责任链,即将处理器按照name
划分到不同的责任链中,后面可以通过容器chainContainer
来获取。最后对同一链上的处理器按照sort
升序排序。 - 当调用
handler
方法时,会根据name
获取责任链,然后依次调用链上的处理器来进行业务处理,若有任意处理器的handle
方法返回 true ,责任链就会中断。如果想要依次执行所有处理器,那所有处理器都返回 false 即可
package com.vrs.chain_of_responsibility;import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.*;/*** @Author dam* @create 2024/12/11 19:20*/
@Component
public class ChainContext<T> implements ApplicationContextAware, CommandLineRunner {/*** 通过 Spring IOC 获取 Bean 实例*/private ApplicationContext applicationContext;/*** key:责任链名称* value:责任链*/private final Map<String, List<AbstractChainHandler>> chainContainer = new HashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}@Overridepublic void run(String... args) {// 从 Spring IOC 容器中获取接口对应的 Spring Bean 集合Map<String, AbstractChainHandler> chainFilterMap = applicationContext.getBeansOfType(AbstractChainHandler.class);chainFilterMap.forEach((beanName, bean) -> {// 判断 name 是否已经存在抽象责任链容器中// 如果已经存在直接向集合新增// 如果不存在,创建对应的集合List<AbstractChainHandler> abstractChainHandlers = chainContainer.getOrDefault(bean.name(), new ArrayList<>());abstractChainHandlers.add(bean);chainContainer.put(bean.name(), abstractChainHandlers);});chainContainer.forEach((mark, unsortedChainHandlers) -> {// 对每个责任链的实现类根据order升序排序Collections.sort(unsortedChainHandlers, ((o1, o2) -> {return Integer.compare(o1.order(), o2.order());}));});}/*** 责任链组件执行** @param name 责任链组件标识* @param requestParam 请求参数*/public void handler(String name, T requestParam) {// 根据 name 从责任链容器中获取对应的责任链List<AbstractChainHandler> abstractChainHandlers = chainContainer.get(name);if (CollectionUtils.isEmpty(abstractChainHandlers)) {throw new RuntimeException(name + "对应的责任链不存在");}// 遍历责任链处理器for (AbstractChainHandler handler : abstractChainHandlers) {if (handler.handle(requestParam)) {// --if-- 如果处理器返回 true,表示已经处理完成,退出责任链return;}}}
}
常量
package com.vrs.constant;/*** Redis缓存Key常量类*/
public class ChainConstant {public static final String RESERVE_CHAIN_NAME = "reserve_chain";public static final String CANAL_CHAIN_NAME = "canal_chain";}
具体处理器
由于修改的要么是订单表,要么是时间段表,因此责任链上面只要有任一处理器成功处理,即返回 true ,就无须调用余下的其他处理器
【时间段库存修改处理器】
当数据库中的库存修改之后,同步修改缓存中的库存
package com.vrs.service.chainHander.canal;import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.service.PartitionService;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.util.Map;/*** 订单超时关闭处理逻辑** @Author dam* @create 2024/12/11 19:43*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TimePeriodStockChangeHandler implements AbstractChainHandler<CanalBinlogDTO> {private final StringRedisTemplate redisTemplate;private final TimePeriodService timePeriodService;private final PartitionService partitionService;@Overridepublic boolean handle(CanalBinlogDTO canalBinlogDTO) {Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("stock")) {// --if-- 如果是修改操作,且修改了stocklog.info("[消费者] 消费canal的消息,时间段库存修改,同步修改缓存的库存,时间段ID:{}", alterDataMap.get("id"));Long timePeriodId = Long.parseLong(alterDataMap.get("id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Integer stock = Integer.parseInt(alterDataMap.get("stock").toString());Long bookedSlots = Long.parseLong(alterDataMap.get("booked_slots").toString());// 更新库存redisTemplate.opsForValue().set(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId), stock.toString());// 更新位图timePeriodService.initializeFreeIndexBitmap(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),partitionService.getPartitionDOById(partitionId).getNum(),bookedSlots,24 * 3600);return true;}return false;}@Overridepublic String name() {return ChainConstant.CANAL_CHAIN_NAME;}@Overridepublic int order() {return 10;}
}
【订单超时关闭处理器】
package com.vrs.service.chainHander.canal;import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.service.TimePeriodService;
import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;import java.util.Map;/*** 订单超时关闭处理逻辑** @Author dam* @create 2024/12/11 19:43*/
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseHandler implements AbstractChainHandler<CanalBinlogDTO> {private final TimePeriodService timePeriodService;@Overridepublic boolean handle(CanalBinlogDTO canalBinlogDTO) {Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {log.info("[消费者] 消费canal的消息,订单超时关闭,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));Long userId = Long.parseLong(alterDataMap.get("user_id").toString());Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());Long courtIndex;if (alterDataMap.containsKey("partition_index")) {courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());} else {courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());}Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {// 恢复库存timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder().userId(userId).courtIndex(courtIndex).timePeriodId(timePeriodId).partitionId(partitionId).build());}return true;}return false;}@Overridepublic String name() {return ChainConstant.CANAL_CHAIN_NAME;}@Overridepublic int order() {return 0;}
}
MQ 消费者调用责任链
使用非常简单,直接调用chainContext.handler(ChainConstant.
CANAL_CHAIN_NAME
, canalBinlogDTO);
即可
package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.chain_of_responsibility.ChainContext;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.enums.IdempotentSceneEnum;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {private final ChainContext chainContext;/*** 消费消息的方法* 方法报错就会拒收消息** @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "canal_binlog_common:",key = "#canalBinlogDTO.getId()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(CanalBinlogDTO canalBinlogDTO) {if (canalBinlogDTO.getOld() == null) {// --if-- 如果不是修改数据,快速退出,因为我们现在的业务逻辑都是识别出数据修改才有下面的操作return;}// 调用责任链来消费 canal 消息chainContext.handler(ChainConstant.CANAL_CHAIN_NAME, canalBinlogDTO);}
}