redisson的使用及LUA脚本实现分布式秒杀

1.redisson实现分布式锁(推荐)

redisson官网:Redisson: Easy Redis Java client and Real-Time Data Platform

Redisson是一个基于Redis的Java客户端,它不仅提供了对Redis基本操作的支持,而且是一个功能丰富的分布式协调服务客户端。Redisson致力于简化在分布式环境中的开发工作,它实现了许多分布式服务,如分布式锁、分布式计数器、分布式队列、分布式映射等。 以下是一些Redisson的主要特点: 高级客户端:Redisson提供了与传统Java集合类似的API,比如Map、List、Set、Queue、Deque、Topic、Multimap、SortedSet等,使得开发者能够以几乎无感知的方式使用分布式数据结构。 分布式服务:支持各种分布式服务,如分布式锁、信号量、读写锁、原子整数、计数器、延迟队列、事件发布订阅、任务调度等。 高可用性:通过Redis Sentinel或Redis Cluster支持高可用性,能够在节点故障时自动切换。 客户端负载均衡:支持多节点连接,自动进行客户端负载均衡。 序列化:内置多种序列化方式,包括Jackson、Avro、Gson等,方便数据交换。 非侵入式:Redisson不需要额外的配置或代理层,可以直接集成到现有的Java应用中。 性能优化:通过使用Netty框架,Redisson实现了高效的网络通信,提供了低延迟和高吞吐量。 Lua脚本支持:可以使用Lua脚本在服务器端执行复杂操作,确保操作的原子性。 由于其丰富的功能和易于使用的API,Redisson成为Java开发者在构建分布式系统时的一个流行选择。

使用

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.2</version>
</dependency>
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RedissonClientConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();
//        如果是集群模式可以使用config.useClusterServers()来设置集群模式config.useSingleServer().setAddress("redis://127.0.0.1:6379");return Redisson.create(config);}
}
/*** <p>* 服务实现类* </p>* create cws* @since 2024-05-18*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@ResourceSeckillVoucherMapper seckillVoucherMapper;@ResourceStringRedisTemplate stringRedisTemplate;@ResourceRedissonClient redissonClient;/*** 抢票** @param voucherId* @return*/@Overridepublic Result seckillVoucher(Long voucherId) {
//        1.获取优惠卷信息SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);
// 2.判断秒杀是否开启
//       2.1获取当前时间LocalDateTime now = LocalDateTime.now();if (seckillVoucher.getBeginTime().isAfter(now)) {return Result.fail("秒杀未开启");}
//        3.判断秒杀是否结束if (seckillVoucher.getEndTime().isBefore(now)) {return Result.fail("秒杀已结束");}
//       4. 判断库存是否充足if (seckillVoucher.getStock() < 1) {return Result.fail("库存不足");}Long userId = UserHolder.getUser().getId();//        创建锁对象
//        SimpleRedisLock lock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);RLock lock = redissonClient.getLock("order:" + userId);boolean res = lock.tryLock();
//        获取锁if(!res){return Result.fail("不能重新下单~");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);} finally {
//            释放锁lock.unlock();}}@Transactional(rollbackFor = Exception.class)public  Result createVoucherOrder(Long voucherId) {//       5.判断用于是否已经领取Long userId = UserHolder.getUser().getId();if (query().eq("user_id", userId).eq("voucher_id", voucherId).count() > 0) {return Result.fail("用户已经领取过");}
//        6.扣减库存boolean res = seckillVoucherMapper.updateStock(voucherId);if (!res) {return Result.fail("库存不足");}
//        7.创建订单VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);long orderId = RedisWorker.nextId("order");voucherOrder.setId(orderId);save(voucherOrder);return Result.ok(orderId);}
}

通过源码我们可以发现 lock.tryLock()在无参的情况下,等待时间为-1也就像不等待,等待释放时间30s。

redisson可重入机制(原理)

我从源码分析

//点击lock.tryLock()方法的实现//核心<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}

从上面代码中我们可以发现底层也是使用了LUA脚本。

