🎯 本文介绍了一个高效处理高并发场馆预订请求的系统设计方案。通过使用Redis缓存和位图技术,系统能够快速管理场地的可用性和预订状态。采用Lua脚本确保操作的原子性,结合责任链模式进行参数校验,并通过事务保证数据一致性。系统还实现了订单生成、延时关闭订单等功能,确保资源的公平分配和高效利用。整体设计旨在提供稳定、高性能的预订接口,满足高并发场景下的用户需求。
文章目录
- 简介
- 思路
- 数据表设计
- 场馆服务
- Controller
- Service
- 时间段预定
- 缓存回滚
- 库存回滚
- Mapper
- 订单服务
- Controller
- Service
- 复合分片
- MQ
简介
在场馆预订平台的设计中,通常会将一个大型场馆细分为多个功能区,例如篮球区、羽毛球区、乒乓球区等。每个功能区内又包含若干个独立的场地或球台,用户可以根据自己的需求选择特定的区域和设施进行预定。当预定时间窗口开启时,用户通过平台预订场地,并指定使用的时间段——比如9:00-10:00、10:00-11:00等一小时的时段。
特别地,在一些热门场馆或者资源有限的情况下(例如学校内,众多学生争相预订为数不多的运动场地),高并发的预订请求可能会同时涌入系统。为了应对这种场景,平台需要具备高效的处理能力,确保即使在高峰期也能提供流畅的用户体验。因此,实现一个稳定且高性能的时间段预订接口至关重要,它不仅能够准确管理场地的可用性,还能公平有效地分配资源给每一位有需要的用户。
思路
采用传统秒杀思路,使用 Redis缓存 存储具体的剩余库存,使用 Redis位图 来存储场号预订情况,时间段数据提前放入缓存中
- 接口限流
- 接口幂等性
- 验证提交参数
- 参数非空判断
- 预订的时间段id是否正确
- 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
- 是否已经过了时间段时间
- 同一用户只能抢一次同一时间段
- 执行 lua 脚本
- 库存是否足够
- 分配空闲场号(使用bitmap存储)
- 开启事务
- 数据库扣减所预定时间段的库存、空闲场号信息
- 调用订单远程服务
- 生成订单
- 订单号生成,基因算法生成分片键
- 发送延迟消息,超时未支付关闭订单
- 监听延时消息关掉订单
- Binlog监听订单状态改变,使用RokcetMQ消费消息,恢复数据库和缓存的库存、空闲场号
- 若订单远程服务调用失败,恢复缓存的库存
- 生成订单
- 关闭事务
数据表设计
DROP TABLE IF EXISTS `time_period`;
CREATE TABLE `time_period`( `id` bigint NOT NULL COMMENT 'ID',`create_time` datetime,`update_time` datetime,`is_deleted` tinyint default 0 COMMENT '逻辑删除 0:没删除 1:已删除',`partition_id` bigint NOT NULL COMMENT '场区id',`price` decimal(10,2) NOT NULL COMMENT '该时间段预订使用价格(元)',`stock` int NOT NULL COMMENT '库存',`booked_slots` bigint unsigned NOT NULL DEFAULT 0 COMMENT '已预订的场地(位图表示)',`period_date` date NOT NULL COMMENT '预定日期', `begin_time` time NOT NULL COMMENT '时间段开始时间HH:mm(不用填日期)',`end_time` time NOT NULL COMMENT '时间段结束时间HH:mm(不用填日期)', PRIMARY KEY (`id`) USING BTREE,INDEX `idx_partition_id` (`partition_id`)
);
这里使用位图来存储一个分区场号的预定情况,是如何存储的呢?很简单的理解是,一个字节有8个比特,每个比特存储一个 0 或 1 ,那么一个字节其实可以存储 8 个场是否被预订的状态,0表示空闲,1表示预定。在java中一个bigint可以存储64个场的空闲状态,一个int可以存储32个场的空闲状态,一个tinyint可以存储8个场的空闲状态。
使用这种方式存储有哪些优点?
- 节省存储空间:每个场地只需要1个二进制位进行存储,可以显著减少存储空间占用
- 效率高:位图支持高效的位运算,可以快速查询和更新场地的预订状态
- 适合高并发:位图的位操作是原子性的,适合高并发场景下的预订和释放操作
在数据库中,常用的位运算操作如下:
- 预订场地:假设要预订索引为2的场地。
UPDATE bookings
SET BookedSlots = BookedSlots | (1 << 2)
WHERE BookingID = 1;
- 取消预订:假设要取消预订索引为2的场地。
UPDATE bookings
SET BookedSlots = BookedSlots & ~(1 << 2)
WHERE BookingID = 1;
- 检查场地是否已预订:假设要检查索引为2的场地是否已预订。
SELECT (BookedSlots & (1 << 2)) > 0 AS IsBooked
FROM bookings
WHERE BookingID = 1;
场馆服务
Controller
在这里使用了@Idempotent
注解来实现接口的幂等性,实现方式可以参考文章:https://hellodam.blog.csdn.net/article/details/137435495
/*** 预定时间段*/
@GetMapping("/v1/reserve")
@Idempotent(uniqueKeyPrefix = "vrs-venue:lock_reserve:",// 让用户同时最多只能预定一个时间段,根据用户名来加锁// key = "T(com.vrs.common.context.UserContext).getUsername()",// 让用户同时最多只能预定该时间段一次,但是可以同时预定其他时间段,根据用户名+时间段ID来加锁key = "T(com.vrs.common.context.UserContext).getUsername()+'_'+#timePeriodId",message = "正在执行场馆预定流程,请勿重复预定...",scene = IdempotentSceneEnum.RESTAPI
)
@Operation(summary = "预定时间段")
public Result reserve(@RequestParam("timePeriodId") Long timePeriodId) {OrderDO orderDO = timePeriodService.reserve(timePeriodId);return Results.success(orderDO);
}
Service
时间段预定
首先使用了一个责任链模式并结合来进行参数校验,需要的校验如下:
- 用户预定的时间段是否存在
- 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
- 是否已经过了时间段时间
其实这里的责任链模式用的不是特别好,因为检验过程中,数据具有连续性。即在检验用户预定时间段是否为空时,如果不为空,需要把时间段的相关属性带到预定时间校验中,所以需要进行参数的传递。
执行 lua 脚本获取预定场号,这里要做几件事,使用 lua 脚本可以保证几件事情的原子性
- 校验用户是否已经预定过当前时间段
- 如果用户是第一次预定,检验库存是否大于0
- 如果库存大于0,分配场地
-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]-- 用户ID
local user_id = ARGV[1]
-- 过期时间 (秒)
local expire_time = tonumber(ARGV[2])-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 1 then-- 用户已经购买过,返回 -2 表示失败return -2
end-- 如果用户没有购买过,添加用户到set中
redis.call("SADD", set_name, user_id)
-- 设置过期时间
if expire_time > 0 thenredis.call("EXPIRE", set_name, expire_time)
end-- 获取库存
local current_inventory = tonumber(redis.call('GET', stock_key) or 0)-- 尝试消耗库存
if current_inventory < 1 then-- 库存不够了,返回-1,代表分配空场号失败return -1 -- 失败
end-- 查找第一个空闲的场地(位图中第一个为 0 的位)
local free_court_bit = redis.call("BITPOS", free_index_bitmap_key, 0)if not free_court_bit or free_court_bit == -1 then-- 没有空闲的场号return -1 -- 失败
end-- 占用该场地(将对应位设置为 1)
redis.call("SETBIT", free_index_bitmap_key, free_court_bit, 1)-- 更新库存
redis.call('DECRBY', stock_key, 1)-- 返回分配的场地索引(注意:位图的位索引从0开始,如果你需要从1开始,这里加1)
return tonumber(free_court_bit)
场地分配成功之后,扣减数据库库存、更新场地预定情况,并创建订单。由于创建订单需要调用远程服务,需要保证业务一致性。这里使用@Transactional
注解,当创建订单失败时,恢复缓存,并不提交数据库的修改。但是这里的实现仍然存在一个问题:如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办?这里提出两种解决方案
方案一:在扣减库存成功之后,发送一个消息到消息队列,通知订单服务创建订单,如果消息发送失败,回滚库存。
【优点】
- 库存扣减和订单创建是异步的,给订单服务发送消息之后,即可返回,预订接口吞吐量更高。
【缺点】
- 使用这种方式,库存扣减和订单创建是异步的,无法在接口调用结束时,直接返回订单信息给前端。在订单创建成功之后,需要使用双向通讯技术(如Websocket)通知前端订单创建完成,并给前端发生订单数据。
方案二:使用延时消息兜底,间隔几秒后,去查询订单是否创建成功,如果创建成功,到缓存中检查一下用户是否真的预订了该时间段,如果没有预定该时间段,说明库存发生了回滚,系统将订单进行删除
【优点】
- 通过延时消息和后续的检查机制,能够确保订单和库存的最终一致性。即使因为网络超时导致库存回滚,系统也能通过后续的检查发现不一致并修复。
【缺点】
- 数据一致性有延迟,如果订单创建成功但库存回滚,用户可能会短暂看到订单创建成功,但后续订单被删除,这可能会让用户疑惑。
- 库存扣减和订单创建是同步的,预订接口吞吐量较低。
在这里先提出这个问题,后续在时间段预定V2中使用方案一进行解决
这里还存在一个问题,假如说lua脚本执行完成,缓存中的库存已经扣减,结果突然服务器宕机了,没有执行后续的数据库库存扣减和创建订单流程,那相当于有一个场被占用了,但实际上无人使用,这个问题也会在时间段预定V2中解决
/*** 传统秒杀架构,使用缓存存储具体的剩余库存,使用 位图 来存储空闲场号* 下单时首先尝试扣减库存和分配空闲场号,如果可以扣减成功,再执行下单等逻辑* 存在问题:* 1、如果缓存扣减成功之后,应用宕机了,没有执行数据库库存扣减和生成订单逻辑,那就会出现缓存、数据库不一致的情况。应用重启之后,需要重新同步缓存和数据库,需要人力管理* 2、使用同步扣减数据库库存和下单,接口吞吐量不够高** @param timePeriodId* @return*/
@Override
// 子方法声明了Transactional,父方法也需要声明,不要会失效
@Transactional(rollbackFor = Throwable.class)
public OrderDO reserve(Long timePeriodId) { 参数校验// 使用责任链模式校验数据是否正确//todo 后面对于不对外开放的场馆,还需要校验用户是否属于该机构用户TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO();Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO(); 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次String luaScriptPath = "lua/free_court_index_allocate_by_bitmap.lua";DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));redisScript.setResultType(Long.class);return redisScript;});Long freeCourtIndex = stringRedisTemplate.execute(luaScript,Lists.newArrayList(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodReserveReqDTO.getTimePeriodId()),String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodReserveReqDTO.getTimePeriodId()),String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodReserveReqDTO.getTimePeriodId())),UserContext.getUserId().toString(),String.valueOf(venueDO.getAdvanceBookingDay() * 86400));if (freeCourtIndex == -2) {// --if-- 用户已经购买过该时间段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1) {// --if-- 没有空闲的场号throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 修改数据库中时间段的库存和已经选定的场号,并生成订单// todo 为了保证事务原子性,将修改数据库库存操作和创建订单放在了一次,而且是同步执行,如果想要接口吞吐量更高,这里肯定是需要优化成异步的return this.executePreserve(timePeriodDO,freeCourtIndex,venueId);
}/*** 执行下单和数据库库存扣减操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
// 抛出任何异常,回退库存
@Transactional(rollbackFor = Throwable.class)
public OrderDO executePreserve(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId) {// 扣减当前时间段的库存,修改空闲场信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 调用远程服务创建订单OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 订单生成服务调用失败// 恢复缓存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(), UserContext.getUserId(), courtIndex);// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办,如何将订单置为废弃状态// 打印错误堆栈信息e.printStackTrace();// 把错误返回到前端throw new ServiceException(e.getMessage());}return result.getData();
}
缓存回滚
/*** 库存、空闲场号、已购买用户缓存回退*/
@Override
public void restoreStockAndBookedSlotsCache(Long timePeriodId, Long userId, Long courtIndex) { 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次String luaScriptPath = "lua/free_court_index_release_by_bitmap.lua";DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));redisScript.setResultType(Long.class);return redisScript;});Long status = stringRedisTemplate.execute(luaScript,Lists.newArrayList(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId),String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodId)),userId.toString(),courtIndex.toString());if (status == -3) {// --if-- 该场号本身就是空闲的,无需释放库存(说明库存已经被释放过了,这不要抛异常出去,否则库存释放方法会反复失败)} else if (status == -2) {// --if-- 用户没有购买该时间段throw new ServiceException(BaseErrorCode.TIME_PERIOD_HAVE_NOT_BOUGHT_ERROR);} else if (status == -1) {// --if-- 场号不合法throw new ServiceException(BaseErrorCode.TIME_PERIOD_FREE_COURT_INDEX_ERROR);}}
在进行缓存回滚的时候,也需要使用 lua 脚本,保证用户去重表删除、库存回滚、预订场号回滚操作的原子性
-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]-- 用户ID
local user_id = ARGV[1]
-- 场地索引
local court_index = tonumber(ARGV[2])-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 0 then-- 用户没有购买过,返回 -2 表示失败return -2
end-- 检查场地索引是否有效
if not court_index or court_index < 0 then-- 无效的场地索引,返回 -1 表示失败return -1
end-- 检查场号是否本来就是处于空闲状态
local is_free = redis.call("GETBIT", free_index_bitmap_key, court_index)
if is_free == 0 then-- 场号本身就处于空闲状态,所以无需释放库存,返回 -3 表示错误return -3
end-- 释放场号(将对应位设置为 0)
redis.call("SETBIT", free_index_bitmap_key, court_index, 0)-- 更新库存(增加库存)
redis.call('INCRBY', stock_key, 1)-- 移除用户
redis.call("SREM", set_name, user_id)-- 返回成功
return 0 -- 成功
库存回滚
/*** 库存、空闲场号数据库回退*/
@Override
public void restoreStockAndBookedSlotsDatabase(TimePeriodStockRestoreReqDTO timePeriodStockRestoreReqDTO) {// 恢复数据库中的库存baseMapper.restoreStockAndBookedSlots(timePeriodStockRestoreReqDTO.getTimePeriodId(), timePeriodStockRestoreReqDTO.getPartitionId(), timePeriodStockRestoreReqDTO.getCourtIndex());
}
Mapper
在更新位图的时候,需要使用位运算
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.vrs.mapper.TimePeriodMapper"><update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]></update><update id="restoreStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots & ~(1 << #{partitionIndex}), stock = stock + 1WHERE id = #{timePeriodId} AND partition_id = #{partitionId}]]></update></mapper>
订单服务
Controller
/*** 生成订单*/
@PostMapping("/v1/generateOrder")
@Operation(summary = "生成订单")
public Result<OrderDO> generateOrder(@RequestBody OrderGenerateReqDTO orderGenerateReqDTO) {OrderDO orderDO = orderService.generateOrder(orderGenerateReqDTO);return Results.success(orderDO);
}
Service
为了支持大数据量,本文对订单进行了分表。但是订单需要支持两种查询方式。
- 直接根据订单号查询相应的订单
- 用户需要查询自己的订单列表。
传统的分表方式只有一个分片键,要么用订单号分片,要么用用户id进行分片,两种方式都无法满足上述查询需求,会触发读扩散问题,效率严重下降,本文使用复合分片算法来解决这个问题,即在订单后面冗余用户ID的后六位SnowflakeIdUtil.
nextId
() + String.
valueOf
(orderGenerateReqDTO.getUserId() % 1000000)
- 如果是用户查询自己的订单列表,直接使用用户ID的后六位进行分片定位
- 如果根据订单号查询相应的订单,那么使用订单号的后六位进行分片定位
在创建订单之后,发送一个延时消息,如果十分钟用户还没有成功付款,则取消订单,回滚缓存和数据库的库存,避免有人长期占用资源但是不购买。这里为啥分为closeOrder
和secondCloseOrder
,主要是避免订单支付成功回调期间,订单被超时关闭了,详情请看 https://hellodam.blog.csdn.net/article/details/144942881
基于 Canal 监听 MySQL Binlog 日志恢复库存,可以参考 https://hellodam.blog.csdn.net/article/details/144483823
@Override
public OrderDO generateOrder(OrderGenerateReqDTO orderGenerateReqDTO) {OrderDO orderDO = OrderDO.builder()// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位.orderSn(SnowflakeIdUtil.nextId() + String.valueOf(orderGenerateReqDTO.getUserId() % 1000000)).orderTime(new Date()).venueId(orderGenerateReqDTO.getVenueId()).partitionId(orderGenerateReqDTO.getPartitionId()).courtIndex(orderGenerateReqDTO.getCourtIndex()).timePeriodId(orderGenerateReqDTO.getTimePeriodId()).periodDate(orderGenerateReqDTO.getPeriodDate()).beginTime(orderGenerateReqDTO.getBeginTime()).endTime(orderGenerateReqDTO.getEndTime()).userId(orderGenerateReqDTO.getUserId()).userName(orderGenerateReqDTO.getUserName()).payAmount(orderGenerateReqDTO.getPayAmount()).orderStatus(OrderStatusConstant.UN_PAID).build();int insert = baseMapper.insert(orderDO);if (insert > 0) {// 发送延时消息来关闭未支付的订单orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());}return orderDO;
}@Override
@Transactional(rollbackFor = Throwable.class)
public void closeOrder(String orderSn) {String orderPayLock = stringRedisTemplate.opsForValue().get(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn));if ("0".equals(orderPayLock)) {OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);// --if-- 订单已经被锁定,说明订单正处于支付状态,先不要关闭订单,等等再看看是否支付成功了if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {// --if-- 当前订单还没有支付成功,发一个延时消息,如果等等订单还没有被支付,就关闭订单orderSecondDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());// 将订单支付状态设置为1,拒绝后面的支付调用stringRedisTemplate.opsForValue().set(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn), "1", 5, TimeUnit.MINUTES);}} else {// --if-- 订单不在支付中,直接关闭订单secondCloseOrder(orderSn);}
}@Override
public void secondCloseOrder(String orderSn) {OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {// --if-- 到时间了,订单还没有支付,取消该订单orderDO.setOrderStatus(OrderStatusConstant.CANCEL);// 分片键不能更新orderDO.setVenueId(null);baseMapper.updateByOrderSn(orderDO);if (!isUseBinlog) {// --if-- 如果不启用binlog的话,需要自己手动调用方法来释放库存// 极端情况,如果说远程已经还原了库存,但是因为网络问题,返回了错误,导致订单没有关闭,于是出现了不一致的现象。库存都还原完了,你订单还可以支付Result<OrderDO> result;try {result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder().timePeriodId(orderDO.getTimePeriodId()).partitionId(orderDO.getPartitionId()).courtIndex(orderDO.getCourtIndex()).userId(orderDO.getUserId()).build());} catch (Exception e) {// --if-- 库存恢复远程接口调用失败throw new ServiceException(BaseErrorCode.REMOTE_ERROR);}if (result == null || !result.isSuccess()) {// 因为使用了Transactional,如果这里出现了异常,订单的关闭修改会回退throw new ServiceException("调用远程服务释放时间段数据库库存失败", BaseErrorCode.SERVICE_ERROR);}} else {// --if-- 如果启用binlog的话,会自动监听数据库的订单关闭,然后恢复缓存中的库存}}
}
复合分片
下面定义了数据分片规则,用于将 time_period_order
表的数据水平分割到多个物理表中。具体来说,它指定了 time_period_order
表的真实数据节点为 ds_0
数据源下的 time_period_order_0
到 time_period_order_15
共16个分片表。为了确定数据应该插入哪个分片表,这里采用了复合分表策略,即根据 user_id
和 order_sn
两个分片键来决定。对于分片算法,配置中引用了名为 order_table_gene_mod
的自定义算法,该算法由 com.vrs.algorithm.OrderTableGeneAlgorithm
类实现,通过计算用户ID或订单号的哈希值,并结合分片数量(本例中为16),来确定数据最终存储的具体分片表。
rules:- !SHARDINGtables:time_period_order:# 真实数据节点,比如数据库源以及数据库在数据库中真实存在的actualDataNodes: ds_0.time_period_order_${0..15}# 分表策略tableStrategy:# 复合分表策略(多个分片键)complex:# 用户 ID 和订单号shardingColumns: user_id,order_sn# 搜索 order_table_complex_mod 下方会有分表算法shardingAlgorithmName: order_table_gene_mod# 分片算法shardingAlgorithms:# 订单分表算法order_table_gene_mod:# 通过加载全限定名类实现分片算法,相当于分片逻辑都在 algorithmClassName 对应的类中type: CLASS_BASEDprops:algorithmClassName: com.vrs.algorithm.OrderTableGeneAlgorithm# 分表数量sharding-count: 16# 复合(多分片键)分表策略strategy: complex
下面的类实现了 Apache ShardingSphere 框架中的 ComplexKeysShardingAlgorithm
接口,用于根据订单号和用户ID这两个分片键来决定数据应该存储在哪个分片表中。该算法首先尝试使用用户ID进行哈希分片,如果用户ID不存在或为空,则退而求其次使用订单号作为分片键。它会根据配置文件中指定的分片数量,计算出分片值(用户ID或订单号)对应的哈希值,并据此确定具体的数据分片表名称,从而实现数据的水平分割。
package com.vrs.algorithm;import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingValue;import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;/*** 订单分表基因算法 - 该类用于实现复杂的分片逻辑,特别是当存在多个分片键时。(这里是订单号和用户ID)* 它会根据提供的分片值来决定数据应该存储在哪一个分片表中。** @Author dam* @create 2024/12/6 9:55*/
@Getter
public class OrderTableGeneAlgorithm implements ComplexKeysShardingAlgorithm {private Properties props;/*** 分片的数量,即总共有多少个分片表*/private int shardingCount;/*** 配置文件中的分片数量键名*/private static final String SHARDING_COUNT_KEY = "sharding-count";/*** 根据提供的分片键和分片值来决定将数据分配到哪个分片表中。** @param collection 可能的分片表集合* @param complexKeysShardingValue 包含分片键及其对应的值* @return 返回包含具体分片表名称的集合*/@Overridepublic Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {// 获取分片键与分片值的映射Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();// 初始化结果集,使用 LinkedHashSet 以保持插入顺序Collection<String> result = new LinkedHashSet<>(collection.size());if (CollUtil.isNotEmpty(columnNameAndShardingValuesMap)) {// --if-- 如果有分片键和值,则开始处理String userId = "user_id";// 获取 'user_id' 对应的分片值集合Collection<Comparable<?>> customerUserIdCollection = columnNameAndShardingValuesMap.get(userId);if (CollUtil.isNotEmpty(customerUserIdCollection)) {// --if-- 'user_id' 存在且不为空,则基于 'user_id' 进行分片// 获取第一个分片值Comparable<?> comparable = customerUserIdCollection.stream().findFirst().get();// 取用户ID的后面六位来进行哈希分片String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);} else {// 'user_id' 不存在或为空,尝试使用 'order_sn' 作为分片键String orderSn = "order_sn";Collection<Comparable<?>> orderSnCollection = columnNameAndShardingValuesMap.get(orderSn);Comparable<?> comparable = orderSnCollection.stream().findFirst().get();if (comparable instanceof String) {// --if-- 如果订单号是字符串类型String actualOrderSn = comparable.toString();result.add(complexKeysShardingValue.getLogicTableName() + "_" + hashShardingValue(actualOrderSn.substring(Math.max(actualOrderSn.length() - 6, 0))) % shardingCount);} else {// --if-- 如果订单号是长整型(我们这个系统肯定不是这个)String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);}}}// 返回最终确定的分片表名称集合return result;}/*** 初始化方法,在创建分片算法实例时被调用,用来设置分片参数。** @param props 包含分片配置信息的属性对象*/@Overridepublic void init(Properties props) {this.props = props;shardingCount = getShardingCount(props);}/*** 从配置属性中读取分片数量,如果未找到则抛出异常。** @param props 包含分片配置信息的属性对象* @return 分片数量*/private int getShardingCount(final Properties props) {// 检查是否提供了分片数量,如果没有则抛出异常Preconditions.checkArgument(props.containsKey(SHARDING_COUNT_KEY), "分片数量不可以为空");// 解析并返回分片数量return Integer.parseInt(props.getProperty(SHARDING_COUNT_KEY));}/*** 根据给定的分片值计算哈希值,用于确定具体的分片。** @param shardingValue 分片值* @return 哈希后的分片值*/private long hashShardingValue(final Comparable<?> shardingValue) {// 使用分片值的 hashCode 生成一个绝对值的哈希码return Math.abs((long) shardingValue.hashCode());}
}
MQ
【消息生产者】
package com.vrs.rocketMq.producer;import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 计算数据准备 生产者** @Author dam* @create 2024/9/20 16:00*/
@Slf4j
@Component
public class OrderDelayCloseProducer extends AbstractCommonSendProduceTemplate<OrderDelayCloseMqDTO> {@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(OrderDelayCloseMqDTO messageSendEvent) {return BaseSendExtendDTO.builder().eventName("延时关闭订单").keys(String.valueOf(messageSendEvent.getOrderSn())).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).sentTimeout(2000L)
// .delayTime(10 * 1000L)// 延时十分钟,关闭未支付订单.delayTime(3 * 60 * 1000L).build();}@Overrideprotected Message<?> buildMessage(OrderDelayCloseMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, messageSendEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}
【消息消费者】
package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.OrderService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.ORDER_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.ORDER_TOPIC,consumerGroup = RocketMqConstant.ORDER_CONSUMER_GROUP + "-" + RocketMqConstant.ORDER_DELAY_CLOSE_TAG,messageModel = MessageModel.CLUSTERING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.ORDER_DELAY_CLOSE_TAG
)
@RequiredArgsConstructor
public class OrderDelayCloseListener implements RocketMQListener<MessageWrapper<OrderDelayCloseMqDTO>> {private final OrderService orderService;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "order_delay_close:",key = "#messageWrapper.getMessage().getOrderSn()",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<OrderDelayCloseMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] 关闭订单:{}", messageWrapper.getMessage().getOrderSn());String orderSn = messageWrapper.getMessage().getOrderSn();orderService.closeOrder(orderSn);}
}