全局唯一ID
全局唯一ID生成策略:
- UUID
- Redis自增
- snowflake算法
- 数据库自增
Redis自增ID策略: - 每天一个key,方便统计订单量
- ID构造是 时间戳 + 计数器
@Component
public class RedisIdWorker {// 2024的第一时刻private static final long BEGIN_TIMESTAMP = 1704067200L;private static final int COUNT_BITS = 32;private final StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix){// 1. 获取当前时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timeStamp = nowSecond - BEGIN_TIMESTAMP;// 2. 获取序列号// 2.1 获取当天日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2 自增Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3. 拼接成IDreturn timeStamp << COUNT_BITS | count;}}
超卖问题
在处理大量请求时,可能会出现超卖问题。
可以通过加锁解决。
这里采用的是乐观锁。
一人一单
该任务需要每名用户只能抢到一张优惠券。
同时还要考虑到后端部署在多个服务器上可能会出现的异常,此时需要使用分布式锁进行解决。
这里的分布式锁基于Redis实现。
基于Redis的分布式锁实现思路
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
使用Redis优化秒杀
这里将库存判断与一人一单的校验使用Redis完成。
具体流程为:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
这里的阻塞队列是基于Stream的消息队列
STREAM类型消息队列的XREADGROUP命令特点: - 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存优惠券至RedisstringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(), voucher.getStock().toString());}
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2. 数据key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId-- 3. 判断库存是否充足
if tonumber(redis.call('get', stockKey) )< 1 then-- 库存不足return 1
end-- 4. 判断用户是否已经抢购过
if redis.call('sismember', orderKey, userId) == 1 then-- 已经抢购过return 2
end-- 5. 减库存
redis.call('incrby', stockKey, -1)
-- 6. 记录用户抢购信息
redis.call('sadd', orderKey, userId)redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId,"id", orderId)
return 0
异步下单
LUA脚本
-- 1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2. 数据key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId-- 3. 判断库存是否充足
if tonumber(redis.call('get', stockKey) )< 1 then-- 库存不足return 1
end-- 4. 判断用户是否已经抢购过
if redis.call('sismember', orderKey, userId) == 1 then-- 已经抢购过return 2
end-- 5. 减库存
redis.call('incrby', stockKey, -1)
-- 6. 记录用户抢购信息
redis.call('sadd', orderKey, userId)redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId,"id", orderId)
return 0
从消息队列取出,处理代码
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private IVoucherOrderService proxy;// 静态代码块加载Lua脚本private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}// private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);// 线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct// 完成类的construct即执行下面的函数public void init() {// 交给线程池做SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}String queueName = "stream.orders";// 消费线程private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 4.1从消息队列中取出订单 xreadgroupnngroup g1 c1 count 1 block 200 streams streams.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 判断是否成功// 没有if (list == null || list.isEmpty()) {continue;}// 有// 4.2创建订单,解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单处理失败", e);handlePendingList();}}}}private void handlePendingList() {while (true) {try {// 4.1从消息队列中取出订单 xreadgroupnngroup g1 c1 count 1 block 200 streams streams.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 判断是否成功// 没有if (list == null || list.isEmpty()) {// pendingList 没有消息break;}// 有// 4.2创建订单,解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单处理失败", e);try {Thread.sleep(200);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 因为为子线程,userId只能从数据中取Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("order:" + userId);boolean isLock = lock.tryLock();if (!isLock) {log.error("重复抢购");return ;}try {proxy.createVoucherOrder(voucherOrder);return;} finally {lock.unlock();}}
创建订单
@Transactionalpublic Result createVoucherOrder(VoucherOrder voucherOrder) {// 4.一人一单Long userId = voucherOrder.getUserId();// 4.1 查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {return Result.fail("每人限购一张");}// 4.是// 4.1扣减库存,基于乐观锁boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {return Result.fail("库存不足");}//4.2创建订单save(voucherOrder);//4.3返回订单idreturn Result.ok(voucherOrder.getId());}
@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 0. 生成订单idlong orderId = redisIdWorker.nextId("order");// 1. 执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));// 2. 结果是否为0int r = result.intValue();// 2.1 不为0if (r!=0) {return Result.fail(r == 1 ? "库存不足" : "不能重复抢购");}// 注解底层基于aop实现,需要获得代理对象,进行执行proxy = (IVoucherOrderService)AopContext.currentProxy();// 3. 返回结果订单idreturn Result.ok(orderId);}