"if (redis.call('exists', KEYS[1]) == 0) then " "redis.call('hincrby', KEYS[1], ARGV[2], 1); " 
"redis.call('pexpire', KEYS[1], ARGV[1]); " 
"return nil; " 
"end; " 
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " 
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " 
"redis.call('pexpire', KEYS[1], ARGV[1]); " 
"return nil; " 
"end; " 
"return redis.call('pttl', KEYS[1]);"#解释
检查键(锁)是否存在(redis.call('exists', KEYS[1]) == 0):
如果不存在,说明当前没有其他线程持有锁,执行以下操作:
使用hincrby在哈希表中增加一个字段(对应线程ID),值为1,表示获取锁。
使用pexpire设置键的过期时间(根据传入的leaseTime和unit计算得到的毫秒值)。
若键已存在,检查字段(当前线程ID)是否已存在于哈希表中(redis.call('hexists', KEYS[1], ARGV[2]) == 1):
如果存在,说明当前线程已经持有锁,执行以下操作:
使用hincrby增加字段的值(表示重置锁的计数)。
再次使用pexpire更新键的过期时间。
如果以上条件都不满足,说明锁被其他线程持有,返回键的剩余存活时间(return redis.call('pttl', KEYS[1]))。

实现流程图  

主要使用redis的哈希结构实现,每次获取锁value加一,释放一次value减一,当value等于0时删除锁。

2.异步秒杀

将判断秒杀库存及校验一人一单交给redis去做出来,主要就大大降低了对数据库的压力。那么怎么在redis里面处理这两个业务呢?这里我们使用redis里面的string结构存储秒杀的库存,用Set结构去存储下单人员保证唯一性。这两个操作必要满足原子性所以我们这里还是使用LUA脚本去实现。

具体实现流程  

具体实现代码

在添加秒杀优惠卷时我们添加将秒杀库存添加到redis

    @Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);
//       添加秒杀库存到RedisstringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());}

修改业务  

  @ResourceRedissonClient redissonClient;private static final DefaultRedisScript<Long> un_lock;//    加载脚本static {un_lock = new DefaultRedisScript<>();un_lock.setLocation(new ClassPathResource("secJi.lua"));un_lock.setResultType(Long.class);}/*** 优化*/@Overridepublic Result seckillVoucher(Long voucherId) {String userId = UserHolder.getUser().getId().toString();//       lua代码所需的keyList<String> keyList = new ArrayList<>();keyList.add(SECKILL_STOCK_KEY+voucherId.toString());keyList.add(SECKILL_ORDER_KEY+voucherId.toString());
//       1.执行LAU脚本Long result = stringRedisTemplate.execute(un_lock, keyList, userId);int res = result.intValue();if(res!=0){return Result.fail(res==1?"库存不足":"重复下单");}
//       TODO 将订单交给阻塞队列//       订单号long order = RedisWorker.nextId("order");return  Result.ok(order);}

LUA脚本

-- 获取库存 KEYS[1] 也就是seckill:stock:xx
local stock = redis.call('get', KEYS[1])
-- 判断set中是否存在用户   KEYS[2] 订单id:seckill:order:xx    ARGV[1] 为用户
local userId = redis.call('sismember', KEYS[2], ARGV[1])
--判断库存是否大于0
if tonumber(stock) <= 0 then--库存不足返回1return 1
end
--判断用户是否重复下单
if userId == 1 then--用户重复下单返回2return 2
end
--库存减一
redis.call('decr', KEYS[1])
--将用户添加到set中
redis.call('sadd', KEYS[2], ARGV[1])
return 0

 

