go zero 缓存实践:分页列表
在实际开发中,分页列表 是一个非常常见的需求,尤其在面对大量数据时,通过分页可以有效减轻服务器和数据库的压力,提高用户体验。本篇文章将通过go zero 和 Redis 的结合,提供一个高效、灵活的分页列表实现方案,涵盖 基本分页逻辑、Redis 缓存结合 和 常见优化方法。
一、需求分析和实现方案
1.需求
在一个社交媒体平台中,每个用户可以发布多篇文章,当用户浏览文章时,需要分页加载他们的内容。考虑以下场景:
- 按 发布时间 和 点赞数 排序。
- 数据需要 支持分页,并在高并发情况下保持高性能。
- 结合 Redis 缓存 提升效率,减少数据库查询压力。
- 防止重复数据 或分页游标不一致问题。
2. 分页实现方案
分页通常分为两种实现方式:
- 基于偏移量(Offset-based Pagination): 使用 SQL 的
LIMIT
和OFFSET
实现,适合小型数据集。 - 基于游标(Cursor-based Pagination): 通过某个字段(如
id
或publish_time
)来标记分页起点,更适合大型数据集和高并发场景。
在本文中,我们主要讨论 游标分页 的实现。
完整的分页步骤总结:
- 参数校验:确保用户输入的参数有效,并设置合理的默认值。
- **排序字段设置 **:根据排序方式选择排序字段,确定游标的意义。
- **缓存查询 **:尝试从缓存中获取数据,优先使用缓存提升性能。
- **数据库查询 **:当缓存未命中时,从数据库查询数据,确保数据一致性。
- **数据排序 **:根据排序字段对数据进行排序,确保结果符合业务逻辑。
- **边界处理 **:防止分页数据重复,同时正确处理最后一页标记。
- **缓存更新 **:异步更新缓存,提升后续查询效率。
- **结果返回 **:封装分页数据、游标以及是否为最后一页的信息。
二、 项目设计
1.数据表设计
article
表:
CREATE TABLE `article` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',`title` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '标题',`content` TEXT NOT NULL COMMENT '内容',`author_id` BIGINT UNSIGNED NOT NULL DEFAULT '0' COMMENT '作者ID',`like_num` INT NOT NULL DEFAULT '0' COMMENT '点赞数',`publish_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '发布时间',PRIMARY KEY (`id`),INDEX `idx_author_publish_time` (`author_id`, `publish_time`)
);
2.分页接口需求
-
请求参数:
userId
:用户 ID。cursor
:上一页的最后一个游标值(如publish_time
)。pageSize
:每页的记录数量。sortType
:排序方式(0 按发布时间排序,1 按点赞数排序)。
-
返回结果:
isEnd
:是否为最后一页。cursor
:下一页的游标值。articles
:当前页的文章列表。articleId
: 最后一个文章ID
article.proto
文件:
syntax = "proto3";package pb;
option go_package="./pb";service Article {rpc Articles(ArticlesRequest) returns (ArticlesResponse);
}message ArticlesRequest {int64 userId = 1;int64 cursor = 2;int64 pageSize = 3;int64 sortType = 4;}message ArticleItem {int64 Id = 1;string title = 2;string content = 3;string description = 4;string cover = 5;int64 commentCount = 6;int64 likeCount = 7;int64 publishTime = 8;int64 authorId = 9;
}message ArticlesResponse {repeated ArticleItem articles = 1;bool isEnd = 2;int64 cursor = 3;int64 articleId = 4;
}
三、项目实现
为了进一步提高性能,可以使用 Redis 存储文章列表的分页缓存。这里使用 Redis 的有序集合(ZSET
),根据 publish_time
或 like_num
排序。
1.自定义常量
const (SortPublishTime = iotaSortLikeCount
)
const (articlesExpire = 3600 * 24 * 2
)
const (DefaultPageSize = 20DefaultLimit = 200DefaultSortLikeCursor = 1 << 30
)
2.通过用户ID查询文章
func (m *customArticleModel) ArticlesByUserId(ctx context.Context, userId, likeNum int64, pubTime, sortField string, limit int) ([]*Article, error) {//var anyField anyvar sql stringif sortField == "like_num" {//anyField = likeNum//sql = fmt.Sprintf("select "+articleRows+" from "+m.table+" where user_id=? and like_num < ? order by %s desc limit ?", sortField)sql = fmt.Sprintf("select %s from %s where `author_id`=? and like_num < %d order by %s desc limit ?", articleRows, m.table, likeNum, sortField)} else {//anyField = pubTimesql = fmt.Sprintf("select %s from %s where `author_id`=? and publish_time < '%s' order by %s desc limit ?", articleRows, m.table, pubTime, sortField)}var articles []*Articleerr := m.QueryRowsNoCacheCtx(ctx, &articles, sql, userId, limit)if err != nil {return nil, err}return articles, nil
}
3.从缓存查询数据
// 查数据 先查缓存, 如果存在则续期
func (l *ArticlesLogic) cacheArticles(ctx context.Context, userId, cursor, pageSize, sortType int64) ([]int64, error) {key := fmt.Sprintf("biz#articles#%d#%d", userId, sortType)err := l.extendCacheExpiration(ctx, key)if err != nil {return nil, err}return l.fetchArticlesFromCache(ctx, key, cursor, pageSize)}// 缓存续期函数
func (l *ArticlesLogic) extendCacheExpiration(ctx context.Context, key string) error {exists, err := l.svcCtx.Rds.ExistsCtx(ctx, key)if err != nil || !exists {return err}return l.svcCtx.Rds.ExpireCtx(ctx, key, articlesExpire+rand.Intn(60))
}// 从缓存中获取文章 ID
func (l *ArticlesLogic) fetchArticlesFromCache(ctx context.Context, key string, cursor int64, pageSize int64) ([]int64, error) {paris, err := l.svcCtx.Rds.ZrevrangebyscoreWithScoresAndLimitCtx(ctx, key, 0, cursor, 0, int(pageSize))if err != nil {return nil, err}var ids []int64for _, pair := range paris {id, err := strconv.ParseInt(pair.Key, 10, 64)if err != nil {return nil, err}ids = append(ids, id)}return ids, nil
}
4.从数据库中查询文章信息
如果缓存未命中,使用MapReduce从数据库中查询,go zero会自动写入缓存
// 缓存没有去数据库
func (l *ArticlesLogic) articleByIds(ctx context.Context, articleIds []int64) ([]*model.Article, error) {articles, err := mr.MapReduce[int64, *model.Article, []*model.Article](func(source chan<- int64) {for _, aid := range articleIds {source <- aid}}, func(id int64, writer mr.Writer[*model.Article], cancel func(error)) {p, err := l.svcCtx.ArticleModel.FindOne(ctx, id)if err != nil {cancel(err)return}writer.Write(p)}, func(pipe <-chan *model.Article, writer mr.Writer[[]*model.Article], cancel func(error)) {var articles []*model.Articlefor article := range pipe {articles = append(articles, article)}writer.Write(articles)})if err != nil {return nil, err}return articles, nil
}
5.数据添加到有序集合
如果从数据库中查询到信息,把它加入到redis的有序集合中
func (l *ArticlesLogic) addCacheArticles(ctx context.Context, articles []*model.Article, userId int64, sortType int32) error {if len(articles) == 0 {return nil}key := fmt.Sprintf("biz#articles#%d#%d", userId, sortType)for _, article := range articles {var score int64if sortType == SortLikeCount {score = article.LikeNum} else if sortType == SortPublishTime && article.Id != 0 {score = article.PublishTime.Local().Unix()}if score < 0 {score = 0}_, err := l.svcCtx.Rds.ZaddCtx(ctx, key, score, strconv.Itoa(int(article.Id)))if err != nil {return err}}return l.svcCtx.Rds.ExpireCtx(ctx, key, articlesExpire)
}
6.游标分页主逻辑
func (l *ArticlesLogic) Articles(in *pb.ArticlesRequest) (*pb.ArticlesResponse, error) {// todo: add your logic here and delete this line//输入校验 检查 SortType/UserId 是否有效if in.SortType != SortPublishTime && in.SortType != SortLikeCount {in.SortType = SortPublishTime}if in.UserId < 0 {return nil, errors.New("用户ID不合法")}//设置默认的 PageSize 和 Cursor。if in.PageSize == 0 {in.PageSize = DefaultPageSize}if in.Cursor == 0 {if in.SortType == SortPublishTime {in.Cursor = time.Now().Unix()} else {in.Cursor = DefaultSortLikeCursor}}var sortField stringvar sortLikeNum int64var sortPublishTime string//根据排序类型确定排序字段if in.SortType == SortLikeCount {sortField = "like_num"sortLikeNum = in.Cursor} else {sortField = "publish_time"//2023-12-01 12:00:00//sortPublishTime = "CURRENT_TIMESTAMP"sortPublishTime = time.Unix(in.Cursor, 0).Format("2006-01-02 15:04:05")}var isCache = falsevar isEnd boolvar curPage []*pb.ArticleItemvar articles []*model.Articlevar err errorvar lastId, cursor int64// 先查缓存 ,缓存不要做错误处理, 不影响正常流程//尝试通过缓存获取文章ID集合articleIds, _ := l.cacheArticles(l.ctx, in.UserId, in.Cursor, in.PageSize, int64(in.SortType))if len(articleIds) > 0 {fmt.Println("缓存命中")//如果缓存中有数据,标记 isCache 为 trueisCache = true//若缓存返回的最后一个 ID 为 -1,表示数据已经到达末尾,设置 isEnd = trueif articleIds[len(articleIds)-1] == -1 {isEnd = true}fmt.Println("articleIds:", articleIds)//根据缓存的文章 ID 查询具体的文章内容。articles, err = l.articleByIds(l.ctx, articleIds)if err != nil {return nil, err}// 通过sortFiled对articles进行排序var cmpFunc func(a, b *model.Article) intif sortField == "like_num" {cmpFunc = func(a, b *model.Article) int {return cmp.Compare(b.LikeNum, a.LikeNum)}} else {cmpFunc = func(a, b *model.Article) int {return cmp.Compare(b.PublishTime.Unix(), a.PublishTime.Unix())}}slices.SortFunc(articles, cmpFunc)// 数据封装与分页//遍历排序后的文章数据,将其封装为 pb.ArticleItem 并追加到 curPagefor _, article := range articles {curPage = append(curPage, &pb.ArticleItem{Id: int64(article.Id),Title: article.Title,Content: article.Content,LikeCount: article.LikeNum,AuthorId: int64(article.AuthorId),CommentCount: article.CommentNum,PublishTime: article.PublishTime.Unix(),})}} else {//使用 SingleFlight 防止并发查询,确保同一用户的多次查询只会执行一次数据库操作。//如果缓存未命中,则查询数据库获取文章列表。articlesT, _ := l.svcCtx.SingleFlightGroup.Do(fmt.Sprintf("ArticlesByUserId:%d:%d", in.UserId, in.SortType),func() (interface{}, error) {//最大查询200条return l.svcCtx.ArticleModel.ArticlesByUserId(l.ctx, in.UserId, sortLikeNum, sortPublishTime, sortField, 200)})if articlesT == nil {return &pb.ArticlesResponse{}, nil}//将查询结果转换为 []*model.Article 类型//从数据库查询结果中获取文章数据articles = articlesT.([]*model.Article)//第一页var firstPageArticles []*model.Article//如果文章数量超过了 PageSize,只取前 PageSize 个文章。if len(articles) > int(in.PageSize) {//设置第一页的文章数据firstPageArticles = articles[:int(in.PageSize)]} else {firstPageArticles = articlesisEnd = true}//把第一页的数据,存储到当前页数据for _, article := range firstPageArticles {curPage = append(curPage, &pb.ArticleItem{Id: int64(article.Id),Title: article.Title,Content: article.Content,LikeCount: article.LikeNum,AuthorId: int64(article.AuthorId),CommentCount: article.CommentNum,PublishTime: article.PublishTime.Unix(),})}}if len(curPage) > 0 {//获取当前页的最后一个数据的 ID 和 CursorpageLast := curPage[len(curPage)-1]lastId = pageLast.Id//根据上一页最后一个数据的cursor设置下一页的Cursorif in.SortType == SortLikeCount {cursor = pageLast.LikeCount} else {cursor = pageLast.PublishTime}// 确保 Cursor 不为负数if cursor < 0 {cursor = 0}//判断是否有重复的文章for k, article := range curPage {if in.SortType == SortPublishTime {if article.PublishTime == in.Cursor && article.Id == in.ArticleId {curPage = curPage[k:] // 从下一个开始break}} else {if article.LikeCount == in.Cursor && article.Id == in.ArticleId {curPage = curPage[k:] // 从下一个开始break}}}}//fmt.Println("isCache:", isCache)if !isCache {fmt.Println("补偿数据")threading.GoSafe(func() {if len(articles) < DefaultLimit && len(articles) > 0 {articles = append(articles, &model.Article{Id: -1})}err = l.addCacheArticles(context.Background(), articles, in.UserId, in.SortType)if err != nil {logx.Errorf("addCacheArticles error: %v", err)}})}return &pb.ArticlesResponse{IsEnd: isEnd,Cursor: cursor,ArticleId: lastId,Articles: curPage,}, nil}