黑马点评项目学习笔记
文章目录
- 黑马点评项目学习笔记
- 前言
- 项目搭建
- 导入数据库
- 初始化项目
- 启动项目
- 启动前端项目
- 启动后端项目
- 基于Session实现短信验证码登录
- 短信验证码登录
- 配置登录拦截器
- 数据脱敏
- Session集群共享问题
- 基于Redis实现短信验证码登录
- 短信验证登录
- 配置登录拦截器
- 店铺数据查询
- 根据 id 查询商铺缓存
- 查询店铺类型
- 数据一致性问题
- 缓存主动更新策略的实现
- 缓存穿透的解决方案
- 缓存雪崩的解决方案
- 缓存击穿的解决方案
- 基于互斥锁解决缓存击穿
- 基于逻辑过期解决缓存击穿
- 缓存工具类的封装
- 小结
- 优惠券秒杀
- 自增ID存在的问题
- 分布式ID的实现
- 优惠券秒杀接口实现
- 单体下一人多单超卖问题
- 乐观锁解决一人多单超卖问题
- 单体下的一人一单超卖问题
- 悲观锁解决超卖问题
- 集群下的一人一单超卖问题
- 分布式锁
- 分布式锁解决超卖问题
- 分布式锁优化
- 分布式锁优化1
- 分布式锁优化2
- Redisson
- Redisson实现分布式锁
- 可重入锁的原理
- 测试锁的可重入性
- 源码流程解析
- 测试主从节点锁的一致性
- 编码实现可重入锁
- 秒杀优化
- 异步秒杀优化
- 分析
- 实现
- 测试
- 消息队列优化
- 分析
- 实现
- 测试
- 达人探店
- 发布探店笔记
- 查看探店笔记
- Set实现点赞功能
- SortedSet实现点赞排行榜
- 好友关注
- 关注和取关
- Set实现共同关注
- Feed流关注推送
- 分析
- 实现
- 附近商铺搜索
- GEO数据结构
- 附近商户搜索
- 用户签到和连续签到统计
- BitMap基本使用
- 签到功能
- 签到统计
- UV统计
- HyperLogLog用法
- 实现UV统计
- Bug记录
- 结语
前言
当前笔记是第二遍学习Redis实战篇所写的,第一次学习要在1月份前了,当时刚学完SpringBoot练手项目瑞吉外卖,然后放松之时刷到B站UP主 鱼皮 的视频,他推荐大家如果学完SpringBoot后可以尝试着学习这门Redis课程。我也有幸知道了B站还有一门这么牛X的课程,于是当时一股脑连着肝了两周把这么这门课程看完了。当时是第一次接触到Redis,被这么课程而深深打动了,不愧被广大B友称之为B站最有含金量的一门课程,里面不仅大量运用了Lambda表达式、还有大量高并发相关知识点,感触最大的还是对于各种Redis相关知识的应用,比如如何解决数据不一致问题,如何解决缓存穿透、缓存击穿、缓存雪崩(这个只给出了解决思路,没有具体的解决方案),以及分布式下秒杀常见问题及其解决方案,如何使用 Jmeter 进行压测,还有Redis中各类数据结构的应用场景,如何应用等等。
当时学起来还是有一点吃力的,对于一些代码为什么这么写,优点不明所以,多线程下锁的使用,如何确保事务不失效,常用工具类Hutol和Utils的使用,然后我就去学习了一些 JUC 相关知识,后面也就是6月份,我开始打算重新学习一遍,争取把每一行代码的用意以及为什么要怎么写要搞懂,同时也算是为面试做准备,因为我打算在简历上项目中大量运用Redis,于是再次历时一周重新学习了一遍,这一遍,我把每一个版本的迭代都使用git打上了tag,这样我就能够随时看到我想要的了,同时重新调整了一下项目,按照阿里巴巴开发规约让整个项目显得更加规范。这一次我也没有像之前跟着老师敲,而是自己敲一遍,不懂的直接看PPT、看老师提供的代码,对于一些模糊的知识点我就直接看视频复习一遍
相关推荐:
黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目****
Redis基础篇
Lua快速入门笔记
自动化完成1000个用户的登录并获取token并生成tokens.txt文件
hm-dianping: Redis练手项目:黑马点评 (gitee.com):本仓库共计打了26个tag,你可以随时查看某一时段的版本,如果觉得对你有帮助,欢迎start
最后,在此也感谢黑马、感谢鱼皮、感谢B站发弹幕提醒我的人、感谢互联网上那些写下相关文章的人,是你们让我遇到了这么课程,是你们让我能够解决中途遇到的bug,是你们让我最终成功完成这篇文章,文章共计 15万字符,如果觉得本文对你有所帮助,欢迎三连(点赞👍+收藏⭐+关注💖),如果文中存在错误或者描述不当的地方,恳请告知博主,博主将不胜感激,如果本文存在侵权,也恳请及时告知,博主将立刻修改
项目搭建
导入数据库
备注:SQL文件在项目的Resource目录下
初始化项目
-
Step1:导入项目
略……
-
Step2:配置好IDEA环境(这一步很重要)
这一步主要是检查 JDK、Maven、编码三个方面
1)JDK保障是1.8
2)项目编码保障是UTF-8
3)Maven版本不低于3.4
4)项目路径不要出现中文
-
Step3:加载Maven依赖,直接刷新Maven
温馨提示:建议配置阿里云镜像,不会的可以参考这篇文章 一文带你快速上手项目开发神器Maven_知识汲取者的博客-CSDN博客
-
Step4:更改配置
将application.yml文件中的账号密码换成自己的,比如MySQL的账号密码,Redis的地址和密码。
关于Redis的基础使用可以参考我都这篇文章:Redis基础篇_知识汲取者的博客-CSDN博客
启动项目
启动前端项目
-
启动前端项目
解压前端项目,然后进入Dos命令窗口
输入
start nginx.exe
启动NginxNginx是后台启动,我们直接打开浏览器访问
http://localhost:8080/
,如果看到以下页面则代表前端项目启动成功
启动后端项目
-
启动后端项目
方式一:
方式二:
然后访问打开浏览器访问
http://localhost:8081/shop/1
,如果能够看到以下页面就说明后端项目启动成功了注意:首先要保障数据库中的 shop 表中存在 id 为1的这条数据
基于Session实现短信验证码登录
短信验证码登录
/*** 发送验证码*/@Overridepublic Result sendCode(String phone, HttpSession session) {// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、手机号合法,生成验证码,并保存到Session中String code = RandomUtil.randomNumbers(6);session.setAttribute(SystemConstants.VERIFY_CODE, code);// 3、发送验证码log.info("验证码:{}", code);return Result.ok();}/*** 用户登录*/@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();String code = loginForm.getCode();// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、判断验证码是否正确String sessionCode = (String) session.getAttribute(LOGIN_CODE);if (code == null || !code.equals(sessionCode)) {return Result.fail("验证码不正确");}// 3、判断手机号是否是已存在的用户User user = this.getOne(new LambdaQueryWrapper<User>().eq(User::getPassword, phone));if (Objects.isNull(user)) {// 用户不存在,需要注册user = createUserWithPhone(phone);}// 4、保存用户信息到Session中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)session.setAttribute(LOGIN_USER, user);return Result.ok();}/*** 根据手机号创建用户*/private User createUserWithPhone(String phone) {User user = new User();user.setPhone(phone);user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));this.save(user);return user;}
配置登录拦截器
登录拦截器:
public class LoginInterceptor implements HandlerInterceptor {/*** 前置拦截器,用于判断用户是否登录*/@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {HttpSession session = request.getSession();// 1、判断用户是否存在User user = (User) session.getAttribute(LOGIN_USER);if (Objects.isNull(user)){// 用户不存在,直接拦截response.setStatus(HttpStatus.HTTP_UNAUTHORIZED);return false;}// 2、用户存在,则将用户信息保存到ThreadLocal中,方便后续逻辑处理// 比如:方便获取和使用用户信息,session获取用户信息是具有侵入性的ThreadLocalUtls.saveUser(user);return HandlerInterceptor.super.preHandle(request, response, handler);}
}
配置完拦截器后,还需要将我们自定义的拦截器添加到SpringMVC的拦截器列表中,才能生效:
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {// 添加登录拦截器registry.addInterceptor(new LoginInterceptor())// 设置放行请求.excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**");}
}
数据脱敏
- 封装UserDTO,返回给前端的
Entity
数据使用BeanUtil
工具类转成DTO - 存储到ThreadLocal中的数据也进行数据托名
@Data
public class UserDTO {private Long id;private String nickName;private String icon;
}
Session集群共享问题
-
什么是Session集群共享问题?
在分布式集群环境中,会话(Session)共享是一个常见的挑战。默认情况下,Web 应用程序的会话是保存在单个服务器上的,当请求不经过该服务器时,会话信息无法被访问。
-
Session集群共享问题造成哪些问题?
- 服务器之间无法实现会话状态的共享。比如:在当前这个服务器上用户已经完成了登录,Session中存储了用户的信息,能够判断用户已登录,但是在另一个服务器的Session中没有用户信息,无法调用显示没有登录的服务器上的服务
-
如何解决Session集群共享问题?
-
方案一:Session拷贝(不推荐)
Tomcat提供了Session拷贝功能,通过配置Tomcat可以实现Session的拷贝,但是这会增加服务器的额外内存开销,同时会带来数据一致性问题
-
方案二:Redis缓存(推荐)
Redis缓存具有Session存储一样的特点,基于内存、存储结构可以是key-value结构、数据共享
-
-
Redis缓存相较于传统Session存储的优点:
- 高性能和可伸缩性:Redis 是一个内存数据库,具有快速的读写能力。相比于传统的 Session 存储方式,将会话数据存储在 Redis 中可以大大提高读写速度和处理能力。此外,Redis 还支持集群和分片技术,可以实现水平扩展,处理大规模的并发请求。
- 可靠性和持久性:Redis 提供了持久化机制,可以将内存中的数据定期或异步地写入磁盘,以保证数据的持久性。这样即使发生服务器崩溃或重启,会话数据也可以被恢复。
- 丰富的数据结构:Redis 不仅仅是一个键值存储数据库,它还支持多种数据结构,如字符串、列表、哈希、集合和有序集合等。这些数据结构的灵活性使得可以更方便地存储和操作复杂的会话数据。
- 分布式缓存功能:Redis 作为一个高效的缓存解决方案,可以用于缓存会话数据,减轻后端服务器的负载。与传统的 Session 存储方式相比,使用 Redis 缓存会话数据可以大幅提高系统的性能和可扩展性。
- 可用性和可部署性:Redis 是一个强大而成熟的开源工具,有丰富的社区支持和活跃的开发者社区。它可以轻松地与各种编程语言和框架集成,并且可以在多个操作系统上运行。
PS:但是Redis费钱,而且增加了系统的复杂度
基于Redis实现短信验证码登录
从前面的分析来看,显然Redis是要优于Session的,但是Redis中有很多数据结构,我们应该选择哪种数据结构来存储用户信息才能够更优呢?可能大多数同学都会想到使用 String 类型的数据据结构,但是这里我推荐使用 Hash结构!
- Hash 结构与 String 结构类型的比较:
- String 数据结构是以 JSON 字符串的形式保存,更加直观,操作也更加简单,但是 JSON 结构会有很多非必须的内存开销,比如双引号、大括号,内存占用比 Hash 更高
- Hash 数据结构是以 Hash 表的形式保存,可以对单个字段进行CRUD,更加灵活
- Redis替代Session需要考虑的问题:
- 选择合适的数据结构,了解 Hash 比 String 的区别
- 选择合适的key,为key设置一个业务前缀,方便区分和分组,为key拼接一个UUID,避免key冲突防止数据覆盖
- 选择合适的存储粒度,对于验证码这类数据,一般设置TTL为3min即可,防止大量缓存数据的堆积,而对于用户信息这类数据可以稍微设置长一点,比如30min,防止频繁对Redis进行IO操作
短信验证登录
/*** 发送验证码** @param phone* @param session* @return*/@Overridepublic Result sendCode(String phone, HttpSession session) {// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、手机号合法,生成验证码,并保存到Redis中String code = RandomUtil.randomNumbers(6);stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code,RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);// 3、发送验证码log.info("验证码:{}", code);return Result.ok();}/*** 用户登录** @param loginForm* @param session* @return*/@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();String code = loginForm.getCode();// 1、判断手机号是否合法if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式不正确");}// 2、判断验证码是否正确String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);if (code == null || !code.equals(redisCode)) {return Result.fail("验证码不正确");}// 3、判断手机号是否是已存在的用户User user = this.getOne(new LambdaQueryWrapper<User>().eq(User::getPhone, phone));if (Objects.isNull(user)) {// 用户不存在,需要注册user = createUserWithPhone(phone);}// 4、保存用户信息到Redis中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);// 将对象中字段全部转成string类型,StringRedisTemplate只能存字符串类型的数据Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));String token = UUID.randomUUID().toString(true);String tokenKey = LOGIN_USER_KEY + token;stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);return Result.ok(token);}/*** 根据手机号创建用户并保存** @param phone* @return*/private User createUserWithPhone(String phone) {User user = new User();user.setPhone(phone);user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));this.save(user);return user;}
配置登录拦截器
单独配置一个拦截器用户刷新Redis中的token:在基于Session实现短信验证码登录时,我们只配置了一个拦截器,这里需要另外再配置一个拦截器专门用户刷新存入Redis中的 token,因为我们现在改用Redis了,为了防止用户在操作网站时突然由于Redis中的 token 过期,导致直接退出网站,严重影响用户体验。那为什么不把刷新的操作放到一个拦截器中呢,因为之前的那个拦截器只是用来拦截一些需要进行登录校验的请求,对于哪些不需要登录校验的请求是不会走拦截器的,刷新操作显然是要针对所有请求比较合理,所以单独创建一个拦截器拦截一切请求,刷新Redis中的Key
登录拦截器:
public class LoginInterceptor implements HandlerInterceptor {/*** 前置拦截器,用于判断用户是否登录*/@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 判断当前用户是否已登录if (ThreadLocalUtls.getUser() == null){// 当前用户未登录,直接拦截response.setStatus(HttpStatus.HTTP_UNAUTHORIZED);return false;}// 用户存在,直接放行return true;}
}
刷新token的拦截器:
public class RefreshTokenInterceptor implements HandlerInterceptor {// new出来的对象是无法直接注入IOC容器的(LoginInterceptor是直接new出来的)// 所以这里需要再配置类中注入,然后通过构造器传入到当前类中private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1、获取token,并判断token是否存在String token = request.getHeader("authorization");if (StrUtil.isBlank(token)){// token不存在,说明当前用户未登录,不需要刷新直接放行return true;}// 2、判断用户是否存在String tokenKey = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);if (userMap.isEmpty()){// 用户不存在,说明当前用户未登录,不需要刷新直接放行return true;}// 3、用户存在,则将用户信息保存到ThreadLocal中,方便后续逻辑处理,比如:方便获取和使用用户信息,Redis获取用户信息是具有侵入性的UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);ThreadLocalUtls.saveUser(BeanUtil.copyProperties(userMap, UserDTO.class));// 4、刷新token有效期stringRedisTemplate.expire(token, LOGIN_USER_TTL, TimeUnit.MINUTES);return true;}
}
将自定义的拦截器添加到SpringMVC的拦截器表中,使其生效:
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {// new出来的对象是无法直接注入IOC容器的(LoginInterceptor是直接new出来的)// 所以这里需要再配置类中注入,然后通过构造器传入到当前类中@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {// 添加登录拦截器registry.addInterceptor(new LoginInterceptor())// 设置放行请求.excludePathPatterns("/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**","/voucher/**").order(1); // 优先级默认都是0,值越大优先级越低// 添加刷新token的拦截器registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);}
}
店铺数据查询
-
什么是缓存?
缓存就是数据交换的缓冲区(称作Cache [ kæʃ ] ),是存贮数据的临时地方,一般读写性能较高。
根据 id 查询商铺缓存
/*** 根据id查询商铺数据** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、从Redis中查询店铺数据String shopJson = stringRedisTemplate.opsForValue().get(key);Shop shop = null;// 2、判断缓存是否命中if (StrUtil.isNotBlank(shopJson)) {// 2.1 缓存命中,直接返回店铺数据shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 2.2 缓存未命中,从数据库中查询店铺数据shop = this.getById(id);// 4、判断数据库是否存在店铺数据if (Objects.isNull(shop)) {// 4.1 数据库中不存在,返回失败信息return Result.fail("店铺不存在");}// 4.2 数据库中存在,写入Redis,并返回店铺数据stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));return Result.ok(shop);}
对于店铺的详细数据,这种数据变化比较大,店家可能会随时修改店铺的相关信息(比如宣传语,店铺名等),所以对于这类变动较为频繁的数据,我们是直接存入Redis中,并且设置合适的有效期(后面还会进行优化,确保Redis和MySQL的数据一致性,以及解决缓存常见的三大问题)
查询店铺类型
对于店铺类型数据,一般变动会比较小,所以这里我们直接将店铺类型的数据持久化存储到Redis中
/*** 查询店铺的类型** @return*/@Overridepublic Result queryTypeList() {// 1、从Redis中查询店铺类型String key = CACHE_SHOP_TYPE_KEY + UUID.randomUUID().toString(true);String shopTypeJSON = stringRedisTemplate.opsForValue().get(key);List<ShopType> typeList = null;// 2、判断缓存是否命中if (StrUtil.isNotBlank(shopTypeJSON)) {// 2.1 缓存命中,直接返回缓存数据typeList = JSONUtil.toList(shopTypeJSON, ShopType.class);return Result.ok(typeList);}// 2.1 缓存未命中,查询数据库typeList = this.list(new LambdaQueryWrapper<ShopType>().orderByAsc(ShopType::getSort));// 3、判断数据库中是否存在该数据if (Objects.isNull(typeList)) {// 3.1 数据库中不存在该数据,返回失败信息return Result.fail("店铺类型不存在");}// 3.2 店铺数据存在,写入Redis,并返回查询的数据stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(typeList),CACHE_SHOP_TYPE_TTL, TimeUnit.MINUTES);return Result.ok(typeList);}
数据一致性问题
缓存的使用降低了后端负载,提高了读写的效率,降低了响应的时间,这么看来缓存是不是就一本万利呢?答案是否定的!并不是说缓存有这么多优点项目中就可以无脑使用缓存了,我们还需要考虑缓存带来的问题,比如:缓存的添加提高了系统的维护成本,同时也带来了数据一致性问题……总的来讲,系统使用引入缓存需要经过前期的测试、预算,判断引入缓存后带来的价值是否会超过引入缓存带来的代价
那么我们该如何解决数据一致性问题呢?首先我们需要明确数据一致性问题的主要原因是什么,从主要原因入手才是解决问题的关键!数据一致性的根本原因是 缓存和数据库中的数据不同步,那么我们该如何让 缓存 和 数据库 中的数据尽可能的即时同步?这就需要选择一个比较好的缓存更新策略了
-
常见的缓存更新策略:
-
内存淘汰(全自动)。利用Redis的内存淘汰机制实现缓存更新,Redis的内存淘汰机制是当Redis发现内存不足时,会根据一定的策略自动淘汰部分数据
Redis中常见的淘汰策略:
- noeviction(默认):当达到内存限制并且客户端尝试执行写入操作时,Redis 会返回错误信息,拒绝新数据的写入,保证数据完整性和一致性
- allkeys-lru:从所有的键中选择最近最少使用(Least Recently Used,LRU)的数据进行淘汰。即优先淘汰最长时间未被访问的数据
- allkeys-random:从所有的键中随机选择数据进行淘汰
- volatile-lru:从设置了过期时间的键中选择最近最少使用的数据进行淘汰
- volatile-random:从设置了过期时间的键中随机选择数据进行淘汰
- volatile-ttl:从设置了过期时间的键中选择剩余生存时间(Time To Live,TTL)最短的数据进行淘汰
-
超时剔除(半自动)。手动给缓存数据添加TTL,到期后Redis自动删除缓存
-
主动更新(手动)。手动编码实现缓存更新,在修改数据库的同时更新缓存
-
双写方案(Cache Aside Pattern):人工编码方式,缓存调用者在更新完数据库后再去更新缓存。使用困难,灵活度高。
1)读取(Read):当需要读取数据时,首先检查缓存是否存在该数据。如果缓存中存在,直接返回缓存中的数据。如果缓存中不存在,则从底层数据存储(如数据库)中获取数据,并将数据存储到缓存中,以便以后的读取操作可以更快地访问该数据。
2)写入(Write):当进行数据写入操作时,首先更新底层数据存储中的数据。然后,根据具体情况,可以选择直接更新缓存中的数据(使缓存与底层数据存储保持同步),或者是简单地将缓存中与修改数据相关的条目标记为无效状态(缓存失效),以便下一次读取时重新加载最新数据
使用双写方案需要考虑以下几个问题:
是使用更新缓存模式还是使用删除缓存模式?
-
更新缓存模式:每次更新数据库都更新缓存,无效写操作较多(不推荐使用)
假如我们执行上百次更新数据库操作,那么就要执行上百次写入缓存的操作,而在这期间并没有查询请求,那么这上百次写入缓存的操作就显得没有什么意义
-
删除缓存模式:更新数据时更新数据库并删除缓存,查询时更新缓存,无效写操作较少(推荐使用)
选择使用删除缓存模式,那么是先操作缓存还是先操作数据库?
-
先操作缓存:先删缓存,再更新数据库(不推荐使用,详细原因看P38)
当线程1删除缓存到更新数据库之间的时间段,会有其它线程进来查询数据,由于没有加锁,且前面的线程将缓存删除了,这就导致请求会直接打到数据库上,给数据库带来巨大压力。这个事件发生的概率很大,因为缓存的读写速度块,而数据库的读写较慢。
这种方式的不足之处:存在缓存击穿问题,且概率较大
-
先操作数据库:先更新数据库,再删缓存(推荐使用,详细原因看P38)
当线程1在查询缓存且未命中,此时线程1查询数据,查询完准备写入缓存时,由于没有加锁线程2乘虚而入,线程2在这期间对数据库进行了更新,此时线程1将旧数据返回了,出现了脏读,这个事件发生的概率很低,因为先是需要满足缓存未命中,且在写入缓存的那段事件内有一个线程进行更新操作,缓存的查询很快,这段空隙时间很小,所以出现脏读现象的概率也很低
这种方式的不足之处:存在脏读现象,但概率较小
选择先更新数据库,再删除缓存。那么如何保证缓存与数据库的操作的原子性(同时成功或失败)?
- 对于单体系统1,将缓存与数据库操作放在同一个事务中(当前项目就是一个单体项目,所以选择这种方式)
- 对于分布式系统2,利用TCC(Try-Confirm-Cancel)等分布式事务方案
-
-
-
读写穿透方案(Read/Write Through Pattern):将读取和写入操作首先在缓存中执行,然后再传播到数据存储
1)读取穿透(Read Through):当进行读取请求时,首先检查缓存。如果所请求的数据在缓存中找到,直接返回数据。如果缓存中没有找到数据,则将请求转发给数据存储以获取数据。获取到的数据随后存储在缓存中,然后返回给调用者。
2)写入穿透(Write Through):当进行写入请求时,首先将数据写入缓存。缓存立即将写操作传播到数据存储,确保缓存和数据存储之间的数据保持一致。这样保证了后续的读取请求从缓存中返回更新后的数据。
-
写回方案(Write Behind Caching Pattern):调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
1)读取(Read):先检查缓存中是否存在数据,如果不存在,则从底层数据存储中获取数据,并将数据存储到缓存中。
2)写入(Write):先更新底层数据存储,然后将待写入的数据放入一个缓存队列中。在适当的时机,通过批量操作或异步处理,将缓存队列中的数据写入底层数据存储
-
-
-
主动更新策略中三种方案的比较:
- 双写方案 和 读写穿透方案 在写入数据时都会直接更新缓存,以保持缓存和底层数据存储的一致性。而 写回方案 延迟了缓存的更新操作,将数据先放入缓存队列,然后再进行批量或异步写入。
- 读写穿透方案 和 写回方案 相比,写回方案 具有更高的写入性能,因为它通过批量和异步操作减少了频繁的写入操作。但是 写回方案 带来了数据一致性的考虑,需要确保缓存和底层数据存储在某个时间点上保持一致,而 读写穿透方案 将数据库和缓存整合为一个服务,由服务来维护缓存与数据库的一致性,调用者无需关心数据一致性问题,降低了系统的可维护性,但是实现困难
-
主动更新策略中三种方案的应用场景:
- 双写方案 较适用于读多写少的场景,数据的一致性由应用程序主动管理
- 读写穿透方案 适用于数据实时性要求较高、对一致性要求严格的场景
- 写回方案 适用于追求写入性能的场景,对数据的实时性要求相对较低、可靠性也相对低
-
更新策略的应用场景:
- 对于低一致性需求,可以使用内存淘汰机制。例如店铺类型数据的查询缓存
- 对于高一致性需求,可以采用主动更新策略,并以超时剔除作为兜底方案。例如店铺详情数据查询的缓存
缓存主动更新策略的实现
上一节,我们了解了数据一致性问题,并了解了如何解决数据一致性问题的几种常见策略,最终经过我们的讨论得出采用缓存主动更新来解决数据一致性问题,是相较于其它两种方案更好的选择,同时也选择使用双写方案的删除缓存模式来减少线程安全问题发生的概率,采用TTL过期+内存淘汰机制作为兜底方案,同时将缓存和数据库的操作放到同一个事务来保障操作的原子性,现在就让我们来通过下面的案例进行实现
在启动类添加@EnableTransactionManagement
注解开启事务,
然后使用缓存主动更新策略(采用删除缓存模式,并且先操作数据库再操作缓存,同时添加事务保证数据库操作和缓存操作的原子性)解决数据一致性问题:
/*** 根据id查询商铺数据(查询时,重建缓存)** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、从Redis中查询店铺数据String shopJson = stringRedisTemplate.opsForValue().get(key);Shop shop = null;// 2、判断缓存是否命中if (StrUtil.isNotBlank(shopJson)) {// 2.1 缓存命中,直接返回店铺数据shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 2.2 缓存未命中,从数据库中查询店铺数据shop = this.getById(id);// 4、判断数据库是否存在店铺数据if (Objects.isNull(shop)) {// 4.1 数据库中不存在,返回失败信息return Result.fail("店铺不存在");}// 4.2 数据库中存在,重建缓存,并返回店铺数据stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);}/*** 更新商铺数据(更新时,更新数据库,删除缓存)** @param shop* @return*/@Transactional@Overridepublic Result updateShop(Shop shop) {// 参数校验, 略// 1、更新数据库中的店铺数据boolean f = this.updateById(shop);if (!f){// 缓存更新失败,抛出异常,事务回滚throw new RuntimeException("数据库更新失败");}// 2、删除缓存f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());if (!f){// 缓存删除失败,抛出异常,事务回滚throw new RuntimeException("缓存删除失败");}return Result.ok();}
缓存穿透的解决方案
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
PS:本篇文章主要是记录实现代码,关于缓存穿透详情见这篇文章:Redis缓存常见问题以及解决方案
-
常见解决缓存穿透的解决方案:
-
缓存空对象
-
优点:实现简单,维护方便
-
缺点:额外的内存消耗,可能造成短期的不一致
-
-
布隆过滤
- 优点:内存占用较少,没有多余key
- 缺点:实现复杂,存在误判可能(有穿透的风险),无法删除数据
上面两种方式都是被动的解决缓存穿透方案,此外我们还可以采用主动的方案预防缓存穿透,比如:增强id的复杂度避免被猜测id规律、做好数据的基础格式校验、加强用户权限校验、做好热点参数的限流
-
这里使用方案一(缓存空对象)解决缓存穿透问题:
/*** 根据id查询商铺数据** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、从Redis中查询店铺数据String shopJson = stringRedisTemplate.opsForValue().get(key);Shop shop = null;// 2、判断缓存是否命中if (StrUtil.isNotBlank(shopJson)) {// 2.1 缓存命中,直接返回店铺数据shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 2.2 缓存未命中,判断缓存中查询的数据是否是空字符串(isNotBlank把null和空字符串给排除了)if (Objects.nonNull(shopJson)){// 2.2.1 当前数据是空字符串(说明该数据是之前缓存的空对象),直接返回失败信息return Result.fail("店铺不存在");}// 2.2.2 当前数据是null,则从数据库中查询店铺数据shop = this.getById(id);// 4、判断数据库是否存在店铺数据if (Objects.isNull(shop)) {// 4.1 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);return Result.fail("店铺不存在");}// 4.2 数据库中存在,重建缓存,并返回店铺数据stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);}
第一次查询数据库中和缓存中不存在的数据,请求经过了数据库,但是缓存了空字符串:
第二次查询(短期内),发先请求未经过数据库,通过Debug的方式可以发现请求直接再 2.2.1 处返回了
缓存雪崩的解决方案
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
PS:本篇文章主要是记录实现代码,关于缓存雪崩详情见这篇文章:Redis缓存常见问题以及解决方案
- 缓存雪崩的常见解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略,比如快速失败机制,让请求尽可能打不到数据库上
- 给业务添加多级缓存
缓存击穿的解决方案
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
PS:本篇文章主要是记录实现代码,关于缓存击穿详情见这篇文章:Redis缓存常见问题以及解决方案
-
缓存击穿的常见解决方案:
- 互斥锁(时间换空间)
- 优点:内存占用小,一致性高,实现简单
- 缺点:性能较低,容易出现死锁
- 逻辑过期(空间换时间)
- 优点:性能高
- 缺点:内存占用较大,容易出现脏读
两者相比较,互斥锁更加易于实现,但是容易发生死锁,且锁导致并行变成串行,导致系统性能下降,逻辑过期实现起来相较复杂,且需要耗费额外的内存,但是通过开启子线程重建缓存,使原来的同步阻塞变成异步,提高系统的响应速度,但是容易出现脏读
- 互斥锁(时间换空间)
基于互斥锁解决缓存击穿
/*** 根据id查询商铺数据** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、从Redis中查询店铺数据,并判断缓存是否命中Result result = getShopFromCache(key);if (Objects.nonNull(result)) {// 缓存命中,直接返回return result;}try {// 2、缓存未命中,需要重建缓存,判断能否能够获取互斥锁String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if (!isLock) {// 2.1 获取锁失败,已有线程在重建缓存,则休眠重试Thread.sleep(50);return queryById(id);}// 2.2 获取锁成功,判断缓存是否重建,防止堆积的线程全部请求数据库(所以说双检是很有必要的)result = getShopFromCache(key);if (Objects.nonNull(result)) {// 缓存命中,直接返回return result;}// 3、从数据库中查询店铺数据,并判断数据库是否存在店铺数据Shop shop = this.getById(id);if (Objects.isNull(shop)) {// 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);return Result.fail("店铺不存在");}// 4、数据库中存在,重建缓存,响应数据stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);}catch (Exception e){throw new RuntimeException("发生异常");} finally {// 5、释放锁(释放锁一定要记得放在finally中,防止死锁)unlock(key);}}/*** 从缓存中获取店铺数据* @param key* @return*/private Result getShopFromCache(String key) {String shopJson = stringRedisTemplate.opsForValue().get(key);// 判断缓存是否命中if (StrUtil.isNotBlank(shopJson)) {// 缓存数据有值,说明缓存命中了,直接返回店铺数据Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 判断缓存中查询的数据是否是空字符串(isNotBlank把 null 和 空字符串 给排除了)if (Objects.nonNull(shopJson)) {// 当前数据是空字符串,说明缓存也命中了(该数据是之前缓存的空对象),直接返回失败信息return Result.fail("店铺不存在");}// 缓存未命中(缓存数据既没有值,又不是空字符串)return null;}/*** 获取锁** @param key* @return*/private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);// 拆箱要判空,防止NPEreturn BooleanUtil.isTrue(flag);}/*** 释放锁** @param key*/private void unlock(String key) {stringRedisTemplate.delete(key);}
备注:
-
这里使用Redis中的
setnx
指令实现互斥锁,只有当值不存在时才能进行set
操作 -
锁的有效期更具体业务有关,需要灵活变动,一般锁的有效期是业务处理时长10~20倍
-
线程获取锁后,还需要查询缓存(也就是所谓的双检),这样才能够真正有效保障缓存不被击穿
测试
通过 Jmeter 进行压测,在5秒内发送2000个请求(qps高达400,也就是每秒钟发送400个请求),最终吞吐量是331.3(也就是每秒钟处理134.8个请求),系统性能还是比较好的
基于逻辑过期解决缓存击穿
所谓的逻辑过期,类似于逻辑删除,并不是真正意义上的过期,而是新增一个字段,用来标记key的过期时间,这样能能够避免key过期而被自动删除,这样数据就永不过期了,从根本上解决因为热点key过期导致的缓存击穿。一般搞活动时,比如抢优惠券,秒杀等场景,请求量比较大就可以使用逻辑过期,等活动一过就手动删除逻辑过期的数据
创建一个逻辑过期数据类:
@Data
public class RedisData {/*** 过期时间*/private LocalDateTime expireTime;/*** 缓存数据*/private Object data;
}
ShopService钟的代码:
/*** 缓存重建线程池*/public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);/*** 根据id查询商铺数据** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、从Redis中查询店铺数据,并判断缓存是否命中String shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(shopJson)) {// 1.1 缓存未命中,直接返回失败信息return Result.fail("店铺数据不存在");}// 1.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);// 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段JSONObject data = (JSONObject) redisData.getData();Shop shop = JSONUtil.toBean(data, Shop.class);LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {// 当前缓存数据未过期,直接返回return Result.ok(shop);}// 2、缓存数据已过期,获取互斥锁,并且重建缓存String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if (isLock) {// 获取锁成功,开启一个子线程去重建缓存CACHE_REBUILD_EXECUTOR.submit(() -> {try {this.saveShopToCache(id, CACHE_SHOP_LOGICAL_TTL);} finally {unlock(lockKey);}});}// 3、获取锁失败,再次查询缓存,判断缓存是否重建(这里双检是有必要的)shopJson = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(shopJson)) {// 3.1 缓存未命中,直接返回失败信息return Result.fail("店铺数据不存在");}// 3.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期redisData = JSONUtil.toBean(shopJson, RedisData.class);// 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段data = (JSONObject) redisData.getData();shop = JSONUtil.toBean(data, Shop.class);expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {// 当前缓存数据未过期,直接返回return Result.ok(shop);}// 4、返回过期数据return Result.ok(shop);}/*** 将数据保存到缓存中** @param id 商铺id* @param expireSeconds 逻辑过期时间*/public void saveShopToCache(Long id, Long expireSeconds) {// 从数据库中查询店铺数据Shop shop = this.getById(id);// 封装逻辑过期数据RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 将逻辑过期数据存入Redis中stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));}/*** 获取锁** @param key* @return*/private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);// 拆箱要判空,防止NPEreturn BooleanUtil.isTrue(flag);}/*** 释放锁** @param key*/private void unlock(String key) {stringRedisTemplate.delete(key);}
注意:逻辑过期一定要先进行数据预热,将我们热点数据加载到缓存中
数据预热可以编写一个CommandLineRunner
继承类,然后SpringBoot程序启动时就执行启动的run
方法,实现数据预热,或者像下面一样编写一个测试累,实现数据预热
/*** 预热店铺数据*/@Testpublic void testSaveShopToCache(){shopService.saveShopToCache(1L, RedisConstants.CACHE_SHOP_LOGICAL_TTL);}
备注:
- 逻辑过期时间根据具体业务而定,逻辑过期过长,会造成缓存数据的堆积,浪费内存,过短造成频繁缓存重建,降低性能,所以设置逻辑过期时间时需要实际测试和评估不同参数下的性能和资源消耗情况,可以通过观察系统的表现,在业务需求和性能要求之间找到一个平衡点
测试
先数据预热,由于预热数据的逻辑过期的时间我设置是 10s 钟,所以等待10s 后,在数据库中修改id为1的店铺数据,使用 Jmeter 进行压力测试(可以延迟一下重建缓存的子线程,比如延迟个1s钟,让效果更加明显),此时我们 11s 去查询,发现还是旧数据,等待 12s 左右再去测试,可以发现缓存数据发生了更新(因为此时缓存已经重建好了),所以这种逻辑过期的缺点很明显,容易出现脏读现象
可以发现5s钟发送2000个请求(qps是400),吞吐量是333,发先逻辑过期并没有我们想象中性能要比互斥锁方案高多少,,这不是测试的问题,而是Redis的问题,因为Redis的IO操作太快了,Redis重建缓存的几乎耗费的时间占比很少,所以说实验是检验真理的唯一标准,但是对于重建缓存耗时特别大的操作,比如批量重建大量的缓存数据,这个时候我觉得采用逻辑过期的耗时可能会比互斥锁快很多(具体我就没有测了🤣)
缓存工具类的封装
PS:方法1与方法3对应,负责常见的普通的缓存,用于解决缓存穿透;方法2与方法4对应,负责热点缓存,用于解决缓存击穿
使用工具类后,ShopServiceImpl的代码:
可以看到现在ShopServiceImpl中的代码就显得十分清爽干净了😄
/*** 根据id查询商铺数据** @param id* @return*/@Overridepublic Result queryById(Long id) {// 调用解决缓存穿透的方法
// Shop shop = cacheClient.handleCachePenetration(CACHE_SHOP_KEY, id, Shop.class,
// this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
// if (Objects.isNull(shop)){
// return Result.fail("店铺不存在");
// }// 调用解决缓存击穿的方法Shop shop = cacheClient.handleCacheBreakdown(CACHE_SHOP_KEY, id, Shop.class,this::getById, CACHE_SHOP_TTL, TimeUnit.SECONDS);if (Objects.isNull(shop)) {return Result.fail("店铺不存在");}return Result.ok(shop);}
工具类:
PS:如果看不太懂的,可以参考前面的代码,比对着来看,我注释都没有改过
@Component
@Slf4j
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}/*** 将数据加入Redis,并设置有效期** @param key* @param value* @param timeout* @param unit*/public void set(String key, Object value, Long timeout, TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), timeout, unit);}/*** 将数据加入Redis,并设置逻辑过期时间** @param key* @param value* @param timeout* @param unit*/public void setWithLogicalExpire(String key, Object value, Long timeout, TimeUnit unit) {RedisData redisData = new RedisData();redisData.setData(value);// unit.toSeconds()是为了确保计时单位是秒redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(timeout)));stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), timeout, unit);}/*** 根据id查询数据(处理缓存穿透)** @param keyPrefix key前缀* @param id 查询id* @param type 查询的数据类型* @param dbFallback 根据id查询数据的函数* @param timeout 有效期* @param unit 有效期的时间单位* @param <T>* @param <ID>* @return*/public <T, ID> T handleCachePenetration(String keyPrefix, ID id, Class<T> type,Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {String key = keyPrefix + id;// 1、从Redis中查询店铺数据String jsonStr = stringRedisTemplate.opsForValue().get(key);T t = null;// 2、判断缓存是否命中if (StrUtil.isNotBlank(jsonStr)) {// 2.1 缓存命中,直接返回店铺数据t = JSONUtil.toBean(jsonStr, type);return t;}// 2.2 缓存未命中,判断缓存中查询的数据是否是空字符串(isNotBlank把null和空字符串给排除了)if (Objects.nonNull(jsonStr)) {// 2.2.1 当前数据是空字符串(说明该数据是之前缓存的空对象),直接返回失败信息return null;}// 2.2.2 当前数据是null,则从数据库中查询店铺数据t = dbFallback.apply(id);// 4、判断数据库是否存在店铺数据if (Objects.isNull(t)) {// 4.1 数据库中不存在,缓存空对象(解决缓存穿透),返回失败信息this.set(key, "", CACHE_NULL_TTL, TimeUnit.SECONDS);return null;}// 4.2 数据库中存在,重建缓存,并返回店铺数据this.set(key, t, timeout, unit);return t;}/*** 缓存重建线程池*/public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);/*** 根据id查询数据(处理缓存击穿)** @param keyPrefix key前缀* @param id 查询id* @param type 查询的数据类型* @param dbFallback 根据id查询数据的函数* @param timeout 有效期* @param unit 有效期的时间单位* @param <T>* @param <ID>* @return*/public <T, ID> T handleCacheBreakdown(String keyPrefix, ID id, Class<T> type,Function<ID, T> dbFallback, Long timeout, TimeUnit unit) {String key = keyPrefix + id;// 1、从Redis中查询店铺数据,并判断缓存是否命中String jsonStr = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(jsonStr)) {// 1.1 缓存未命中,直接返回失败信息return null;}// 1.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期RedisData redisData = JSONUtil.toBean(jsonStr, RedisData.class);// 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段JSONObject data = (JSONObject) redisData.getData();T t = JSONUtil.toBean(data, type);LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {// 当前缓存数据未过期,直接返回return t;}// 2、缓存数据已过期,获取互斥锁,并且重建缓存String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if (isLock) {// 获取锁成功,开启一个子线程去重建缓存CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 查询数据库T t1 = dbFallback.apply(id);// 将查询到的数据保存到Redisthis.setWithLogicalExpire(key, t1, timeout, unit);} finally {unlock(lockKey);}});}// 3、获取锁失败,再次查询缓存,判断缓存是否重建(这里双检是有必要的)jsonStr = stringRedisTemplate.opsForValue().get(key);if (StrUtil.isBlank(jsonStr)) {// 3.1 缓存未命中,直接返回失败信息return null;}// 3.2 缓存命中,将JSON字符串反序列化未对象,并判断缓存数据是否逻辑过期redisData = JSONUtil.toBean(jsonStr, RedisData.class);// 这里需要先转成JSONObject再转成反序列化,否则可能无法正确映射Shop的字段data = (JSONObject) redisData.getData();t = JSONUtil.toBean(data, type);expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {// 当前缓存数据未过期,直接返回return t;}// 4、返回过期数据return t;}/*** 获取锁** @param key* @return*/private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);// 拆箱要判空,防止NPEreturn BooleanUtil.isTrue(flag);}/*** 释放锁** @param key*/private void unlock(String key) {stringRedisTemplate.delete(key);}
}
小结
为了解决数据一致性问题,我们可以选择适当的缓存更新策略:
以缓存主动更新(双写方案+删除缓存模式+先操作数据库后操作缓存+事务)为主,超时剔除为辅
- 查询时,先查询缓存,缓存命中直接返回,缓存未命中查询数据库并重建缓存,返回查询结果
- 更新时,先修改数据删除缓存,使用事务保证缓存和数据操作两者的原子性
除了会遇到数据一致性问题意外,我们还会遇到缓存穿透、缓存雪崩、缓存击穿等问题
- 对于缓存穿透,我们采用了**缓存空对象**解决
- 对于缓存雪崩,我们分别演示了互斥锁(
setnx
实现方式)和逻辑过期两种方式解决
最后我们通过抽取出一个工具类,并且利用泛型编写几个通用方法,形成最终的形式
优惠券秒杀
自增ID存在的问题
当用户抢购时,就会生成订单并保存到tb_voucher_order
这张表中,而订单表如果使用数据库自增ID就存在一些问题:
-
id的规律性太明显,容易出现信息的泄露,被不怀好意的人伪造请求
-
受单表数据量的限制,MySQL中表能够存储的数据有限,会出现分库分表的情况,id不能够一直自增
当ID规律过于明显时,存在以下一些缺点:
- 安全性问题:如果ID规律太明显,可能会使系统容易受到恶意攻击,例如暴力破解等。攻击者可以通过分析ID规律来推断出其他用户的ID,从而进行未授权的访问或操纵。
- 隐私泄露风险:如果ID规律太明显,可能导致用户的个人信息或敏感数据被曝光。攻击者可以根据规律推测出其他用户的ID,并通过这些ID获取到相应的数据,进而侵犯用户的隐私。
- 数据可预测性:当ID规律太明显时,使用这些规律的攻击者可以很轻易地猜测出其他实体(如订单、交易等)的ID。这可能破坏系统的数据安全性和防伪能力。
- 扩展性受限:如果ID规律太明显,可能会对系统的扩展性造成一定影响。当系统需要处理大量并发操作时,如果ID规律过于明显,可能导致多个操作同时对同一资源进行竞争,从而增加冲突和性能瓶颈。
- 维护困难:当ID规律太明显时,系统可能需要额外的资源和机制来保持规律的更新和变化,以确保安全性和数据完整性。这会增加系统的复杂度,并给维护带来挑战。
在MySQL中,表最多可以存储的记录数取决于多个因素,包括数据库版本、操作系统和硬件配置等。下面是一些常见的限制:
- 行数限制:在MySQL 5.7及之前的版本中,InnoDB和XtraDB存储引擎的行数限制为最大约为64亿( 2 32 − 1 2^{32}-1 232−1),即 4 , 294 , 967 , 295 4,294,967,295 4,294,967,295行。而在MySQL 8.0及以后的版本中,它们的行数限制可达到理论上的最大值,大约是1844万亿( 2 64 − 1 2^{64}-1 264−1)行。
- 数据库文件大小限制:每个InnoDB表的存储大小受到所使用文件系统的限制。对于InnoDB表,默认情况下,数据库文件的大小限制取决于操作系统和文件系统,通常在几TB或更高。但是,这也可能受到特定的操作系统和文件系统的限制。
- 硬件资源限制:实际上,表的记录数还受到可用硬件资源,如磁盘空间、内存和处理能力的限制。当数据库文件较大时,磁盘空间变得关键,而在执行查询时,内存和处理能力可影响读写性能。
业界流传是500万行。超过500万行就要考虑分表分库了。阿里巴巴《Java 开发手册》提出单表行数超过 500 万行或者单表容量超过 2GB,就需要考虑分库分表了
那么该如何解决呢?我们需要使用分布式ID(也可以叫全局唯一ID),分布式ID满足以下特点:
- 全局唯一性:分布式ID保证在整个分布式系统中唯一性,不会出现重复的标识符。这对于区分和追踪系统中的不同实体非常重要。
- 高可用性:分布式ID生成器通常被设计为高可用的组件,可以通过水平扩展、冗余备份或集群部署来确保服务的可用性。即使某个节点或组件发生故障,仍然能够正常生成唯一的ID标识符。
- 安全性:分布式ID生成器通常是独立于应用程序和业务逻辑的。它们被设计为一个单独的组件或服务,可以被各种应用程序和服务所共享和使用,使得各个应用程序之间的ID生成过程互不干扰。
- 高性能:分布式ID生成器通常要求在很短的时间内生成唯一的标识符。为了实现低延迟,设计者通常采用高效的算法和数据结构,以及优化的网络通信和存储策略。
- 递增性:分布式ID通常可以被设计成可按时间顺序排序,以便更容易对生成的ID进行索引、检索或排序操作。这对于一些场景,如日志记录和事件溯源等,非常重要。
分布式ID的实现
分布式ID的实现方式:
- UUID
- Redis自增
- 数据库自增
- snowflake算法(雪花算法)
这里我们使用自定义的方式实现:时间戳+序列号+数据库自增
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息,比如时间戳、UUID、业务关键词
- 符号位:1bit,永远为0(表示正数)
- 时间戳:31bit,以秒为单位,可以使用69年( 2 31 / 3600 / 24 / 365 ≈ 69 2^{31}/3600/24/365≈69 231/3600/24/365≈69)
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
分布式ID生成器:
@Component
public class RedisIdWorker {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200;/*** 序列化位数*/private static final int COUNT_BITS = 32;/*** 生成分布式ID* @param keyPrefix* @return*/public long nextId(String keyPrefix){// 1、生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2、生成序列号// 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long count = stringRedisTemplate.opsForValue().increment(ID_PREFIX + keyPrefix + ":" + date);// 3、拼接并返回return timestamp << COUNT_BITS | count;}public static void main(String[] args) {LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);long second = time.toEpochSecond(ZoneOffset.UTC);System.out.println("second = " + second);}
}
测试类:
@SpringBootTest
public class RedisIdWorkerTest {@Resourceprivate RedisIdWorker redisIdWorker;private ExecutorService es = Executors.newFixedThreadPool(500);/*** 测试分布式ID生成器的性能,以及可用性*/@Testpublic void testNextId() throws InterruptedException {// 使用CountDownLatch让线程同步等待CountDownLatch latch = new CountDownLatch(300);// 创建线程任务Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}// 等待次数-1latch.countDown();};long begin = System.currentTimeMillis();// 创建300个线程,每个线程创建100个id,总计生成3w个idfor (int i = 0; i < 300; i++) {es.submit(task);}// 线程阻塞,直到计数器归0时才全部唤醒所有线程latch.await();long end = System.currentTimeMillis();System.out.println("生成3w个id共耗时" + (end - begin) + "ms");}
}
优惠券秒杀接口实现
前提:需要先有优惠券,打开数据库然后添加秒杀券(普通券抢购功能前端未实现)
业务流程:
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).setSql("stock = stock -1"));if (!flag){throw new RuntimeException("秒杀券扣减失败");}// 6、秒杀成功,创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag){throw new RuntimeException("创建秒杀券订单失败");}// 返回订单idreturn Result.ok(orderId);}
单体下一人多单超卖问题
上一节我们通过分布式ID+事务成功完成了优惠券秒杀功能,并且在测试后发现逻辑跑通了,看上去已经成功的解决了秒杀优惠券功能。但是前面我们只是正常的测试,那如果换到高并发的场景下能否成功解决?现在就让我们使用 Jmeter 来进行压力测试看看吧!
经过测试,可以发现在qps400时,异常率高达95.80%,也就是说2000个请求只有84个请求正常响应,其它的1916个请求失败,这是很糟糕的,如果发生在公司的系统(特别是大公司)那将是不可估量的损失!经过多次压测,库存居然还惊现了负值!
为什么会产生超卖问题呢?
线程1查询库存,发现库存充足,创建订单,然后准备对库存进行扣减,但此时线程2和线程3也进行查询,同样发现库存充足,然后线程1执行完扣减操作后,库存变为了0,线程2和线程3同样完成了库存扣减操作,最终导致库存变成了负数!这就是超卖问题的完整流程
那么我们该如何有效防止超卖问题的发生呢,以下提供几种常见的解决方案
-
超卖问题的常见解决方案:
- 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:
synchronized
、lock
- 乐观锁,认为线程安全问题不一定发生,因此不加锁,只会在更新数据库的时候去判断有没有其它线程对数据进行修改,如果没有修改则认为是安全的,直接更新数据库中的数据即可,如果修改了则说明不安全,直接抛异常或者等待重试。常见的实现方式有:版本号法、CAS操作、乐观锁算法
- 悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:
-
悲观锁和乐观锁的比较
- 悲观锁比乐观锁的性能低:悲观锁需要先加锁再操作,而乐观锁不需要加锁,所以乐观锁通常具有更好的性能。
- 悲观锁比乐观锁的冲突处理能力低:悲观锁在冲突发生时直接阻塞其他线程,乐观锁则是在提交阶段检查冲突并进行重试。
- 悲观锁比乐观锁的并发度低:悲观锁存在锁粒度较大的问题,可能会限制并发性能;而乐观锁可以实现较高的并发度。
- 应用场景:两者都是互斥锁,悲观锁适合写入操作较多、冲突频繁的场景;乐观锁适合读取操作较多、冲突较少的场景。
拓展:CAS
CAS(Compare and Swap)是一种并发编程中常用的原子操作,用于解决多线程环境下的数据竞争问题。它是乐观锁算法的一种实现方式。
CAS操作包含三个参数:内存地址V、旧的预期值A和新的值B。CAS的执行过程如下:
- 比较(Compare):将内存地址V中的值与预期值A进行比较。
- 判断(Judgment):如果相等,则说明当前值和预期值相等,表示没有发生其他线程的修改。
- 交换(Swap):使用新的值B来更新内存地址V中的值。
CAS操作是一个原子操作,意味着在执行过程中不会被其他线程中断,保证了线程安全性。如果CAS操作失败(即当前值与预期值不相等),通常会进行重试,直到CAS操作成功为止。
CAS操作适用于精细粒度的并发控制,可以避免使用传统的加锁机制带来的性能开销和线程阻塞。然而,CAS操作也存在一些限制和注意事项:
- ABA问题:CAS操作无法感知到对象值从A变为B又变回A的情况,可能会导致数据不一致。为了解决ABA问题,可以引入版本号或标记位等机制。
- 自旋开销:当CAS操作失败时,需要不断地进行重试,会占用CPU资源。如果重试次数过多或者线程争用激烈,可能会引起性能问题。
- 并发性限制:如果多个线程同时对同一内存地址进行CAS操作,只有一个线程的CAS操作会成功,其他线程需要重试或放弃操作。
在Java中,提供了相关的CAS操作支持,如AtomicInteger
、AtomicLong
、AtomicReference
等类,可以实现基于CAS操作的线程安全操作。
乐观锁解决一人多单超卖问题
-
实现方式一:版本号法
首先我们要为 tb_seckill_voucher 表新增一个版本号字段 version ,线程1查询完库存,在进行库存扣减操作的同时将版本号+1,线程2在查询库存时,同时查询出当前的版本号,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的版本号是否是之前查询时的版本号,结果发现版本号发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)
-
实现方式二:CAS法
CAS法类似与版本号法,但是不需要另外在添加一个 version 字段,而是直接使用库存替代版本号,线程1查询完库存后进行库存扣减操作,线程2在查询库存时,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的库存是否是之前查询时的库存,结果发现库存数量发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)
综上所述,使用CAS法要更加好,能够避免额外的内存开销,下面就让我们用代码来实现吧(●ˇ∀ˇ●)
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).eq(SeckillVoucher::getStock, voucher.getStock()).setSql("stock = stock -1"));
可以看到实现起来也是相当简单,只需要添加一个eq(SeckillVoucher::getStock, voucher.getStock())
即可,现在我们再来使用 Jmeter测试
qps200,异常90%,吞吐量1652.9
这又是什么原因呢?这就是乐观锁的弊端,我们只要发现数据修改就直接终止操作了,我们只需要修改一下判断条件,即只要库存大于0就可以进行修改,而不是库存数据修改我们就终止操作
// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));
qps200,异常50%,吞吐量498.8
单体下的一人一单超卖问题
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, ThreadLocalUtls.getUser().getId()));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 4、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 5、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 6、返回订单idreturn Result.ok(orderId);}
}
现在是不是可以成功完成了一人一单的要求呢?让我们来使用 Jmeter 测一下吧
通过测试,发现并没有达到我们想象中的目标,一个人只能购买一次,但是发现一个用户居然能够购买8次。这说明还是存在超卖问题!
问题原因:出现这个问题的原因和前面库存为负数数的情况是一样的,线程1查询当前用户是否有订单,当前用户没有订单准备下单,此时线程2也查询当前用户是否有订单,由于线程1还没有完成下单操作,线程2同样发现当前用户未下单,也准备下单,这样明明一个用户只能下一单,结果下了两单,也就出现了超卖问题
解决方案:一般这种超卖问题可以使用下面两种常见的解决方案
- 悲观锁
- 乐观锁
悲观锁解决超卖问题
乐观锁需要判断数据是否修改,而当前是判断当前是否存在,所以无法像解决库存超卖一样使用CAS机制,但是可以使用版本号法,但是版本号法需要新增一个字段,所以这里为了方便,就直接演示使用悲观锁解决超卖问题
代码实现:
/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、创建订单Long userId = ThreadLocalUtls.getUser().getId();synchronized (userId.toString().intern()) {// 创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);}}/*** 创建订单** @param userId* @param voucherId* @return*/@Transactionalpublic Result createVoucherOrder(Long userId, Long voucherId) {
// synchronized (userId.toString().intern()) {// 1、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 2、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 3、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 4、返回订单idreturn Result.ok(orderId);
// }}
实现细节:
-
锁的范围尽量小。
synchronized
尽量锁代码块,而不是方法,锁的范围越大性能越低 -
锁的对象一定要是一个不变的值。我们不能直接锁
Long
类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成String
类型的 userId,toString()
方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用intern()
方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行) -
我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题
-
Spring的
@Transactional
注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional
失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。这篇文章对@Transactional注解事务失效的常见场景进行了一个总结:【Java面试篇】Spring中@Transactional注解事务失效的常见场景
让代理对象生效的步骤:
①引入AOP依赖,动态代理是AOP的常见实现之一
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
②暴露动态代理对象,默认是关闭的
@EnableAspectJAutoProxy(exposeProxy = true)
集群下的一人一单超卖问题
1)搭建集群并实现负载均衡
首先,在IDEA中启动两个SpringBoot程序,一个端口号是8081,另一个端口是8082:
然后在Nginx中配置负载均衡:
这里遇到bug2
2)准备两个接口,用于模拟集群下的多用户重复下单
在发送请求之前,现在锁的内部打一个断点
3)使用接口发送请求
由于Nginx负载均衡(轮询策略)的原因,请求1会被8081接收,请求2会被8082接收
可以看到,两者都进入了锁的内部,这个synchronized
锁形同虚设,这是由于synchronized
是本地锁,只能提供线程级别的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁,当一个线程进入被 synchronized
关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而现在我们是创建了两个节点,也就意味着有两个JVM,所以synchronized
会失效!
从而出现超卖问题:
分布式锁
- 分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
前面sychronized
锁失效的原因是由于每一个JVM都有一个独立的锁监视器,用于监视当前JVM中的sychronized
锁,所以无法保障多个集群下只有一个线程访问一个代码块。所以我们直接将使用一个分布锁,在整个系统的全局中设置一个锁监视器,从而保障不同节点的JVM都能够识别,从而实现集群下只允许一个线程访问一个代码块
-
分布式锁的特点:
- 多线程可见。
- 互斥。分布式锁必须能够确保在任何时刻只有一个节点能够获得锁,其他节点需要等待。
- 高可用。分布式锁应该具备高可用性,即使在网络分区或节点故障的情况下,仍然能够正常工作。(容错性)当持有锁的节点发生故障或宕机时,系统需要能够自动释放该锁,以确保其他节点能够继续获取锁。
- 高性能。分布式锁需要具备良好的性能,尽可能减少对共享资源的访问等待时间,以及减少锁竞争带来的开销。
- 安全性。(可重入性)如果一个节点已经获得了锁,那么它可以继续请求获取该锁而不会造成死锁。(锁超时机制)为了避免某个节点因故障或其他原因无限期持有锁而影响系统正常运行,分布式锁通常应该设置超时机制,确保锁的自动释放。
……
-
分布式锁的常见实现方式:
- 基于关系数据库:可以利用数据库的事务特性和唯一索引来实现分布式锁。通过向数据库插入一条具有唯一约束的记录作为锁,其他进程在获取锁时会受到数据库的并发控制机制限制。
- 基于缓存(如Redis):使用分布式缓存服务(如Redis)提供的原子操作来实现分布式锁。通过将锁信息存储在缓存中,其他进程可以通过检查缓存中的锁状态来判断是否可以获取锁。
- 基于ZooKeeper:ZooKeeper是一个分布式协调服务,可以用于实现分布式锁。通过创建临时有序节点,每个请求都会尝试创建一个唯一的节点,并检查自己是否是最小节点,如果是,则表示获取到了锁。
- 基于分布式算法:还可以利用一些分布式算法来实现分布式锁,例如Chubby、DLM(Distributed Lock Manager)等。这些算法通过在分布式系统中协调进程之间的通信和状态变化,实现分布式锁的功能。
-
setnx
指令的特点:setnx只能设置key不存在的值,值不存在设置成功,返回 1 ;值存在设置失败,返回 0 -
获取锁:
-
方式一:
# 添加锁 setnx [key] [value] # 为锁设置过期时间,超时释放,避免死锁 expire [key] [time]
-
方式二:这种方式更加推荐,因为将上面两个指令变成一个指令,从而保障指令的原子性3
# 添加锁 set [key] [value] ex [time] nx
-
-
释放锁:
# 释放锁(除了使用del手动释放,还可超时释放) del [key]
分布式锁解决超卖问题
由于本项目是专门学习Redis的,所以我们这里将会使用Redis的
setnx
指令实现分布式锁解决超卖问题
1)创建分布式锁:
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String id = Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent("lock:" + name, id, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.delete("lock:" + name);}
}
2)使用分布式锁。改造前面VoucherOrderServiceImpl中的代码,将之前使用sychronized
锁的地方,改成我们自己实现的分布式锁:
// 3、创建订单(使用分布式锁)Long userId = ThreadLocalUtls.getUser().getId();SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);boolean isLock = lock.tryLock(1200);if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)return Result.fail("一人只能下一单");}try {// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);} finally {lock.unlock();}
PS:完整代码参考前面
同样,重启程序,然后在postman中使用同一个用户的token发送两次请求,可以发现只有有一个用户获取锁成功
实现细节:
try...finally...
确保发生异常时锁能够释放,注意这给地方不要使用catch
,A事务方法内部调用B事务方法,A事务方法不能够直接catch,否则会导致事务失效,详情可以参考这篇文章:【Java面试篇】Spring中@Transactional注解事务失效的常见场景
分布式锁优化
分布式锁优化1
本次优化主要解决了锁超时释放出现的超卖问题
上一节,我们实现了一个简单的分布式锁,但是会存在一个问题:当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题!(但是这个在小项目(并发数不高)中出现的概率比较低,在大型项目(并发数高)情况下是有一定概率的)
备注:我们可以把锁的有效期降低一点,这样就能够测试上面哪种情况了(●’◡’●)
如何解决呢?我们为分布式锁添加一个线程标识,在释放锁时判断当前锁是否是自己的锁,是自己的就直接释放,不是自己的就不释放锁,从而解决多个线程同时获得锁的情况导致出现超卖
我们只需要修改一下锁的实现,业务代码不需要修改:
package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/public static final String KEY_PREFIX = "lock:";/*** ID前缀*/public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {// 判断 锁的线程标识 是否与 当前线程一致String currentThreadFlag = ID_PREFIX + Thread.currentThread().getId();String redisThreadFlag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (currentThreadFlag != null || currentThreadFlag.equals(redisThreadFlag)) {// 一致,说明当前的锁就是当前线程的锁,可以直接释放stringRedisTemplate.delete(KEY_PREFIX + name);}// 不一致,不能释放}
}
分布式锁优化2
本次优化主要解决了释放锁时的原子性问题。说到底也是锁超时释放的问题
在上一节中,我们通过给锁添加一个线程标识,并且在释放锁时添加一个判断,从而防止锁超时释放产生的超卖问题,一定程度上解决了超卖问题,但是仍有可能发生超卖问题(出现超卖概率更低了):当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时,但就在此时发生了阻塞,结果锁被超时释放了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题
备注:我们可以在判断删除锁的那行代码上打一个断点,然后user1发送一个请求,获取锁,手动把锁删了,模拟锁超时释放,然后使用user2发送一个请求,成功获取锁,从而模拟上诉过程,检验超卖问题
PS:虽然这个情况发生的概率较低,但是根据墨菲定律,我们最好不要抱有侥幸心理,不然最终我们会在这个细微的问题上付诸沉重的代价!你可能还会想,判断锁和释放锁在同一个方法中,并且两者之间没有别的代码,为什么会发生阻塞呢?JVM的垃圾回收机制会导致短暂的阻塞(我个人感觉这种情况发生的概率真的不高,但是我也没有实际接触过真正的大型高并发项目,所以具体也只能靠揣摩)
那么我们该如何保障 判断锁 和 释放锁 这连段代码的原子性呢?答案是使用Lua脚本,关于Lua脚本相关知识可以参考这篇文章:Lua快速入门笔记客
那么Lua脚本是如何确保原子性的呢?Redis使用(支持)相同的Lua解释器,来运行所有的命令。Redis还保证脚本以原子方式执行:在执行脚本时,不会执行其他脚本或Redis命令。这个语义类似于MULTI
(开启事务)/EXEC
(触发事务,一并执行事务中的所有命令)。从所有其他客户端的角度来看,脚本的效果要么仍然不可见,要么已经完成。
注意:虽然Redis在单个Lua脚本的执行期间会暂停其他脚本和Redis命令,以确保脚本的执行是原子的,但如果Lua脚本本身出错,那么无法完全保证原子性。也就是说Lua脚本中的Redis指令出错,会发生回滚以确保原子性,但Lua脚本本身出错就无法保障原子性
代码实现
释放锁的业务流程是这样的:
-
获取锁中的线程标示
-
判断是否与指定的标示(当前线程标示)一致
-
如果一致则释放锁(删除)
-
如果不一致则什么都不做
注意:在IDEA中编写Lua脚本,需要先下载一个Lua脚本插件**Tarantool-EmmyLua
**
编写Lua脚本:
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/7/13 16:19
---
-- 比较缓存中的线程标识与当前线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then-- 一致,直接删除return redis.call('del', KEYS[1])
end
-- 不一致,返回0
return 0
编写Java代码,使用Lua实现释放锁:
package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/private static final String KEY_PREFIX = "lock:";/*** ID前缀*/private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 加载Lua脚本*/private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}/*** 释放锁*/@Overridepublic void unlock() {// 执行lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}
业务代码不需要变动,锁的使用方法也不需要改变,因为我们之改动了锁的实现,使用方式没有改变
现在我们的分布式锁满足了:
- 多线程可见,将锁放到Redis中,所有的JVM都可以同时看到
- 互斥,
set ex nx
指令互斥 - 高可用,层层优化,即使是特别极端的情况下照样可以防止超卖
- 高性能,Redis的IO速度很快,Lua脚本的性能也很快
- 安全性,这个不用多说了,通过给锁夹线程标识+Lua封装Redis指令充分保障了线程安全,不那么容易出现并发安全问题,同时采用超时释放避免死锁
Redisson
经过优化1和优化2,我们实现的分布式锁已经达到生产可用级别了,但是还不够完善,比如:
-
分布式锁不可重入:不可重入是指同一线程不能重复获取同一把锁。比如,方法A中调用方法B,方法A需要获取分布式锁,方法B同样需要获取分布式锁,线程1进入方法A获取了一次锁,进入方法B又获取一次锁,由于锁不可重入,所以就会导致死锁
-
分布式锁不可重试:获取锁只尝试一次就返回false,没有重试机制,这会导致数据丢失,比如线程1获取锁,然后要将数据写入数据库,但是当前的锁被线程2占用了,线程1直接就结束了而不去重试,这就导致数据发生了丢失
-
分布式锁超时释放:超市释放机机制虽然一定程度避免了死锁发生的概率,但是如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。锁的有效期过短,容易出现业务没执行完就被释放,锁的有效期过长,容易出现死锁,所以这是一个大难题!
我们可以设置一个较短的有效期,但是加上一个 心跳机制 和 自动续期:在锁被获取后,可以使用心跳机制并自动续期锁的持有时间。通过定期发送心跳请求,显示地告知其他线程或系统锁还在使用中,同时更新锁的过期时间。如果某个线程持有锁的时间超过了预设的有效时间,其他线程可以尝试重新获取锁。
-
主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,线程1获取了锁
我们如果想要更进一步优化分布式锁,当然是可以的,但是没必要,除非是迫不得已,我们完全可以直接使用已经造好的轮子,比如:Redisson。Redssion是一个十分成熟的Redis框架,功能也很多,比如:分布式锁和同步器、分布式对象、分布式集合、分布式服务,各种Redis实现分布式的解决方案。简而言之Redisson就是一个使用Redis解决分布式问题的方案的集合,当然它不仅仅是解决分布式相关问题,还包含其它的一些问题。
所以说分布式锁的究极优化就是使用别人造好的轮子🤣
Redisson实现分布式锁
1)引入Redisson依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
2)配置Redisson客户端
@Configuration
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Value("${spring.redis.password}")private String password;/*** 创建Redisson配置对象,然后交给IOC管理** @return*/@Beanpublic RedissonClient redissonClient() {// 获取Redisson配置对象Config config = new Config();// 添加redis地址,这里添加的是单节点地址,也可以通过 config.userClusterServers()添加集群地址config.useSingleServer().setAddress("redis://" + this.host + ":" + this.port).setPassword(this.password);// 获取RedisClient对象,并交给IOC进行管理return Redisson.create(config);}
}
温馨提示:此外还有一种引入方式,可以引入 redission 的 starter 依赖,然后在yml文件中配置Redisson,但是不推荐这种方式,因为他会替换掉 Spring官方 提供的这套对 Redisson 的配置
3)我们只需要修改一下使用锁的地方,其它的业务代码都不需要该
// 3、创建订单(使用分布式锁)Long userId = ThreadLocalUtls.getUser().getId();RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);boolean isLock = lock.tryLock();
PS:完整代码参考前面
tryLock
方法介绍-
tryLock()
:它会使用默认的超时时间和等待机制。具体的超时时间是由 Redisson 配置文件或者自定义配置决定的。 -
tryLock(long time, TimeUnit unit)
:它会在指定的时间内尝试获取锁(等待time后重试),如果获取成功则返回 true,表示获取到了锁;如果在指定时间内(Redisson内部默认指定的)未能获取到锁,则返回 false。 -
tryLock(long waitTime, long leaseTime, TimeUnit unit)
:指定等待时间为watiTime,如果超过 leaseTime 后还没有获取锁就直接返回失败
-
总的来讲自上而下,tryLock的灵活性逐渐提高,无参tryLock时,waitTime
的默认值是-1,代表不等待,leaseTime
的默认值是30,unit
默认值是 seconds ,也就是锁超过30秒还没有释放就自动释放
可重入锁的原理
现在我们的分布式锁就具有了可重入性,现在让我们来看一下Redissson可重入锁的原理吧(●ˇ∀ˇ●)
Redisson内部释放锁,并不是直接执行del
命令将锁给删除,而是将锁以hash
数据结构的形式存储在Redis中,每次获取锁,都将value
的值-1,每次释放锁,都将value的值+1,只有锁的value值归0时才会真正的释放锁,从而确保锁的可重入性
测试锁的可重入性
@SpringBootTest
@Slf4j
public class RedissonLockTest {@Resourceprivate RedissonClient redissonClient;private RLock lock;/*** 方法1获取一次锁*/@Testvoid method1() {boolean isLock = false;// 创建锁对象lock = redissonClient.getLock("lock");try {isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败,1");return;}log.info("获取锁成功,1");method2();} finally {if (isLock) {log.info("释放锁,1");lock.unlock();}}}/*** 方法二再获取一次锁*/void method2() {boolean isLock = false;try {isLock = lock.tryLock();if (!isLock) {log.error("获取锁失败, 2");return;}log.info("获取锁成功,2");} finally {if (isLock) {log.info("释放锁,2");lock.unlock();}}}
}
源码流程解析
详情见P67
Redisson分布式锁原理:
-
如何解决可重入问题:利用hash结构记录线程id和重入次数。
-
如何解决可重试问题:利用信号量和
PubSub
功能实现等待、唤醒,获取锁失败的重试机制。 -
如何解决超时续约问题:利用
watchDog
,每隔一段时间(releaseTime / 3),重置超时时间。 -
如何解决主从一致性问题:利用Redisson的
multiLock
,多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功缺陷:运维成本高、实现复杂
源码流程示意图(这里展示的是无参构造器的源码执行流程,我们也可以看有构造器的源码执行流程):
获取剩余有效期
Redisson底层通过利用Lua脚本确保 判断锁是否存在、添加锁的有效期、添加线程标识这些的操作全部封装到了一个Lua脚本(确保了锁的原子性和可重入性)
有参构造器的源码执行流程:
底层通过信号量+发布订阅模式实现可重试机制(解决了锁的可重试性)
如何确保锁超时释放问题?线程1获取锁,由于业务阻塞导致锁超时释放了该如何解决呢?这就需要使用看门狗机制了,确保我们的业务是执行完释放,而不是阻塞释放
那么如何取消任务呢,不可能这个任务一直调用,让锁永不过期呢?
测试主从节点锁的一致性
1)注入三个RedissonClient
@Beanpublic RedissonClient redissonClient() {// 获取Redisson配置对象Config config = new Config();// 添加redis地址,这里添加的是单节点地址,也可以通过 config.userClusterServers()添加集群地址config.useSingleServer().setAddress("redis://url").setPassword(this.password);// 获取RedisClient对象,并交给IOC进行管理return Redisson.create(config);}@Beanpublic RedissonClient2 redissonClient() {// 获取Redisson配置对象Config config = new Config();// 添加redis地址,这里添加的是单节点地址,也可以通过 config.userClusterServers()添加集群地址config.useSingleServer().setAddress("redis://url").setPassword(this.password);// 获取RedisClient对象,并交给IOC进行管理return Redisson.create(config);}@Beanpublic RedissonClient3 redissonClient() {// 获取Redisson配置对象Config config = new Config();// 添加redis地址,这里添加的是单节点地址,也可以通过 config.userClusterServers()添加集群地址config.useSingleServer().setAddress("redis://url").setPassword(this.password);// 获取RedisClient对象,并交给IOC进行管理return Redisson.create(config);}
2)编写测试类:
@BeforeEachvoid setUp(){RLock lock1 = redissonClient.getLock("order");RLock lock2 = redissonClient2.getLock("order");RLock lock3 = redissonClient3.getLock("order");// 创建联锁redissonClient.getMultiLock(lock1, lock2, lock3);}
获取锁和释放锁代码都是一样的,执行代码,可以在节点1、节点2、节点3中查看到锁
编码实现可重入锁
上一节,我们使用Redisson实现分布式锁,并且了解了Redssion可重入锁的原理,现在让我们手动来实现一下吧🤭
可以看到,可重入锁需要进行一系列的逻辑判断,这些逻辑代码我们最好将它们全都封装到一个 Lua脚本 中,以确保操作的原子性,从而确保线程安全(Redisson底层也是这么干的)
1)编写获取锁的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/2/14 16:11
---
-- 获取锁的key,即: KEY_PREFIX + name
local key = KEYS[1];
-- 获取当前线程的标识, 即: ID_PREFIX + Thread.currentThread().getId()
local threadId = ARGV[1];
-- 锁的有效期
local releaseTime = ARGV[2];-- 判断缓存中是否存在锁
if (redis.call('EXISTS', key) == 0) then-- 不存在,获取锁redis.call('HSET', key, threadId, '1');-- 设置锁的有效期redis.call('EXPIRE', key, releaseTime);return 1; -- 返回1表示锁获取成功
end-- 缓存中已存在锁,判断threadId是否说自己的
if (redis.call('HEXISTS', key, threadId) == 1) then-- 是自己的锁,获取锁然后重入次数+1redis.call('HINCRBY', key, threadId, '1');-- 设置有效期redis.call('EXPIRE', key, releaseTime);return 1; -- 返回1表示锁获取成功
end-- 锁不是自己的,直接返回0,表示锁获取失败
return 0;
2)编写释放锁的Lua脚本
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/2/14 16:11
---
-- 获取锁的key,即: KEY_PREFIX + name
local key = KEYS[1];
-- 获取当前线程的标识, 即: ID_PREFIX + Thread.currentThread().getId()
local threadId = ARGV[1];
-- 锁的有效期
local releaseTime = ARGV[2];-- 判断当前线程的锁是否还在缓存中
if (redis.call('HEXISTS', key, threadId) == 0) then-- 缓存中没找到自己的锁,说明锁已过期,则直接返回空return nil; -- 返回nil,表示啥也不干
end
-- 缓存中找到了自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);-- 进一步判断是否需要释放锁
if (count > 0) then-- 重入次数大于0,说明不能释放锁,且刷新锁的有效期redis.call('EXPIRE', key, releaseTime);return nil;
else-- 重入次数等于0,说明可以释放锁redis.call('DEL', key);return nil;
end
3)编写可重入锁:
public class ReentrantLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/private static final String KEY_PREFIX = "lock:";/*** ID前缀*/private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";/*** 锁的有效期*/public long timeoutSec;public ReentrantLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 加载获取锁的Lua脚本*/private static final DefaultRedisScript<Long> TRYLOCK_SCRIPT;static {TRYLOCK_SCRIPT = new DefaultRedisScript<>();TRYLOCK_SCRIPT.setLocation(new ClassPathResource("lua/re-trylock.lua"));TRYLOCK_SCRIPT.setResultType(Long.class);}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {this.timeoutSec = timeoutSec;// 执行lua脚本Long result = stringRedisTemplate.execute(TRYLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId(),Long.toString(timeoutSec));return result != null && result.equals(1L);}/*** 加载释放锁的Lua脚本*/private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/re-unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}/*** 释放锁*/@Overridepublic void unlock() {// 执行lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId(),Long.toString(this.timeoutSec));}
}
4)编写测试类
@SpringBootTest
@Slf4j
public class ReentrantLockTest {@Resourceprivate StringRedisTemplate stringRedisTemplate;private ReentrantLock lock;/*** 方法1获取一次锁*/@Testvoid method1() {boolean isLock = false;// 创建锁对象lock = new ReentrantLock(stringRedisTemplate, "order:" + 1);try {isLock = lock.tryLock(1200);if (!isLock) {log.error("获取锁失败,1");return;}log.info("获取锁成功,1");method2();} finally {if (isLock) {log.info("释放锁,1");lock.unlock();}}}/*** 方法二再获取一次锁*/void method2() {boolean isLock = false;try {isLock = lock.tryLock(1200);if (!isLock) {log.error("获取锁失败, 2");return;}log.info("获取锁成功,2");} finally {if (isLock) {log.info("释放锁,2");lock.unlock();}}}
}
在VoucherOrderServiceImpl中修改锁的实现方式,将之前的Redssion分布式锁改成我们自己写的可重入锁
// 3、创建订单(使用分布式锁)Long userId = ThreadLocalUtls.getUser().getId();ReentrantLock lock = new ReentrantLock(stringRedisTemplate, "order:" + userId);boolean isLock = lock.tryLock(1200);if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)return Result.fail("一人只能下一单");}try {// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);} finally {lock.unlock();}
PS:完整代码参考前面
秒杀优化
最开始我们的遇到自增ID问题,我们通过实现分布式ID解决了问题;后面我们在单体系统下遇到了一人多单超卖问题,我们通过乐观锁解决了;我们对业务进行了变更,将一人多单变成了一人一单,结果在高并发场景下同一用户发送相同请求仍然出现了超卖问题,我们通过悲观锁解决了;由于用户量的激增,我们将单体系统升级成了集群,结果由于锁只能在一个JVM中可见导致又出现了,在高并发场景下同一用户发送下单请求出现超卖问题,我们通过实现分布式锁成功解决集群下的超卖问题;由于我们最开始实现的分布式锁比较简单,会出现超时释放导致超卖问题,我们通过给锁添加线程标识成功解决了;但是释放锁时,判断锁是否是当前线程 和 删除锁两个操作不是原子性的,可能导致超卖问题,我们通过将两个操作封装到一个Lua脚本成功解决了;为了解决锁的不可重入性,我们通过将锁以hash结构的形式存储,每次释放锁都value-1,获取锁value+1,从而实现锁的可重入性,并且将释放锁和获取锁的操作封装到Lua脚本中以确保原子性。最最后,我们发现可以直接使用现有比较成熟的方案Redisson来解决上诉出现的所有问题🤣,什么不可重试、不可重入、超市释放、原子性等问题Redisson都提供相对应的解决方法(。^▽^)
所以现在锁的优化基本上已经到了极致,我们现在就要对**性能**和稳定性进行进一步的优化😄
异步秒杀优化
- 同步(Synchronous)是指程序按照顺序依次执行,每一步操作完成后再进行下一步。在同步模式下,当一个任务开始执行时,程序会一直等待该任务完成后才会继续执行下一个任务。
- 异步(Asynchronous)是指程序在执行任务时,不需要等待当前任务完成,而是在任务执行的同时继续执行其他任务。在异步模式下,任务的执行顺序是不确定的,程序通过回调、事件通知等方式来获取任务执行的结果。
显然异步的性能是要高于同步的,但是会牺牲掉一定的数据一致性,所以也不是无脑用异步,要根据具体业务进行分析,这里的下单是可以使用异步的,因为下单操作比较耗时,后端操作步骤多,可以进行拆分
分析
之前秒杀业务的流程:
1秒钟1000个用户同时发起请求:
可以看到 qps1000、平均值1921、异常率37.2%、吞吐量315.3,对比优化后
可以看到这个流程是同步执行的,同步是比较耗费时间的,我们直接将同步变成异步,从而大幅提高秒杀业务的性能,具体如何做呢?我们可以将一部分的工作交给Redis,并且不能直接去调用Redis,而是通过开启一个独立的子线程去异步执行,从而大大提高效率
实现
细节
- 库存判断放到Redis中,我们应该使用哪一种数据结构存储订单的库存呢?可以直接使用
String
类型的数据结构,Redis的IO操作是单线程的,所以能够充分保障线程安全。 - 一人一单的判断也是由Redis完成的,所以我们需要在Redis中存储订单信息,而订单是唯一的,所以我们可以使用
Set
类型的数据结构 - lua脚本中,接收的参数都是String类型的,String类型的数据无法进行比较,我们需要利用
tonumber
函数将String转成Number stringRedisTemplate.execute
这个方法,第二个参数是应该List集合,标识传入Lua脚本中的的 key,如果我们没有传key,那么直接使用Collections.emptyList()
,而不是直接使用null
,是因为在 stringRedisTemplate.execute 方法内部可能对参数进行了处理,如果传递 null 可能引发NPE异常- 异步线程无法从ThreadLocal中获取userId,我们需要从voucherOrder中获取userId
AopContext.currentProxy()
底层也是利用ThreadLocal获取的,所以异步线程中也无法使用。解决方案有两种,第一种是将代理对象和订单一起放入阻塞队列中,第二种是将代理对象的作用域提升,变成一个成员变量(我采用了第二种方式)
1)编写Lua脚本:
---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/7/15 15:22
--- Description 判断库存是否充足 && 判断用户是否已下单
---
-- 优惠券id
local voucherId = ARGV[1];
-- 用户id
local userId = ARGV[2];-- 库存的key
local stockKey = 'seckill:stock:' .. voucherId;
-- 订单key
local orderKey = 'seckill:order:' .. voucherId;-- 判断库存是否充足 get stockKey > 0 ?
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then-- 库存不足,返回1return 1;
end-- 库存充足,判断用户是否已经下过单 SISMEMBER orderKey userId
if (redis.call('SISMEMBER', orderKey, userId) == 1) then-- 用户已下单,返回2return 2;
end-- 库存充足,没有下过单,扣库存、下单
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
-- 返回0,标识下单成功
return 0;
2)在VoucherOrderServiceImpl中编写Java代码:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;/*** 当前类初始化完毕就立马执行该方法*/@PostConstructprivate void init() {// 执行线程任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}/*** 存储订单的阻塞队列*/private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);/*** 线程池*/private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/*** 线程任务: 不断从阻塞队列中获取订单*/private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {// 从阻塞队列中获取订单信息,并创建订单try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常", e);}}}}/*** 创建订单** @param voucherOrder*/private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);boolean isLock = lock.tryLock();if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)log.error("一人只能下一单");return;}try {// 创建订单(使用代理对象调用,是为了确保事务生效)proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}/*** 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本*/private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}/*** VoucherOrderServiceImpl类的代理对象* 将代理对象的作用域进行提升,方面子线程取用*/private IVoucherOrderService proxy;/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {// 1、执行Lua脚本,判断用户是否具有秒杀资格Long result = null;try {result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),ThreadLocalUtls.getUser().getId().toString());} catch (Exception e) {log.error("Lua脚本执行失败");throw new RuntimeException(e);}if (result != null && !result.equals(0L)) {// result为1表示库存不足,result为2表示用户已下单int r = result.intValue();return Result.fail(r == 2 ? "不能重复下单" : "库存不足");}// 2、result为0,用户具有秒杀资格,将订单保存到阻塞队列中,实现异步下单long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);// 创建订单VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherId);// 将订单保存到阻塞队列中orderTasks.add(voucherOrder);// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();this.proxy = proxy;return Result.ok();}/*** 创建订单** @param voucherOrder* @return*/@Transactional@Overridepublic void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();// 1、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));if (count >= 1) {// 当前用户不是第一单log.error("当前用户不是第一单");return;}// 2、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 3、将订单保存到数据库flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}}
}
测试
测试一:测试同一用户在高并发场景下发送大量请求,是否发生超卖问题,或者其它线程安全问题
qps200,吞吐量1242,异常率96%,可以发现成功请求数量为 200 ∗ 4 200*4%=8 200∗4,居然有八个请求同时成功!那是不是又发生了超卖问题w(゚Д゚)w
相较于前面同步逻辑,我们添加了异步机制之后,qps由原来的
我们看看数据库和缓存中库存的情况:
可以看到虽然请求有8次成功,但是库存只减少了1,并且数据库和缓存中订单也只创建了一个。那么为什么会出现问题呢?原因很简单,请求成功不以为这下单成功。对于大量请求,SpringBoot无法同时处理大量请求,Jemter的请求只具有短期有效,SpringBoot在处理某一个请求时,其它请求由于超时直接就返回失败了,我们也可以看到哪些成功请求具有一定的规律性,基本上是每隔固定的失败请求就会有一个成功请求,只有第一个成功请求,返回是 success ,后面虽然请求成功了,但是返回的是 不能重复下单 的提示信息
测试二:测试不同用户在高并发场景下发送大量请求,是否发生超卖问题,或者其它线程安全问题
PS:关于Jmeter的Tokens文件获取,可以参考这篇文章自动化完成1000个用户的登录并获取token并生成tokens.txt文件
生成1000个用户的token文件后,直接导入到 Jmeter,然后进行压测,相当于1000个用户同时对系统进行请求
可以看到 qps1000、吞吐量455.6、异常率0%,可以看出当前系统的这一个秒杀功能已经达到了企业可用级别了😄
对比优化前,平均值由1921降低至930,吞吐量由315.3提高至445.6,异常率由37.2%降低至0%
PS:这个性能看起来比较差,这是因为我当前电脑是轻薄本,本身性能不太好,并且同时开启了两个虚拟机、QQ、微信、两个浏览器、一个Jmeter、Navicat、一个IDEA、一个ApiPost、FinalShell、RESP、有道翻译等等(老师的吞吐量直接1300😫)快去测测你的并发性能吧 (●’◡’●)
再去看看数据库和Redis中是否发生了超卖问题:
消息队列优化
分析
前面我们使用 Java 自带的阻塞队列 BlockingQueue 实现消息队列,这种方式存在以下几个严重的弊端:
- 信息可靠性没有保障,BlockingQueue 的消息是存储在内存中的,无法进行持久化,一旦程序宕机或者发生异常,会直接导致消息丢失
- 消息容量有限,BlockingQueue 的容量有限,无法进行有效扩容,一旦达到最大容量限制,就会抛出OOM异常
所以这里我们可以选择采用其它成熟的的(和之前分布式锁一样)MQ,比如:RabbitMQ、RocketMQ、Kafka等,但是本项目是为了学习Redis而设计的,所以这里我们将要学习如何使用Redis实现一个相对可靠的消息队列(自己实现的肯定没法和别人成熟的产品相比)
PS:关于消息队列可以参考这篇文章RabbitMQ入门学习笔记
那么我们该如何实现呢?首先我们需要了解MQ的特点(这里不再赘述了,MQ详情看上面那篇文章),根据MQ的特点选取相对应的数据结构,Redis中能够实现MQ效果的主要由以下三种方式:
-
list
结构:基于List结构模拟消息队列(BRPOP+BLPOP
实现阻塞队列)-
生产消息:
BRPUSH key value [value ...]
将一个或多个元素推入到指定列表的头部。如果列表不存在,BRPUSH命令会自动创建一个新的列表 -
消费消息:
BRPOP key [key ...] timeout
从指定的一个或多个列表中弹出最后一个元素。如果 list 列表为空,BRPOP命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间
优点:不会内存超限、可以持久化、消息有序性;缺点:无法避免数据丢失、只支持单消费者
-
-
pubsub
:发布订阅模式,基本的点对点消息模型(redis2.0引入)-
生产消息:
# 用于向指定频道发布一条消息 PUBLISH channel message
-
消费消息:
# 订阅一个或多个频道 SUBSCRIBE channel [channel] # 用于取消订阅一个或多个频道 UNSUBSCRIBE [channel [channel ...]] # 用于订阅一个或多个符合给定模式的频道,接收消息 PSUBSCRIBE pattern [pattern ...] # 用于取消订阅一个或多个符合给定模式的频道 PUNSUBSCRIBE [pattern [pattern ...]]
优点:支持多生产、多消费者;缺点:不支持持久化、无法比避免数据丢失,消息堆积有上限(消费者会缓存消息),超出会丢失消息
-
-
stream
:比较完善的消息队列模型(redis5.0引入,我的是redis6.0😄)stream是一种数据类型,专门为消息队列设计的,相较于前面两种方式能够更加完美实现一个消息队列
-
生产消息:用于向指定的Stream流中添加一个消息
XADD key *|ID value [value ...]# 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID 127.0.0.1:6379> XADD users * name jack age 21 "1644805700523-0"
key就是消息队列,key不存(*)在会自动创建(默认),ID是消息表示,value是消息的内容
-
消费消息:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID# 读取XREAD中的第一条消息 XREAD COUNT 1 STREAMS users 0 # 阻塞1秒钟后从XREAD中读取的最新消息 XREAD COUNT 1 BLOCK 1000 STREAMS users $
注意:当我们指定起始ID为
$
时代表读取最后一条消息(读取最新的消息)ID为0
时代表读最开始的一条消息(读取最旧的消息),如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
优点:消息可回溯、一个消息可以被多个消费者消费、可以阻塞读取;缺点:有消息漏读的风险
-
上面我们介绍的消费方式都是单消费方式,容易发生消息堆积导致消息丢失,所以我们需要改用消费者组的模式
-
消费者组(Consumer Group):将多个消息划分到一个组中,监听同一队列
-
消费者组的特点:
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于
pending
(待处理)状态,并存入一个pending-list
。当处理完成后需要通过XACK
来确认消息,标记消息为已处理,才会从pending-list移除。
# 创建消费者组
XGROUP CREATE key groupName ID
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
# 删除消费者组中指定消费者
XGROUP DELCONSUMER key groupName consumerName
# 从消费者组中读取消息
XREADGROUP GROUP
stream
类型消息队列的XREADGROUP
命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
实现
1)创建队列。在Redis CLI 中执行下面指令
# 创建队列(消费者组模式)
XGROUP CREATE stream.orders g1 0 MKSTREAM
我的建议是直接创建一个任务类,在Java代码中实现创建队列
/*** 当前类初始化完毕就立马执行该方法*/@PostConstructprivate void init() {// 创建消息队列DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));mqScript.setResultType(Long.class);Long result = null;try {result = stringRedisTemplate.execute(mqScript,Collections.emptyList(),QUEUE_NAME,GROUP_NAME);} catch (Exception e) {log.error("队列创建失败", e);return;}int r = result.intValue();String info = r == 1 ? "队列创建成功" : "队列已存在";log.debug(info);// 执行线程任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}
备注:当时我还想着使用CommandLineRunner
来初始化队列,但是发现CommandLineRunner
的初始化时机在@PostConstruct
之后,这就导致队列还未初始化,就已经执行线程任务获取队列中的消息,从而报错,所以干脆直接放到执行线程之前
2)在VoucherOrderServiceImpl中编写Java代码:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;/*** 当前类初始化完毕就立马执行该方法*/@PostConstructprivate void init() {// 执行线程任务SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}/*** 线程池*/private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/*** 队列名*/private static final String queueName = "stream.orders";/*** 线程任务: 不断从消息队列中获取订单*/private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1、从消息队列中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order >List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2、判断消息获取是否成功if (messageList == null || messageList.isEmpty()) {// 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息continue;}// 3、消息获取成功,可以下单// 将消息转成VoucherOrder对象MapRecord<String, Object, Object> record = messageList.get(0);Map<Object, Object> messageMap = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);// 4、ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);// 处理异常消息handlePendingList();}}}}private void handlePendingList() {while (true) {try {// 1、从pendingList中获取订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),StreamOffset.create(queueName, ReadOffset.from("0")));// 2、判断pendingList中是否有效性if (messageList == null || messageList.isEmpty()) {// 2.1 pendingList中没有消息,直接结束循环break;}// 3、pendingList中有消息// 将消息转成VoucherOrder对象MapRecord<String, Object, Object> record = messageList.get(0);Map<Object, Object> messageMap = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);// 4、ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);// 这里不用调自己,直接就进入下一次循环,再从pendingList中取,这里只需要休眠一下,防止获取消息太频繁try {Thread.sleep(20);} catch (InterruptedException ex) {log.error("线程休眠异常", ex);}}}}/*** 创建订单** @param voucherOrder*/private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);boolean isLock = lock.tryLock();if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)log.error("一人只能下一单");return;}try {// 创建订单(使用代理对象调用,是为了确保事务生效)proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}/*** 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的Lua脚本*/private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}/*** VoucherOrderServiceImpl类的代理对象* 将代理对象的作用域进行提升,方面子线程取用*/private IVoucherOrderService proxy;/*** 抢购秒杀券** @param voucherId* @return*/@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = ThreadLocalUtls.getUser().getId();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);// 1、执行Lua脚本,判断用户是否具有秒杀资格Long result = null;try {result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));} catch (Exception e) {log.error("Lua脚本执行失败");throw new RuntimeException(e);}if (result != null && !result.equals(0L)) {// result为1表示库存不足,result为2表示用户已下单int r = result.intValue();return Result.fail(r == 2 ? "不能重复下单" : "库存不足");}// 2、result为0,下单成功,直接返回ok// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();this.proxy = proxy;return Result.ok();}/*** 创建订单** @param voucherOrder* @return*/@Transactional@Overridepublic void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();// 1、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));if (count >= 1) {// 当前用户不是第一单log.error("当前用户不是第一单");return;}// 2、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 3、将订单保存到数据库flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}}
}
测试
和前面的测试一样,测试1000个人在一秒内同时下单,观察是否发生超卖,并且记录系统性能
可以看到数据库和缓存都没有发生超卖问题(●ˇ∀ˇ●)
qps1000、平均值889、异常率0%、吞吐量474.2,可以看到stream
实现的消息队列并没有比Java的BlockingQueue
实现的消息队列高多少性能,但是stream实现的信息队列比BlockingQueue实现的消息队列的可靠性和灵活性要高一大截
期间遇到bug3、bug4
达人探店
发布探店笔记
这个直接已经实现了,不用作任何修改即可使用,并且没用使用到Redis
温馨提示:记得修改图片上传地址,建议直接放到你的Ngixn下的imgs目录
查看探店笔记
这个功能和发布博客功能一样,也是一个非重点功能,没有使用到Redis
在编写查询博客详情接口的同时,重构了一下查询热点博客的接口
/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)){return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);return Result.ok(blog);}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(this::queryUserByBlog);return Result.ok(records);}/*** 查询博客相关用户信息* @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
Set实现点赞功能
现在存在一个问题,一个用户可以无限点赞,这显然是不合理的,所以我们需要对点赞功能进行一个优化,实现一人只能点赞一次。
对于点赞这种高频变化的数据,如果我们使用MySQL是十分不理智的,因为MySQL慢、并且并发请求MySQL会影响其它重要业务,容易影响整个系统的性能,继而降低了用户体验。那么如何我们要使用Redis,那么我们又该选择哪种数据结构才更加合理呢?
这里我推荐使用Set
,因为Set类型的数据结构具有
- 不重复,符合业务的特点,一个用户只能点赞一次
- 高性能,Set集合内部实现了高效的数据结构(Hash表)
- 灵活性,Set集合可以实现一对多,一个用户可以点赞多个博客,符合实际的业务逻辑
当然也可以选择使用Hash
(Hash占用空间比Set更小),如果想要点赞排序也可以选用Sorted Set
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();// sismember key valueBoolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());boolean result;if (BooleanUtil.isFalse(isMember)) {// 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存 sadd key valuestringRedisTemplate.opsForSet().add(key, userId.toString());}} else {// 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 srem key valuestringRedisTemplate.opsForSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}
SortedSet实现点赞排行榜
在平常我们所使用的软件中(比如微信、QQ、抖音)的点赞功能都会默认按照时间顺序对点赞的用户进行一个排序,后点赞的用户会排在最前面,而Set
是无需的,无法满足这个需求,虽然 List
有序,但是不唯一,查找效率也比较低,所以也不推荐使用,此时我们就可以选择使用SortedSet
这个数据结构,它完美的满足了我们所有的需求:唯一、有序、查找效率高。
相较于Set集合,SortedList有以下不同之处:
- 对于Set集合我们可以使用
isMember
方法判断用户是否存在,对于SortedList我们可以使用ZSCORE
方法判断用户是否存在 - Set集合没有提供范围查询,无法获排行榜前几名的数据,SortedList可以使用
ZRANGE
方法实现范围查询
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {UserDTO user = ThreadLocalUtls.getUser();if (Objects.isNull(user)){// 当前用户未登录,无需查询点赞return;}Long userId = user.getId();String key = BLOG_LIKED_KEY + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(Objects.nonNull(score));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 1、判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;// zscore key valueDouble score = stringRedisTemplate.opsForZSet().score(key, userId.toString());boolean result;if (score == null) {// 1.1 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存 zadd key value scorestringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());}} else {// 1.2 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 zrem key valuestringRedisTemplate.opsForZSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询所有点赞博客的用户** @param id* @return*/@Overridepublic Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}
如果我们的需求是,先点赞的排在前面,后点赞的排在后面该如何实现?这就需要涉及到MySQL的一些相关知识了,在MySQL中如果我们使用in
进行条件查询,我们的查询默认是数据库顺序查询,数据库中的记录默认都是按照ID自增的,所以查出来的结果默认是按照ID自增排序的
解决方法:
select id, phone,password,nick_name,icon,create_time,update_time
from tb_user
where id in(1, 5)
order by field(id, 5, 1)
注意:根据某一个字段进行排序,oder by字段的id顺序即为查询的顺序
/*** 查询所有点赞博客的用户** @param id* @return*/@Overridepublic Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);// 根据id降序排序 select * from tb_user where id in(1,5) order by field(id, 1, 5)List<UserDTO> userDTOList = userService.list(new LambdaQueryWrapper<User>().in(User::getId, ids).last("order by field (id," + idStr + ")")).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}
好友关注
关注和取关
@RestController
@RequestMapping("/follow")
public class FollowController {@Resourceprivate IFollowService followService;/*** 关注用户* @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@PutMapping("/{id}/{isFollow}")public Result follow(@PathVariable("id") Long followUserId, @PathVariable Boolean isFollow){return followService.follow(followUserId, isFollow);}/*** 是否关注用户* @param followUserId 关注用户的id* @return*/@GetMapping("/or/not/{id}")public Result isFollow(@PathVariable("id") Long followUserId){return followService.isFollow(followUserId);}
}
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {/*** 关注用户** @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);this.save(follow);} else {// 用户已关注,删除关注信息this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}
}
Set实现共同关注
我们想要查询出两个用户的共同关注对象,这就需要使用求交集,对于求交集,我们可以使用Set集合
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate IUserService userService;/*** 关注用户** @param followUserId 关注用户的id* @param isFollow 是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();String key = FOLLOW_KEY + userId;if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);boolean isSuccess = this.save(follow);if (isSuccess) {// 用户关注信息保存成功,把关注的用户id放入Redis的Set集合中,stringRedisTemplate.opsForSet().add(key, followUserId.toString());}} else {// 用户已关注,删除关注信息boolean isSuccess = this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));if (isSuccess) {stringRedisTemplate.opsForSet().remove(key, followUserId.toString());}}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}/*** 查询共同关注** @param id* @return*/@Overridepublic Result followCommons(Long id) {Long userId = ThreadLocalUtls.getUser().getId();String key1 = FOLLOW_KEY + userId;String key2 = FOLLOW_KEY + id;// 查询当前用户与目标用户的共同关注对象Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);if (Objects.isNull(intersect) || intersect.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());// 查询共同关注的用户信息List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}
}
遇到bug5
Feed流关注推送
分析
-
什么是Feed流?
关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。Feed流是一种基于用户个性化需求和兴趣的信息流推送方式,常见于社交媒体、新闻应用、音乐应用等互联网平台。Feed流通过算法和用户行为数据分析,动态地将用户感兴趣的内容以流式方式呈现在用户的界面上。
-
Feed流产品有两种常见模式:
-
时间排序(
Timeline
):不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈-
优点:信息全面,不会有缺失。并且实现也相对简单
-
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
-
-
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
-
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
-
缺点:如果算法不精准,可能起到反作用
-
本例中的个人页面,是基于关注的好友来做Feed流,因此采用
Timeline
的模式。该模式的实现方案有三种:-
拉模式:也叫做读扩散。在拉模式中,终端用户或应用程序主动发送请求来获取最新的数据流。它是一种按需获取数据的方式,用户可以在需要时发出请求来获取新数据。在Feed流中,数据提供方将数据发布到实时数据源中,而终端用户或应用程序通过订阅或请求来获取新数据。
优点:节约空间,可以减少不必要的数据传输,只需要获取自己感兴趣的数据,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
缺点:延迟较高,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
-
推模式:也叫做写扩散。在推模式中,数据提供方主动将最新的数据推送给终端用户或应用程序。数据提供方会实时地将数据推送到终端用户或应用程序,而无需等待请求。
优点:数据延迟低,不用临时拉取
缺点:内存耗费大,假设一个大V写信息,很多人关注他, 就会写很多份数据到粉丝那边去
-
推拉结合:也叫做读写混合,兼具推和拉两种模式的优点。在推拉结合模式中,数据提供方会主动将最新的数据推送给终端用户或应用程序,同时也支持用户通过拉取的方式来获取数据。这样可以实现实时的数据更新,并且用户也具有按需获取数据的能力。推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息
-
实现
1)基于模式实现关注推送功能
当前项目用户量比较小,所以这里我们选择使用推模式,延迟低、内存占比也没那么大
由于我们需要实现分页查询功能,这里我们可以选择 list
或者 SortedSet
,而不能使用Set
,因为Set
是无需的, list
是有索引的,SortedSet
是有序的,那么我们该如何选择呢?
如果我们选择 list
会存在索引漂移现象(这个在Vue中也存在),从而导致读取重复数据,所以我们不能选择使用 list
我们可以选择使用滚动分页,我们使用SortedSet
,如果使用排名和使用角标是一样的,但是SortedSet
可以按照Score
排序(Score默认按照时间戳生成,所以是固定的),每次我们可以选择比之前Score较小的,这样就能够实现滚动排序,从而防止出现问题
代码实现:在BlogServiceImpl中修改原有的保存探店笔记的方法:
/*** 保存探店笔记** @param blog* @return*/@Overridepublic Result saveBlog(Blog blog) {Long userId = ThreadLocalUtls.getUser().getId();blog.setUserId(userId);// 保存探店笔记boolean isSuccess = this.save(blog);if (!isSuccess){return Result.fail("笔记保存失败");}// 查询笔记作者的所有粉丝List<Follow> follows = followService.list(new LambdaQueryWrapper<Follow>().eq(Follow::getFollowUserId, userId));// 将笔记推送给所有的粉丝for (Follow follow : follows) {// 获取粉丝的idLong id = follow.getUserId();// 推送笔记String key = FEED_KEY + id;stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}return Result.ok(blog.getId());}
2)实现关注推送页面的分页查询
代码实现:
/*** 关注推送页面的笔记分页** @param max* @param offset* @return*/@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {// 1、查询收件箱Long userId = ThreadLocalUtls.getUser().getId();String key = FEED_KEY + userId;// ZREVRANGEBYSCORE key Max Min LIMIT offset countSet<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);// 2、判断收件箱中是否有数据if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok();}// 3、收件箱中有数据,则解析数据: blogId、minTime(时间戳)、offsetList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0; // 记录当前最小值int os = 1; // 偏移量offset,用来计数for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2// 获取idids.add(Long.valueOf(tuple.getValue()));// 获取分数(时间戳)long time = tuple.getScore().longValue();if (time == minTime) {// 当前时间等于最小时间,偏移量+1os++;} else {// 当前时间不等于最小时间,重置minTime = time;os = 1;}}// 4、根据id查询blog(使用in查询的数据是默认按照id升序排序的,这里需要使用我们自己指定的顺序排序)String idStr = StrUtil.join(",", ids);List<Blog> blogs = this.list(new LambdaQueryWrapper<Blog>().in(Blog::getId, ids).last("ORDER BY FIELD(id," + idStr + ")"));// 设置blog相关的用户数据,是否被点赞等属性值for (Blog blog : blogs) {// 查询blog有关的用户queryUserByBlog(blog);// 查询blog是否被点赞isBlogLiked(blog);}// 5、封装并返回ScrollResult scrollResult = new ScrollResult();scrollResult.setList(blogs);scrollResult.setOffset(os);scrollResult.setMinTime(minTime);return Result.ok(scrollResult);}
附近商铺搜索
GEO数据结构
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
-
GEOADD
:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member) -
GEODIST
:计算指定的两个点之间的距离并返回 -
GEOHASH
:将指定member的坐标转为hash字符串形式并返回 -
GEOPOS
:返回指定member的坐标 -
GEORADIUS
:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.2以后已废弃 -
GEOSEARCH
:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能 -
GEOSEARCHSTORE
:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
练习
# 添加坐标数据
GEOADD g1 116.378248 39.865275 bjnz 116.42803 39.903738 bjz 116.322287 39.893729 bjxz
# 计算北京西站到北京站的距离
GEODIST g1 bjnz bjxz km
# 搜索天安门附近10km内的所有火车站,并按照距离升序排序
GEOSEARCH g1 FROMLONLAT 116.397904 39.909005 BYRADIUS 10 km WITHDIST
备注:
- 一定要登录Redis,如果有密码一定要输入密码登录,否则添加数据会报错
unauthenticated multibulk length
GEODIST
计算距离,默认的单位是米
附近商户搜索
细节:
- GEO存储经度(longitude)和维度(latitude)还有值(member),为了节约内存,我们在memboer中值存储店铺id
- 由于前端传来一个type参数,但是GEO没有type数据,所以我们按照商铺类型进行分组,类型相同的商户分为一组,以typeId作为key同时存入一个GEO集合中
1)数据预热。将店铺数据按照 typeId 批量存入Redis
/*** 预热店铺数据,按照typeId进行分组,用于实现附近商户搜索功能*/@Testpublic void loadShopListToCache() {// 1、获取店铺数据List<Shop> shopList = shopService.list();// 2、根据 typeId 进行分类
// Map<Long, List<Shop>> shopMap = new HashMap<>();
// for (Shop shop : shopList) {
// Long shopId = shop.getId();
// if (shopMap.containsKey(shopId)){
// // 已存在,添加到已有的集合中
// shopMap.get(shopId).add(shop);
// }else{
// // 不存在,直接添加
// shopMap.put(shopId, Arrays.asList(shop));
// }
// }// 使用 Lambda 表达式,更加优雅(优雅永不过时)Map<Long, List<Shop>> shopMap = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));// 3、将分好类的店铺数据写入redisfor (Map.Entry<Long, List<Shop>> shopMapEntry : shopMap.entrySet()) {// 3.1 获取 typeIdLong typeId = shopMapEntry.getKey();List<Shop> values = shopMapEntry.getValue();// 3.2 将同类型的店铺的写入同一个GEO ( GEOADD key 经度 维度 member )String key = SHOP_GEO_KEY + typeId;// 方式一:单个写入(这种方式,一个请求一个请求的发送,十分耗费资源,我们可以进行批量操作)
// for (Shop shop : values) {
// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()),
// shop.getId().toString());
// }// 方式二:批量写入List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();for (Shop shop : values) {locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(), shop.getY())));}stringRedisTemplate.opsForGeo().add(key, locations);}}
2)在ShopServiceImpl中编写查询代码
@Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {// 1.判断是否需要根据坐标查询if (x == null || y == null) {// 不需要坐标查询,按数据库查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}// 2.计算分页参数int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = current * SystemConstants.DEFAULT_PAGE_SIZE;// 3.查询redis、按照距离排序、分页。结果:shopId、distanceString key = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE.search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));// 4.解析出idif (results == null) {return Result.ok(Collections.emptyList());}List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();if (list.size() <= from) {// 没有下一页了,结束return Result.ok(Collections.emptyList());}// 4.1.截取 from ~ end的部分List<Long> ids = new ArrayList<>(list.size());Map<String, Distance> distanceMap = new HashMap<>(list.size());list.stream().skip(from).forEach(result -> {// 4.2.获取店铺idString shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));// 4.3.获取距离Distance distance = result.getDistance();distanceMap.put(shopIdStr, distance);});// 5.根据id查询ShopString idStr = StrUtil.join(",", ids);List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}// 6.返回return Result.ok(shops);}
注意:还需要再Controller层中新增两个参数,经度x和纬度y
备注:前端数据是写死的,无论是哪一个用户,发送的地理坐标都是一样的,数据库中的店铺数据的坐标也是也是假的
期间遇到bug6
用户签到和连续签到统计
BitMap基本使用
Redis中是利用string类型数据结构实现BitMap**,**因此最大上限是512M,转换为bit则是 2 32 2^{32} 232个bit位。
BitMap的操作命令有:
-
SETBIT
:向指定位置(offset)存入一个0或1 -
GETBIT
:获取指定位置(offset)的bit值 -
BITCOUNT
:统计BitMap中值为1的bit位的数量 -
BITFIELD
:操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值 -
BITFIELD_RO
:获取BitMap中bit数组,并以十进制形式返回 -
BITOP
:将多个BitMap的结果做位运算(与 、或、异或) -
BITPOS
:查找bit数组中指定范围内第一个0或1出现的位置
示例一:总共7天,前三天(星期一到星期三)签到,中间两天不签到(星期四和星期五),后面三天又全都签到
SETBIT key offset value
示例二:查看数据
# 读取所有的bit位
BITFIELD key
# 查找第一个数出现的位置
BITPOS key value
BITPOS bm1 1 # 返回的就是0,11100111 offset位0的位置就是1
BITPOS bm1 0 # 返回的就是0,11100111 offset位0的位置就是1
# 读取指定位数的bit位
BITFIELD key GET type offset
# 获取的数据是3
BITFIELD bm1 get u2 0
备注:u
表示无符号,i
表示有符号,这里读取到3,是因为u2表示获取两个bit位,0表示从0开始计数,前面我们存入的数据是11100111,从0开始计数,往后数两个bit位 就是 11 ,11代表的数字就是3,如果是 BITFIELD bm1 get u3 0 对应的就是111,代表的数据就是7,BITFIELD bm1 get u5 0对应的数字也是7(11100)
签到功能
注意:Redis客户端(RESP),必须使用2020以前版本,或者2022.2之后的版本,2021不支持二进制数据的展示
/*** 用户签到** @return*/@Overridepublic Result sign() {// 获取当前登录用户Long userId = ThreadLocalUtls.getUser().getId();// 获取日期LocalDateTime now = LocalDateTime.now();// 拼接keyString keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;// 获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();// 写入Redis SETBIT key offset 1stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);return Result.ok();}
由于这个功能并没有前端实现,所以这里采用接口测试的方法:
签到统计
-
问题1:什么叫做连续签到天数?
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
-
问题2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
-
问题3:如何从后向前遍历每个bit位?
与 1 做与运算,就能得到最后一个bit位。随后右移1位,下一个bit位就成为了最后一个bit位。
/*** 记录连续签到的天数** @return*/@Overridepublic Result signCount() {// 1、获取签到记录// 获取当前登录用户Long userId = ThreadLocalUtls.getUser().getId();// 获取日期LocalDateTime now = LocalDateTime.now();// 拼接keyString keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;// 获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();// 获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0List<Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));// 2、判断签到记录是否存在if (result == null || result.isEmpty()) {// 没有任何签到结果return Result.ok(0);}// 3、获取本月的签到数(List<Long>是因为BitFieldSubCommands是一个子命令,可能存在多个返回结果,这里我们知识使用了Get,// 可以明确只有一个返回结果,即为本月的签到数,所以这里就可以直接通过get(0)来获取)Long num = result.get(0);if (num == null || num == 0) {// 二次判断签到结果是否存在,让代码更加健壮return Result.ok(0);}// 4、循环遍历,获取连续签到的天数(从当前天起始)int count = 0;while (true) {// 让这个数字与1做与运算,得到数字的最后一个bit位,并且判断这个bit位是否为0if ((num & 1) == 0) {// 如果为0,说明未签到,结束break;} else {// 如果不为0,说明已签到,计数器+1count++;}// 把数字右移一位,抛弃最后一个bit位,继续下一个bit位num >>>= 1;}return Result.ok(count);}
由于这个功能并没有前端实现,所以这里采用接口测试的方法,此时由于只有一天起到,所以这里只会显示连续签到1天
我们可以通过SETBIT
命令进行补签,然后再来统计一下签到天数
# 1号签到
SETBIT sign:2:202307 0 1
# 2号签到
SETBIT sign:2:202307 0 1
# 3号签到
SETBIT sign:2:202307 0 1
# 21号签到
SETBIT sign:2:202307 20 1
# 20号签到
SETBIT sign:2:202307 19 1
现在我们当前是22号,当前连续签到是3天
UV统计
首先我们搞懂两个概念:
-
UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
-
PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。
HyperLogLog用法
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL
是基于string
结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于**0.81%**的误差。不过对于UV统计来说,这完全可以忽略。
-
HyperLogLog常用指令:
-
PFADD key element [element...]
:添加指定元素到 HyperLogLog 中 -
PFCOUNT key [key ...]
:返回给定 HyperLogLog 的基数估算值 -
PFMERGE destkey sourcekey [sourcekey ...]
:将多个 HyperLogLog 合并为一个 HyperLogLog
-
-
HyperLogLog的作用:做海量数据的统计工作
-
HyperLogLog的优缺点:
-
优点:内存占用极低、性能非常好
-
缺点:有一定的误差
-
实现UV统计
由于当前系统并没有足够的用户数据量,所以这里我们需要模拟实现UV统计(●’◡’●)
1)我们先来测试一下UV统计的内存占用情况
内存查看前,我们先查看当前Redis占用内存情况
# 查看Redis的内存
info memory
可以看到记录存储前:2.07MB( 2.07 ∗ 2 10 ∗ 2 10 = 2174664 B y t e 2.07*2^{10}*2^{10}=2174664Byte 2.07∗210∗210=2174664Byte)
2)存储模拟用户数据
/*** 测试 HyperLogLog 实现 UV 统计的误差*/@Testpublic void testHyperLogLog() {String[] values = new String[1000];// 批量保存100w条用户记录,每一批1个记录int j = 0;for (int i = 0; i < 1000000; i++) {j = i % 1000;values[j] = "user_" + i;if (j == 999) {// 发送到RedisstringRedisTemplate.opsForHyperLogLog().add("hl2", values);}}// 统计数量Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");System.out.println("count = " + count);}
使用info memory
查看记录存储后内存:2.09MB(2174664Byte),可以看到,存储100w条用户记录,但是内存至多占用了0.02MB(恐怖如斯😱,太TM牛了),再来看看误差率 ( 1000000 − 997593 ) / 1000000 ≈ 0.002407 % (1000000-997593)/1000000≈0.002407\% (1000000−997593)/1000000≈0.002407%,远远低于官方说的 0.81%
Bug记录
-
bug:
问题背景:
问题原因:
问题解决:
-
bug1:测试类运行失败
问题背景:在【基于逻辑过期解决缓存击穿】时,需要提前预热数据,我使用测试类进行Shop数据预热,但是最终NPE
问题原因:详情见下方文章链接
问题解决:
- 方案一:提高SpringBoot版本(不推荐)
- 方案二:在测试类上添加
@RunWith(SpringJunit4ClassRunner.class)
注解
参考文章:SpringBoot程序测试时出现NullPointerException(空指针异常)- 知识汲取者的博客-CSDN博客
-
bug2:Nginx负载均衡失败
问题背景:在搭建集群环境,测试一人一单超卖问题时,配置完Nginx的负载均衡后,发现Nginx一直无法实现负载均衡的效果
问题原因:ngxin配置没有生效,修改了nginx的配置后,一定要记得重启nginx
问题解决:杀死所有8080端口的进程,然后执行
nginx.exe -s reload
,或者直接执行nginx.exe -s reload
netstat -ano | findstr "8080" taskkill /pid 进程id -f
-
bug3:包不存在
问题背景:在消息队列优化的时候,突然发现 redisson 这个jar包找不到了,真的很疑惑,明明我根本就没有动过这个依赖啊??
问题原因:IDEA的锅
问题解决:
"Delegate IDE build/run actions to Maven"是一种配置选项,通常在集成开发环境(IDE)中可用。它的作用是将IDE中的构建和运行操作委托给Maven进行处理。将IDE中的构建和运行操作委托给Maven可以提供更稳定、可靠和一致的构建过程,并使得项目的构建和依赖管理更加高效和可控
参考文章: Java——程序包不存在【三种解决方法】
-
bug4:SpringBoot启动后循环报错
问题背景:在对消息队列进行优化时,然后启动SpringBoot程序后没有报错,但是过一段时间就会循环报错
java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
问题原因:这个异常信息表明在使用一个已被销毁的LettuceConnectionFactory实例。LettuceConnectionFactory是Spring Data Redis对Lettuce客户端库的一个封装,用于创建和管理与Redis服务器的连接。
这个异常通常在以下情况下发生:
- 情况一:在已经销毁的LettuceConnectionFactory实例上尝试进行Redis操作。
- 情况二:在多线程环境下共享同一个LettuceConnectionFactory实例,并在另一个线程销毁了该实例。(我属于这种情况)
我属于第二种情况,我开启了两个SpringBoot程序,但是两个SpringBoot共用了一个LettuceConnectionFactory实例,同时我开启了热部署,每次我修改代码后,导致销毁和使用交叉进行,一个线程销毁了LettuceConnectionFactory,另一个线程立马去取,从而引发线程安全问题
问题解决:
-
方案一:使用ThreadLocal:将LettuceConnectionFactory实例放入ThreadLocal中,确保每个线程都使用自己的连接工厂实例。这样可以避免同时访问同一个连接工厂,减少线程间的竞争条件。
-
方案二:使用连接池:使用连接池管理连接并提供线程安全的访问。Lettuce支持连接池(如LettucePool),可以配置连接池的大小来满足并发需求。
-
方案三:使用连接工厂池:如果需要多个LettuceConnectionFactory实例,可以使用连接工厂池。可以使用Apache Commons Pool等开源库来管理连接工厂的池化对象,并确保池中连接工厂的线程安全访问。(推荐)
-
方案四:同步访问:如果必须在多个线程中共享同一个LettuceConnectionFactory实例,那么需要在访问该实例时添加适当的同步机制,如使用synchronized同步块。这将确保只有一个线程能够使用连接工厂,从而避免线程间的竞争条件。
-
方案五:使用连接工厂的连接:确保在操作完Redis后,将连接归还到连接工厂或连接池中。这样可以避免在一个线程中完成操作后,另一个线程从已关闭的连接工厂获取连接并尝试使用它。
-
方案六:关闭热部署(我采用的方法)
-
方案七:重启项目,但是不关闭热部署,重启项目还是会报错
-
方案八:让当前项目处于单体状态,关闭一个Tomcat,但是需要配置一下Nginx,不让Nginx处于负载均衡状态
参考文章:
- java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
- ChatGPT
-
bug5:查询共同评论接口无法获取id
问题背景:在实现共同关注功能时,发现发送请求时,发现无法获取用户id
问题原因:漏写了一个接口,我们需要通过这个接口查询出用户信息,然后将用户信息存入user中
我一开始还以为是前端出错了,没注意到这个请求居然是404了
问题解决:在UserController中添加这个接口,返回用户信息,用户信息一定要包括用户id
-
bug6</:Redis查询报错
ERR unknown command 'GEORADIUS'
问题背景:在实现附近商户搜索功能时,利用
GEORADIUS
命令查询Redis报错问题原因:Redis版本太低了,
GEORADIUS
命令是Redis6.2提供的。我的SpringBoot版本是2.3,2.3对应的SpringDataRedis是2.3.9,该版本不支持Redis6.2提供的GEORADIUS
指令问题解决:
- 方案一:使用低版本Redis的查询指令(可以自行百度)
- 方案二:将Redis换成高版本的(采用)
-
bug7:SpringBoot启动失败
问题背景:在实现了签到统计之后,为了方便接口测试,我不打算使用postman了,而是直接集成Knife4j直接再浏览器上进行接口测试,这样做的好处是能够避免编写接口,减少一定的工作量,使用Knife4j页显得更加简单,结果导入Knife4j之后报错
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
,这个原有是由于一个错误导致Redis连接断开,一开始我被这个错误迷惑了双眼,一顿Debug发现还是没有发现错误,于是就打算先将报错的地方注释起来,然后再次启动,结果有报错Failed to start bean 'documentationPluginsBootstrapper'; nested exception is java.lang.NullPointerException
,这个错误一看就明白了,Knife4j的问题🤣。之所以前面循环报错,就是由于我要读取 stream.orders 中的数据,结果Knife4j报错导致Redis中断,我开辟了一个新线程去读取消息队列中的消息,导致循环报错问题原因:这次报错是由于没有注意到Knife4j与SpringBoot的版本兼容性问题,Knife4j3.0.x版本不在兼容SpringBoot2.6之前的版本,我当前项目的SpringBoot版本是2.4,所以就报错了
问题解决:
-
方案一:改动版本。要么提高SpringBoot的版本(成本较高,不推荐),要么降低Knife4j的版本(推荐)
-
方案二:在配置文件中添加如下配置(采用)
spring:mvc:pathmatch:matching-strategy: ant_path_matcher # 支持swagger3
参考资料:
- Failed to start bean ‘documentationPluginsBootstrapper‘; nested exception is java.lang.NullPointerEx
配置成功之后,启动没有报错了,但是访问 \doc.html 出现401,401表示没有访问权限,这么一看就明白了,是拦截器在作怪┭┮﹏┭┮,需要将Knife4j的资源添加到白名单,加入一个Knife4j真不容易啊
// knife4j接口文档请求"/doc.html","/webjars/**","/swagger-resources","/v2/api-docs"
配置成功后,访问
http://localhost:8081/doc.html
即可 -
结语
详情见:黑马点评项目总结.xmind
参考资料:
黑马程序员Redis入门到实战教程
GPT3.5
[单体架构系统与分布式系统区别对比](https://blog.csdn.net/weixin_45393094/article/details/104632343#:~:text=整个系统的所有功能单元%2C整体部署到同一个进程 (所有代码可以打包成1个或多个文件),我们可以称之为”单体架构”,(Monolithic Architecture) %3B)
Redis使用Lua脚本时为什么能保证原子性?
redisson.tryLock()的参数的理解
单体系统:整个系统的所有功能单元都由一个单一的进程控制,整个系统都部署再一台服务器上 ↩︎
分布式系统:整个系统的所有功能由多个进程控制,整个系统部署在多态服务器上 ↩︎
原子性:原子性是指 一个操作是不可中断的,要么全部执行成功要么全部执行失败,有着"同生共死"的感觉 ↩︎