完整代码  

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {// 注入SeckillVoucherMapper@ResourceSeckillVoucherMapper seckillVoucherMapper;// 注入StringRedisTemplate用于Redis字符串操作@ResourceStringRedisTemplate stringRedisTemplate;// 注入RedissonClient用于Redis分布式锁@ResourceRedissonClient redissonClient;// 定义Redis脚本,用于秒杀解锁操作private static final DefaultRedisScript<Long> un_lock;static {un_lock = new DefaultRedisScript<>();un_lock.setLocation(new ClassPathResource("secJi.lua"));un_lock.setResultType(Long.class);}// 创建一个定长的阻塞队列,用于存储代金券订单任务private BlockingQueue<VoucherOrder> orderTaskQueue = new ArrayBlockingQueue<>(1024 * 1024);// 创建单线程执行器,用于处理代金券订单private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 子线程中无法直接获取Service实例,因此在主线程中获取代理对象private IVoucherOrderService proxy;/*** 初始化方法,启动订单处理线程。*/@PostConstructvoid init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}// 秒杀代金券订单的处理线程private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 从订单任务队列中获取一个代金券订单VoucherOrder voucherOrder = orderTaskQueue.take();// 处理订单HandleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理代金券订单错误: {}", e);}}}}/*** 处理代金券订单,包括创建订单和释放分布式锁。* @param voucherOrder 代金券订单对象*/private void HandleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();// 创建Redis分布式锁RLock lock = redissonClient.getLock("order:" + userId);boolean res = lock.tryLock();if (!res) {log.error("不能重新下单");}try {// 调用创建代金券订单方法proxy.createVoucherOrder(voucherOrder);} finally {// 释放分布式锁lock.unlock();}}/*** 用户参与秒杀代金券活动。* * @param voucherId 代金券ID* @return 返回秒杀结果,成功返回订单ID,失败返回错误信息*/@Overridepublic Result seckillVoucher(Long voucherId) {String userId = UserHolder.getUser().getId().toString();// 准备Lua脚本所需的keyList<String> keyList = new ArrayList<>();keyList.add(SECKILL_STOCK_KEY + voucherId.toString());keyList.add(SECKILL_ORDER_KEY + voucherId.toString());// 执行Lua脚本进行库存检查和下单操作Long result = stringRedisTemplate.execute(un_lock, keyList, userId);int res = result.intValue();if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "重复下单");}// 构建代金券订单并提交到阻塞队列VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);long orderId = RedisWorker.nextId("order");voucherOrder.setId(orderId);orderTaskQueue.add(voucherOrder);proxy = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);}/*** 创建代金券订单,包括核对用户是否已领取、扣减库存和创建订单操作。* * @param voucherOrder 代金券订单对象*/@Overridepublic void createVoucherOrder(VoucherOrder voucherOrder) {// 核对用户是否已领取相同代金券Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();if (query().eq("user_id", userId).eq("voucher_id", voucherId).count() > 0) {log.error("用户已经领取过");}// 扣减库存boolean res = seckillVoucherMapper.updateStock(voucherId);if (!res) {log.error("库存不足");}// 创建订单save(voucherOrder);}
}

这样实现虽然确保了性能方面的提升,但是在极端情况下例如jvm宕机了那么这里的阻塞队列的数据将会丢失,导致mysql与redis的数据原子性出现了异常。我们下列使用消息队列去解决该问题。  

3.redis消息队列

基于List结构模拟的消息队列  

BRPOP与LPUSH  

 

优缺点:

优点:

  • 李勇Redis存储,不受限于JVM内存上限

  • 基于Redis的持久化机制,数据库安全性有保证

  • 可以满足消息有序性

缺点:

  • 无法避免消丢失

  • 只支持消费者

 基于Stream的消息队列

Stream的基本概念:

  • 消息与ID:Stream中的基本单位是消息(message),每条消息都有一个唯一标识符(ID),这个ID是一个递增的整数或是一个由数字和字母组成的字符串,确保了消息的顺序性。消息ID可以用来定位和范围读取消息。

  • 字段(field)和值(value):每个消息可以包含多个键值对形式的字段和值,这使得消息内容更加丰富和结构化。

  • 消费者组(Consumer Group):Stream支持消费者组的概念,一个消息可以被多个消费者组订阅,每个消费者组内的消费者可以独立地消费消息,实现了消息的广播和分发。消费者组内支持消息的确认机制,确保消息不会丢失。

  • 持久性和可靠性:Stream中的消息默认是持久化的,即使Redis服务器重启,消息也不会丢失。同时,通过消费者确认机制可以实现消息的可靠处理。

  • 读取偏移量:消费者可以通过指定消息ID或偏移量来读取消息,支持范围读取、读取新消息或未确认消息等多种模式。

  • 消息限流:Stream可以通过XPENDING命令查看消费者组的状态,包括已处理和未处理的消息数量,从而实现流量控制和监控。

