博客须知
- 本篇博客内容来源与黑马点评项目实战篇-16.用户签到-实现签到功能_哔哩哔哩_bilibili,作者对视频内容进行了整合,由于记笔记时图片使用的是本地路径,所以导致博客的图片无法正常显示,如果有图片需求可以下载上方的pdf
- 须知
- 该项目主要是通过Redis实现的,基本上把Redis的使用场景学了个遍
- 封装Redis工具类和生成分布式id需要重点学习一下,这两块的思路很好
- 本篇博客不包含课程中的自研分布式锁的内容,因为我以前跟着周阳的redis课程和谷粒商城自研过两遍了
- Redis版本的消息队列很鸡肋,所以我在这边博客给出了秒杀优化的RabbitMQ的实现版本
- 这个项目其实不大,而且环境黑马都搭建好了,不用走弯路,这项目只花了我两周的时间【抛去实习和比赛的占用时间】,整体还是比较好上手的,推荐学习
- 不足
- 黑马课程都是挺圆滑的,比如解决缓存穿透有存储空值和布隆过滤器等实现方式,黑马在这一块挑了好实现的存储空值的方式,另一种讲都没讲
- Redis版本的消息队列这个真就没必要学了,看看这一块业务逻辑就行了,到时候换别的消息队列实现
短信登录
环境搭建
1.导入sql文件
-
黑马公众号可以获取到sql等资源,或者通过本篇博客上方绑定的sql文件获取】
-
其中的表包括【要求MySQL版本在5.7以上】
- tb_user:用户表、tb_user_info:用户详情表、tb_blog:用户日记表(达人探店日记)、tb_follow:用户关注表
- tb_shop:商户信息表、tb_shop_type:商户类型表
- tb_voucher:优惠券表、tb_voucher_order:优惠券的订单表
-
创建一个hmdp的数据库,然后运行sql文件
.
2.项目架构
.
3.后端项目部署
- ps:在资料中提供了一个项目源码
-
导入后端项目
-
修改配置文件
server:port: 8081 spring:application:name: hmdpdatasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/hmdp?useSSL=false&serverTimezone=UTCusername: rootpassword: 自己的数据库密码redis:host: redis主机所在的ipport: 6379#password: 123321 如果redis服务器有配置密码,这里就要设置lettuce:pool:max-active: 10max-idle: 10min-idle: 1time-between-eviction-runs: 10sjackson:default-property-inclusion: non_null # JSON处理时忽略非空字段 mybatis-plus:type-aliases-package: com.hmdp.entity # 别名扫描包 logging:level:com.hmdp: debug
-
测试是否能正常访问localhost:8081/shop-type/list
.
4.前端部署
-
将资料中的nignx文件夹拷贝到磁盘中不带有中文路径的位置
.
-
运行前端项目:在nginx所在目录下打开一个CMD窗口输入
start nginx.exe
命令.
-
打开chrome浏览器,在空白页面点击鼠标右键,选择检查,即可打开开发者工具【F12也是一样的】
.
-
修改成手机模式.
-
测试前端项目是否启动成功可以访问http://127.0.0.1:8080
- 吐槽一句:这里要把调试工具的停靠位置左右侧,否则放在底部前端样式会压扁
.
基于Session实现登录
业务流程
-
发送验证码:用户在提交手机号后,会校验手机号是否合法
- 如果不合法,则要求用户重新输入手机号
- 如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存,然后再通过短信的方式将验证码发送给用户
-
短信验证码登录、注册:用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验
- 如果不一致,则无法通过校验
- 如果一致,则后台根据手机号查询用户
- 如果用户不存在,则为用户创建账号信息,保存到数据库
- 无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息
-
校验登录状态:用户在请求时候,会从cookie中携带者JsessionId到后台,后台通过JsessionId从session中拿到用户信息
- 如果没有session信息,则进行拦截,如果有session信息
- 则将用户信息保存到threadLocal中,并且放行
-
登录功能是和用户有关的,所以以下功能的实现都是围绕UserService实现类完成的
短信验证登录/注册
发送短信验证码
页面流程
业务代码
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {@Overridepublic Result sendCode(String phone, HttpSession session) {//1.校验手机号if (RegexUtils.isPhoneInvalid(phone)) {//2.不符合就直接返回错误信息return Result.fail("手机号格式错误!");}//3.符合就生成验证码String code = RandomUtil.randomNumbers(6);//4.将验证码保存到sessionsession.setAttribute("code",code);//5.发送验证码,这一块是伪实现,如果要实现发送验证码功能可以去阿里云找短信验证的api接口log.debug("发送成功,验证码为:{}",code);return Result.ok();}
}
测试结果
登录&注册
页面流程
.
封装登录信息
@Data
public class LoginFormDTO {private String phone;private String code;private String password;
}
业务代码
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {//1.校验手机号String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误!");}//2.校验验证码Object cacheCode = session.getAttribute("code");String code=loginForm.getCode();if(cacheCode==null||!cacheCode.toString().equals(code)){//3.不一致,报错return Result.fail("验证码错误");}//4.一致,根据手机号查询用户User user = query().eq("phone", phone).one();//5.判断用户是否存在if(user==null){//6.不存在就创建一个新用户user=createUserWithPhone(phone);}//7.将用户信息保存到session中session.setAttribute("user",user);return Result.ok();
}
private User createUserWithPhone(String phone) {User user = new User();user.setPhone(phone);user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));save(user);//保存用户return user;
}
登录拦截功能
创建拦截器
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取session中的用户信息HttpSession session = request.getSession();Object user = session.getAttribute("user");//2.判断用户是否存在if(user==null){//3.不存在就报错response.setStatus(401);return false;}//4.存在就将用户信息保存到ThreadLocalUserHolder.saveUser((UserDTO) user);//5.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
配置拦截器生效
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code","/user/login","/voucher/**",//优惠券查询"/shop/**","/shop-type/**","/upload/**","/blog/hot");}
}
拦截用户信息并登录
@GetMapping("/me")
public Result me(){// 获取当前登录的用户并返回UserDTO user = UserHolder.getUser();return Result.ok(user);
}
报错
- 类型转换异常
java.lang.ClassCastException: class com.hmdp.entity.User cannot be cast to class com.hmdp.dto.UserDTO (com.hmdp.entity.User and com.hmdp.dto.UserDTO are in unnamed module of loader 'app')at com.hmdp.utils.LoginInterceptor.preHandle(LoginInterceptor.java:24) ~[classes/:na]
- 原因:黑马挖坑,前期ThreadLocal工具类中存储的类型是UserDTO而不是User
public class UserDTO {private Long id;private String nickName;private String icon;
}
- 解决方式一:保存到ThreadLocal之前需要新建一个UserDTO对象并且将属性拷贝过去
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取session中的用户信息HttpSession session = request.getSession();User user = (User)session.getAttribute("user");//2.判断用户是否存在if(user==null){//3.不存在就报错response.setStatus(401);return false;}//4.存在就将用户信息保存到ThreadLocalUserDTO userDTO = new UserDTO();BeanUtils.copyProperties(user,userDTO);//属性拷贝,隐藏隐私字段UserHolder.saveUser(userDTO);//5.放行return true;
}
.
- 方式二【推荐】:在存入session的时候【登录成功时】就指定UserDTO类型,session中保存部分字段,不存储隐私字段
//将用户信息保存到session中,创建UserDTO类型的对象,并且拷贝user同名属性
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
集群的session共享问题
.
-
session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题
-
核心思路分析
-
早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
-
但是这种方案具有两个大问题
- 每台服务器中都有完整的一份session数据,服务器压力过大。
- ression拷贝数据时,可能会出现延迟
-
所以后来采用的方案都是基于redis来完成,把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了
-
-
session的替代方案应该满足:数据共享、内存存储、key、value结构
基于Redis实现共享session登录
业务流程
数据类型选择
-
保存登录的用户信息,可以使用String结构,以JSON字符串来保存,比较直观
-
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且
内存占用更少【String类型反而更少,这里不要被黑马误导】.
发送短信验证码
public Result sendCode(String phone, HttpSession session) {//1.校验手机号if (RegexUtils.isPhoneInvalid(phone)) {//2.不符合就直接返回错误信息return Result.fail("手机号格式错误!");}//3.符合就生成验证码String code = RandomUtil.randomNumbers(6);//4.将验证码保存到redis,并且设置过期时间stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);//5.发送验证码,这一块是伪实现,如果要实现发送验证码功能可以去阿里云找短信验证的api接口log.debug("发送成功,验证码为:{}",code);return Result.ok();
}
登录校验
public Result login(LoginFormDTO loginForm, HttpSession session) {//1.校验手机号String phone = loginForm.getPhone();if (RegexUtils.isPhoneInvalid(phone)) {return Result.fail("手机号格式错误!");}//2.从redis中获取验证码并且验证String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);String code=loginForm.getCode();if(cacheCode==null||!cacheCode.equals(code)){//3.不一致,报错return Result.fail("验证码错误");}//4.一致,根据手机号查询用户User user = query().eq("phone", phone).one();//5.判断用户是否存在if(user==null){//6.不存在就创建一个新用户user=createUserWithPhone(phone);}//7.将用户信息保存到session中,创建UserDTO类型的对象,并且拷贝user同名属性//7.1生成登录令牌String token = UUID.randomUUID().toString(true);//7.2将User对象转成Hash存储【我这边还是推荐存储成string,然后设置序列化机制】UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);//隐藏部分字段Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);String tokenKey = LOGIN_USER_KEY + token;stringRedisTemplate.opsForHash().putAll(tokenKey,userMap);//7.3设置令牌有效期stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);//8.返回登录令牌return Result.ok(token);
}
登录状态续期
-
需求:用户session的过期时间不是固定的,如果期间内用户有访问系统,就应该给过期时间续期
-
由于自定义拦截器中没有注入容器,所以无法自动注入redis操作类,只能手动注入
-
可以通过构造方法注入,由于MvcConfig是配置类,由IoC容器管理,所以可以自动注入StringRedisTemplate对象,并将该对象通过构造器注入到自定义拦截器中
public class LoginInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public LoginInterceptor(StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;}//.....
}
@Configuration
public class MvcConfig implements WebMvcConfigurer {@ResourceStringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)).excludePathPatterns(//....);}
}
- 登录拦截以及状态续期的业务代码
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取请求头的tokenString token = request.getHeader("authorization");if(StrUtil.isBlank(token)){//2.不存在就报错response.setStatus(401);return false;}//3.基于token获取到redis中的用户String tokenKey = RedisConstants.LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);//4.判断用户是否存在if(userMap.isEmpty()){response.setStatus(401);return false;}//5.将查询到的hash数据转成UserDTO类型,第三个参数表示是否要忽略转换过程中的错误UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);//6.将用户信息存到ThreadLocalUserHolder.saveUser(userDTO);//7.刷新token有效期stringRedisTemplate.expire(tokenKey,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);//8.放行return true;
}
调试
-
发送验证码
.
-
登录报错:将userMap类型存储到redis时出现类型转换异常
.
java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.String at com.hmdp.service.impl.UserServiceImpl.login(UserServiceImpl.java:85) ~[classes/:na]
- 原因:使用StringRedisTemplate要求存储的数据类型都为String,因此要确保hash中的每一个值都为String类型
- 这个坑感觉是黑马故意挖的,炫技?
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true)//设置忽略null值.setFieldValueEditor((fieldName,fieldValue)//接收字段名和字段值-> fieldValue.toString()));//将字段值转成string类型
-
登录成功
.
登录拦截器优化
需求分析
- 需求:登录拦截器只针对需要登录的资源进行拦截并且刷新过期时间,实际上在访问所有资源时都应该刷新当前用户的状态
- 实现:额外声明一个拦截器来拦截所有请求,并执行令牌刷新相关逻辑,并且登录拦截器就不用实现刷新逻辑了
令牌刷新拦截器
public class RefreshTokenInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate){this.stringRedisTemplate=stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//1.获取请求头的tokenString token = request.getHeader("authorization");if(StrUtil.isBlank(token)){//2.不存在token就直接放行到登录拦截器跳转登录页return true;}//3.基于token获取到redis中的用户String tokenKey = RedisConstants.LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(tokenKey);//4.判断用户是否存在if(userMap.isEmpty()){response.setStatus(401);return true;}//5.将查询到的hash数据转成UserDTO类型,第三个参数表示是否要忽略转换过程中的错误UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);//6.将用户信息存到ThreadLocalUserHolder.saveUser(userDTO);//7.刷新token有效期stringRedisTemplate.expire(tokenKey,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);//8.放行return true;}
}
修改登录拦截器
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//判断是否有用户信息,没有就进行登录拦截if(UserHolder.getUser()==null){response.setStatus(401);return false;}return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
将拦截器注册到容器
- 要保证刷新拦截器先执行,默认情况下拦截器是按照添加顺序依次执行的,也可以设置优先级【order值越小优先级越高】
public void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code","/user/login","/voucher/**",//优惠券查询"/shop/**","/shop-type/**","/upload/**","/blog/hot").order(1);//拦截所有,并且优先级要高于登录拦截器registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
}
bug修复
-
问题复述:当redis中的token过期时,此时浏览器中的token还在,此时无法跳转到登录拦截器的逻辑
-
原因【个人推断】
- 由于设置了错误状态码,mvc检测到错误状态码就对该请求进行一个处理,处理完成之后没有放行,而是直接返回错误信息给前端,所以无法放行到登录拦截器【下图拦截器链都没有放入登录拦截器】
.
- 此时还是同一个请求,请求头中的token还是没有变化,所以令牌刷新拦截器会一直检测到token存在但是用户不存在,所以重新设置错误状态码,最终一直无法跳转到登录页面
-
解决:将令牌刷新拦截器的错误状态码设置给去掉
if(userMap.isEmpty()){return true;
}
商户查询缓存
业务流程
.
查询商户业务代码
public Result queryShopById(Long id) {//1.从redis查询商铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);//2.判断商铺信息是否存在if(StrUtil.isNotBlank(shopJson)){//3.存在就直接返回缓存数据Shop shop = JSONUtil.toBean(shopJson, Shop.class);//转化成商铺实体类return Result.ok(shop);}//4.不存在就根据id去数据库中查Shop shop = getById(id);//5.如果数据库不存在就返回错误信息if(shop==null){return Result.fail("店铺不存在!!!");}//6.存在就写回到redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));return Result.ok(shop);
}
- 写入缓存成功
.
作业:缓存店铺类型
- 需求:缓存店铺类型,对应的请求路径为http://localhost:8080/api/shop-type/list,对应控制层为ShopTypeController
- 业务分析
- 最终需要返回一个店铺类型的列表,所以缓存中可以使用list类型存储
- 缓存查询逻辑就和查询店铺差不多了,照搬然后小改一下
- 查询redis中某个list类型的键的所有数据可以调用
stringRedisTemplate.opsForList().range("typeList", 0, -1)
- 由于stringRedisTemplate中存储的元素类型为String,所以需要把ShopType类型的数据转成JSON格式字符串存入list
- 业务代码
public Result queryTypeList() {//1.从redis查询出所有的商铺类别List<ShopType> typeList = stringRedisTemplate.opsForList().range("typeList", 0, -1).stream().map(type -> {return JSONUtil.toBean(type, ShopType.class);}).collect(Collectors.toList());//2.判断商铺信息是否存在if(typeList!=null&&typeList.size()>0){//3.存在就直接返回缓存数据return Result.ok(typeList);}//4.不存在就根据id去数据库中查typeList = query().orderByAsc("sort").list();//5.如果数据库不存在就返回错误信息if(typeList==null&&typeList.size()==0){return Result.fail("没有定义商铺类别信息!!!");}//6.存在就写回到redisstringRedisTemplate.opsForList().leftPushAll("typeList",typeList.stream().map(shopType -> {return JSONUtil.toJsonStr(shopType);}).collect(Collectors.toList()));return Result.ok(typeList);
}
- 测试结果
.
缓存更新策略
策略对比
- 业务场景
- 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
主动更新策略
需要考虑的问题
-
删除缓存还是更新缓存?
-
更新缓存:每次更新数据库都更新缓存,无效写操作较多
-
删除缓存:更新数据库时让缓存失效,查询时再更新缓存
-
-
如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布式事务方案
-
先操作缓存还是先操作数据库?
- 二者都存在将旧值写入数据库的可能性,但是先操作数据库的写入旧值概率更低
最佳实践方案
- 低一致性需求:使用Redis自带的内存淘汰机制
- 高一致性需求:主动更新,并以超时剔除作为兜底方案
- 读操作
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设定超时时间
- 写操作
- 先写数据库,然后再删除缓存
- 要确保数据库与缓存操作的原子性
- 读操作
更新商户数据
@Transactional
public Result update(Shop shop) {Long id = shop.getId();if(id==null){return Result.fail("店铺id不能为空");}//1.更新数据库updateById(shop);//2.删除缓存stringRedisTemplate.delete(CACHE_SHOP_KEY+id);return Result.ok();
}
- 使用postman更新商户信息,测试连接
PUT http://localhost:8081/shop
,测试的json字符串如下
{"area":"大关","openHours":"10:00-22:00","sold":4215,"address":"金华路xxx29号","comments":3035,"avgPrice":80,"score":37,"name":"102茶餐厅","typeId":1,"id":1
}
缓存的生产问题
缓存穿透
基础概念
-
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库
-
常见的解决方案有两种
.
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点:额外的内存消耗【可以加ttl来解决】、可能造成短期的不一致、我的补充:无法解决生成大量不同的key
- 布隆过滤
- 优点:内存占用较少,没有多余key
- 缺点:实现复杂、存在误判可能
- 缓存空对象
修改查询商铺功能–缓存空值
public Result queryShopById(Long id) {//1.从redis查询商铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);//2.判断商铺信息是否存在if(StrUtil.isNotBlank(shopJson)){//3.存在就直接返回缓存数据Shop shop = JSONUtil.toBean(shopJson, Shop.class);//转化成商铺实体类return Result.ok(shop);}//==========补充:如果命中空值,说明是无效数据,直接返回不存在,就不需要去数据库查询了=========if("".equals(shopJson)){return Result.fail("店铺不存在!!!");}//4.不存在就根据id去数据库中查Shop shop = getById(id);//5.如果数据库不存在就返回错误信息if(shop==null){//==========补充:数据不存在就缓存空值============stringRedisTemplate.opsForValue().set(key,"");return Result.fail("店铺不存在!!!");}//6.存在就写回到redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);
}
.
总结
- 缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
- 缓存穿透的解决方案有哪些?
- 缓存null值
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
- 缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力
- 解决方案
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿
基础概念
- 缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击
- 常见的解决方案有两种
- 互斥锁:多个线程争抢锁,只有一个线程完成重建缓存操作
- 逻辑过期:不设置ttl,而是额外存储过期时间字段,查询时如果检测到该数据过期就先返回过期数据,然后由异步线程完成重建
.
解决方案对比
.
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | 没有额外的内存消耗 保证一致性 实现简单 | 线程需要等待,性能受影响 可能有死锁风险 |
逻辑过期 | 线程无需等待,性能较好 | 不保证一致性 有额外内存消耗 实现复杂 |
基于互斥锁解决缓存击穿
业务分析
- 需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
.
上锁和解锁逻辑
private boolean tryLock(String key){Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);//该工具类会自动拆箱,并且防止空指针异常
}
private void unlock(String key){stringRedisTemplate.delete(key);
}
修改查询商户功能
- ps:ctrl+alt+t可以给选定代码片段加上try-catch逻辑.
public Shop queryWithMutex(Long id) {//............//==========4.实现缓存重建===========Shop shop = null;String lockKey=LOCK_SHOP_KEY+id;try {//4.1获取互斥锁boolean isLock = tryLock(lockKey);//4.2判断是否获取成功if(!isLock){//获取失败就休眠尝试Thread.sleep(50);return queryWithPassThrough(id);}shop = getById(id);Thread.sleep(200);//模拟重建的延时//5.如果数据库不存在就返回错误信息if(shop==null){//数据不存在就缓存空值stringRedisTemplate.opsForValue().set(key,"");return null;}//6.存在就写回到redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {//7.释放互斥锁unlock(lockKey);}return shop;
}
性能测试
.
.
基于逻辑过期解决缓存击穿
业务分析
- 需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
.
封装包含过期时间的商户信息
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
修改缓存保存方法
public void saveShopRedis(Long id,Long expireSeconds){//1.查询店铺数据Shop shop = getById(id);//2.封装过期时间RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));//3.存入redisstringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
修改查询商户功能
- redis缓存作为热点商铺的主要存储方式,不从数据库中拿
//基于逻辑过期时间解决缓存击穿
private static final ExecutorService threadPool= Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id) {//1.从redis查询商铺缓存String key = CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);//2.判断商铺信息是否存在if(StrUtil.isBlank(shopJson)){//3.不存在直接返回nullreturn null;}//4.缓存命中,就需要先把json反序列化位对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);//将Object类型数据转换成Shop类型Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();//5.判断是否过期,即过期时间是否在当前时间之后,在当前时间之前就没过期if(expireTime.isAfter(LocalDateTime.now())){//5.1未过期就直接返回店铺信息return shop;}//5.2过期就需要缓存重建String lockKey=LOCK_SHOP_KEY+id;boolean isLock = tryLock(lockKey);if(isLock){//获取锁成功就开启异步线程完成缓存重建threadPool.submit(()->{try {this.saveShopRedis(id,20L);} catch (Exception e) {throw new RuntimeException(e);}finally {unlock(lockKey);//释放锁}});}return shop;
}
测试
- 先往缓存中写入一条数据,过期时间为10s
@Resource
ShopServiceImpl shopService;
@Test
void testLogicDelete() throws InterruptedException {shopService.saveShopRedis(1L,10L);
}
- 存入成功,数据在redis永不过期,但是存储了标识过期时间的字段
.
-
修改数据库数据,模拟数据不一致
-
性能压测访问该商户数据
- 重建成功,并且只进行了一次重建操作
!!!缓存工具封装
业务分析
- 基于StringRedisTemplate封装一个缓存工具类,满足下列需求
- TTL过期时间
- 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 逻辑过期时间
- 将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
- TTL过期时间
两种缓存存储方式
/***设置过期时间*/
public void set(String key, Object value, Long time, TimeUnit unit){stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
/***设置逻辑过期时间*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){//封装逻辑过期时间的字段RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));//将redisData写入redis,并且不设置过期时间stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
缓存空值解决缓存穿透
- 返回的对象和传入的id类型是不确定的,所以要使用泛型
- 查询数据库时,该工具类无法查询指定的数据库信息,因此要使用函数式编程,将查询数据库的操作交给调用者去实现
public <R,IDType> R queryWithPassThrough(String keyPrefix, IDType id, Class<R> type, Function<IDType,R> dbFallBack
,Long time, TimeUnit unit){//1.从redis查询商铺缓存String key = keyPrefix + id;String json = stringRedisTemplate.opsForValue().get(key);//2.判断商铺信息是否存在if(StrUtil.isNotBlank(json)){//3.存在就直接返回缓存数据return JSONUtil.toBean(json, type);//转化成指定实体类}//如果命中空值,说明是无效数据,直接返回不存在,就不需要去数据库查询了if("".equals(json)){return null;}//4.不存在就根据id去数据库中查R data = dbFallBack.apply(id);//这一块工具类不知道具体要去查哪个数据库,只能交给调用者去处理,所以使用函数式编程//5.如果数据库不存在就返回错误信息if(data==null){//数据不存在就缓存空值this.set(key,"",time,unit);return null;}//6.存在就写回到redisthis.set(key,data,time,unit);return data;
}
- 调用示例
//方式一
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY,id,Shop.class,shopId->getById(shopId),CACHE_SHOP_TTL,TimeUnit.MINUTES);
//方式二
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
逻辑删除解决缓存击穿
- 核心思路和缓存空值的方法大差不差
private static final ExecutorService threadPool= Executors.newFixedThreadPool(10);
public <R,IDType> R queryWithLogicalExpire(String keyPrefix, IDType id, Class<R> type, Function<IDType,R> dbFallBack,Long time, TimeUnit unit) {//1.从redis查询商铺缓存String key = keyPrefix + id;String json = stringRedisTemplate.opsForValue().get(key);//2.判断商铺信息是否存在if(StrUtil.isBlank(json)){//3.不存在直接返回nullreturn null;}//4.缓存命中,就需要先把json反序列化位对象RedisData redisData = JSONUtil.toBean(json, RedisData.class);//将Object类型数据转换成Shop类型R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);LocalDateTime expireTime = redisData.getExpireTime();//5.判断是否过期,即过期时间是否在当前时间之后,在当前时间之前就没过期if(expireTime.isAfter(LocalDateTime.now())){//5.1未过期就直接返回店铺信息return r;}//5.2过期就需要缓存重建String lockKey=LOCK_SHOP_KEY+id;boolean isLock = tryLock(lockKey);if(isLock){//获取锁成功就开启异步线程完成缓存重建threadPool.submit(()->{try {R r1 = dbFallBack.apply(id);//写入redisthis.setWithLogicalExpire(key,r1,time,unit);} catch (Exception e) {throw new RuntimeException(e);}finally {unlock(lockKey);//释放锁}});}return r;//先返回旧数据挡一下,然后由异步线程来修改
}
private boolean tryLock(String key){Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);//该工具类会自动拆箱,并且防止空指针异常
}
private void unlock(String key){stringRedisTemplate.delete(key);
}
-
调用示例
- 先提前预热缓存数据
void testLogicDelete() throws InterruptedException {Shop shop = shopService.getById(1l);cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop,10l, TimeUnit.SECONDS); }
- 查询缓存数据,如果缓存过期就续期
cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, LOCK_SHOP_TTL, TimeUnit.SECONDS);
仅执行一次重建操作
优惠券秒杀
全局唯一性id
引入
-
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题
-
id的规律性太明显
-
受单表数据量的限制,如果分表的话,不同表的自增id会出现重复
-
-
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性
.
-
为了增加ID的安全性,不直接使用Redis自增的数值,而是按位拼接一些其它信息
-
符号位:1bit,永远为0
-
时间戳:31bit,以秒为单位,可以使用69年
-
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
-
.
实现
- 生成一个业务开始时间戳
public static void main(String[] args) {LocalDateTime localDateTime = LocalDateTime.of(2024, 4, 10, 0, 0, 0);System.out.println(localDateTime.toEpochSecond(ZoneOffset.UTC));//1712707200
}
- 生成全局唯一性id
- 先生成时间戳,直接用当前时间和业务开始时间相减即可
- 然后生成序列号,可以通过在redis中对一个键进行自增操作生成不同序列,注意以下细节
- 如果始终对同一个键进行自增操作,可能会导致该序列号超过存储上限
- 因此可以给序列号键加上当天日期,只记录当天的流水是不会超过上限的,而且便于按天数统计信息
- 最后时间戳和序列号进行拼接后返回,业务要求低32位存储序列号,所以可以将时间戳先左移32位,使得低32位空出来,并且用序列号与低32位做逻辑或运算【只要有1就为true】
private static final long BEGIN_TIMESTAMP=1712707200L;//生成的业务开始时间戳
private static final int COUNT_BITS=32;//序列号位数
public long nextId(String keyPrefix) {//1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号//2.1获取当前日期【精确到天】,自增长id的键需要拼接上时间【避免超过序列号存储上限且方便统计】String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));//2.2自增长【这里使用基本数据类型方便计算,并且不存在空指针的情况,因为如果key不存在也会自动生成一个值为0的key】long count = stringRedisTemplate.opsForValue().increment("icr" + keyPrefix + ":" + date);//3.拼接后返回【先向左移把32位空出来,然后同count进行或运算,只要有1就为真】return timestamp<<COUNT_BITS | count;
}
测试
- 生成30000个自增长id,共耗时9s【可能是用云服务器,网络交互慢了点】
private ExecutorService es= Executors.newFixedThreadPool(500);
@Resource
RedisIdWorker redisIdWorker;
@Test
void testUniqueId() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(300);//计数器Runnable task=()->{for(int i=0;i<100;i++){long order = redisIdWorker.nextId("order");System.out.println(order);}countDownLatch.countDown();};long begin = System.currentTimeMillis();for(int i=0;i<300;i++){es.submit(task);}countDownLatch.await();long end = System.currentTimeMillis();System.out.println("time="+(end-begin));
}
总结
-
全局唯一ID生成策略
-
UUID
-
Redis自增
-
snowflake算法
-
数据库自增
-
-
Redis自增ID策略
- ID构造是时间戳 + redis计数器【序列号】
- 每天一个key,方便统计订单量且防止超过上限
秒杀下单
业务分析
-
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购
-
表关系如下
-
tb_voucher:优惠券的基本信息,优惠金额、使用规则等
-
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
-
.
-
下单时需要判断两点
-
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
-
库存是否充足,不足则无法下单
-
.
新增代金券
- 使用postman模拟商户发布代金券【这里金额的单位是分】
{"shopId":1,"title":"100元代金券","subTitle":"周一至周五均可使用","rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食","payValue":8000,"actualValue":10000,"type":1,"stock":100,"beginTime" : "2024-04-11T10:09:17","endTime" : "2024-04-12T23:09:04"
}
.
实现优惠券秒杀的下单功能
public Result seckillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}//3.判断是否结束if (voucher.getStock()<1) {return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if(voucher.getStock()<1){return Result.fail("库存不足!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId).update();if(!success){//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户idvoucherOrder.setUserId(UserHolder.getUser().getId());//6.3优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(orderId);
}
压测
- 模拟200个用户同时抢优惠券
.
- 测试前需要先登录,可以在请求头中设置当前用户的token,之后测试就共享这个登录状态
- 出现超卖问题
超卖问题
问题分析
.
-
多个线程同时获取到相同的库存量,误认为库存充足,就会发生超卖问题
-
针对这一问题的常见解决方案就是加锁
-
悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行【例如Synchronized、Lock】
-
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改
- 如果没有修改则认为是安全的,自己才更新数据
- 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常
-
乐观锁
- 版本号法
- 每次数据被修改,相应的版本号就加一
- 每次执行修改前,先判断当前的版本号和自己之前获取的版本号是否一致,不一致就说明别人修改过了
.
- CAS法(比较与替换)
- 上一个方法可以简化,可以不加版本号字段,修改前通过比较当前的库存量和之前获取的库存量是否相等来判断是否被修改过
.
秒杀业务修改
//5.扣减库存
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======扣减库存时需要判断当前库存量和之前获取到的库存量是否相等,从而判断同一张优惠券是否被抢过.eq("stock",voucher.getStock()).update();
- 超卖问题解决,200人只消费了20张优惠券
.
- 但是又出现新的问题–秒杀成功率太低
- 可以对业务进一步优化,不需要当前库存值和之前获取到的库存值完全相等,只要当前的库存大于0就可以抢
- 之前的做法相当于多个人抢一张券,只有一个人能成功,其他人只能重抢
- 现在的做法相当于多个人去池子中抢,只要当前池子还有剩余,就可以购买
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock",0).update();
- 不仅解决秒杀成功率低的问题
.
总结
-
超卖这样的线程安全问题的解决方案
-
悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般【可以将库存分成多段】
-
乐观锁:不加锁,在更新时判断是否有其它线程在修改
-
优点:性能好
-
缺点:存在成功率低的问题
-
-
一人一单【单体】
需求分析
- 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
.
业务修改
- 在扣减库存之前判断当前用户是否有抢过该优惠券
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){//用户之前购买过了return Result.fail("你之前已经抢过该优惠券了!!");
}
- 无法实现一人一单功能,和抢优惠券是一样的,出现了多线程同时读取到当前用户未消费的状态,所以都认为可以进行下单
.
- 解决方式
- 因为创建订单是插入操作,无法通过乐观锁来实现,因此可以在判断用户购买情况到创建订单完成这一过程加锁
- 可以锁定当前用户id来细化锁的粒度,调用
userId.toString().intern()
是为了每次获取id字符串都从常量池去找,避免同一个用户的不同线程生成不同的id地址 - 快捷键:ctrl+alt+m可以对选定代码段封装成一个方法
@Transactional
public Result createVoucherOrder(Long voucherId) {//==========一人一单Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()){int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count>0){//用户之前购买过了return Result.fail("你之前已经抢过该优惠券了!!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock",0).update();if(!success){//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户idvoucherOrder.setUserId(UserHolder.getUser().getId());//6.3优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(orderId);}
}
-
由于spring中的事务需要方法调用完成才会提交,在事务提交期间
-
其它线程可以进入该方法,此时事务可能未提交导致订单还未创建到数据库,因此其它线程判断当前用户是没有购买过的
-
所以依然存在线程安全问题,因此要扩大锁定的范围,锁住createVoucherOrder方法
-
Long userId = UserHolder.getUser().getId();
//1.先获取锁
synchronized (userId.toString().intern()){//2.锁定住当前用户的所有线程再进行订单创建一系列操作return createVoucherOrder(voucherId);
}
-
事务失效
-
上述方法调用createVoucherOrder方法实际上是通过this.createVoucherOrder()的方式调用的,事务想要生效,还得利用代理来生效【this指代的是目标对象,不具备代理功能】
-
所以需要获得原始的事务对象, 来调用createVoucherOrder方法,此时才会交给spring aop管理并使事务生效
-
操作步骤
-
先导入aspecj的依赖
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId> </dependency>
-
然后标识暴露代理对象的注解
@EnableAspectJAutoProxy(exposeProxy = true)
-
-
//1.先获取锁
synchronized (userId.toString().intern()){//2.锁定住当前用户的所有线程再进行订单创建一系列操作//3.需要获取当前对象的代理对象,通过代理对象调用目标方法才会使事务生效IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
}
- 出现问题【个人问题】:发现每次都扣减10个库存,最后发现是我类上加了
@Transactional
注解,延迟了事务提交的时间
..
完整代码
- seckillVoucher不用加
@Transactional
注解,否则要等到seckillVoucher执行完成才会进行事务提交,可能会有并发安全问题
@Override
public Result seckillVoucher(Long voucherId) {//1.查询优惠券信息SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}//3.判断是否结束if (voucher.getStock() < 1) {return Result.fail("秒杀已经结束!");}//4.判断库存是否充足if (voucher.getStock() < 1) {return Result.fail("库存不足!");}Long userId = UserHolder.getUser().getId();//1.先获取锁synchronized (userId.toString().intern()){//2.锁定住当前用户的所有线程再进行订单创建一系列操作//3.需要获取当前对象的代理对象,通过代理对象调用目标方法才会使事务生效IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {//==========一人一单Long userId = UserHolder.getUser().getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {//用户之前购买过了return Result.fail("你之前已经抢过该优惠券了!!");}//5.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock", 0).update();if (!success) {//扣减失败return Result.fail("库存不足!");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户idvoucherOrder.setUserId(UserHolder.getUser().getId());//6.3优惠券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单idreturn Result.ok(orderId);
}
一人一单【集群】
环境搭建
- 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行
-
启动两份服务,端口分别为8081和8082
=新版idea果然略有不同=》》》
-
然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡【默认轮询】
- 用户请求会在这两个节点上负载均衡,再次测试下是否存在线程安全问题
upstream backend {server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1; }
-
通过
nginx.exe -s reload
命令重新加载配置文件,这里黑马配置文件有坑【和视频不同步】,如下配置默认还是访问8081proxy_pass http://127.0.0.1:8081; #proxy_pass http://backend;
- 应改为如下,配置的上游服务器才会生效
#proxy_pass http://127.0.0.1:8081; proxy_pass http://backend;
-
同时要将系统多开的nginx给结束掉【感谢弹幕】
-
测试负载均衡生效,访问前端页面,查看两个节点的控制台打印
.
集群下本地锁的问题
问题模拟
- 集群环境下,本地锁无法锁定住多个服务的同一个用户的请求
问题分析
分布式锁【这一块我基本上都跳了,之前跟着周阳和谷粒商城有实现过】
分析
基础概念
-
**分布式锁:**满足分布式系统或集群模式下多进程可见并且互斥的锁
-
分布式锁需要具备的特性
.
- 实现对比
.
Redisson
环境搭建
- 引入依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
- 配置redis连接信息
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redissonClient() {// 配置类Config config = new Config();// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址config.useSingleServer().setAddress("redis://120.26.122.127:6379");// 创建客户return Redisson.create(config);}
}
- 利用redis客户端操作分布式锁,修改秒杀业务加锁逻辑
Long userId = UserHolder.getUser().getId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (isLock) {//获取失败就返回错误或者重试return Result.fail("不允许重复下单");
}
try {IVoucherOrderService proxy= (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);
}finally {lock.unlock();//释放锁
}
可重入锁原理
简易流程
.
获取锁的lua脚本
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then-- 不存在就可以抢锁redis.call('hset', key, threadId, '1'); -- 设置有效期redis.call('expire', key, releaseTime); return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then-- 锁是自己的,重入次数+1redis.call('hincrby', key, threadId, '1'); -- 有效期redis.call('expire', key, releaseTime);return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁的lua脚本
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) thenreturn nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回redis.call('EXPIRE', key, releaseTime);return nil;
else -- 等于0说明可以释放锁,直接删除redis.call('DEL', key);return nil;
end;
完整流程
总结
-
可重入:利用hash结构记录线程id和重入次数
-
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
-
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
主从一致性解决
-
不可重入Redis分布式锁
- 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
- 缺陷:不可重入、无法重试、锁超时失效
-
可重入的Redis分布式锁
- 原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:redis宕机引起锁失效问题
-
Redisson的multiLock
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
秒杀优化
模拟多用户访问
- 先往redis中存入1000个token,并且将所有token导出到本地的文本文件中【这代码是gpt生成+我小改了一下】
@Test
void testMultUser() {//向redis中存入1000条token信息// 文件路径String filePath = "tokens.txt";// 生成1000个token并存储到Redis和本地文本文件for (long i = 0; i < 1000; i++) {String token = generateToken();UserDTO userDTO =new UserDTO();userDTO.setId(i);userDTO.setNickName("用户"+i);Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true)//设置忽略null值.setFieldValueEditor((fieldName,fieldValue)//接收字段名和字段值-> fieldValue.toString()));//将字段值转成string类型// 存储到RedisstringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,userMap);// 存储到本地文本文件writeToFile(filePath, token);}System.out.println("Token生成完成!");
}
// 生成一个随机token
private static String generateToken() {return UUID.randomUUID().toString();
}
// 将token写入本地文件
private static void writeToFile(String filePath, String token) {try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath, true))) {writer.write(token);writer.newLine();} catch (IOException e) {System.err.println("写入文件时出错:" + e.getMessage());}
}
- 然后在jMeter中导入该文本文件
- 测试【只成功了17单,因为上述业务抢锁失败只返回错误而不重试,同时耗时不短】
优化分析
- 当前业务只用检查用户是否有秒杀资格,如果有就交给消息队列异步处理秒杀商品扣减逻辑
- 检验秒杀资格的完整流程
业务分解
1.保存库存信息
- 新增秒杀优惠券的同时,将优惠券库存信息保存到Redis中【VoucherServiceImpl】
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
2.判断秒杀资格的lua脚本
编写脚本
-
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-
idea编写lua脚本可以先去导入emmylua插件
-- 1.参数列表
-- 1.1优惠券id
local voucherId = ARGV[1]
-- 1.2用户id
local userId = ARGV[2]-- 2.数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.业务代码
-- 3.1 判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then-- 库存不足就返回1return 1
end
-- 3.2 判断用户是否下单【即集合中是否存在该用户】
if redis.call('sismember', orderKey, userId) == 1 then-- 存在该用户,说明该用户重复下单return 2
end
-- 3.3 扣减库存
redis.call('incrby',stockKey,-1)
-- 3.4 下单
redis.call('sadd',orderKey,userId)
-- 成功就返回0
return 0
加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {SECKILL_SCRIPT =new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);
}
执行lua脚本
//1.执行lua脚本,判断当前用户的购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());
if (result != 0) {//2.不为0说明没有购买资格return Result.fail(result==1?"库存不足":"不能重复下单");
}
异常
- 原因:执行 Redis 脚本时出现了错误,错误信息指示在脚本的第 15 行中出现**了尝试比较 nil(空值)**和数字的情况
org.springframework.data.redis.RedisSystemException: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: ERR user_script:15: attempt to compare nil with number script: 700addf09ba817fe2a4fcae9acd938fe721c7578, on @user_script:15.
- 解决:进行转换之前先判断是否为空
stockKey == nil or tonumber(redis.call('get', stockKey)) <= 0
压测
- 和之前一样要生成多个用户的token并存入redis中,相比于同步消费快了很多
.
3.成功订单存入阻塞队列
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
//3.走到这一步说明有购买资格,将订单信息存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//放入阻塞队列等待异步消费
orderTasks.add(voucherOrder);
return Result.ok(orderId);
4.异步下单
- 开启多线程任务,不断从阻塞队列中获取信息,实现异步下单功能
private static final ExecutorService SECKILL_ORDER_EXECTOR= Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){SECKILL_ORDER_EXECTOR.submit(new VoucerOrderHandle());
}
private class VoucerOrderHandle implements Runnable{@Overridepublic void run() {while (true){//1.获取队列中的订单信息try {VoucherOrder voucherOrder = orderTasks.take();//2.创建订单handleVoucherOrder(voucherOrder);} catch (InterruptedException e) {log.error("处理订单异常:{}",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.多线程下,用户id只能从订单中获取Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();//2.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock", 0).update();//3.创建订单save(voucherOrder);}
}
- 压测
总结
-
秒杀业务的优化思路
-
先利用Redis完成库存余量、一人一单判断,完成抢单业务
-
再将下单业务放入阻塞队列,利用独立线程异步下单
-
-
基于阻塞队列的异步秒杀存在的问题
-
内存限制问题
-
数据安全问题
-
Redis消息队列【了解,这个绝逼没用】
基础概念
- 消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
.
-
Redis提供了三种不同的方式来实现消息队列
-
list结构:基于List结构模拟消息队列
-
PubSub:基本的点对点消息模型
-
Stream:比较完善的消息队列模型
-
基于List结构模拟消息队列
-
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果
-
队列是入口和出口不在一边,因此可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现
-
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息
-
因此这里应该使用BRPOP或者BLPOP来实现阻塞效果
-
基于List的消息队列优缺点
-
优点
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
-
缺点
- 无法避免消息丢失
- 只支持单消费者
-
基于PubSub的消息队列
- PubSub**(发布订阅)**是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
.
-
基于PubSub的消息队列优缺点
-
优点:采用发布订阅模型,支持多生产、多消费
-
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
-
基于Stream的消息队列
- Stream 是 Redis 5.0 引入的一种新数据类型
发送消息
## 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
127.0.0.1:6379> XADD users * name jack age 21
"1644805700523-0"
XREAD读取消息
.
-
非阻塞读取
-
XREAD阻塞方式,读取最新的消息
-
在业务开发中,可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下
.
-
当指定起始ID为$时,代表读取最新的消息,如果处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
-
STREAM类型消息队列的XREAD命令特点
-
消息可回溯
-
一个消息可以被多个消费者读取
-
可以阻塞读取
-
有消息漏读的风险
-
消费者组
特性
-
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
-
消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费
-
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除
创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
-
key:队列名称
-
groupName:消费者组名称
-
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
-
MKSTREAM:队列不存在时自动创建队列
-
其它常见命令
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
-
group:消费组名称
-
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
-
count:本次查询的最大数量
-
BLOCK milliseconds:当没有消息时最长等待时间
-
NOACK:无需手动ACK,获取到消息后自动确认
-
STREAMS key:指定队列名称
-
ID:获取消息的起始ID
- “>”:从下一个未消费的消息开始
- 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
基本思路
.
XREADGROUP命令特点
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
总结
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组 提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
整合RabbitMQ【个人补充】
环境搭建
- docker按照RabbitMQ镜像
docker run -d --name rabbitmq \
-p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 \
rabbitmq:management
.
- 访问RabbitMQ服务器【15672端口,账密都是guest】
.
- 导入RabbitMQ的场景启动器
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置连接信息
spring.rabbitmq.host=自己的主机名
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
- 主启动类标注@EnableRabbit开启消息队列的监听功能
需求分解
1.消息队列环境搭建
- 创建direct类型【完全匹配】的交换机以及一个名为seckill.order.queue的消息队列,然后将二者绑定
.之后发往该交换机且路由键为seckill.order的消息都会转发到seckill.order.queue
-
测试
- 往交换机发送路由键为seckill.order的消息
.
- 秒杀消息队列消费消息
.
2.项目整合消息队列
测试监听消息
-
如果要监听消息就必须标识@EnableRabbit才可以开启监听功能,不需要监听的功能可以不标注
-
@RabbitListener【可以标注在类、方法上】标注在业务逻辑的组件上,并且该组件必须要在容器中,才可以监听消息
- 属性queues:声明需要监听的所有队列,数组类型
- 监听消息的方法需要接收消息内容,消息类型可以指定
- Message:原生消息类型,可以获取消息头和消息体
- T:发送的消息类型,就不需要收到转换了
- Channel :当前传输数据的通道
@Component
public class SeckillOrderListener {@RabbitListener(queues = {"seckill.order.queue"})public void recieveMessage(Object message){System.out.println("监听到了"+message);}
}
.
测试发送消息
- RabbitTemplate可以发送消息
- 需要指定发送给的交换机、路由键和消息内容
@Test
void testSendMessage(){rabbitTemplate.convertAndSend("seckill.direct","seckill.order","测试发送消息");
}
.
3.修改秒杀下单业务
- 在认定有抢购资格后,直接向seckill.direct交换机发送消息,内容包含voucherId、userId、orderId
//3.走到这一步说明有购买资格,将订单信息存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//存入消息队列等待异步消费
rabbitTemplate.convertAndSend("seckill.direct","seckill.order",voucherOrder);
-
测试
- postman模拟下单
.
- RabbitMQ客户端接收消息
.
-
优化:自定义json格式的消息的序列化机制
@Configuration
public class RabbitMQConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
4.监听秒杀成功订单
- 先开启手动提交,否则业务执行失败,监听到的消息就丢失了
rabbitmq:
host: 120.26.122.127
port: 5672
listener:simple:acknowledge-mode: manual
- 监听seckill.order.queue队列的信息并且创建订单到数据库,当创建完成时手动ack
@Autowired
VoucherOrderServiceImpl voucherOrderService;
@RabbitListener(queues = {"seckill.order.queue"})
public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){try {voucherOrderService.handleVoucherOrder(voucherOrder);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {throw new RuntimeException(e);}
}
-
测试
- 下单成功
- 多用户模拟下单
达人探店
发布探店笔记
业务分析
-
探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个
-
tb_blog:探店笔记表,包含笔记中的标题、文字、图片等
-
tb_blog_comments:其他用户对探店笔记的评价
-
.
业务流程
- 点击首页最下方菜单栏中的+按钮,即可发布探店图文
- 注意:这里文件上传到本地nginx目录的imgs下,所以需要去
SystemConstants
中修改文件上传的位置
public static final String IMAGE_UPLOAD_DIR = "自己nginx安装目录\\html\\hmdp\\imgs\\";
业务调试
- 上传图片和发布笔记的代码黑马已经提供了,所以直接测试就行了
- 上传图片
- 发布博客
实现查看笔记功能
业务流程
- 需求:点击首页的探店笔记,会进入详情页面,实现该页面的查询接口
.
业务代码
@Override
public Result queryBlogById(Long id) {//查询博客消息Blog blog = getById(id);if(blog==null){return Result.fail("用户不存在");}queryBlogUser(blog);return Result.ok(blog);
}
private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());
}
bug修复
-
bug:查看笔记时关联店铺信息不显示
-
原因
- 查询店铺信息时,原本控制层代码如下
public Result queryShopById(@PathVariable("id") Long id) {return Result.ok(shopService.queryShopById(id)); }
- 而封装工具类时,业务方法返回Result对象
public Result queryShopById(Long id) {Shop shop = cacheClient.queryWithLogicalExpire(CACHE_SHOP_KEY, id, Shop.class, this::getById, LOCK_SHOP_TTL, TimeUnit.SECONDS);if (shop==null) {return Result.fail("店铺不存在");}return Result.ok(shop); }
- 从而导致最终控制层中Result携带的数据是一个Result类型的信息
-
解决:控制层直接返回业务层封装的Result数据就行了
public Result queryShopById(@PathVariable("id") Long id) {return shopService.queryShopById(id);
}
.
点赞
业务流程
.
完善点赞功能
- 需求
- 同一个用户只能点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤
-
给Blog类中添加一个isLike字段,表示是否被当前用户点赞
@TableField(exist = false) private Boolean isLike;
-
修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
public Result likeBlog(Long id) {//1.先获取当前的用户信息Long userId = UserHolder.getUser().getId();//2.判断当前用户是否已经给该博客点过赞String key="blog:liked:"+id;Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());if (BooleanUtil.isFalse(isLiked)) {//3.如果未点过赞,就可以点//3.1数据库点赞数+1boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();//3.2记录该用户点赞过该博客的信息if (isSuccess) {stringRedisTemplate.opsForSet().add(key,userId.toString());}}else {//4.如果点赞过,就取消点赞//4.1点赞数-1boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();//4.2set中移除该用户的信息stringRedisTemplate.opsForSet().remove(key,userId.toString());}return Result.ok(); }
-
修改根据id查询和分页查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
private void isBlogLiked(Blog blog) {//1.先获取当前的用户信息Long userId = UserHolder.getUser().getId();//2.判断当前用户是否已经给该博客点过赞String key="blog:liked:"+blog.getId();Boolean isLiked = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isLiked)); }
bug修复
-
问题重现:如果是游客,那查询博客时会抱空指针异常
-
原因:因为isBlogLiked需要检查当前用户的登录状态,游客就没用登录状态咯
-
解决
-
所有跟查询博客有关的逻辑都要判断用户是否登录,如果用户没登陆就不走isBlogLiked逻辑
//queryBlogById if(UserHolder.getUser()!=null){//判断是否被点过赞isBlogLiked(blog); } //queryHotBlog查询所有 if(UserHolder.getUser()!=null){// 查询所有博客,并且绑定上当前用户的点赞状态records.forEach(blog ->{queryBlogUser(blog);isBlogLiked(blog);}); }else {records.forEach(blog ->{queryBlogUser(blog);}); }
-
如果游客点赞,就跳转登录【这个拦截器已经默认拦截了】
-
点赞排行榜
需求分析
- 需求:按照点赞时间先后排序,返回Top5的用户
List | Set | SortedSet | |
---|---|---|---|
排序方式 | 按添加顺序排序 | 无法排序 | 根据score值排序 |
唯一性 | 不唯一 | 唯一 | 唯一 |
查找方式 | 按索引查找或首尾查找 | 根据元素查找 | 根据元素查找 |
业务修改
- 使用zset类型代替set类型
- likeBlog方法
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
if (score==null) {//3.如果未点过赞,就可以点//3.1数据库点赞数+1boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();//3.2记录该用户点赞过该博客的信息if (isSuccess) {stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());}
}else {//4.如果点赞过,就取消点赞//4.1点赞数-1boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();//4.2set中移除该用户的信息stringRedisTemplate.opsForZSet().remove(key,userId.toString());
}
- isBlogLiked方法
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
blog.setIsLike(BooleanUtil.isTrue(score!=null));
点赞列表查询
.
- 实现一个控制器处理likes/{id}的请求
@GetMapping("/likes/{id}")
public Result likesBlog(@PathVariable("id") Long id) {//查询博客的点赞列表return blogService.likesBlog(id);
}
- 业务方法
public Result likesBlog(Long id) {//1.查询前五名的点赞用户String key=BLOG_LIKED_KEY+id;Set<String> topFiveUsers = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (topFiveUsers==null||topFiveUsers.isEmpty()) {return Result.ok(Collections.emptyList());}//2.解析出前五名的用户idList<Long> ids = topFiveUsers.stream().map(Long::valueOf).collect(Collectors.toList());//3.根据用户id查询用户List<UserDTO> userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);
}
问题修复
-
问题描述:后点赞的反而排在前面了.
-
原因:sql中用in查询多个用户id对应的信息,此时查询结果会按照主键进行排序并返回
-
解决:显式指定返回顺序
SELECT id,phone,password,nick_name,icon,create_time,update_time FROM tb_user WHERE id IN (5,1 ) ORDER BY FIELD(id,5,1)
.
-
业务修改【其实这里用xml更方便一点】
//2.解析出前五名的用户id
List<Long> ids = topFiveUsers.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);//拼接下面FIELD函数要用的id字符串
//3.根据用户id查询用户
List<UserDTO> userDTOS = userService.query().in("id",ids).last("ORDER BY FIELD (id,"+idStr+")").//指定返回顺序list().stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
.
好友关注
关注和取关
业务流程
需求分析
- 需求:基于该表数据结构,实现两个接口
- 关注和取关接口
- 判断是否关注的接口
- 关注是User之间的关系,是博主与粉丝的关系,数据库中有一张tb_follow表来标示【注意:需要把主键修改为自增长,简化开发】
.
业务实现
@Override
public Result follow(Long followUserId, Boolean isFollowed) {Long userId = UserHolder.getUser().getId();//1.判断是关注还是取关if (isFollowed){//2.关注就新增数据Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);save(follow);}else {//3.取关就删除当前用户的关注对象信息remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId));}return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {Long userId = UserHolder.getUser().getId();Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();return Result.ok(count>0);
}
共同关注
展示博主个人主页
- 点击博主头像,可以进入博主首页
- 展示博主个人主页这一块黑马已经实现了,放在黑马资料中的代码片段里,分别复制到UserController和BlogController即可
共同关注的业务流程
- 需求:利用Redis中恰当的数据结构,实现共同关注功能。在博主个人页面展示出当前用户与博主的共同好友
.
修改关注和取关实现
if (isFollowed){//2.关注就新增数据//...if(save(follow)){//将关注的用户id放入redis的set集合中stringRedisTemplate.opsForSet().add(key,followUserId.toString());}
}else {//3.取关就删除当前用户的关注对象信息if (remove(new QueryWrapper<Follow>().eq("user_id",userId).eq("follow_user_id",followUserId))) {stringRedisTemplate.opsForSet().remove(key,followUserId.toString());}
}
共同关注实现
public Result followCommons(Long followUserId) {//1.获取当前用户Long userId = UserHolder.getUser().getId();String key1="follows:"+userId;//2.求交集String key2="follows:"+followUserId;Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);if(intersect==null||intersect.isEmpty()){//无交集return Result.ok();}//3.解析id集合List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());//4.查询用户List<UserDTO> userDTOS = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);
}
.
关注推送
基础概念
定义
- 关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息
.
模式
- Feed流产品有两种常见模式
- Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
- 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
- Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 本例中的个人页面,是基于关注的好友来做Feed流,因此采用Timeline的模式。该模式的实现方案有三种
- 拉模式
- 推模式
- 推拉结合
实现
- 拉模式:也叫做读扩散,每次读取时去拉取消息即可,节省内存,但是每次都得拉取,耗时
.
- 推模式:也叫做写扩散,消息会发送给所有关注的人,粉丝直接从自己的收件箱读取即可,但是要发送给所有粉丝不仅耗时还占内存
.
- 推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点,按粉丝数量采取不同模式
.
拉模式 | 推模式 | 推拉结合 | |
---|---|---|---|
写比例 | 低 | 高 | 中 |
读比例 | 高 | 低 | 中 |
用户读取延迟 | 高 | 低 | 低 |
实现难度 | 复杂 | 简单 | 很复杂 |
使用场景 | 很少使用 | 用户量少、没有大V | 过千万的用户量,有大V |
基于推模式实现关注推送
需求分析
-
黑马偷懒的借口还是很圆滑的,直接暴力使用推模式
-
需求
-
修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
-
收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
-
查询收件箱数据时,可以实现分页查询
-
-
Feed流的分页问题
- Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式
- 滚动分页模式:就是记录每次查询的末尾下标,之后从末尾下一个位置继续查
推送功能
public Result saveBlog(Blog blog) {//1.获取登录用户UserDTO user = UserHolder.getUser();blog.setUserId(user.getId());//2.保存探店博客boolean isSuccess = save(blog);if(!isSuccess){return Result.fail("新增笔记失败!!!");}//3.查询该作者的所有粉丝List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();//4.推送给所有粉丝for (Follow follow : follows) {//4.1获取粉丝idLong userId = follow.getUserId();//4.2推送String key="feed:"+userId;stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());}//5.返回idreturn Result.ok(blog.getId());
}
- 推送成功
.
关注推送的分页查询
业务流程
- 需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息
.
业务实现
- 方法参数
- max为上一次查询的最小时间,也是本次查询的最大时间,即本次查询需要展示的首条记录
- offset即要跳过的元素个数
- 返回数据
- minTime是本次查询的最小时间,降序排列后,集合中最后一个元素一定是最小时间,但是可能有多个相同的最小时间
- offset是等于本层查询的最小时间戳的数据总数,从第一次遇到最小时间开始向后递增
public Result queryBlogOfFollow(Long max, Integer offset) {//1.获取当前用户Long userId = UserHolder.getUser().getId();//2.查询收件箱String key = FEED_KEY + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples =stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok();}ArrayList<Long> ids = new ArrayList<>(typedTuples.size());//3.解析数据:blogId,minTime(时间戳),offsetint os=1;long minTime=Long.MAX_VALUE;for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {ids.add(Long.valueOf(typedTuple.getValue()));long time=typedTuple.getScore().longValue();if(time==minTime){os++;}else {minTime=time;os=1;}}//4.根据id查询blogString idStr = StrUtil.join(",", ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD (id," + idStr + ")").list();//指定返回顺序for (Blog blog : blogs) {queryBlogUser(blog);isBlogLiked(blog);}ScrollResult scrollResult = new ScrollResult();scrollResult.setList(blogs);scrollResult.setOffset(os);scrollResult.setMinTime(minTime);return Result.ok(scrollResult);
}
- 测试:每次查询三条记录才会触发滚动刷新效果【感谢弹幕】,所以从redis中获取zset的代码改为
Set<ZSetOperations.TypedTuple<String>> typedTuples =stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 3);
.
附件商户
GEO数据结构
基础概念
-
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,根据经纬度来检索数据
-
常见的命令有
-
GEOADD:添加一个地理空间信息,包含经度(longitude)、纬度(latitude)、值(member)
-
GEODIST:计算指定的两个点之间的距离并返回
-
GEOHASH:将指定member的坐标转为hash字符串形式并返回
-
GEOPOS:返回指定member的坐标
-
GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.2以后已废弃
-
GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回,范围可以是圆形或矩形。6.2.新功能
-
GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
-
练习Redis的GEO功能
-
添加下面几条数据
-
北京南站( 116.378248 39.865275 )
-
北京站( 116.42803 39.903738 )
-
北京西站( 116.322287 39.893729 )
geoadd g1 116.378248 39.865275 bjn 116.42803 39.903738 bj 116.322287 39.893729 bjx
.
-
-
计算北京西站到北京站的距离,最后一个参数指定返回的单位
.
-
搜索天安门( 116.397904 39.909005 )附近10km内的所有火车站,并按照距离升序排序【默认】
GEOSEARCH g1 fromlonlat 116.397904 39.909005 byradius 10 km withdist
.
附近商户搜索
业务流程
- 在首页中点击某个频道,即可看到频道下的商户
业务分析
- 按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
.
导入各个商户地理坐标
- 写入店铺时如果一条条写入效率较低
for (Shop shop : shops) {stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()),shop.getId().toString());
}
- add方法可以传入一个可迭代对象,一次性将集合元素写入redis
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();
for (Shop shop : shops) {locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(), shop.getY())));
}
stringRedisTemplate.opsForGeo().add(key,locations);
- 完整代码
@Test
void loadShopData() {//1.查询点评信息List<Shop> list = shopService.list();//2.按照店铺类型分组Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));//groupingBy可以根据指定参数进行分组//3.分批写入Redisfor (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {//3.1获取店铺类型idLong typeId = entry.getKey();String key = SHOP_GEO_KEY + typeId;//3.2获取同类相的店铺集合List<Shop> shops = entry.getValue();//3.3写入每个店铺的地理坐标List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();for (Shop shop : shops) {locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(), shop.getY())));}stringRedisTemplate.opsForGeo().add(key,locations);}
}
.
环境准备
- 黑马的springboot是2.3左右的版本,redis客户端也是老版本,所以需要导入6.2之后版本的redis客户端依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId></exclusion><exclusion><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.6.2</version>
</dependency>
<dependency><artifactId>lettuce-core</artifactId><groupId>io.lettuce</groupId><version>6.1.6.RELEASE</version>
</dependency>
业务实现
- 控制器方法新增加坐标的请求参数
@GetMapping("/of/type")
public Result queryShopByType(@RequestParam("typeId") Integer typeId,@RequestParam(value = "current", defaultValue = "1") Integer current,@RequestParam(value = "x",required = false)Double x,@RequestParam(value = "y",required = false)Double y
) {return shopService.queryShopByType(typeId,current,x,y);
}
- 业务代码
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {//1.判断是否需要更加坐标查询if(x==null||y==null){Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));return Result.ok(page.getRecords());}//2.计算分页参数int from=(current-1)*SystemConstants.DEFAULT_PAGE_SIZE;int end= current*SystemConstants.DEFAULT_PAGE_SIZE;//3.查询redis,按照距离排序String key=SHOP_GEO_KEY+typeId;GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),//指定中心点经纬度new Distance(5000),//指定查询半径,默认单位是米//这里的limit返回0~指定索引的记录,不能指定范围查询,所以这里把0~end所有数据都查出来再去截取RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));//4.解析出店铺id和距离信息if(results==null){return Result.ok();}//4.1截取from~end的部分List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();List<Long> ids=new ArrayList<>(list.size());HashMap<String, Distance> distanceHashMap = new HashMap<>(list.size());list.stream().skip(from).forEach(result->{//4.2获取店铺idString shopId = result.getContent().getName();ids.add(Long.valueOf(shopId));//4.3获取距离Distance distance = result.getDistance();distanceHashMap.put(shopId,distance);});//5.根据id查询店铺String idStr = StrUtil.join(",", ids);List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD (id," + idStr + ")").list();//给每个店铺设置距离信息for (Shop shop : shops) {shop.setDistance(distanceHashMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);
}
- 需要点击距离按钮才会有滚动分页功能
.
bug修复
- 问题描述:当划到底下没有数据时,会报如下sql语句异常
### SQL: SELECT id,name,type_id,images,area,address,x,y,avg_price,sold,comments,score,open_hours,create_time,update_time FROM tb_shop WHERE (id IN ()) ORDER BY FIELD (id,)
### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to
- 原因:因为没有店铺信息时就没有id列表,
"ORDER BY FIELD (id," + idStr + ")"
进行字符串拼接时没有idStr的参数,所以导致(id,)
- 解决:直接判断id列表是否为空就行了
if(ids.size()==0){return Result.ok();
}
- 黑马炫技:在截取fromend的店铺之前,如果from大于等于0end的店铺数,就说明当前这批数据之前都查询过了,没有新数据
if (list.size()<=from) {return Result.ok();
}
用户签到
BitMap用法
引入
-
假如用一张表来存储用户签到信息,其结构如下
.
- 假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为 1亿条
- 每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节
-
按月来统计用户签到信息,签到记录为1,未签到则记录为0
- 把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种结构就称为位图(BitMap)
- Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位
.
操作BitMap的命令
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
- BITOP :将多个BitMap的结果做位运算(与 、或、异或)
- BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
签到功能
需求分析
- 需求:实现签到接口,将当前用户当天签到信息保存到Redis中
说明 | |
---|---|
请求方式 | Post |
请求路径 | /user/sign |
请求参数 | 无 |
返回值 | 无 |
- 因为BitMap底层是基于String数据结构,因此其操作也都封装在字符串相关操作中了
.
业务实现
public Result sign() {//1.获取当前登录的用户Long userId = UserHolder.getUser().getId();//2.获取日期LocalDateTime now = LocalDateTime.now();//3.拼接keyString dateKey = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String userSignKey=USER_SIGN_KEY+userId+dateKey;//4.获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();//5.记录该用户今天的签到stringRedisTemplate.opsForValue().setBit(userSignKey,dayOfMonth-1,true);return Result.ok();
}
- postman模拟五月一号签到
签到统计
需求分析
- 需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
说明 | |
---|---|
请求方式 | GET |
请求路径 | /user/sign/count |
请求参数 | 无 |
返回值 | 连续签到天数 |
- 连续签到天数:从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数
BITFIELD key GET u[dayOfMonth] 0
得到本月到今天为止的所有签到数据- BITFIELD返回结果是十进制数,十进制数从后向前遍历每个bit位的操作如下
- 与1做与运算,就能得到最后一个bit位
- 随后右移1位,下一个bit位就成为了最后一个bit位
- 直到遇到第一个0,就找到连续签到的开始位置
业务实现
public Result signCount() {//1.获取当前用户本月截止到今天的所有签到记录Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String dateKey = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String userSignKey=USER_SIGN_KEY+userId+dateKey;int dayOfMonth = now.getDayOfMonth();//由于bitField可以同时进行get、set等多种操作,所以返回的结果是一个集合List<Long> bitField = stringRedisTemplate.opsForValue().bitField(userSignKey, BitFieldSubCommands.create().//将0-dayOfMonth的所有比特位以十进制无符号形式返回get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));//2.位域获取到的是十进制数,需要每次用1和当前数字进行与运算得到当前最低位if(bitField==null||bitField.isEmpty()){//没有签到结果return Result.ok(0);}Long num = bitField.get(0);if(num==null||num==0){//num为0说明0-dayOfMonth的所有比特位都是0return Result.ok(0);}int count=0;//3.循环判断每个最低位是否为1while((num&1)==1){count++;num>>>=1;//无符号右移,判断前一天是否签到了}return Result.ok(count);
}
- 为了方便测试,我把系统的日期调成了5.7,并且将5.5-5.7对应的位置为1
UV统计
基础概念
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量
- UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖
HyperLogLog用法
- Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
- Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低,但是其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略
- Hyperloglog会去重
.
实现UV统计
环境搭建
- 直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何
@Test
void testHyperLogLog() {// 准备数组,装用户数据 String[] users = new String[1000];int index = 0;for (int i = 1; i <= 1000000; i++) {users[index++] = "user_" + i;// 每1000条发送一次if (i % 1000 == 0) {index = 0;stringRedisTemplate.opsForHyperLogLog().add("hll1", users);}} // 统计数量 Long size = stringRedisTemplate.opsForHyperLogLog().size("hll1");System.out.println("size = " + size);
}
-
测试前内存占用
.
-
向HyperLogLog键插入一百万条数据
-
查看内存占用【总占用反而更少了,可能是有些键过期了】