目录
1. 优化思路
总结之前实现的秒杀过程
下单流程
2. 使用Redis完成秒杀资格判断和库存
0. Redis中数据类型的选用
1.将优惠券信息保存到Redis中
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
3. 开启新协程,处理数据库的数据
4.可以使用消息队列
3. Redis消息队列
基于List实现消息队列
基于 Pub-Sub 模式
其在处理消息积压时,为什么会丢数据?
基于Stream的消息队列
发送消息:XADD
读取消息:XREAD
基于Stream的消息队列-消费者组
1. 创建消费者组
编辑
2. 从消费者组读取消息
使用stream消息队列来消费订单
1. 创建stream类型的消息队列
2. 往队列中添加订单消息
3. 获取消息
项目地址:https://github.com/liwook/PublicReview
1. 优化思路
总结之前实现的秒杀过程
- 最开始我们的遇到自增ID问题,我们通过实现分布式ID解决了问题;
- 之后在单体系统下遇到了一人多单超卖问题,我们通过乐观锁解决,判断库存是否大于0,大于0才返回成功;
- 我们对业务进行了变更,将一人多单变成了一人一单,结果在高并发场景下同一用户发送相同请求仍然出现了超卖问题,我们通过悲观锁解决了,在查看库存时候就添加锁,其他线程的同一个用户就会被堵住;
- 由于用户量的激增,我们将单体系统升级成了集群,结果由于锁只能在一个进程中可见导致又出现了同一用户发送下单请求出现超卖问题。我们通过实现分布式锁成功解决集群下的超卖问题。
- 最后,我们直接使用现有成熟的方案redsync来解决上诉出现的所有问题。什么不可重试、不可重入、超时释放、原子性等问题,redsync都提供相对应的解决方法。
上面的都是我们为了解决程序可用所做的。现在程序执行不会出现超卖问题的。但是其性能可能不太好。
下单流程
查询优惠卷 – 判断秒杀库存是否足够 –加分布式锁 –查询订单 – 校验是否是一人一单 – 扣减库存 – 创建订单
在这7步操作中,加粗黑体的操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的比较慢,所以我们需要异步程序执行。
我们可以将一部分的工作交给Redis,并且通过开启一个独立的子线程去异步执行数据库的一些操作,从而大大提高效率。
2. 使用Redis完成秒杀资格判断和库存
把 判断秒杀库存 和 检验一人一单 交由Redis处理。Redis中判断能抢购,就返回成功,之后再开启一个线程去处理一些MySql操作。
- 当用户下单之后,判断库存是否充足只需要在redis中根据key找对应的value是否大于0即可。
- 如果不充足,则直接结束;如果充足,继续在redis中判断用户是否可以下单。
0. Redis中数据类型的选用
- 对于库存值,我们可以直接使用 String 类型。
- 一人一单的判断,一个优惠卷是可以让多个不同的用户购买的,即一个优惠卷可以对应多个用户,就是说可以用个集合,key是voucherId,而集合元素就是userId。而需要可以快速判断该集合中是否有某userId,那可以使用 Set类型,key是优惠卷id,value是userId。
所以:Redis中,库存使用String 类型,判断一人一单使用Set类型。
检验一人一单的过程:
- 如果set集合中没有这条数据,说明他可以下单;
- 如果set集合中没有这条记录,则将userId存入到redis中。
该过程是两个操作,而且我们还需要判断库存,所以需要使用lua来保证整个过程的原子性。
基于以上的分析,可得出需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、该用户是否已购买过,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
但是在go语言中,开启一个协程是很方便的,消耗的资源也少,所以当前,处理完redis中的数据,直接开启一个新协程去处理数据库中的数据。
1.将优惠券信息保存到Redis中
const (SeckillStockKey = "seckill:voucher:"
)// 添加秒杀券
func addSeckillVoucher(voucher Voucher) error {.....................q := query.Use(db.DBEngine)//使用事务err = q.Transaction(func(tx *query.Query) error {..........................})if err != nil {return err}//往redis中添加秒杀卷的库存return db.RedisClient.Set(context.Background(), SeckillStockKey+strconv.Itoa(int(v.ID)), voucher.Stock, 0).Err()
}
2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
在internal/shopservice添加lau.go文件。
const AdjustSeckill = `
-- 秒杀优化需求二:基于Lua脚本,判断秒杀库存、一人一单,决定用户是否有购买资格
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = KEYS[1]
-- 1.2.用户id
local userId = KEYS[2]-- 2.数据key
-- 2.1.库存key ..lua的字符串拼接
local stockKey = 'seckill:stock:' .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey tonumber()将字符串转换为数字
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER:判断set集合中是否存在某个元素
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户) sadd orderKey userIdredis.call('sadd', orderKey, userId)return 0
`
修改seckillVoucher函数。注意:返回的结果res的类型是interface{},需要转换。
// post /voucher/seckill
func SeckillVoucher(c *gin.Context) {var req seckillResquesterr := c.BindJSON(&req)if err != nil {slog.Error("SeckillVoucher, bind json bad", "err", err)code.WriteResponse(c, code.ErrBind, nil)return}script := redis.NewScript(AdjustSeckill)res, err := script.Run(context.Background(), db.RedisClient, []string{strconv.Itoa(req.VoucherId), strconv.Itoa(req.UserId)}).Result()if err != nil {slog.Error("run script bad", "err", err)code.WriteResponse(c, code.ErrDatabase, nil)return}//需要注意,res的类型是interface{},需要转换。if res.(int64) != 0 {// if res != 0 {var e stringif res == 1 {e = "stock not enough"} else {e = "order already exist"}code.WriteResponse(c, code.ErrDatabase, e)}orderId := NextId("order")//TODO 开启新协程,异步更新数据库go createVoucherOrder(req, orderId)// 3.返回订单idcode.WriteResponse(c, code.ErrSuccess, gin.H{"orderId": orderId})
}
测试效果符合预期,而且花费的时间也大大减少了。开启400个协程来模拟,结果只需要51ms,比之前的2,3s快了很多。
3. 开启新协程,处理数据库的数据
协程函数是之前的CreateVoucherOrder函数。
这里有个问题是:我们不知道是否成更新了MySql中的数据。这个是一定要处理的。看后续。
// post /voucher/seckill
func SeckillVoucher(c *gin.Context) {............orderId := NextId("order")//开启新协程,异步更新数据库go createNewOrder(req, orderId)// 3.返回订单idcode.WriteResponse(c, code.ErrSuccess, gin.H{"orderId": orderId})
}//该函数就是在原来的createVoucherOrder函数上进行修改的
func createNewOrder(req seckillResquest, orderId int64) {order := model.TbVoucherOrder{ID: orderId,VoucherID: uint64(req.VoucherId),UserID: uint64(req.UserId),}//处理两张表(订单表,秒杀卷表),使用事务q := query.Use(db.DBEngine)q.Transaction(func(tx *query.Query) error {//3.合法,库存数量减1//使用update,要是没有该条数据,不会返回gorm.ErrRecordNotFound或者有错误的。info, err := tx.TbSeckillVoucher.Where(tx.TbSeckillVoucher.VoucherID.Eq(uint64(req.VoucherId)), tx.TbSeckillVoucher.Stock.Gt(0)).UpdateSimple(tx.TbSeckillVoucher.Stock.Add(-1))if err != nil {return err}if info.RowsAffected == 0 {return fmt.Errorf("affected rows is 0")}//4.成功,创建对应的订单,并保存到数据中// err = tx.TbVoucherOrder.Create(&order)err = tx.TbVoucherOrder.Select(tx.TbVoucherOrder.ID, tx.TbVoucherOrder.VoucherID, tx.TbVoucherOrder.UserID).Create(&order)if err != nil {return err}return nil})
}
那么每来一个抢单,就需要开启一个新协程。会有几个问题:
- 资源管理:如果并发量非常大,频繁地开启新协程可能会消耗过多的系统资源,如内存等。如果没有合理的协程数量控制机制,可能会导致系统资源耗尽。
- 错误处理复杂:在新协程中处理 MySQL 写入时,错误处理会变得比较复杂。例如,如果写入 MySQL 失败,可能需要一种机制来通知相关模块或者进行重试操作,并且需要确保这些操作不会影响到主业务流程。
- 数据安全问题,要是程序某些原因宕机了,而没有保存到。
那么可以使用消息队列。
4.可以使用消息队列
消息队列即是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
使用队列的好处在于 解耦:所谓解耦在我们秒杀中就是:我们下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快我们的响应速度。 但是这也会导致一个问题,就是我们添加了一个中间件,就要保证该中间件的高可用等等需求。
3. Redis消息队列
Redis中可用作消息队列的有3种:List数据结构,基于PubSub,Stream数据结构。
基于List实现消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
- 生产消息:BRPUSH key value [value ...] 将一个或多个元素推入到指定列表的头部。如果列表不存在,BRPUSH命令会自动创建一个新的列表
- 消费消息:BRPOP key [key ...] timeout 从指定的一个或多个列表中弹出最后一个元素。如果 list 列表为空,BRPOP命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间。
优点:
- 利用 Redis 存储,不受go程序内存上限。消费者可以积压消息,不会因为客户端的消息过多而被强行断开。
- 基于 Redis 的持久化机制,数据安全有保证
- 可以满足消息有序性
缺点:
- 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
- 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了(因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了)
基于 Pub-Sub 模式
这是Redis2.0版本提供的,其提供了以下命令来完成发布、订阅的操作:
# 用于向指定频道发布一条消息
PUBLISH channel message # 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 用于取消订阅一个或多个频道
UNSUBSCRIBE [channel [channel ...]]
# 用于订阅一个或多个符合给定模式的频道,接收消息
PSUBSCRIBE pattern [pattern ...]
# 用于取消订阅一个或多个符合给定模式的频道
PUNSUBSCRIBE [pattern [pattern ...]]
消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
那么,在程序中消费者就订阅某个channel,生产者在处理好redis后,把订单等信息往channel中发布,那消费者就可以收到从而进行处理。
优点:
- 支持多生产、多消费者
缺点:
- Pub/Sub 在实现时非常简单,它既不基于任何数据类型,也不进行任何数据存储。所以无法持久化保存消息(不会写入到 RDB 和 AOF 中),如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;
- 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后就不能消费之前的历史消息。因为下线期间生产者发布的消息会因找不到消费者而被丢弃。
- 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失。
其在处理消息积压时,为什么会丢数据?
- 采用 list 当作队列,消息积压时会导致链表很长,Redis 内存持续增长,直到消费者取出数据,list 属于 拉 模型,数据可一直积压在内存中。
- 而 Pub/Sub 属于 推 模型,消费者订阅队列时,Redis 会在 Server 上为其分配一个缓冲区,生产者发布消息时先写入该缓冲区,消费者从缓冲区读取消息。
- 但缓冲区有上限,若消费者拉取消息很慢,会造成消息积压,缓冲区内存持续增长。若超过缓冲区配置上限,Redis 会强制把消费者踢下线,导致消费失败和数据丢失。
基于Stream的消息队列
在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:
- 发布订阅模式 PubSub,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
- 列表实现消息队列的方式不能重复消费,一个消息消费完就会被删除;
- 有序集合消息队列的实现方式不能存储相同 value 的消息,并且不能阻塞读取消息。
并且以上三种方式在实现消息队列时,只能存储单 value 值,也就是如果你要存储一个对象的情况下,必须先序列化成 JSON 字符串,在读取之后还要反序列化成对象才行,这也给用户的使用带来的不便。
基于以上问题,Redis 5.0 便推出了 Stream 类型,用于完美地实现消息队列,它借鉴了 Kafka 的设计思路,它支持消息的持久化和消息轨迹的消费,支持 ack 确认消息的模式,让消息队列更加的稳定和可靠。
发送消息:XADD
其结果是返回该消息的唯一id。这里的示例就是发送了一个对象,Json格式。
读取消息:XREAD
XREAD阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果。
注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
而steam有消息者组模式,可以解决上述的一些缺陷。
基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
消费者组的特点:
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费。
- 消息确认:消费者获取消息后,那该消息就处于pending(待处理)状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
相关语法:
// 创建消费者组
XGROUP CREATE key groupName ID[MKSTREAM]// 删除指定的消费者组
XGROUP DESTORY key groupName// 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername// 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername// 从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]ex:XPENDING mystream group55 - + 10 // - + 表示all// 确认消息
XACK key group id [id ...]// 获取pending-list的消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]//- IDLE:确认时间//- start&end:确认消息的起始和末尾// - count:确认数量
1. 创建消费者组
2. 从消费者组读取消息
使用stream消息队列来消费订单
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
1. 创建stream类型的消息队列
第一步,我们直接在命令行行执行即可。
2. 往队列中添加订单消息
之前我们判断是否有抢购资格时候是用Lua脚本,所以我们也可以把 往对列中添加消息 这操作添加到Lua脚本中。所以这次Lua脚本中需要添加orderid。
#在对应位置添加-- 1.3.订单id
local orderId = KEYS[3]-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId;-- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
那seckillVoucher函数中的Run代码就需要传递三个参数。
// post /voucher/seckill
func SeckillVoucher(c *gin.Context) {var req seckillResquesterr := c.BindJSON(&req)if err != nil {...................return}orderId := NextId("order")script := redis.NewScript(AdjustSeckill)// res, err := script.Run(context.Background(), db.RedisClient, []string{strconv.Itoa(req.VoucherId), strconv.Itoa(req.UserId)}).Result()res, err := script.Run(context.Background(), db.RedisClient, []string{strconv.Itoa(req.VoucherId), strconv.Itoa(req.UserId), strconv.Itoa(int(orderId))}).Result()if err != nil {slog.Error("run script bad", "err", err)code.WriteResponse(c, code.ErrDatabase, nil)return}................// 开启新协程,异步更新数据库// go createVoucherOrder(req, orderId)// 3.返回订单idcode.WriteResponse(c, code.ErrSuccess, gin.H{"orderId": orderId})
}
3. 获取消息
往队列中发送消息后,就需要获取消息。在internal目录中创建mq目录,在该目录创建stream.go。
const (streamName = "stream.orders"streamGroupName = "group1"
)func StartStream() {// 创建消费组(如果不存在)err := db.RedisClient.XGroupCreateMkStream(context.Background(), streamName, streamGroupName, "0").Err()if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {slog.Error("Failed to create consumer group", "err", err)panic(err)}// 从消费组中读取消息for i := 1; i <= 5; i++ {name := "consumer" + fmt.Sprint(i)go startStream(name)}
}//在main函数中使用
//main.go
func main() {mq.StartStream() //开启redis的stream队列获取消息r := router.NewRouter().............
}
创建消费者后,然后开启获取消息。
- 可以开多个协程去获取,每一个协程就代表一个消费者。
- 使用go-redis的.XReadGroup函数获取消息。消费成功后,使用XAck函数来标记消息为已处理。
- 若其中创建订单错误就再尝试一次,要是再失败,就发送邮件进行人工处理,或者发送到死信队列。
func startStream(name string) {// 从消费组中读取消息for {msgs, err := db.RedisClient.XReadGroup(context.Background(), &redis.XReadGroupArgs{Group: streamGroupName,Consumer: name,Streams: []string{streamName, ">"}, //streamName:这是要读取的 Redis Stream 的名称。表示从这个特定的 Stream 中读取消息。//">":这个特殊的标识符在 Redis Stream 中用于表示从 Stream 的末尾开始读取,即只读取尚未被任何消费者处理的新消息。Count: 1,Block: 0,}).Result()if err != nil {slog.Error("Failed to read messages from stream", "err", err)continue}//处理信息if len(msgs) == 0 {continue}msg := msgs[0].Messages[0]fmt.Printf("Received message: %v", msg.Values)voucherId := msg.Values["voucherId"].(string)userId := msg.Values["userId"].(string)orderId := msg.Values["id"].(string)voucherIdInt, _ := strconv.Atoi(voucherId)userIdInt, _ := strconv.Atoi(userId)orderIdInt, _ := strconv.Atoi(orderId)err = createOrder(voucherIdInt, userIdInt, orderIdInt)if err != nil {slog.Error("Failed to create voucher order", "err", err)//再次尝试err = createOrder(voucherIdInt, userIdInt, orderIdInt)if err != nil {// 发送邮件让人工处理。或者发送到死信队列body := "voucherId:" + voucherId + ", userId:" + userId + ", orderId:" + orderIdmail.SendMail(*config.MailOption, body)}}//确认消息,发送ackdb.RedisClient.XAck(context.Background(), streamName, streamGroupName, msgs[0].Messages[0].ID)}
}
创建订单的函数createOrder。
//stream.go
func createOrder(voucherId int, userId int, orderId int) error {order := model.TbVoucherOrder{ID: int64(orderId),VoucherID: uint64(voucherId),UserID: uint64(userId),}//处理两张表(订单表,秒杀卷表),使用事务q := query.Use(db.DBEngine)return q.Transaction(func(tx *query.Query) error {//3.合法,库存数量减1//使用update,要是没有该条数据,不会返回gorm.ErrRecordNotFound或者有错误的。info, err := tx.TbSeckillVoucher.Where(tx.TbSeckillVoucher.VoucherID.Eq(uint64(voucherId)), tx.TbSeckillVoucher.Stock.Gt(0)).UpdateSimple(tx.TbSeckillVoucher.Stock.Add(-1))if err != nil {return err}if info.RowsAffected == 0 {return fmt.Errorf("affected rows is 0")}//4.成功,创建对应的订单,并保存到数据中return tx.TbVoucherOrder.Select(tx.TbVoucherOrder.ID, tx.TbVoucherOrder.UserID, tx.TbVoucherOrder.VoucherID).Create(&order)})
}
发送邮件相关的代码
在pkg目录创建mail目录,创建mail.go文件,编写关于邮件的代码。之后在config.go文件添加相应的变量,并解析配置文件的关于mail的参数。
type MailSetting struct {Host stringPort intUsername stringPassword stringTo []string
}func SendMail(mailConn MailSetting, body string, attachment ...string) error {m := gomail.NewMessage()// m.SetHeader("From", m.FormatAddress(mailConn.User, mailConn.Alias)) //设置邮件别名m.SetHeader("From", mailConn.Username) //设置邮件别名m.SetHeader("To", mailConn.To...) //发送给多个用户m.SetHeader("Subject", "需要人工处理的订单") //设置邮件主题m.SetBody("text/plain", body) //设置邮件正文if len(attachment) > 0 {for _, v := range attachment {m.Attach(v) // 附件文件,可以是文件,照片,视频等等}}d := gomail.NewDialer(mailConn.Host, mailConn.Port, mailConn.Username, mailConn.Password)// 关闭SSL协议认证d.TLSConfig = &tls.Config{InsecureSkipVerify: true}return d.DialAndSend(m)
}