应用场景:

  • 实时日志处理:作为日志收集和处理管道,支持高并发的日志记录和分析。

  • 实时数据流处理:在金融交易、物联网(IoT)等场景中,处理连续的数据流。

  • 消息队列和事件驱动架构:构建高度可扩展和解耦的微服务系统。

  • 用户活动追踪:记录和分析用户行为,如点击流分析。

  • 缓存更新通知:作为数据库更新的通知机制,实现数据同步。

  • Stream类型通过其灵活的设计和强大的功能集,成为了现代分布式系统中数据传输和处理的重要组件。

发送消息指令

#生成者
Windows:0>XADD s1 * k1 v1
"1716112531587-0"
Windows:0>XLEN s1  #查看长度
"1"#消费者
Windows:0>XREAD COUNT 1 STREAMS s1 0
1) 1) "s1"2) 1) 1) "1716112531587-0"2) 1) "k1"2) "v1"#消费者阻塞等待  BLOCK
Windows:0>XREAD COUNT 1 BLOCK 0 STREAMS s1 $  
1) 1) "s1"2) 1) 1) "1716112915859-0"2) 1) "k1"2) "v1"#COUNT 1 表示每次读取消息的最大数量#BLOCK 0 开启阻塞 0为永久阻塞# STREAMS s1  对应的队列消息# $ 表示从最新消息开始读取

消费者组  

Redis Stream的消费者组(Consumer Groups)是实现消息处理的关键特性。它们允许将Stream中的消息分配给一组消费者,而不是单个消费者。消费者组允许消息的并发处理,同时也提供了消息的可靠传递和幂等性。 以下是一些关于消费者组的关键点:

  • 创建消费者组:使用XGROUP CREATE命令可以创建一个新的消费者组,指定Stream的名称、消费者组的名称以及起始ID。起始ID通常是$表示从Stream的最新消息开始,或者是一个特定的ID表示从历史消息开始。

  • 消息分配:当消息被写入Stream时,它们被分配给消费者组。每个消费者组内部,消息被轮询分配给组内的消费者。默认情况下,每个消费者只看到其他消费者未消费的消息,这样可以避免消息被多个消费者重复处理。

  • 消息确认:消费者使用XREADGROUP或XACK命令来读取和确认消息。当消费者确认消息时,该消息被视为已处理并可以从Stream的主数据结构中移除(除非配置了NOACK选项)。未确认的消息保留在pending entries list(PEL)中,等待确认。

  • 消费者状态:XPENDING命令用于查询消费者组中未确认的消息,包括它们的ID、消费者名和等待时间。

  • 幂等性:通过消费者组,消息只被一个消费者处理一次,即使消费者崩溃并重新连接,未确认的消息也不会被重新分配,除非使用XREADGROUP的COUNT参数或IDLE时间设置来重新分配。

  • 消费者心跳:消费者通过PING命令发送心跳来保持其活跃状态,防止未确认的消息被重新分配。

  • 消费者组清理:使用XGROUP DELCONSUMER可以删除消费者组中的消费者,而XGROUP SETID可以将消费者组的读取位置重置到某个ID,用于处理消息丢失或需要重新处理的情况。

  • 消费者组的设计使得Redis Stream能够支持复杂的消息处理场景,如消息的可靠传递、回溯处理以及在多个消费者之间公平地分配工作负载。

创建消费者组  

