场馆预定平台高并发时间段预定实现V1

🎯 本文介绍了一个高效处理高并发场馆预订请求的系统设计方案。通过使用Redis缓存和位图技术,系统能够快速管理场地的可用性和预订状态。采用Lua脚本确保操作的原子性,结合责任链模式进行参数校验,并通过事务保证数据一致性。系统还实现了订单生成、延时关闭订单等功能,确保资源的公平分配和高效利用。整体设计旨在提供稳定、高性能的预订接口,满足高并发场景下的用户需求。

文章目录

  • 简介
  • 思路
  • 数据表设计
  • 场馆服务
    • Controller
    • Service
      • 时间段预定
      • 缓存回滚
      • 库存回滚
    • Mapper
  • 订单服务
    • Controller
    • Service
    • 复合分片
    • MQ

简介

在场馆预订平台的设计中,通常会将一个大型场馆细分为多个功能区,例如篮球区、羽毛球区、乒乓球区等。每个功能区内又包含若干个独立的场地或球台,用户可以根据自己的需求选择特定的区域和设施进行预定。当预定时间窗口开启时,用户通过平台预订场地,并指定使用的时间段——比如9:00-10:00、10:00-11:00等一小时的时段。

特别地,在一些热门场馆或者资源有限的情况下(例如学校内,众多学生争相预订为数不多的运动场地),高并发的预订请求可能会同时涌入系统。为了应对这种场景,平台需要具备高效的处理能力,确保即使在高峰期也能提供流畅的用户体验。因此,实现一个稳定且高性能的时间段预订接口至关重要,它不仅能够准确管理场地的可用性,还能公平有效地分配资源给每一位有需要的用户。

思路

采用传统秒杀思路,使用 Redis缓存 存储具体的剩余库存,使用 Redis位图 来存储场号预订情况,时间段数据提前放入缓存中

  • 接口限流
  • 接口幂等性
  • 验证提交参数
    • 参数非空判断
    • 预订的时间段id是否正确
    • 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
    • 是否已经过了时间段时间
    • 同一用户只能抢一次同一时间段
  • 执行 lua 脚本
    • 库存是否足够
    • 分配空闲场号(使用bitmap存储)
  • 开启事务
    • 数据库扣减所预定时间段的库存、空闲场号信息
    • 调用订单远程服务
      • 生成订单
        • 订单号生成,基因算法生成分片键
      • 发送延迟消息,超时未支付关闭订单
        • 监听延时消息关掉订单
        • Binlog监听订单状态改变,使用RokcetMQ消费消息,恢复数据库和缓存的库存、空闲场号
      • 若订单远程服务调用失败,恢复缓存的库存
  • 关闭事务

数据表设计

DROP TABLE IF EXISTS `time_period`;
CREATE TABLE `time_period`( `id` bigint NOT NULL COMMENT 'ID',`create_time` datetime,`update_time` datetime,`is_deleted` tinyint default 0 COMMENT '逻辑删除 0:没删除 1:已删除',`partition_id` bigint NOT NULL COMMENT '场区id',`price` decimal(10,2) NOT NULL COMMENT '该时间段预订使用价格(元)',`stock` int NOT NULL COMMENT '库存',`booked_slots` bigint unsigned NOT NULL DEFAULT 0 COMMENT '已预订的场地(位图表示)',`period_date` date NOT NULL COMMENT '预定日期', `begin_time` time NOT NULL COMMENT '时间段开始时间HH:mm(不用填日期)',`end_time` time NOT NULL COMMENT '时间段结束时间HH:mm(不用填日期)', PRIMARY KEY (`id`) USING BTREE,INDEX `idx_partition_id` (`partition_id`)
);

这里使用位图来存储一个分区场号的预定情况,是如何存储的呢?很简单的理解是,一个字节有8个比特,每个比特存储一个 0 或 1 ,那么一个字节其实可以存储 8 个场是否被预订的状态,0表示空闲,1表示预定。在java中一个bigint可以存储64个场的空闲状态,一个int可以存储32个场的空闲状态,一个tinyint可以存储8个场的空闲状态。

使用这种方式存储有哪些优点?

  • 节省存储空间:每个场地只需要1个二进制位进行存储,可以显著减少存储空间占用
  • 效率高:位图支持高效的位运算,可以快速查询和更新场地的预订状态
  • 适合高并发:位图的位操作是原子性的,适合高并发场景下的预订和释放操作

在数据库中,常用的位运算操作如下:

  • 预订场地:假设要预订索引为2的场地。
UPDATE bookings 
SET BookedSlots = BookedSlots | (1 << 2) 
WHERE BookingID = 1;
  • 取消预订:假设要取消预订索引为2的场地。
