什么是Feed流?
顾名思义就是投喂流 传统的信息查找方式用户需要手动去搜寻 Feed流就是不再是用户自己找 而是服务端主动投喂他喜欢/想看到的信息 考虑以下场景:
张三关注了李四
王五关注了李四
当李四发了动态时 它的粉丝们在我的关注列表里就能看到自己关注的人发的动态且最新发布的在最上面
以上场景需求捕捉:
1.发布动态后要通知给所有粉丝
2.粉丝读取时要能看到所有关注的人最新动态
3.内容是按发布时间倒序的
如何满足呢?
Redis的sortedSet数据结构
从需求来看 用户发布动态后要立马将动态主动推送到每个粉丝的收件箱里 值不能重复 还要排序 所以sortedset的分数结构就能很好支持
表结构设计:
关注表:
CREATE TABLE `tb_follow` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',`user_id` bigint unsigned NOT NULL COMMENT '用户自己',`follow_user_id` bigint unsigned NOT NULL COMMENT '被用户关注的人',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1关注0未关注',`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`updated_at` timestamp NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE,KEY `idx_user_id_follow_user_id` (`user_id`,`follow_user_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT
用到的redis命令:
写入用户收件箱:
ZADD key score member [score] [member]
分页查询用户收件箱:
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
想必看到查询的参数列表里有limit offset count这种字眼 就知道分页咋弄了
比如用户2关注的博主发了5篇动态 用户2的收件箱被推送了这5篇:
member就是动态的id Score就是时间戳 假设查第一页查3条 那么查出来就是29,28,27
ZREVRANGEBYSCORE subscribe:blog:2 1843092073692 0 LIMIT 0 3
命令讲解:
max就是最大分数: 这里用当前时间就是最新的: 1843092073692
min就是最小分数 给0就行
LIMIT 0 3就是从0个偏移量开始查3个 这里和mysql分页意思一样
ZREVRANGEBYSCORE是倒序查 所以根据时间戳分数就是29, 28, 27
发布推流和分页查询代码实现
发布推流
/*** 发布博客并广播到所有订阅者的收件箱** @param blog* @return*/@Overridepublic long publish(Blog blog) {Long userId = UserHolder.getUser().getId();blog.setUserId(userId);long inserted = blogMapper.insertBlog(blog);if (inserted < 1) {throw new BusinessException("发布失败");}// 主动推到当前用户的订阅者的收件箱// select user_id from tb_follow where follow_user_id = ? and status = 1List<Long> followerIds = followService.findFollowerById(userId);for (Long followerId : followerIds) {stringRedisTemplate.opsForZSet().add(SUBSCRIBE_BLOG + followerId, blog.getId().toString(), System.currentTimeMillis());}return blog.getId();}
这里主要就是做了发布动态 然后把当前用户的粉丝都查出来 将动态id推送到每个粉丝的收件箱
用户分页查询关注的博主动态
接口参数两个:
1.时间戳 第一次传取当前时间 以后都用接口返回的
2.偏移量 第一次不用传 以后都用接口返回的
Controller层:
/*** 拉取用户关注的人的动态** @param lastTimestamp* @param offset* @return*/
@GetMapping("/of/follow")
public Result pullSubscribeBlogs(@RequestParam("lastId") Long lastTimestamp, @RequestParam(required = false, defaultValue = "0") Integer offset) {Long userId = UserHolder.getUser().getId();return Result.ok(blogService.pullSubscribeBlogs(userId, offset, lastTimestamp));
}
service代码:
@Override
public ScrollResult<Blog> pullSubscribeBlogs(Long userId, Integer offset, Long lastTimestamp) {ScrollResult scrollResult = new ScrollResult();String key = SystemConstants.SUBSCRIBE_BLOG + userId;final int PAGE_SIZE = 5; // 一页查多少个// 获取降序的订阅消息 最新的在上面Set<ZSetOperations.TypedTuple<String>> subscribeBlogs = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastTimestamp, offset, PAGE_SIZE);// 没有则查到头了if (Objects.isNull(subscribeBlogs) || subscribeBlogs.isEmpty()) {scrollResult.setList(Collections.emptyList());return scrollResult;}long minTimestamp = 0; // 记录最小时间戳int cursor = 1; // 相同的最小的时间戳存在的帖子的个数,用来跳过相同分数避免重复数据查询 默认为1是因为查出来的最后一条就是下一次查的第一条 所以要告诉下一次要偏移1个去掉重复的List<String> ids = new ArrayList<>(subscribeBlogs.size()); // 指定初始化长度 避免长度超出默认值触发扩容for (ZSetOperations.TypedTuple<String> subscribeBlog : subscribeBlogs) {String id = subscribeBlog.getValue();ids.add(id); // 将id添加到集合用于后续查找动态详情// 判断时间戳是否有重合 有则记录数+1long time = subscribeBlog.getScore().longValue();if (time == minTimestamp) {cursor += 1;} else {cursor = 1;minTimestamp = time; // 更新最小时间戳}}int newOffset = cursor > 1 ? cursor + offset : cursor; // 如果cursor>1则有时间戳重合的部分 要加上重复的部分 下一次查询才能跳过重复的List<Blog> blogs = blogMapper.findByIds(ids); // 根据ids查出所有博客和用户信息for (Blog blog : blogs) { // 查询用户是否点赞和点赞数String likeKey = BLOG_LIKED + blog.getId();Set<String> likedUserIds = stringRedisTemplate.opsForZSet().reverseRange(likeKey, 0, -1);blog.setIsLike(likedUserIds.contains(userId.toString()));blog.setLiked(likedUserIds.size());}scrollResult.setList(blogs);scrollResult.setOffset(newOffset);scrollResult.setMinTime(minTimestamp);return scrollResult;
}
这里的难点就是要理解分页边界 每次分页前端会把最后一个时间戳带过来 所以查出来的第一条是上一条 所以cursor=1做为偏移量过滤掉上一条 考虑第一次查询为 5 4 3 那么第二次就会把3查出来 加上偏移量1就会从2开始查就不会有重复3了 然而当时间戳相同时 即同一时间有多条记录 我们要记录相同时间的条数 下次查时用偏移量跳过这些