#创建消费组
Windows:0>XGROUP CREATE s1 l1 0  #s1表示队列名称  l1表示消费组名称 0 表示队列中的第一个消息
"OK"#消费者
Windows:0>XREADGROUP GROUP l1 c1 COUNT 1 BLOCK 200 STREAMS s1 > 
# l1 表示消费组名称  c1消费者名称(不写默认分配)   COUNT 1读取多少条  BLOCK 200等待多少时间(毫秒)  STREAMS s1指定队列名称
1) 1) "s1"2) 1) 1) "1716112531587-0"2) 1) "k1"2) "v1"
Windows:0>XREADGROUP GROUP l1 c2 COUNT 1 BLOCK 200 STREAMS s1 > 
1) 1) "s1"2) 1) 1) "1716112773920-0"2) 1) "k2"2) "v2"
#使用XACK指令确认的消息
Windows:0>XACK s1 l1 1716112531587-0 1716112773920-0  #组名
"2"#查看pendingList里面为确认消息
XPENDING  s1 l1 - + 10   #s1队列名称 l1表示消费组名称  -+表示读取全部   读取10条  Windows:0>XREADGROUP GROUP l1 c2 COUNT 1 BLOCK 200 STREAMS s1 > 
1) 1) "s1"2) 1) 1) "1716112915859-0"2) 1) "k1"2) "v1"
#上面那条消息未必确认,所以可以使用XPENDING查询出来
Windows:0>XPENDING s1 l1 - + 10
1) 1) "1716112915859-0"2) "c2"3) "1674"4) "1"

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯

  • 可以多消费者争抢消息,加快消费速度

  • 可以阻塞读取

  • 没有消息漏读的风险

  • 有消息确认机制,保证消息至少被消费一次

 4.使用Stream结构实现队列结合业务秒杀

import static com.hmdp.utils.RedisConstants.SECKILL_ORDER_KEY;
import static com.hmdp.utils.RedisConstants.SECKILL_STOCK_KEY;/*** <p>* 服务实现类* </p>** @since 2021-12-22*/
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@ResourceSeckillVoucherMapper seckillVoucherMapper;@ResourceStringRedisTemplate stringRedisTemplate;@ResourceRedissonClient redissonClient;private static final DefaultRedisScript<Long> script;//    加载脚本static {script = new DefaultRedisScript<>();script.setLocation(new ClassPathResource("seckill.lua"));script.setResultType(Long.class);}//    创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//    在子线程中是无法获取嗲了对象的,所以我们在主线程中获取private IVoucherOrderService proxy;@PostConstructvoid init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {
//                    1.获取消息队列中的订单消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >  List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));
//                    2.判断是否获取成功if (list == null || list.isEmpty()) {//  3.如果获取失败,说明没有消息,继续下一次循环continue;}
//                    解析队列中的数据MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> body = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(body, new VoucherOrder(), true);//  创建订单HandleVoucherOrder(voucherOrder);
//                   确认消息stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", entries.getId());} catch (Exception e) {log.error("队列子线程错误{}", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));if (list == null || list.isEmpty()) {//  如果获取失败,说明没有消息,继续下一次循环break;}MapRecord<String, Object, Object> entries = list.get(0);Map<Object, Object> body = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(body, new VoucherOrder(), true);//  创建订单HandleVoucherOrder(voucherOrder);//  确认消息stringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", entries.getId());} catch (Exception e) {log.error("处理pendingList失败", e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}private void HandleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();//        创建锁对象RLock lock = redissonClient.getLock("order:" + userId);boolean res = lock.tryLock();
//        获取锁if (!res) {log.error("不能重新下单");}try {proxy.createVoucherOrder(voucherOrder);} finally {
//            释放锁lock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {String userId = UserHolder.getUser().getId().toString();long orderId = RedisWorker.nextId("order");//       1.执行LAU脚本Long result = stringRedisTemplate.execute(script, Collections.emptyList(),voucherId.toString(), userId, String.valueOf(orderId));int res = result.intValue();if (res != 0) {return Result.fail(res == 1 ? "库存不足" : "重复下单");}proxy = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);}@Override@Transactional(rollbackFor = Exception.class)public void createVoucherOrder(VoucherOrder voucherOrder) {//       1.判断用于是否已经领取Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();if (query().eq("user_id", userId).eq("voucher_id", voucherId).count() > 0) {log.error("用户已经领取过");}
//        2.扣减库存boolean res = seckillVoucherMapper.updateStock(voucherId);if (!res) {log.error("库存不足");}
//        3.创建订单save(voucherOrder);}
}