UPDATE bookings 
SET BookedSlots = BookedSlots & ~(1 << 2) 
WHERE BookingID = 1;
  • 检查场地是否已预订:假设要检查索引为2的场地是否已预订。
SELECT (BookedSlots & (1 << 2)) > 0 AS IsBooked 
FROM bookings 
WHERE BookingID = 1;

场馆服务

Controller

在这里使用了@Idempotent注解来实现接口的幂等性,实现方式可以参考文章:https://hellodam.blog.csdn.net/article/details/137435495

/*** 预定时间段*/
@GetMapping("/v1/reserve")
@Idempotent(uniqueKeyPrefix = "vrs-venue:lock_reserve:",// 让用户同时最多只能预定一个时间段,根据用户名来加锁// key = "T(com.vrs.common.context.UserContext).getUsername()",// 让用户同时最多只能预定该时间段一次,但是可以同时预定其他时间段,根据用户名+时间段ID来加锁key = "T(com.vrs.common.context.UserContext).getUsername()+'_'+#timePeriodId",message = "正在执行场馆预定流程,请勿重复预定...",scene = IdempotentSceneEnum.RESTAPI
)
@Operation(summary = "预定时间段")
public Result reserve(@RequestParam("timePeriodId") Long timePeriodId) {OrderDO orderDO = timePeriodService.reserve(timePeriodId);return Results.success(orderDO);
}

Service

时间段预定

首先使用了一个责任链模式并结合来进行参数校验,需要的校验如下:

  • 用户预定的时间段是否存在
  • 是否已经到达预订时间(增加时间缓冲,提前1秒开放预订,避免时间同步问题)
  • 是否已经过了时间段时间

其实这里的责任链模式用的不是特别好,因为检验过程中,数据具有连续性。即在检验用户预定时间段是否为空时,如果不为空,需要把时间段的相关属性带到预定时间校验中,所以需要进行参数的传递。

执行 lua 脚本获取预定场号,这里要做几件事,使用 lua 脚本可以保证几件事情的原子性

  • 校验用户是否已经预定过当前时间段
  • 如果用户是第一次预定,检验库存是否大于0
  • 如果库存大于0,分配场地
-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]-- 用户ID
local user_id = ARGV[1]
-- 过期时间 (秒)
local expire_time = tonumber(ARGV[2])-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 1 then-- 用户已经购买过,返回 -2 表示失败return -2
end-- 如果用户没有购买过,添加用户到set中
redis.call("SADD", set_name, user_id)
-- 设置过期时间
if expire_time > 0 thenredis.call("EXPIRE", set_name, expire_time)
end-- 获取库存
local current_inventory = tonumber(redis.call('GET', stock_key) or 0)-- 尝试消耗库存
if current_inventory < 1 then-- 库存不够了,返回-1,代表分配空场号失败return -1 -- 失败
end-- 查找第一个空闲的场地(位图中第一个为 0 的位)
local free_court_bit = redis.call("BITPOS", free_index_bitmap_key, 0)if not free_court_bit or free_court_bit == -1 then-- 没有空闲的场号return -1 -- 失败
end-- 占用该场地(将对应位设置为 1)
redis.call("SETBIT", free_index_bitmap_key, free_court_bit, 1)-- 更新库存
redis.call('DECRBY', stock_key, 1)-- 返回分配的场地索引(注意:位图的位索引从0开始,如果你需要从1开始,这里加1)
return tonumber(free_court_bit)

场地分配成功之后,扣减数据库库存、更新场地预定情况,并创建订单。由于创建订单需要调用远程服务,需要保证业务一致性。这里使用@Transactional注解,当创建订单失败时,恢复缓存,并不提交数据库的修改。但是这里的实现仍然存在一个问题:如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办?这里提出两种解决方案

方案一:在扣减库存成功之后,发送一个消息到消息队列,通知订单服务创建订单,如果消息发送失败,回滚库存。

【优点】

  • 库存扣减和订单创建是异步的,给订单服务发送消息之后,即可返回,预订接口吞吐量更高。

【缺点】

  • 使用这种方式,库存扣减和订单创建是异步的,无法在接口调用结束时,直接返回订单信息给前端。在订单创建成功之后,需要使用双向通讯技术(如Websocket)通知前端订单创建完成,并给前端发生订单数据。

方案二:使用延时消息兜底,间隔几秒后,去查询订单是否创建成功,如果创建成功,到缓存中检查一下用户是否真的预订了该时间段,如果没有预定该时间段,说明库存发生了回滚,系统将订单进行删除

