kafka系统的生成,自顶向下
1. kafaka发送消息
- 1.1 是最初始外部调用kafaka的地方
- 1.6 是最初调用kafaka的函数。中间是对kafaka的构建
1.1 向Kafka发送一条发布视频的message
- 在videoHandler的发布视频逻辑中,向Kafka发送一条发布视频的mq,之后就解耦,先返回状态告知发布成功,不再等待具体执行
// 通过MQ异步处理视频的上传操作, 包括上传到OSS,截帧, 保存到MySQL, 更新rediszap.L().Info("上传视频发送到消息队列", zap.String("videoPath", videoPath))kafka.VideoMQInstance.Produce(&kafka.VideoMessage{VideoPath: videoPath,VideoFileName: videoFileName,UserID: uint(request.GetUserId()),Title: request.GetTitle(),})return &video.PublishVideoResponse{StatusCode: common.CodeSuccess,StatusMsg: common.MapErrMsg(common.CodeSuccess),}, nil
1.2. 构造MQ结构体,核心包括Topic,GroupId,Producer,Consumer
- 上面的VideoMQInstance 是*VideoMQ的类型,实际上就是包括Topic,GroupId,Producer,Consumer这几个成员的结构体
1.3 对MQ结构体进行初始化
- 对上面这个结构体VideoMQInstance中的几个成员进行初始化
func InitVideoKafka() {VideoMQInstance = &VideoMQ{MQ{Topic: "videos",GroupId: "video_group",},}// 创建 Video 业务的生产者和消费者实例VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)go VideoMQInstance.Consume()
}
Topic、GroupId都很简单,赋一个string的字符串就好了,关键在Producer和Consumer需要一步步创建
1.4 Producer和Consumer的创建流程
先看代码:
type Manager struct {Brokers []string
}var kafkaManager *Managerfunc (m *Manager) NewProducer(topic string) *kafka.Writer {return &kafka.Writer{Addr: kafka.TCP(m.Brokers...),Topic: topic,Balancer: &kafka.Hash{}, // 使用Hash算法按照key将消息均匀分布到不同的partition上WriteTimeout: 1 * time.Second,RequiredAcks: kafka.RequireAll, // 需要确保Leader和所有Follower都写入成功才可以发送下一条消息, 确保消息成功写入, 不丢失AllowAutoTopicCreation: true, // Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}
}func (m *Manager) NewConsumer(topic, groupId string) *kafka.Reader {// TODO reader 优雅关闭return kafka.NewReader(kafka.ReaderConfig{Brokers: m.Brokers,Topic: topic,GroupID: groupId,// CommitInterval: 1 * time.Second, // 不配置此项, 默认每次读取都会自动提交offsetStartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效})
}
可以看到,Producer实际上就是kafka.Writer,consumer实际上就是kafka.Reader,其中writer肯定需要绑定Topic,而reader肯定需要Topic和GroupId,去消费这些消息。
1.5 创建Kafaka的manager
- 发现上述创建Producer和Consumer的代码都是Manager的成员方法,Manager是什么呢?
- 是Manager的成员方法说明肯定是使用Manager这个结构体去创建Producer和Consumer,而Manager核心包含的就是Brokers(存的是broker的url地址)
type Manager struct {Brokers []string
}var kafkaManager *Managertype MQ struct {Topic stringGroupId stringProducer *kafka.WriterConsumer *kafka.Reader
}func Init(appConfig *config.AppConfig) (err error) {var conf *config.KafkaConfigif appConfig.Mode == config.LocalMode {conf = appConfig.Local.KafkaConfig} else {conf = appConfig.Remote.KafkaConfig}brokerUrl := conf.Address + ":" + strconv.Itoa(conf.Port)// 初始化 Kafka Managerbrokers := []string{brokerUrl}kafkaManager = NewKafkaManager(brokers)//InitMessageKafka()//InitCommentKafka()//InitVideoKafka()return nil
}func NewKafkaManager(brokers []string) *Manager {return &Manager{Brokers: brokers,}
}
1.6 VideoMQ 它有个成员方法是Produce(和最早的1.1调用对应)
// Produce 发布将本地视频上传到OSS的消息
func (m *VideoMQ) Produce(message *VideoMessage) {err := kafkaManager.ProduceMessage(m.Producer, message)if err != nil {log.Println("kafka发送添加视频的消息失败:", err)return}
}
Produce其中又调用了ProduceMessage方法,方法具体内容如下,就是将通过producer将要发送的消息序列化后发送出去
// ProduceMessage 向 Kafka 写入消息的公共函数, 由于不同业务的消息格式不同, 所以使用 interface{} 代替
func (m *Manager) ProduceMessage(producer *kafka.Writer, message interface{}) error {messageBytes, err := json.Marshal(message)if err != nil {return err}return producer.WriteMessages(context.Background(), kafka.Message{Value: messageBytes,})
}
2. kafka消费消息
2.1 开启消费goroutine
kafka消费消息的代码之前在initMQ的时候就已经开启一个goroutine开始消费,只要有消息对应上topic就可以消费
func InitVideoKafka() {VideoMQInstance = &VideoMQ{MQ{Topic: "videos",GroupId: "video_group",},}// 创建 Video 业务的生产者和消费者实例VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)go VideoMQInstance.Consume()
}
2.2 消费的具体逻辑举例:执行一个上传视频到oss的函数
步骤:
- Consumer.ReadMessage 先拿到序列化的消息msg,并反序列化为最初的结构体
- 现在拿到了msg,利用里面的内容,开启goroutine执行相关函数
- 开启一个goroutine:比如拿到msg中: video的url,和name。那现在就可以调用oss的函数,将指定url地址中name为name的视频上传到oss。上传完成之后,还可以将最开始传来的msg(包含video的消息)的内容上传到mysql
- 再开启一个goroutine:将视频上传到redis
- 再开启一个goroutine:删除用户哈希字段
- 再开启一个goroutine:将视频id加入到布隆过滤器中
上面开的那么多goroutine都是互相不影响的,没有先后执行的需要,因此可以分别开启
// Consume 消费将本地视频上传到OSS的消息
func (m *VideoMQ) Consume() {for {msg, err := m.Consumer.ReadMessage(context.Background())if err != nil {log.Fatal("[VideoMQ]从消息队列中读取消息失败:", err)}videoMsg := new(VideoMessage)err = json.Unmarshal(msg.Value, videoMsg)if err != nil {log.Println("[VideoMQ]解析消息失败:", err)return}go func() {defer func() {os.Remove(videoMsg.VideoPath)}()zap.L().Info("开始处理视频消息", zap.Any("videoMsg", videoMsg))// 视频存储到ossif err = common.UploadToOSS(videoMsg.VideoPath, videoMsg.VideoFileName); err != nil {zap.L().Error("上传视频到OSS失败", zap.Error(err))return}// 利用oss功能获取封面图imgName, err := common.GetVideoCover(videoMsg.VideoFileName)if err != nil {zap.L().Error("图片截帧失败", zap.Error(err))return}// 视频信息存储到MySQLvideo := model.Video{AuthorId: videoMsg.UserID,VideoUrl: videoMsg.VideoFileName,CoverUrl: imgName,Title: videoMsg.Title,CreatedAt: time.Now().Unix(),}mysql.InsertVideo(&video)var wg sync.WaitGroupwg.Add(3)go func() {defer wg.Done()redis.AddVideo(&video)}()go func() {defer wg.Done()// cache asideredis.DelUserHashField(videoMsg.UserID, redis.WorkCountField)}()go func() {defer wg.Done()// 添加到布隆过滤器common.AddToWorkCountBloom(fmt.Sprintf("%d", videoMsg.UserID))}()wg.Wait()zap.L().Info("视频消息处理成功", zap.Any("videoMsg", videoMsg))}()}
}