---
--- Generated by Luanalysis
--- Created by cws.
--- DateTime: 2024/5/19 15:17
---
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId= ARGV[3]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. userId-- 判断库存是否充足
if(tonumber(redis.call('get',stockKey)) <= 0) thenreturn 1
end-- 判断用户是否重复下单
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
-- 扣减库存
redis.call('incrby',stockKey,-1)
-- 添加到已售订单集合
redis.call('sadd',orderKey,userId)
-- 发送消息到队列中
redis.call('xadd','stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

如何启动时报错:io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'stream.orders' or consumer group 'g1' in XREADGROUP with GROUP option

"ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically."

在redis客户端执行:

Windows:0>XGROUP CREATE stream.orders g1 0 MKSTREAM
"OK"

 这里主要是使用了redis的消息队列,其实还可以使用RabbitMQ、RocketMQ、kafka等去整合实现,后面会更新企业级秒杀业务~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/332072.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

知识分享:隔多久查询一次网贷大数据信用报告比较好?

随着互联网金融的快速发展&#xff0c;越来越多的人开始接触和使用网络贷款。而在这个过程中&#xff0c;网贷大数据信用报告成为了评估借款人信用状况的重要依据。那么&#xff0c;隔多久查询一次网贷大数据信用报告比较好呢?接下来随小易大数据平台小编去看看吧。 首先&…

Pandas 多层索引中的索引和切片操作你学会了吗

1. Series的索引操作 对于Series来说&#xff0c;直接中括号[]与使用 .loc() 完全一样 显式索引 # 导包import numpy as npimport pandas as pd data np.random.randint(0,100,size6) index [ ["1班","1班","1班","2班","…

C++——list的实现以及源码

前言&#xff1a; 最近学习了clist的实现&#xff0c;这让我对迭代器的理解又上升了一个新的高度&#xff0c;注意&#xff1a;代码里的list是放在一个叫zgw的命名空间里边&#xff0c;但是在实现list的代码中没有加namespace&#xff0c;这里给个注意&#xff0c;以后复习时能…

颜色值进制转换

颜色值进制转换 专业的和非专业程序员在编程时都碰到过颜色值的表达式。特别是在编制网页和设计界面时&#xff0c;都要选择颜色。各语言的颜色值表达式就两种&#xff0c;十六进制的颜色值hex$和十进制的RGB格式。现成的调色板颜色表也是这两种格式。写代码时会遇到写颜色值码…

Linux-组管理和权限管理

1 Liunx组的基本介绍&#xff1a; 在Linux中的每个用户必须属于一个组&#xff0c;不能独立于组外。在Linux中每个文件都有所有者、所在组、其他组的概念 所有者所在组其它组改变用户所在的组 2 文件/目录的所有者 一般文件的创建者&#xff0c;谁创建了该文件&#xff0c;就…

垃圾回收机制及算法

文章目录 概要对象存活判断引用计数算法可达性分析算法对象是否存活各种引用 垃圾收集算法分代收集理论复制算法标记清除算法标记-整理算法 概要 垃圾收集&#xff08;Garbage Collection&#xff0c; 下文简称GC&#xff09;&#xff0c;其优缺点如下&#xff1a; 优点&#…

Shell

Linux中shell是Linux内核的一个外层保护工具&#xff0c;负责用户与内核互交。是一直命令行解析器&#xff0c;是指一直应用程序&#xff0c;且提供一个界面 还是一种编程语言. 查看当前系统的Shell 查看有哪些shell&#xff0c;用cat /etc/shells 查看当前系统默认的shell&…

二十八篇:嵌入式系统实战指南:案例研究与未来挑战

嵌入式系统实战指南&#xff1a;案例研究与未来挑战 1. 引言 1.1 嵌入式系统的重要性及其应用广度 在当今快速发展的技术领域中&#xff0c;嵌入式系统扮演着至关重要的角色。这些系统是专门设计的计算机硬件和软件的组合&#xff0c;旨在执行特定任务&#xff0c;如控制、监…

牛马真的沉默了,入职第一天就干活

入职第一天就干活的&#xff0c;就问还有谁&#xff0c;搬来一台N手电脑&#xff0c;第一分钟开机&#xff0c;第二分钟派活&#xff0c;第三分钟干活&#xff0c;巴适。。。。。。 打开代码发现问题不断 读取配置文件居然读取两个配置文件&#xff0c;一个读一点&#xff0c;…

Leetcode算法题笔记(3)

目录 矩阵101. 生命游戏解法一解法二 栈102. 移掉 K 位数字解法一 103. 去除重复字母解法一 矩阵 101. 生命游戏 根据 百度百科 &#xff0c; 生命游戏 &#xff0c;简称为 生命 &#xff0c;是英国数学家约翰何顿康威在 1970 年发明的细胞自动机。 给定一个包含 m n 个格子…

计算机网络——TCP 协议的三次握手 / 四次挥手

简述 TCP / UDP 协议都是传输层的协议。 UDP 是面向无连接的协议&#xff0c;就是说发送端不在乎消息数据是否传输到接收端了&#xff0c;所以会出现数据丢失的情况&#xff0c;所以可靠性也不高。 TCP 是面向连接的、可靠的、基于字节流的传输层协议。所谓面向连接的&#…

机器学习算法手撕(一):KD树

import math import matplotlib.pyplot as pltclass Node:def __init__(self, data, leftNone, rightNone):self.data dataself.left leftself.right right# 创建KDTree类 class KDTree:def __init__(self, k):self.k kdef create_tree(self,dataset,depth):if not dataset…

SpringBoot使用rsa-encrypt-body-spring-boot实现接口加解密

废话不多说&#xff0c;直接上代码 引入依赖 <dependency><groupId>cn.shuibo</groupId><artifactId>rsa-encrypt-body-spring-boot</artifactId><version>1.0.1.RELEASE</version> </dependency>配置文件 rsa:encrypt:# 是…

电表远传抄表是什么?

1.电表远传抄表&#xff1a;简述 电表远传抄表&#xff0c;又称为远程控制自动抄表系统&#xff0c;是电力行业的智能化技术运用&#xff0c;它通过无线或通信网络技术&#xff0c;完成对电表数据信息的远程收集解决。此项技术不仅提升了抄水表高效率&#xff0c;降低了人工偏…

Java订餐系统源码 springboot点菜系统源码

Java订餐系统源码 springboot点菜系统源码 源码下载地址&#xff1a;https://download.csdn.net/download/xiaohua1992/89341358 功能介绍&#xff1a; 前台登录&#xff1a;前台登录&#xff1a; ①首页&#xff1a;菜品信息推荐、菜品信息展示、查看更多 ②菜品信息&…

【剑指offer】2.2编程语言(p22-p25)——面试题1:string赋值运算函数

本节博客是对阅读剑指offer后的笔记归纳总结&#xff0c;有需要借鉴即可。 目录 1.p21-p25内容概要2.询问语法概念常考&#xff1a;CPP关键字理解举例&#xff1a;sizeof空类 3.分析代码举例&#xff1a;类中拷贝构造的无限递归问题 4.写代码常考点&#xff1a;类内成员函数、迭…

keycloakAsana SSO对接配置

说明&#xff1a;Keycloak与Asana单点登录对接&#xff0c;Keycloak做IDP&#xff0c;Asana做SP&#xff1b; 一、环境信息 操作系统&#xff1a;ubuntu keycloak&#xff1a;21.1.2 Asana enterprise&#xff1b;更新apt软件包索引&#xff1a; sudo apt update检查是否已安…

数组-最接近给出数字的三数之和

题目描述 解题思路 这里使用三层for循环&#xff0c;暴力解法穷举所有三个数和的可能性&#xff0c;注意三层循环里的索引不要重复。 代码实现 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返…

Introduction of Internet 计算机网络概述

计算机网络的概念 计算机网络的定义&#xff1a; 多台独立的计算机通过通信线路实现资源共享的计算机系统 计算机网络的组成 资源子网&#xff1a;提供共享的软件资源和硬件资源 通信子网&#xff1a;提供信息交换的网络结点和通信线路 计算机网络类型 按照拓扑排序 星型…

Transformer详解(1)-结构解读

Transormer块主要由四个部分组成&#xff0c;注意力层、位置感知前馈神经网络、残差连接和层归一化。 1、注意力层(Multi-Head Attention) 使用多头注意力机制整合上下文语义&#xff0c;它使得序列中任意两个单词之间的依赖关系可以直接被建模而不基于传统的循环结构&#…