【优点】

  • 通过延时消息和后续的检查机制,能够确保订单和库存的最终一致性。即使因为网络超时导致库存回滚,系统也能通过后续的检查发现不一致并修复。

【缺点】

  • 数据一致性有延迟,如果订单创建成功但库存回滚,用户可能会短暂看到订单创建成功,但后续订单被删除,这可能会让用户疑惑。
  • 库存扣减和订单创建是同步的,预订接口吞吐量较低。

在这里先提出这个问题,后续在时间段预定V2中使用方案一进行解决

这里还存在一个问题,假如说lua脚本执行完成,缓存中的库存已经扣减,结果突然服务器宕机了,没有执行后续的数据库库存扣减和创建订单流程,那相当于有一个场被占用了,但实际上无人使用,这个问题也会在时间段预定V2中解决

/*** 传统秒杀架构,使用缓存存储具体的剩余库存,使用 位图 来存储空闲场号* 下单时首先尝试扣减库存和分配空闲场号,如果可以扣减成功,再执行下单等逻辑* 存在问题:* 1、如果缓存扣减成功之后,应用宕机了,没有执行数据库库存扣减和生成订单逻辑,那就会出现缓存、数据库不一致的情况。应用重启之后,需要重新同步缓存和数据库,需要人力管理* 2、使用同步扣减数据库库存和下单,接口吞吐量不够高** @param timePeriodId* @return*/
@Override
// 子方法声明了Transactional,父方法也需要声明,不要会失效
@Transactional(rollbackFor = Throwable.class)
public OrderDO reserve(Long timePeriodId) { 参数校验// 使用责任链模式校验数据是否正确//todo 后面对于不对外开放的场馆,还需要校验用户是否属于该机构用户TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO();Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO(); 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次String luaScriptPath = "lua/free_court_index_allocate_by_bitmap.lua";DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));redisScript.setResultType(Long.class);return redisScript;});Long freeCourtIndex = stringRedisTemplate.execute(luaScript,Lists.newArrayList(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodReserveReqDTO.getTimePeriodId()),String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodReserveReqDTO.getTimePeriodId()),String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodReserveReqDTO.getTimePeriodId())),UserContext.getUserId().toString(),String.valueOf(venueDO.getAdvanceBookingDay() * 86400));if (freeCourtIndex == -2) {// --if-- 用户已经购买过该时间段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1) {// --if-- 没有空闲的场号throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 修改数据库中时间段的库存和已经选定的场号,并生成订单// todo 为了保证事务原子性,将修改数据库库存操作和创建订单放在了一次,而且是同步执行,如果想要接口吞吐量更高,这里肯定是需要优化成异步的return this.executePreserve(timePeriodDO,freeCourtIndex,venueId);
}/*** 执行下单和数据库库存扣减操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
// 抛出任何异常,回退库存
@Transactional(rollbackFor = Throwable.class)
public OrderDO executePreserve(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId) {// 扣减当前时间段的库存,修改空闲场信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 调用远程服务创建订单OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 订单生成服务调用失败// 恢复缓存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(), UserContext.getUserId(), courtIndex);// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,咋办,如何将订单置为废弃状态// 打印错误堆栈信息e.printStackTrace();// 把错误返回到前端throw new ServiceException(e.getMessage());}return result.getData();
}

缓存回滚

/*** 库存、空闲场号、已购买用户缓存回退*/
@Override
public void restoreStockAndBookedSlotsCache(Long timePeriodId, Long userId, Long courtIndex) { 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存// 使用 Hutool 的单例管理容器 管理lua脚本的加载,保证其只被加载一次String luaScriptPath = "lua/free_court_index_release_by_bitmap.lua";DefaultRedisScript<Long> luaScript = Singleton.get(luaScriptPath, () -> {DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(luaScriptPath)));redisScript.setResultType(Long.class);return redisScript;});Long status = stringRedisTemplate.execute(luaScript,Lists.newArrayList(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId),String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),String.format(RedisCacheConstant.VENUE_IS_USER_BOUGHT_TIME_PERIOD_KEY, timePeriodId)),userId.toString(),courtIndex.toString());if (status == -3) {// --if-- 该场号本身就是空闲的,无需释放库存(说明库存已经被释放过了,这不要抛异常出去,否则库存释放方法会反复失败)} else if (status == -2) {// --if-- 用户没有购买该时间段throw new ServiceException(BaseErrorCode.TIME_PERIOD_HAVE_NOT_BOUGHT_ERROR);} else if (status == -1) {// --if-- 场号不合法throw new ServiceException(BaseErrorCode.TIME_PERIOD_FREE_COURT_INDEX_ERROR);}}

在进行缓存回滚的时候,也需要使用 lua 脚本,保证用户去重表删除、库存回滚、预订场号回滚操作的原子性

-- 定义脚本参数
local stock_key = KEYS[1]
local free_index_bitmap_key = KEYS[2]
-- 用来存储已购买用户的set
local set_name = KEYS[3]-- 用户ID
local user_id = ARGV[1]
-- 场地索引
local court_index = tonumber(ARGV[2])-- 检查用户是否已经购买过
if redis.call("SISMEMBER", set_name, user_id) == 0 then-- 用户没有购买过,返回 -2 表示失败return -2
end-- 检查场地索引是否有效
if not court_index or court_index < 0 then-- 无效的场地索引,返回 -1 表示失败return -1
end-- 检查场号是否本来就是处于空闲状态
local is_free = redis.call("GETBIT", free_index_bitmap_key, court_index)
if is_free == 0 then-- 场号本身就处于空闲状态,所以无需释放库存,返回 -3 表示错误return -3
end-- 释放场号(将对应位设置为 0)
redis.call("SETBIT", free_index_bitmap_key, court_index, 0)-- 更新库存(增加库存)
redis.call('INCRBY', stock_key, 1)-- 移除用户
redis.call("SREM", set_name, user_id)-- 返回成功
return 0 -- 成功

库存回滚

/*** 库存、空闲场号数据库回退*/
@Override
public void restoreStockAndBookedSlotsDatabase(TimePeriodStockRestoreReqDTO timePeriodStockRestoreReqDTO) {// 恢复数据库中的库存baseMapper.restoreStockAndBookedSlots(timePeriodStockRestoreReqDTO.getTimePeriodId(), timePeriodStockRestoreReqDTO.getPartitionId(), timePeriodStockRestoreReqDTO.getCourtIndex());
}

Mapper

在更新位图的时候,需要使用位运算

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.vrs.mapper.TimePeriodMapper"><update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]></update><update id="restoreStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots & ~(1 << #{partitionIndex}), stock = stock + 1WHERE id = #{timePeriodId} AND partition_id = #{partitionId}]]></update></mapper>

订单服务

Controller

/*** 生成订单*/
@PostMapping("/v1/generateOrder")
@Operation(summary = "生成订单")
public Result<OrderDO> generateOrder(@RequestBody OrderGenerateReqDTO orderGenerateReqDTO) {OrderDO orderDO = orderService.generateOrder(orderGenerateReqDTO);return Results.success(orderDO);
}

Service

为了支持大数据量,本文对订单进行了分表。但是订单需要支持两种查询方式。

  • 直接根据订单号查询相应的订单
  • 用户需要查询自己的订单列表。

传统的分表方式只有一个分片键,要么用订单号分片,要么用用户id进行分片,两种方式都无法满足上述查询需求,会触发读扩散问题,效率严重下降,本文使用复合分片算法来解决这个问题,即在订单后面冗余用户ID的后六位SnowflakeIdUtil.nextId() + String.valueOf(orderGenerateReqDTO.getUserId() % 1000000)

  • 如果是用户查询自己的订单列表,直接使用用户ID的后六位进行分片定位
  • 如果根据订单号查询相应的订单,那么使用订单号的后六位进行分片定位

在创建订单之后,发送一个延时消息,如果十分钟用户还没有成功付款,则取消订单,回滚缓存和数据库的库存,避免有人长期占用资源但是不购买。这里为啥分为closeOrdersecondCloseOrder,主要是避免订单支付成功回调期间,订单被超时关闭了,详情请看 https://hellodam.blog.csdn.net/article/details/144942881

基于 Canal 监听 MySQL Binlog 日志恢复库存,可以参考 https://hellodam.blog.csdn.net/article/details/144483823

@Override
public OrderDO generateOrder(OrderGenerateReqDTO orderGenerateReqDTO) {OrderDO orderDO = OrderDO.builder()// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位.orderSn(SnowflakeIdUtil.nextId() + String.valueOf(orderGenerateReqDTO.getUserId() % 1000000)).orderTime(new Date()).venueId(orderGenerateReqDTO.getVenueId()).partitionId(orderGenerateReqDTO.getPartitionId()).courtIndex(orderGenerateReqDTO.getCourtIndex()).timePeriodId(orderGenerateReqDTO.getTimePeriodId()).periodDate(orderGenerateReqDTO.getPeriodDate()).beginTime(orderGenerateReqDTO.getBeginTime()).endTime(orderGenerateReqDTO.getEndTime()).userId(orderGenerateReqDTO.getUserId()).userName(orderGenerateReqDTO.getUserName()).payAmount(orderGenerateReqDTO.getPayAmount()).orderStatus(OrderStatusConstant.UN_PAID).build();int insert = baseMapper.insert(orderDO);if (insert > 0) {// 发送延时消息来关闭未支付的订单orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());}return orderDO;
}@Override
@Transactional(rollbackFor = Throwable.class)
public void closeOrder(String orderSn) {String orderPayLock = stringRedisTemplate.opsForValue().get(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn));if ("0".equals(orderPayLock)) {OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);// --if-- 订单已经被锁定,说明订单正处于支付状态,先不要关闭订单,等等再看看是否支付成功了if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {// --if-- 当前订单还没有支付成功,发一个延时消息,如果等等订单还没有被支付,就关闭订单orderSecondDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());// 将订单支付状态设置为1,拒绝后面的支付调用stringRedisTemplate.opsForValue().set(String.format(RedisCacheConstant.ORDER_PAY_LOCK_KEY, orderSn), "1", 5, TimeUnit.MINUTES);}} else {// --if-- 订单不在支付中,直接关闭订单secondCloseOrder(orderSn);}
}@Override
public void secondCloseOrder(String orderSn) {OrderDO orderDO = baseMapper.selectByOrderSn(orderSn);if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) {// --if-- 到时间了,订单还没有支付,取消该订单orderDO.setOrderStatus(OrderStatusConstant.CANCEL);// 分片键不能更新orderDO.setVenueId(null);baseMapper.updateByOrderSn(orderDO);if (!isUseBinlog) {// --if-- 如果不启用binlog的话,需要自己手动调用方法来释放库存// 极端情况,如果说远程已经还原了库存,但是因为网络问题,返回了错误,导致订单没有关闭,于是出现了不一致的现象。库存都还原完了,你订单还可以支付Result<OrderDO> result;try {result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder().timePeriodId(orderDO.getTimePeriodId()).partitionId(orderDO.getPartitionId()).courtIndex(orderDO.getCourtIndex()).userId(orderDO.getUserId()).build());} catch (Exception e) {// --if-- 库存恢复远程接口调用失败throw new ServiceException(BaseErrorCode.REMOTE_ERROR);}if (result == null || !result.isSuccess()) {// 因为使用了Transactional,如果这里出现了异常,订单的关闭修改会回退throw new ServiceException("调用远程服务释放时间段数据库库存失败", BaseErrorCode.SERVICE_ERROR);}} else {// --if-- 如果启用binlog的话,会自动监听数据库的订单关闭,然后恢复缓存中的库存}}
}

复合分片

下面定义了数据分片规则,用于将 time_period_order 表的数据水平分割到多个物理表中。具体来说,它指定了 time_period_order 表的真实数据节点为 ds_0 数据源下的 time_period_order_0time_period_order_15 共16个分片表。为了确定数据应该插入哪个分片表,这里采用了复合分表策略,即根据 user_idorder_sn 两个分片键来决定。对于分片算法,配置中引用了名为 order_table_gene_mod 的自定义算法,该算法由 com.vrs.algorithm.OrderTableGeneAlgorithm 类实现,通过计算用户ID或订单号的哈希值,并结合分片数量(本例中为16),来确定数据最终存储的具体分片表。

rules:- !SHARDINGtables:time_period_order:# 真实数据节点,比如数据库源以及数据库在数据库中真实存在的actualDataNodes: ds_0.time_period_order_${0..15}# 分表策略tableStrategy:# 复合分表策略(多个分片键)complex:# 用户 ID 和订单号shardingColumns: user_id,order_sn# 搜索 order_table_complex_mod 下方会有分表算法shardingAlgorithmName: order_table_gene_mod# 分片算法shardingAlgorithms:# 订单分表算法order_table_gene_mod:# 通过加载全限定名类实现分片算法,相当于分片逻辑都在 algorithmClassName 对应的类中type: CLASS_BASEDprops:algorithmClassName: com.vrs.algorithm.OrderTableGeneAlgorithm# 分表数量sharding-count: 16# 复合(多分片键)分表策略strategy: complex

下面的类实现了 Apache ShardingSphere 框架中的 ComplexKeysShardingAlgorithm 接口,用于根据订单号和用户ID这两个分片键来决定数据应该存储在哪个分片表中。该算法首先尝试使用用户ID进行哈希分片,如果用户ID不存在或为空,则退而求其次使用订单号作为分片键。它会根据配置文件中指定的分片数量,计算出分片值(用户ID或订单号)对应的哈希值,并据此确定具体的数据分片表名称,从而实现数据的水平分割。

package com.vrs.algorithm;import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Preconditions;
import lombok.Getter;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.sharding.api.sharding.complex.ComplexKeysShardingValue;import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Properties;/*** 订单分表基因算法 - 该类用于实现复杂的分片逻辑,特别是当存在多个分片键时。(这里是订单号和用户ID)* 它会根据提供的分片值来决定数据应该存储在哪一个分片表中。** @Author dam* @create 2024/12/6 9:55*/
@Getter
public class OrderTableGeneAlgorithm implements ComplexKeysShardingAlgorithm {private Properties props;/*** 分片的数量,即总共有多少个分片表*/private int shardingCount;/*** 配置文件中的分片数量键名*/private static final String SHARDING_COUNT_KEY = "sharding-count";/*** 根据提供的分片键和分片值来决定将数据分配到哪个分片表中。** @param collection               可能的分片表集合* @param complexKeysShardingValue 包含分片键及其对应的值* @return 返回包含具体分片表名称的集合*/@Overridepublic Collection<String> doSharding(Collection collection, ComplexKeysShardingValue complexKeysShardingValue) {// 获取分片键与分片值的映射Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();// 初始化结果集,使用 LinkedHashSet 以保持插入顺序Collection<String> result = new LinkedHashSet<>(collection.size());if (CollUtil.isNotEmpty(columnNameAndShardingValuesMap)) {// --if-- 如果有分片键和值,则开始处理String userId = "user_id";// 获取 'user_id' 对应的分片值集合Collection<Comparable<?>> customerUserIdCollection = columnNameAndShardingValuesMap.get(userId);if (CollUtil.isNotEmpty(customerUserIdCollection)) {// --if-- 'user_id' 存在且不为空,则基于 'user_id' 进行分片// 获取第一个分片值Comparable<?> comparable = customerUserIdCollection.stream().findFirst().get();// 取用户ID的后面六位来进行哈希分片String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);} else {//  'user_id' 不存在或为空,尝试使用 'order_sn' 作为分片键String orderSn = "order_sn";Collection<Comparable<?>> orderSnCollection = columnNameAndShardingValuesMap.get(orderSn);Comparable<?> comparable = orderSnCollection.stream().findFirst().get();if (comparable instanceof String) {// --if-- 如果订单号是字符串类型String actualOrderSn = comparable.toString();result.add(complexKeysShardingValue.getLogicTableName() + "_" + hashShardingValue(actualOrderSn.substring(Math.max(actualOrderSn.length() - 6, 0))) % shardingCount);} else {// --if-- 如果订单号是长整型(我们这个系统肯定不是这个)String dbSuffix = String.valueOf(hashShardingValue((Long) comparable % 1000000) % shardingCount);result.add(complexKeysShardingValue.getLogicTableName() + "_" + dbSuffix);}}}// 返回最终确定的分片表名称集合return result;}/*** 初始化方法,在创建分片算法实例时被调用,用来设置分片参数。** @param props 包含分片配置信息的属性对象*/@Overridepublic void init(Properties props) {this.props = props;shardingCount = getShardingCount(props);}/*** 从配置属性中读取分片数量,如果未找到则抛出异常。** @param props 包含分片配置信息的属性对象* @return 分片数量*/private int getShardingCount(final Properties props) {// 检查是否提供了分片数量,如果没有则抛出异常Preconditions.checkArgument(props.containsKey(SHARDING_COUNT_KEY), "分片数量不可以为空");// 解析并返回分片数量return Integer.parseInt(props.getProperty(SHARDING_COUNT_KEY));}/*** 根据给定的分片值计算哈希值,用于确定具体的分片。** @param shardingValue 分片值* @return 哈希后的分片值*/private long hashShardingValue(final Comparable<?> shardingValue) {// 使用分片值的 hashCode 生成一个绝对值的哈希码return Math.abs((long) shardingValue.hashCode());}
}

MQ

【消息生产者】

package com.vrs.rocketMq.producer;import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 计算数据准备 生产者** @Author dam* @create 2024/9/20 16:00*/
@Slf4j
@Component
public class OrderDelayCloseProducer extends AbstractCommonSendProduceTemplate<OrderDelayCloseMqDTO> {@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(OrderDelayCloseMqDTO messageSendEvent) {return BaseSendExtendDTO.builder().eventName("延时关闭订单").keys(String.valueOf(messageSendEvent.getOrderSn())).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).sentTimeout(2000L)
//                .delayTime(10 * 1000L)// 延时十分钟,关闭未支付订单.delayTime(3 * 60 * 1000L).build();}@Overrideprotected Message<?> buildMessage(OrderDelayCloseMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, messageSendEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}

【消息消费者】

package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.OrderService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.ORDER_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.ORDER_TOPIC,consumerGroup = RocketMqConstant.ORDER_CONSUMER_GROUP + "-" + RocketMqConstant.ORDER_DELAY_CLOSE_TAG,messageModel = MessageModel.CLUSTERING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.ORDER_DELAY_CLOSE_TAG
)
@RequiredArgsConstructor
public class OrderDelayCloseListener implements RocketMQListener<MessageWrapper<OrderDelayCloseMqDTO>> {private final OrderService orderService;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "order_delay_close:",key = "#messageWrapper.getMessage().getOrderSn()",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<OrderDelayCloseMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] 关闭订单:{}", messageWrapper.getMessage().getOrderSn());String orderSn = messageWrapper.getMessage().getOrderSn();orderService.closeOrder(orderSn);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/502809.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring MVC实战指南:构建高效Web应用的架构与技巧(三)

响应数据和结果视图(7种) 返回值分类 创建web.xml&#xff08;spring、过滤器解决乱码、配置控制器dispatcherServlet、加载springmvc.xml文件、配置启动加载&#xff09;创建springmvc.xml文件 <!--配置了内容&#xff0c;启动Tomcat服务器的时候&#xff0c;就会被加载--…

使用LINUX的dd命令制作自己的img镜像

为了避免重复安装同一镜像&#xff0c;配置环境&#xff0c;首先我准备一个正常使用的完整系统。 使用Gparted软件先将母盘&#xff08;如U盘&#xff0c;TF卡&#xff09;分区调整为只有数据的大小。如&#xff1a;60G的TF卡&#xff0c;只用了3.5G&#xff0c;将未使用的空间…

doris:基于 Arrow Flight SQL 的高速数据传输链路

Doris 基于 Arrow Flight SQL 协议实现了高速数据链路&#xff0c;支持多种语言使用 SQL 从 Doris 高速读取大批量数据。 用途​ 从 Doris 加载大批量数据到其他组件&#xff0c;如 Python/Java/Spark/Flink&#xff0c;可以使用基于 Arrow Flight SQL 的 ADBC/JDBC 替代过去…

Gitee图形界面上传(详细步骤)

目录 1.软件安装 2.安装顺序 3.创建仓库 4.克隆远程仓库到本地电脑 提交代码的三板斧 1.软件安装 Git - Downloads (git-scm.com) Download – TortoiseGit – Windows Shell Interface to Git 2.安装顺序 1. 首先安装git-2.33.1-64-bit.exe&#xff0c;顺序不能搞错2. …

用公网服务代理到本地电脑笔记

参考&#xff1a; 利用frp 穿透到内网的http/https网站&#xff0c;实现对外开放&#xff08;这篇博客有点老&#xff0c;需要改动&#xff0c;不能照抄&#xff09;&#xff1a;https://www.cnblogs.com/hahaha111122222/p/8509150.html frp内网穿透(windows和服务器)&#xf…

(leetcode算法题)384. 打乱数组 398. 随机数索引

问题转化&#xff1a; 题目要求将nums中的数字出现的次序随机打乱 转化成&#xff1a;对于 0 号位置来说&#xff0c;nums[i], ..., nums[n - 1] 可以等概率的出现 ... && ... && 对于 n - 1号位置来说&#xff0c;nums[i], ..., nums[n - 1] 可以等概率的出…

Redis - 5 ( 18000 字 Redis 入门级教程 )

一&#xff1a; 补充知识 1.1 渐进式遍历 Redis 使用 scan 命令以渐进式方式遍历键&#xff0c;避免了直接使用 keys 命令可能引发的阻塞问题。scan 的时间复杂度为 O(1)&#xff0c;但需要多次执行才能完成对所有键的遍历&#xff0c;整个过程分步进行&#xff0c;有效减少阻…

22408操作系统期末速成/复习(考研0基础上手)

第一部分:计算题&#xff1a; 考察范围&#xff1a;&#xff08;标红的是重点考&#xff09; 第一章&#xff1a;CPU利用率&#xff1a; 第二章&#xff1a; 进程调度算法&#xff08;需要注意不同调度算法的优先级和题目中给出的是否可以抢占【分为可抢占和不可抢占&#xff…

AI在电子制造中的应用:预测质量控制

一、 电子制造中存在的质量问题 电子制造过程中&#xff0c;由于生产工艺复杂、材料种类繁多、生产环境要求高等因素&#xff0c;可能会出现各种质量问题。 常见质量问题如下&#xff1a; 1. 空焊 原因&#xff1a;锡膏活性较弱、钢网开孔不佳、铜铂间距过大或大铜贴小元件、…

如何通过API实现淘宝商品评论数据抓取?item_review获取淘宝商品评论

前几天一个好朋友要我帮忙抓一下淘宝商品的评论数据&#xff0c;获取淘宝评论数据可以帮忙商家们做好市场调研&#xff0c;对自己的产品进行升级&#xff0c;从而更好地获取市场。我将详细爬取方法封装成API&#xff0c;以供方便调用。 item_review-获得淘宝商品评论 响应示例…

springboot550乐乐农产品销售系统(论文+源码)_kaic

摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统乐乐农产品销售系统信息管理难度大&#xff0c;容错率低&…

百度贴吧的ip属地什么意思?怎么看ip属地

在数字化时代&#xff0c;IP地址不仅是网络设备的唯一标识符&#xff0c;更承载着用户的网络身份与位置信息。百度贴吧作为广受欢迎的社交平台&#xff0c;也遵循相关规定&#xff0c;在用户个人主页等位置展示账号IP属地信息。那么&#xff0c;百度贴吧的IP属地究竟意味着什么…

[读书日志]从零开始学习Chisel 第一篇:书籍介绍,Scala与Chisel概述,Scala安装运行(敏捷硬件开发语言Chisel与数字系统设计)

简介&#xff1a;从20世纪90年代开始&#xff0c;利用硬件描述语言和综合技术设计实现复杂数字系统的方法已经在集成电路设计领域得到普及。随着集成电路集成度的不断提高&#xff0c;传统硬件描述语言和设计方法的开发效率低下的问题越来越明显。近年来逐渐崭露头角的敏捷化设…

element-plus大版本一样,但是小版本不一样导致页面出bug

npm 的版本 node的版本 npm的源这些都一样&#xff0c;但是效果不一样 发现是element的包版本不一样导致的 2.9.1与2.8.1的源是不一样的&#xff0c;导致页面出bug;

【网络协议】开放式最短路径优先协议OSPF详解(一)

OSPF 是为取代 RIP 而开发的一种无类别的链路状态路由协议&#xff0c;它通过使用区域划分以实现更好的可扩展性。 文章目录 链路状态路由协议OSPF 的工作原理OSPF 数据包类型Dijkstra算法、管理距离与度量值OSPF的管理距离OSPF的度量值 链路状态路由协议的优势拓扑结构路由器O…

《数据结构》期末考试测试题【中】

《数据结构》期末考试测试题【中】 21.循环队列队空的判断条件为&#xff1f;22. 单链表的存储密度比1&#xff1f;23.单链表的那些操作的效率受链表长度的影响&#xff1f;24.顺序表中某元素的地址为&#xff1f;25.m叉树第K层的结点数为&#xff1f;26. 在双向循环链表某节点…

华为数通考试模拟真题(附带答案解析)题库领取

【多选题】 管理员想要更新华为路由器的VRP版本&#xff0c;则正确的方法有? A管理员把路由器配置为FTP服务器&#xff0c;通过FTP来传输VRP软件 B:管理员把路由器置为FTP客户端&#xff0c;通过FTP来传输VRP软件 C:管理员把路由器配置为TFTP客户端&#xff0c;通过TFTP来传…

Linux:操作系统不朽的传说

操作系统是计算机的灵魂&#xff0c;它掌控着计算机的硬件和软件资源&#xff0c;为用户和应用程序提供了一个稳定、高效、安全的运行环境。 在众多操作系统中&#xff0c;Linux 的地位举足轻重。它被广泛应用于服务器、云计算、物联网、嵌入式设备等领域。Linux 的成功离不开…

前端(API)学习笔记(CLASS 4):进阶

1、日期对象 日期对象&#xff1a;用来表示事件的对象 作用&#xff1a;可以得到当前系统时间 1、实例化 在代码中发现了new关键字&#xff0c;一般将这个操作称为实例化 创建一个时间对象并获取时间 获得当前时间 const datenew Date() 使用日志查看&#xff0c;得到的…

【USRP】教程:在Macos M1(Apple芯片)上安装UHD驱动(最正确的安装方法)

Apple芯片 前言安装Homebrew安装uhd安装gnuradio使用b200mini安装好的路径下载固件后续启动频谱仪功能启动 gnu radio关于博主 前言 请参考本文进行安装&#xff0c;好多人买了Apple芯片的电脑&#xff0c;这种情况下&#xff0c;可以使用UHD吗&#xff1f;答案是肯定的&#…