一、配置前后端项目的初始环境
前端:
对前端项目在cmd中进行start nginx.exe,端口号为8080
后端:
配置mysql数据库的url 和 redis 的url 和 导入数据库数据
二、登录校验
基于Session的实现登录(不推荐)
(1)发送验证流程:
- 用户发送验证码 -
- 校验手机号 -
- 符合则生成验证码(不符合就提示用户所输入的手机号错误)-
- 保存验证码到session-发送验证码 -
(2)短信验证流程:
- 将提交的手机号验证码与校验验证码进行对比 -
- 两个验证码一致(验证码不一致,验证码错误)-
- 查询用户信息,若存在则进行登录(若不存在进行注册,创建新的用户)-
(3)校验登录状态的流程:
-
请求并携带 cookie -
-
从session中获取用户 -
-
判断用户是否存在(存在保存用户信息到ThreadLocal,不存在就进行拦截)-
(4)实操
UserController:
/*** 发送手机验证码*/@PostMapping("code")public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {//发送短信验证码并保存验证码return userService.sendCode(phone,session);}
UserServiceImpl:
@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {// 1.判断手机号是否符合规范if(RegexUtils.isPhoneInvalid(loginForm.getPhone())){return Result.fail("手机号格式不正确");}// 2.手机号码符合规范,获取发送验证码和session中的验证码进行比较String code = loginForm.getCode();Object cacheCode = session.getAttribute("code");if(code == null || !cacheCode.toString().equals(code) ){return Result.fail("验证码不正确");}// 3.验证码正确,查询数据库User user = userMapper.selectOne(new LambdaQueryWrapper<User>().eq(User::getPhone, loginForm.getPhone()));// 4.判断用户是否存在,如果不存在则创建用户if(user == null){user = createWithUser(loginForm.getPhone());}// 5.将用户信息保存到session中session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));return Result.ok();}public User createWithUser(String phone){User user = new User();user.setPhone(phone);user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(6) );userMapper.insert(user);return user;}
拦截器
LoginInterceptor:
public class LoginInterceptor extends HandlerInterceptorAdapter {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.获取sessionHttpSession session = request.getSession();// 2.获取session中的user对象Object user = session.getAttribute("user");// 3.判断user是否为空if(user == null){response.setStatus(401);return false;}// 4.把user保存到UserHolder中UserHolder.saveUser((UserDTO) user);return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
MvcConfig:
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/shop/**","/vouchar/**","/shop-type/**","upload/**","/blog/hot","/user/login","/user/code");}
}
基于Redis实现共享Session登录(推荐)
使用session进行登录校验,会出现session的共享问题,多台Tomcat并不共享session存储空间,当请求切换到不同的tomcat服务导致数据丢失的问题
(1)校验登录状态流程:
- 请求并携带token -
- 从Redis中 以token为key获取用户信息 -
- 判断用户是否存在(若存在,保存用户到ThreadLocal;若不存在则进行拦截)
(2)短信验证流程:
- 提交手机号码获取验证码 -
- 校验验证码(如果验证码不一致,则验证码错误)
- 查询用户是否存在(如果不存在就创建新用户)
- 保存用户到Redis中
(3)实操:
UserController:
@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic Result sendCode(String phone, HttpSession session) {// 1.判断手机号是否符合规范if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式不正确");}// 2.手机号码符合规范,生成验证码String code = RandomUtil.randomNumbers(6);// 3.将验证码保存到redis中stringRedisTemplate.opsForValue().set(RedisConstants.LOGIN_CODE_KEY+phone, code,RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);// TODO 4.发送验证码return Result.ok();}@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {// 1.判断手机号是否符合规范if(RegexUtils.isPhoneInvalid(loginForm.getPhone())){return Result.fail("手机号格式不正确");}// 2.手机号码符合规范,获取发送验证码和redis中的验证码进行比较String code = loginForm.getCode();String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY+loginForm.getPhone());if(code == null || !Objects.equals(cacheCode, code)){return Result.fail("验证码不正确");}// 3.验证码正确,查询数据库User user = userMapper.selectOne(new LambdaQueryWrapper<User>().eq(User::getPhone, loginForm.getPhone()));// 4.判断用户是否存在,如果不存在则创建用户if(user == null){user = createWithUser(loginForm.getPhone());}// 5.生成token作为登录令牌,这里就先直接生成,可以使用JWT的方式生成String token = UUID.randomUUID().toString();UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);Map<String,Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));// 6.将用户信息保存到redis中String tokenKey = RedisConstants.LOGIN_USER_KEY+token;stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);stringRedisTemplate.expire(tokenKey, RedisConstants.LOGIN_USER_TTL, TimeUnit.SECONDS);// 7.返回tokenreturn Result.ok(token);}public User createWithUser(String phone){User user = new User();user.setPhone(phone);user.setNickName(SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomNumbers(6) );userMapper.insert(user);return user;}
MvcConfig:
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)).excludePathPatterns("/shop/**","/vouchar/**","/shop-type/**","upload/**","/blog/hot","/user/login","/user/code");}
}
LoginInteceptor:
public class LoginInterceptor extends HandlerInterceptorAdapter {private StringRedisTemplate stringRedisTemplate;public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.获取请求头中的tokenString token = request.getHeader("authorization");if(StrUtil.isBlank(token)) {response.setStatus(401);return false;}// 2.判断token是否有效String tokenKey = RedisConstants.LOGIN_USER_KEY + token;Map<Object,Object> userMap =stringRedisTemplate.opsForHash().entries(tokenKey);if(userMap.isEmpty()){response.setStatus(401);return false;}// 3.把token中的user信息反序列化UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(),false);// 4.把user保存到UserHolder中UserHolder.saveUser(user);// 5.设置token的过期时间stringRedisTemplate.expire(tokenKey, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
解决登录状态刷新问题(登录拦截器优化)
使用双拦截器的方式
RefreshTokenInterceptor:
public class RefreshTokenInterceptor extends HandlerInterceptorAdapter {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.获取请求头中的tokenString token = request.getHeader("authorization");if(StrUtil.isBlank(token)) {return true;}// 2.判断token是否有效String tokenKey = RedisConstants.LOGIN_USER_KEY + token;Map<Object,Object> userMap =stringRedisTemplate.opsForHash().entries(tokenKey);if(userMap.isEmpty()){return true;}// 3.把token中的user信息反序列化UserDTO user = BeanUtil.fillBeanWithMap(userMap, new UserDTO(),false);// 4.把user保存到UserHolder中UserHolder.saveUser(user);// 5.设置token的过期时间stringRedisTemplate.expire(tokenKey, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
LoginInterceptor:
public class LoginInterceptor extends HandlerInterceptorAdapter {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 通过ThreadLocal获取用户信息进行判断if(UserHolder.getUser() == null){response.setStatus(401);return false;}return true;}}
MvcConfig:
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/shop/**","/vouchar/**","/shop-type/**","upload/**","/blog/hot","/user/login","/user/code").order(1);registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**") //默认拦截所有请求.order(0); //order值越小,优先级越高}
}
三、添加缓存
查询商户缓存:
@Overridepublic Result queryById(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);if(StrUtil.isNotBlank(shopJson)) {// 1.缓存中存在,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 2.缓存中不存在,查询数据库Shop shop = shopMapper.selectById(id);if(shop == null){return Result.fail("查询不到该店铺");}// 3.如果该店铺存在,将查询到的数据放入缓存stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL);// 4.返回结果return Result.ok(shop);}
四、缓存更新:
内存淘汰 | 超时剔除 | 主动更新 | |
说明 | 不用自己维护,利用Redis的内存淘汰机制,当内存不足的时自动淘汰部分数据,下次查询时更新缓存 | 给缓存设置TTL时间,到期后删除缓存,下次查询时更新缓存 | 编写业务逻辑,在修改数据库的同时,更新缓存 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
根据业务场景选择:
低一致性需求:使用内存淘汰机制,例如店铺的类型查询
高一致性需求:主动更新,以超时剔除作为兜底方案,例如店铺详细查询
主动更新策略
读操作:
- 缓存命中就返回
- 缓存未命中就查询数据库,写入缓存并设置超时时间,然后返回
写操作:
- 先写数据库,然后再删除缓存(写数据库的过程相对比较慢)
- 确保数据库与缓存操作的原子性
更新商户:
下面是单体架构,如果对于分布式系统,需要通过MQ的方式传送给其他系统进行删除缓存的操作
@Transactional(rollbackFor = Exception.class)@Overridepublic Result update(Shop shop) {Long id = shop.getId();if(id == null){return Result.fail("更新失败,shopId不能为空");}// 1.更新数据库shopMapper.updateById(shop);// 2.删除缓存stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);return Result.ok();}
五、缓存穿透
推荐使用缓存空对象的方式
查询商户(设置缓存空对象)
@Overridepublic Result queryById(Long id) {String key = RedisConstants.CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);if(StrUtil.isNotBlank(shopJson)) {// 1.缓存中存在,直接返回Shop shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 存在shopJson且不为null,则说明shopJson为空缓存if(shopJson != null){return Result.fail("查询不到该店铺");}// 2.缓存中不存在,查询数据库Shop shop = shopMapper.selectById(id);if(shop == null){// 3.如果数据库中不存在,设置缓存空对象stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return Result.fail("查询不到该店铺");}// 3.如果该店铺存在,将查询到的数据放入缓存stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL,TimeUnit.MINUTES);// 4.返回结果return Result.ok(shop);}
六、缓存雪崩
七、缓存击穿
基于互斥锁 实现商户查询:
@Overridepublic Result queryById(Long id) {// 1.互斥锁解决缓存击穿Shop shop = queryWithPassMutex(id);// 4.返回结果return Result.ok(shop);}public Shop queryWithPassMutex(Long id){String key = RedisConstants.CACHE_SHOP_KEY + id;String shopJson = stringRedisTemplate.opsForValue().get(key);Shop shop = null;if(StrUtil.isNotBlank(shopJson)) {// 1.缓存中存在,直接返回shop = JSONUtil.toBean(shopJson, Shop.class);return shop;}// 2.存在shopJson且不为null,则说明shopJson为空缓存if(shopJson != null){return null;}// 3.尝试加锁String lockKey = RedisConstants.LOCK_SHOP_KEY + id;try {// 3.1 尝试加锁boolean isLock = tryLock(lockKey);if(!isLock){// 3.2 获取锁失败,休眠后重试Thread.sleep(50);return queryWithPassMutex(id);}// 4.缓存中不存在,查询数据库shop = shopMapper.selectById(id);if(shop == null){// 5.如果数据库中不存在,设置缓存空对象stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);return null;}// 6.如果该店铺存在,将查询到的数据放入缓存stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL,TimeUnit.MINUTES);}catch (InterruptedException e){throw new RuntimeException(e);}finally {// 7.释放锁unlock(lockKey);}return shop;}public boolean tryLock(String key){Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}public void unlock(String key){stringRedisTemplate.delete(key);}
基于逻辑过期 实现商户查询:
使用工具类RedisData:
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
编写代码:
public boolean tryLock(String key){// 使用setIfAbsent方法尝试设置一个键值对,如果键不存在则设置成功并返回true// 同时设置该键的超时时间为10秒,以防止死锁Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key,"1",10,TimeUnit.SECONDS);// 将Boolean对象转换为基本boolean类型并返回结果return BooleanUtil.isTrue(flag);
}public void unlock(String key){// 删除指定的Redis键,用于释放锁stringRedisTemplate.delete(key);
}// 创建一个固定大小的线程池,用于缓存重建任务
public static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public Shop queryWithLogicalExpire(Long id){// 构建Redis缓存的键String key = RedisConstants.CACHE_SHOP_KEY + id;// 从Redis获取缓存数据String shopJson = stringRedisTemplate.opsForValue().get(key);// 如果缓存为空,则返回nullif(StrUtil.isNotBlank(shopJson)) {return null;}// 将json字符串反序列化为RedisData对象RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();// 判断缓存是否过期if(expireTime.isAfter(LocalDateTime.now())){// 如果未过期,直接返回Shop对象return shop;}// 如果已过期,获取锁的键String lockKey = RedisConstants.LOCK_SHOP_KEY + id;// 尝试获取锁boolean isLock = tryLock(lockKey);if(isLock) {// 如果获取锁成功,异步执行缓存重建任务CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 重建缓存this.saveShop2Redis(id, RedisConstants.CACHE_SHOP_TTL);}catch (Exception e){// 异常处理e.printStackTrace();}finally {// 释放锁unlock(lockKey);}});}// 返回可能过期的Shop对象return shop;
}public void saveShop2Redis(Long id, Long expireSeconds){// 从数据库中查询Shop对象Shop shop = shopMapper.selectById(id);// 创建RedisData对象用于封装Shop对象和过期时间RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));// 构建Redis缓存的键String key = RedisConstants.CACHE_SHOP_KEY + id;// 将RedisData对象序列化为JSON字符串并保存到Redis中,同时设置过期时间stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData), expireSeconds, TimeUnit.SECONDS);
}
八、 优惠卷秒杀
全局唯一ID
当用户抢购时,会产生订单并保存tb_voucher_order这表中,而订单表如果使用数据库自增ID会产生的问题:
- id的规律太明显
- 受单表数据量的限制
生成时间戳+序列号的生成唯一ID的工具类
@Component
public class RedisIdWorker {private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}//初始时间戳,值为2024年1月1日0时0分0秒的秒数:LocalDateTime.of(2024,1,1,0,0,0).toEpochSecond(ZoneOffset.UTC);public static final long BEGIN_TIMESTAMP = 1704067200L;//序列号的位数public static final int COUNT_BITS = 32;public Long nextId(String keyPrefix){//1.生成时间戳Long nowSecond = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);Long timestamp = nowSecond - BEGIN_TIMESTAMP;//2.生成序列号String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);// 将时间戳左移序列号位数,然后与序列号进行位或运算,生成最终的IDreturn timestamp << COUNT_BITS | count;}}
添加优惠卷
tb_seckill_voucher在添加优惠卷时需要考虑关联tb_voucher的id
同时使用@Transactional保证数据的一致性
@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);}
实现秒杀下单
@Override@Transactional(rollbackFor = Exception.class)public Result seckillVoucher(Long voucherId) {SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束");}if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还未开始");}if(seckillVoucher.getStock() < 1){return Result.fail("库存不足");}//减库存seckillVoucherMapper.update(null,new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).set(SeckillVoucher::getStock, seckillVoucher.getStock() - 1));//生成订单VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);UserDTO userDTO = UserHolder.getUser();voucherOrder.setUserId(userDTO.getId());voucherOrderMapper.insert(voucherOrder);return Result.ok(orderId);}
库存超卖问题
如果线程A在扣减库存之前,有线程B执行完查询库存的操作,那么线程B获取的库存是线程A扣减库存之前的数据,这会导致超卖的问题发生。
解决超卖问题 乐观锁的思路:
方式一:
在每次卖出商品后,通过递增版本号的方式;进行扣除库存前使用where进行判断该库存是否等于当前的版本是否为查询商品时所获取的版本,如果不一致说明其他线程进行了扣库存的操作,所以本次操作无效。如果版本号一致,说明没有进行修改过,此时库存数据还是查询时的库存,可以进行减库存的操作。
方式二:
因为每次卖出商品,库存都会变化,那么我们只需要在减库存的同时进行判断现在数据库中的库存为查询时的库存;
如果where stock = queryStock 说明没有进行修改过,此时库存数据还是查询时的库存,可以进行减库存的操作。如果stock 和 queryStock 不一致说明其他线程进行了扣库存的操作,所以本次操作无效。
实践:
减扣库存的同时使用 where判断stock > 0
//获取查询的库存Integer queryStock = seckillVoucher.getStock();//减库存seckillVoucher.setStock(seckillVoucher.getStock() - 1);int seckillChange = seckillVoucherMapper.update(seckillVoucher,new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).eq(SeckillVoucher::getStock, queryStock) //乐观锁方案,通过CAS更新库存.set(SeckillVoucher::getStock, seckillVoucher.getStock() - 1).gt(SeckillVoucher::getStock, 0));if(seckillChange < 1){return Result.fail("库存不足");}
一人一单
启动类上加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)
添加依赖:
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
集群下的并发问题
通过右击点击Copy Configuration或者Ctrl+D
在 VM options中添加 -Dserver.port=8082
在nginx 文件中的conf/nginx.conf中配置
# 设置工作进程的数量,这里设置为1
worker_processes 1;events {# 每个工作进程可以处理的最大连接数worker_connections 1024;
}http {# 包含文件类型映射表include mime.types;# 如果请求的文件没有指定类型,则默认使用application/jsondefault_type application/json;# 开启sendfile功能,可以在传输文件时提高效率sendfile on;# 保持连接的超时时间keepalive_timeout 65;server {# 监听的端口号listen 8080;# 服务器名称server_name localhost;# 匹配所有请求,指定前端项目所在的位置location / {# 前端项目根目录root html/hmdp;# 默认首页文件index index.html index.htm;}# 定义错误页面,当出现500, 502, 503, 504错误时,返回50x.htmlerror_page 500 502 503 504 /50x.html;location = /50x.html {# 错误页面的根目录root html;}# 匹配以/api开头的请求,进行反向代理location /api { # 默认响应类型为application/jsondefault_type application/json;# keep-alive超时时间keepalive_timeout 30s; # 每个keep-alive连接的最大请求数keepalive_requests 1000; # 使用HTTP/1.1版本,支持keep-aliveproxy_http_version 1.1; # 重写请求,去除/api前缀rewrite /api(/.*) $1 break; # 传递请求头到上游服务器proxy_pass_request_headers on;# 当出现错误或超时时,尝试下一个上游服务器proxy_next_upstream error timeout; # 反向代理到本地的8081端口proxy_pass http://127.0.0.1:8081;# 注释掉的配置,可以使用upstream定义的后端服务器# proxy_pass http://backend;}}# 定义后端服务器组upstream backend {# 后端服务器配置,包括最大失败次数、失败超时时间、权重等server 127.0.0.1:8081 max_fails=5 fail_timeout=10s weight=1;# 可以添加更多后端服务器# server 127.0.0.1:8082 max_fails=5 fail_timeout=10s weight=1;}
}
每个JVM内部都有其锁的监视器 ,这可能导致并行问题
sychronized只能保证单个JVM内部多个线程的锁,没法直接对JVM集群下的锁进行互斥
分布式锁使在集群和分布式系统下多个进程可见并互斥的锁,使用一个锁监视器监视所有的JVM
分布式锁的实现
分布式锁的核心实现多进程之间的互斥,常用的三种方法:
MySQL | Redis | Zookeeper | |
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁的超时时间,到期释放 | 临时节点,断开连接自动释放 |
基于Redis的分布式锁
简单的Redis分布式锁实现:
ILock:
public interface ILock {/**** @param timeoutSec 设置超时时间* @return true 获取锁, false 获取锁失败*/boolean tryLock(long timeoutSec);/*** 释放锁*/void unlock();
}
SimpleRedisLock:
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {long threadId = Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(flag);}@Overridepublic void unlock() {stringRedisTemplate.delete(KEY_PREFIX+name);}
}
VoucherOrderServiceImpl 中的 seckillVoucher 方法
@Override@Transactional(rollbackFor = Exception.class)public Result seckillVoucher(Long voucherId) {SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);if(seckillVoucher.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已结束");}if(seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还未开始");}if(seckillVoucher.getStock() < 1){return Result.fail("库存不足");}Long userId = UserHolder.getUser().getId();SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);boolean isLock = lock.tryLock(1000);if(!isLock){return Result.fail("不允许重复下单");}try{IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId, seckillVoucher);}finally {lock.unlock();}}
解决分布式锁误删问题
新的并发问题:当释放锁的时候,如果不进行判断当前获取的锁的标识是否为自己,那么很有可能当时释放的锁是其他线程的锁。所以需要再释放锁的时候进行判断锁的标识是否是自己的。
改进之前简单的分布式锁实现:
- 在获取锁的时存入线程标识(UUID)
- 在释放锁的时先获取锁中的线程标识,判断是否与当前的标识是否一致(如果一致则释放锁,不一致就不释放锁)
修改SimpleRedisLock中的代码:
通过设置UUID设置 ID_PREFIX 作为线程的唯一标识
释放锁时通过对比当前的threadId是否与获取的id的值一致,一致才允许释放锁
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString();@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(flag);}@Overridepublic void unlock() {String threadId = ID_PREFIX + Thread.currentThread().getId();String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);if(threadId.equals(id)){stringRedisTemplate.delete(KEY_PREFIX+name);}}
}
分布式锁原子性问题
即时添加了释放锁的判断,但是因为判断锁和释放锁这两个动作没有进行原子性设置,很可能判断锁后,因为线程的阻塞导致超时释放锁,然后再进行释放锁操作,造成了误删。
(所以,要设置判断锁和释放锁的原子性)
使用Lua脚本,确保多条命令执行时的原子性。Lua编程语言的基本语法参考网站:
Lua 教程 | 菜鸟教程 (runoob.com)https://www.runoob.com/lua/lua-tutorial.html
unlock.lua:
if(redis.call('get',KEYS[1]) == ARGV[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
SimpleRedisLock:
private static final String KEY_PREFIX = "lock:";private static final String ID_PREFIX = UUID.randomUUID().toString();// 定义一个静态常量 UNLOCK_SCRIPT,类型为 DefaultRedisScript,预期返回值类型为 Longprivate static final DefaultRedisScript<Long> UNLOCK_SCRIPT;// 静态初始化块,用于初始化静态常量 UNLOCK_SCRIPTstatic{UNLOCK_SCRIPT = new DefaultRedisScript<>();// 设置 Lua 脚本的路径,这里脚本是从类路径下的资源文件 "unlock.lua" 中加载UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 指定脚本执行后的返回值类型为 Long.classUNLOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId();Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(flag);}@Overridepublic void unlock(){// 使用 StringRedisTemplate 执行 Lua 脚本 UNLOCK_SCRIPTstringRedisTemplate.execute(// 传递预定义的 Lua 脚本对象 UNLOCK_SCRIPTUNLOCK_SCRIPT,// 传递一个包含锁键的列表,这里锁键由前缀和锁的名称组成Collections.singletonList(KEY_PREFIX + name),// 传递一个参数,该参数是线程 ID 前缀加上当前线程的 IDID_PREFIX + Thread.currentThread().getId());}
基于分布式锁的优化Redisson
基于Redis的setnx实现的分布式锁存在的问题:
- 不可重入:同一个线程无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回,没有重试机制
- 超时释放:锁超时释放虽然可以避免出现死锁,但是业务执行耗时过长,也会导致锁释放,存在安全隐患
- 主从一致性:如果Redis提供了主从集群,当主节点宕机时,如果从节点并同步主节点中的锁数据,会出现锁实现
Redisson快速入门
引入Redisson依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.33.0</version></dependency>
创建RedissonConfig:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");return Redisson.create(config);}}
实操:
RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("不允许重复下单");}try{IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId, seckillVoucher);}finally {lock.unlock();}
处理主从一致性使用Redisson中的mutiLock:
多创建两个Redis文件,修改Redis文件中的redis.windows.conf中的port(端口)和requirepass(密码)
打开cmd窗口输入:(运行redis.windows.conf配置的内容,默认密码123456)
redis-server.exe redis.windows.conf
RedissonConfig:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");return Redisson.create(config);}@Beanpublic RedissonClient redissonClient2() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");return Redisson.create(config);}@Beanpublic RedissonClient redissonClient3() {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");return Redisson.create(config);}}
实操:
@Resourceprivate RedissonClient redissonClient;@Resourceprivate RedissonClient redissonClient2;@Resourceprivate RedissonClient redissonClient3;
RLock lock1 = redissonClient.getLock("lock:order:" + userId);RLock lock2 = redissonClient2.getLock("lock:order:" + userId);RLock lock3 = redissonClient3.getLock("lock:order:" + userId);RLock lock = redissonClient.getMultiLock(lock1, lock2, lock3);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("不允许重复下单");}try{IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId, seckillVoucher);}finally {lock.unlock();}
Redisson的原理总结:
- 不可重入Redis分布式锁:
原理:利用setnx的互斥性,利用ex避免死锁,释放锁时进行判断线程标示
缺点:不可重入,无法重试,锁超时失效
- 可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数,利用watchDog延续锁,利用信号控制锁等待重试
缺点:Redis宕机导致锁失效问题
- Redisson的mutiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺点:运维成本高,实现复杂
秒杀优化
(1)在新增秒杀优惠卷的时候,将优惠卷信息保存到Redis中
@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);//保存秒杀库到Redis中stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY+voucher.getId(), voucher.getStock().toString());}
(2)编写lua脚本,创建seckill.lua文件:内容如下,用于判断用户是否抢购成功
--优惠卷
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]-- 库存Key
local stockKey = 'seckill:stock:' .. voucherId-- 订单Key
local orderKey = 'seckill:order:' .. voucherId-- 脚本业务
if(tonumber(redis.call('get',stockKey)) <=0 ) then-- 库存不足,返回1return 1
end
-- 判断用户是否已经下单 orderKey中的userId是否已经存在
if(tonumber(redis.call('sismember',orderKey,userId)) == 1 ) thenreturn 2
end
-- 库存-1
redis.call('incrby',stockKey,-1)
-- 记录用户已下单
redis.call('sadd',orderKey,userId)return 0
(3)ServiceImpl中:
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);private static final ExecutorService SEKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SEKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run(){while(true){try{//从阻塞队列中获取订单信息VoucherOrder voucherOrder = orderTasks.take();//处理订单handlerVoucherOrder(voucherOrder);}catch (Exception e){e.printStackTrace();}}}}//代理对象,因为子线程无法代理父类IVoucherOrderService proxy;public void handlerVoucherOrder(VoucherOrder voucherOrder){Long userId = UserHolder.getUser().getId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();log.info("是否获取到锁:{}", isLock);if(!isLock){log.error("不允许重复下单");return;}try{proxy.createVoucherOrder(voucherOrder);}finally {lock.unlock();}}@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Long voucherId = voucherOrder.getVoucherId();int seckillCount = voucherOrderMapper.selectCount(new LambdaUpdateWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, UserHolder.getUser().getId()).eq(VoucherOrder::getVoucherId, voucherId));if(seckillCount > 0){log.error("不允许重复下单");return;}SeckillVoucher seckillVoucher = seckillVoucherMapper.selectById(voucherId);//获取查询的库存Integer queryStock = seckillVoucher.getStock();//减库存seckillVoucher.setStock(seckillVoucher.getStock() - 1);int seckillChange = seckillVoucherMapper.update(seckillVoucher,new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).eq(SeckillVoucher::getStock, queryStock) //乐观锁方案,通过CAS更新库存.set(SeckillVoucher::getStock, seckillVoucher.getStock() - 1).gt(SeckillVoucher::getStock, 0));if(seckillChange < 1){log.error("库存不足");return;}voucherOrderMapper.insert(voucherOrder);}private static final DefaultRedisScript<Long> SEKILL_SCRIPT;static {SEKILL_SCRIPT = new DefaultRedisScript<>();SEKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SEKILL_SCRIPT.setResultType(Long.class);}@Override@Transactional(rollbackFor = Exception.class)public Result seckillVoucher(Long voucherId) {//执行lua脚本Long userId = UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(SEKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());int r = result.intValue();if(r!=0){return Result.fail(r==1?"库存不足":"不能重复下单");}VoucherOrder voucherOrder = new VoucherOrder();//将下单的信息保存到阻塞队列中long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);//将订单信息保存到阻塞队列中orderTasks.add(voucherOrder);proxy = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);}
Redis消息队列
Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟和消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
基于List结构模拟消息队列
队列是入口和出口不在一边,我们可以利用LPUSH结合RPOP、或者RPUSH结合LPOP来实现
注意队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样阻塞并等待消息、因此应使用BRPOP或者BLPOP来实现阻塞效果。
优点:
- 利用Redis存储,不受JVM内存上限
- 基于Redis的持久化机制,数据安全性的保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub结构模拟消息队列
PubSub(发布订阅):消费者可以订阅一个或多个channel,生产者可以向对应的channel发送消息后,所有订阅者都可以收到相关消息
- SUBSCRIBE channel [channel]:订阅一个或多个频道
- PUBLISH channel msg:向一个频道发送消息
- PSUBSCRIBE pattern [pattern]:订阅与pattern格式匹配的所有频道
优点:
- 采用发布订阅的模型、支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列(需要安装Redis5.0版本或以上)
Stream类型消息队列的XREAD的特点:
优点:
- 消息可回溯
- 一个消息可以被多个消费者读取·
- 可以阻塞读取
缺点:
- 有消息漏读的风险
Stream消费者组:(Redis版本5.0)
Stream类型消息队列的XREADGROUP命令:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
总结 Redis消息队列
List | PubSub | Stream | |
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限内存空间,可以利用多消费者加快处理 | 不支持 | 支持 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
实操:基于Redis消息队里的Stream结构作为消息队列,实现异步秒杀下单
注意:提高Redis版本到5.0
需求:
- (1)创建一个Stream的消息队列,名为stream.orders
- (2)修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消费,内容包含voucherId、userId、orderId
- (3)项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
seckill.lua:
--优惠卷
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]-- 库存Key
local stockKey = 'seckill:stock:' .. voucherId-- 订单Key
local orderKey = 'seckill:order:' .. voucherId-- 脚本业务
if(tonumber(redis.call('get',stockKey)) <=0 ) then-- 库存不足,返回1return 1
end
-- 判断用户是否已经下单 orderKey中的userId是否已经存在
if(tonumber(redis.call('sismember',orderKey,userId)) == 1 ) thenreturn 2
end
-- 库存-1
redis.call('incrby',stockKey,-1)
-- 记录用户已下单
redis.call('sadd',orderKey,userId)
-- 发送消息到队列中 XADD stream.orders * k1 v1 k2 v2
--(下面的key值请参考实体类所定义的,因为这个lua脚本是创建订单信息,所以oderId对应实体的id)
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
代码:
@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate RedissonClient redissonClient;@Resourceprivate StringRedisTemplate stringRedisTemplate;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有异常消息,结束循环break;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);}}}}private void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();// 创建锁对象RLock redisLock = redissonClient.getLock("lock:order:" + userId);// 尝试获取锁boolean isLock = redisLock.tryLock();// 判断if (!isLock) {// 获取锁失败,直接返回失败或者重试log.error("不允许重复下单!");return;}try {// 5.1.查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了log.error("不允许重复下单!");return;}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣减失败log.error("库存不足!");return;}// 7.创建订单save(voucherOrder);} finally {// 释放锁redisLock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();// 2.判断结果是否为0if (r != 0) {// 2.1.不为0 ,代表没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}// 3.返回订单idreturn Result.ok(orderId);}
九、达人探店
查询探店
@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryBlogUser(blog);this.isBlogLiked(blog);});return Result.ok(records);}@Overridepublic Result queryBlogById(Long id) {// 1.查询blogBlog blog = getById(id);if (blog == null) {return Result.fail("笔记不存在!");}// 2.查询blog有关的用户queryBlogUser(blog);// 3.查询blog是否被点赞isBlogLiked(blog);return Result.ok(blog);}private void queryBlogUser(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}private void isBlogLiked(Blog blog) {// 1.获取登录用户UserDTO user = UserHolder.getUser();if (user == null) {// 用户未登录,无需查询是否点赞return;}Long userId = user.getId();// 2.判断当前登录用户是否已经点赞String key = "blog:liked:" + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(score != null);}
点赞功能
@Overridepublic Result likeBlog(Long id) {Long userId = UserHolder.getUser().getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(RedisConstants.BLOG_LIKED_KEY + id, userId.toString());if (BooleanUtil.isFalse(isMember)) {int isSuccess = blogMapper.update(null,new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked+1"));if (isSuccess > 0) {stringRedisTemplate.opsForSet().add(RedisConstants.BLOG_LIKED_KEY + id, userId.toString());}}else{int isSuccess = blogMapper.update(null,new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked-1"));if(isSuccess > 0){stringRedisTemplate.opsForSet().remove(RedisConstants.BLOG_LIKED_KEY + id, userId.toString());}}return Result.ok();}
点赞排名
好友关注
@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = UserHolder.getUser().getId();if(isFollow){Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);int isSuccess = followMapper.insert(follow);if(isSuccess > 0){// 关注成功stringRedisTemplate.opsForSet().add(RedisConstants.FOLLOW_KEY+userId,followUserId.toString());}}else{int isSuccess = followMapper.delete(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));if (isSuccess>0){stringRedisTemplate.opsForSet().remove(RedisConstants.FOLLOW_KEY+userId,followUserId.toString());}}return Result.ok();}@Overridepublic Result isFollow(Long followUserId) {Long userId = UserHolder.getUser().getId();// 查询当前用户是否关注了 followUserIdint count = followMapper.selectCount(new LambdaUpdateWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return count > 0 ? Result.ok(true) : Result.ok(false);}
共同关注
@Overridepublic Result followCommons(Long id) {Long userId = UserHolder.getUser().getId();// 查询当前用户的共同关注Set<String> intersect = stringRedisTemplate.opsForSet().intersect(RedisConstants.FOLLOW_KEY+userId, RedisConstants.FOLLOW_KEY+id);if(StringUtils.isEmpty(intersect)){return Result.ok(Collections.emptyList());}List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> users = userMapper.selectBatchIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(users);}
关注推送
使用Feed流,对用户所查看的信息进行分析,推送用户喜欢的内容
Feed流产品有两种常用的模式:
Timeline:不做内容的赛选,简单的按照发布内容的时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会缺失,实现相对简单
- 缺点:信息用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽违规的、用户不感兴趣的内容。推送用户感兴趣的信息来吸引用户
- 优点:推送用户感兴趣的内容,增加用户粘性
- 缺点:算法不精准,可能起反作用
个人的界面是基于关注好友做Feed流,使用Timeline模式,该模式实现的方案有三种:
- 拉模式
- 推模式
- 推拉模式
拉模式 | 推模式 | 推拉结合 | |
写比例 | 低 | 高 | 中 |
读比例 | 高 | 低 | 中 |
用户读取延迟 | 高 | 低 | 低 |
实现难度 | 复杂 | 简单 | 很复杂 |
使用场景 | 很少使用 | 用户量少,没有大V | 过千万的用户量,有大V |
@Overridepublic Result saveBlog(Blog blog) {blog.setUserId(UserHolder.getUser().getId());int isSuccess = blogMapper.insert(blog);if(isSuccess <= 0){return Result.fail("新增笔记失败");}List<Follow> follows = followMapper.selectList(new LambdaQueryWrapper<Follow>().eq(Follow::getFollowUserId, UserHolder.getUser().getId()));for(Follow follow : follows) {Long userId = follow.getUserId();String key = RedisConstants.FEED_KEY + userId;stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}return Result.ok(blog.getId());}
实现滚动分页查询
请求参数:
- lastId(上一次查询的最小时间戳)
- offset:偏移量
返回值:
- List<Blog>:小于指定时间戳的笔记集合
- minTime:本次查询的推送的最小时间戳
- offset:偏移量
定义返回结果的实体类ScrollResult:
@Data
public class ScrollResult {private List<?> list;private Long minTime;private Integer offset;
}
@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {// 1.获取当前用户Long userId = UserHolder.getUser().getId();// 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset countString key = FEED_KEY + userId;Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);// 3.非空判断if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok();}// 4.解析数据:blogId、minTime(时间戳)、offsetList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0; // 2int os = 1; // 2for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2// 4.1.获取idids.add(Long.valueOf(tuple.getValue()));// 4.2.获取分数(时间戳)long time = tuple.getScore().longValue();if(time == minTime){os++;}else{minTime = time;os = 1;}}// 5.根据id查询blogString idStr = StrUtil.join(",", ids);List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Blog blog : blogs) {// 5.1.查询blog有关的用户queryBlogUser(blog);// 5.2.查询blog是否被点赞isBlogLiked(blog);}// 6.封装并返回ScrollResult r = new ScrollResult();r.setList(blogs);r.setOffset(os);r.setMinTime(minTime);return Result.ok(r);}
十、GEO(Geolocation)
(1)练习题
GEOADD:(添加坐标)
GEOADD g1 116.378248 39.865275 bjn 116.42003 39.903738 bjz 116.32287 39.893729 bjx
GEODIST:(查看两个坐标之间的距离)
最后是返回的单位,默认为m,第二行指定返回的单位为km
GEODIST g1 bjx bjzGEODIST g1 bjx bjz km
GEOPOS:(查看坐标成员)
GEOPOS g1 bjz
GEOSERACH:(查看坐标范围内容的坐标成员)
GEOSEARCH g1 FROMLONLAT 116.397904 39.909005 BYRADIUS 10 km WITHDIST
GEOSEARCH
:执行地理位置搜索。g1
:指定的键(key),其中存储了地理位置信息。FROMLONLAT 116.397904 39.909005
:搜索的中心点,这里是经纬度坐标(116.397904, 39.909005)。BYRADIUS 10 km
:指定搜索的半径为10公里。WITHDIST
:返回结果中包含成员与中心点之间的距离。
(2)附近商户搜索
请求参数:、
- typeId:商户类型
- current:页码,滚动查询
- x:经度
- y:纬度
返回值:
-
List<Shop>:符合要求的商户信息
按照商户的类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中
@Testvoid loadShopData() {// 1.查询店铺信息List<Shop> list = shopService.list();// 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));// 3.分批完成写入Redisfor (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {// 3.1.获取类型idLong typeId = entry.getKey();String key = SHOP_GEO_KEY + typeId;// 3.2.获取同类型的店铺的集合List<Shop> value = entry.getValue();List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());// 3.3.写入redis GEOADD key 经度 纬度 memberfor (Shop shop : value) {// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(), shop.getY())));}stringRedisTemplate.opsForGeo().add(key, locations);}}
SpringDataRedis的2.3.9版本并不支持Redis 6.2 提供的GEOSEARCH命令,因此我们需要提示版本,修改自己的POM文件:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId></exclusion><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>3.3.2</version></dependency><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>6.2.3.RELEASE</version></dependency>
代码:
@Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {// 1.判断是否需要根据坐标查询if (x == null || y == null) {// 不需要坐标查询,按数据库查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}// 2.计算分页参数int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = current * SystemConstants.DEFAULT_PAGE_SIZE;// 3.查询redis、按照距离排序、分页。结果:shopId、distanceString key = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE.search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));// 4.解析出idif (results == null) {return Result.ok(Collections.emptyList());}List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();if (list.size() <= from) {// 没有下一页了,结束return Result.ok(Collections.emptyList());}// 4.1.截取 from ~ end的部分List<Long> ids = new ArrayList<>(list.size());Map<String, Distance> distanceMap = new HashMap<>(list.size());list.stream().skip(from).forEach(result -> {// 4.2.获取店铺idString shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));// 4.3.获取距离Distance distance = result.getDistance();distanceMap.put(shopIdStr, distance);});// 5.根据id查询ShopString idStr = StrUtil.join(",", ids);List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}// 6.返回return Result.ok(shops);}
十一、BitMap实现用户签到
实现签到功能:
使用1标识签到,0标识未签到
代码:
@Overridepublic Result sign() {// 1.获取当前登录用户Long userId = UserHolder.getUser().getId();// 2.获取日期LocalDateTime now = LocalDateTime.now();// 3.拼接keyString keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;// 4.获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();// 5.写入Redis SETBIT key offset 1stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);return Result.ok();}
统计连续签到功能:
从最后一次签到开始向前统计,知道遇到第一次未签到为止,计算总的签到次数,就是连续的签到天数。
获取本月到今天的所有签到数据:
BITFIELD key GET u[datOfMonth] 0
u 代表无符号整数,dayOfMonth是该月的天数
从后向前遍历每个bit位,分别与1进行运算,就能知道最后一个Bit位
代码:
@Overridepublic Result signCount() {// 1.获取当前登录用户Long userId = UserHolder.getUser().getId();// 2.获取日期LocalDateTime now = LocalDateTime.now();// 3.拼接keyString keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;// 4.获取今天是本月的第几天int dayOfMonth = now.getDayOfMonth();// 5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0List<Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));if (result == null || result.isEmpty()) {// 没有任何签到结果return Result.ok(0);}Long num = result.get(0);if (num == null || num == 0) {return Result.ok(0);}// 6.循环遍历int count = 0;while (true) {// 6.1.让这个数字与1做与运算,得到数字的最后一个bit位 // 判断这个bit位是否为0if ((num & 1) == 0) {// 如果为0,说明未签到,结束break;}else {// 如果不为0,说明已签到,计数器+1count++;}// 把数字右移一位,抛弃最后一个bit位,继续下一个bit位num >>>= 1;}return Result.ok(count);}