Kafka实战案例

kafka系统的生成,自顶向下

1. kafaka发送消息

  • 1.1 是最初始外部调用kafaka的地方
  • 1.6 是最初调用kafaka的函数。中间是对kafaka的构建

1.1 向Kafka发送一条发布视频的message

  • 在videoHandler的发布视频逻辑中,向Kafka发送一条发布视频的mq,之后就解耦,先返回状态告知发布成功,不再等待具体执行
	// 通过MQ异步处理视频的上传操作, 包括上传到OSS,截帧, 保存到MySQL, 更新rediszap.L().Info("上传视频发送到消息队列", zap.String("videoPath", videoPath))kafka.VideoMQInstance.Produce(&kafka.VideoMessage{VideoPath:     videoPath,VideoFileName: videoFileName,UserID:        uint(request.GetUserId()),Title:         request.GetTitle(),})return &video.PublishVideoResponse{StatusCode: common.CodeSuccess,StatusMsg:  common.MapErrMsg(common.CodeSuccess),}, nil

1.2. 构造MQ结构体,核心包括Topic,GroupId,Producer,Consumer

  • 上面的VideoMQInstance 是*VideoMQ的类型,实际上就是包括Topic,GroupId,Producer,Consumer这几个成员的结构体
    在这里插入图片描述

1.3 对MQ结构体进行初始化

  • 对上面这个结构体VideoMQInstance中的几个成员进行初始化
func InitVideoKafka() {VideoMQInstance = &VideoMQ{MQ{Topic:   "videos",GroupId: "video_group",},}// 创建 Video 业务的生产者和消费者实例VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)go VideoMQInstance.Consume()
}

Topic、GroupId都很简单,赋一个string的字符串就好了,关键在Producer和Consumer需要一步步创建

1.4 Producer和Consumer的创建流程

先看代码:

type Manager struct {Brokers []string
}var kafkaManager *Managerfunc (m *Manager) NewProducer(topic string) *kafka.Writer {return &kafka.Writer{Addr:                   kafka.TCP(m.Brokers...),Topic:                  topic,Balancer:               &kafka.Hash{}, // 使用Hash算法按照key将消息均匀分布到不同的partition上WriteTimeout:           1 * time.Second,RequiredAcks:           kafka.RequireAll, // 需要确保Leader和所有Follower都写入成功才可以发送下一条消息, 确保消息成功写入, 不丢失AllowAutoTopicCreation: true,             // Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}
}func (m *Manager) NewConsumer(topic, groupId string) *kafka.Reader {// TODO reader 优雅关闭return kafka.NewReader(kafka.ReaderConfig{Brokers: m.Brokers,Topic:   topic,GroupID: groupId,// CommitInterval: 1 * time.Second, // 不配置此项, 默认每次读取都会自动提交offsetStartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效})
}

可以看到,Producer实际上就是kafka.Writer,consumer实际上就是kafka.Reader,其中writer肯定需要绑定Topic,而reader肯定需要Topic和GroupId,去消费这些消息。

1.5 创建Kafaka的manager

  • 发现上述创建Producer和Consumer的代码都是Manager的成员方法,Manager是什么呢?
  • 是Manager的成员方法说明肯定是使用Manager这个结构体去创建Producer和Consumer,而Manager核心包含的就是Brokers(存的是broker的url地址)
type Manager struct {Brokers []string
}var kafkaManager *Managertype MQ struct {Topic    stringGroupId  stringProducer *kafka.WriterConsumer *kafka.Reader
}func Init(appConfig *config.AppConfig) (err error) {var conf *config.KafkaConfigif appConfig.Mode == config.LocalMode {conf = appConfig.Local.KafkaConfig} else {conf = appConfig.Remote.KafkaConfig}brokerUrl := conf.Address + ":" + strconv.Itoa(conf.Port)// 初始化 Kafka Managerbrokers := []string{brokerUrl}kafkaManager = NewKafkaManager(brokers)//InitMessageKafka()//InitCommentKafka()//InitVideoKafka()return nil
}func NewKafkaManager(brokers []string) *Manager {return &Manager{Brokers: brokers,}
}

1.6 VideoMQ 它有个成员方法是Produce(和最早的1.1调用对应)

// Produce 发布将本地视频上传到OSS的消息
func (m *VideoMQ) Produce(message *VideoMessage) {err := kafkaManager.ProduceMessage(m.Producer, message)if err != nil {log.Println("kafka发送添加视频的消息失败:", err)return}
}

Produce其中又调用了ProduceMessage方法,方法具体内容如下,就是将通过producer将要发送的消息序列化后发送出去

// ProduceMessage 向 Kafka 写入消息的公共函数, 由于不同业务的消息格式不同, 所以使用 interface{} 代替
func (m *Manager) ProduceMessage(producer *kafka.Writer, message interface{}) error {messageBytes, err := json.Marshal(message)if err != nil {return err}return producer.WriteMessages(context.Background(), kafka.Message{Value: messageBytes,})
}

2. kafka消费消息

2.1 开启消费goroutine

kafka消费消息的代码之前在initMQ的时候就已经开启一个goroutine开始消费,只要有消息对应上topic就可以消费

func InitVideoKafka() {VideoMQInstance = &VideoMQ{MQ{Topic:   "videos",GroupId: "video_group",},}// 创建 Video 业务的生产者和消费者实例VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)go VideoMQInstance.Consume()
}

2.2 消费的具体逻辑举例:执行一个上传视频到oss的函数

步骤:

  1. Consumer.ReadMessage 先拿到序列化的消息msg,并反序列化为最初的结构体
  2. 现在拿到了msg,利用里面的内容,开启goroutine执行相关函数
  3. 开启一个goroutine:比如拿到msg中: video的url,和name。那现在就可以调用oss的函数,将指定url地址中name为name的视频上传到oss。上传完成之后,还可以将最开始传来的msg(包含video的消息)的内容上传到mysql
  4. 再开启一个goroutine:将视频上传到redis
  5. 再开启一个goroutine:删除用户哈希字段
  6. 再开启一个goroutine:将视频id加入到布隆过滤器中
    上面开的那么多goroutine都是互相不影响的,没有先后执行的需要,因此可以分别开启
// Consume 消费将本地视频上传到OSS的消息
func (m *VideoMQ) Consume() {for {msg, err := m.Consumer.ReadMessage(context.Background())if err != nil {log.Fatal("[VideoMQ]从消息队列中读取消息失败:", err)}videoMsg := new(VideoMessage)err = json.Unmarshal(msg.Value, videoMsg)if err != nil {log.Println("[VideoMQ]解析消息失败:", err)return}go func() {defer func() {os.Remove(videoMsg.VideoPath)}()zap.L().Info("开始处理视频消息", zap.Any("videoMsg", videoMsg))// 视频存储到ossif err = common.UploadToOSS(videoMsg.VideoPath, videoMsg.VideoFileName); err != nil {zap.L().Error("上传视频到OSS失败", zap.Error(err))return}// 利用oss功能获取封面图imgName, err := common.GetVideoCover(videoMsg.VideoFileName)if err != nil {zap.L().Error("图片截帧失败", zap.Error(err))return}// 视频信息存储到MySQLvideo := model.Video{AuthorId:  videoMsg.UserID,VideoUrl:  videoMsg.VideoFileName,CoverUrl:  imgName,Title:     videoMsg.Title,CreatedAt: time.Now().Unix(),}mysql.InsertVideo(&video)var wg sync.WaitGroupwg.Add(3)go func() {defer wg.Done()redis.AddVideo(&video)}()go func() {defer wg.Done()// cache asideredis.DelUserHashField(videoMsg.UserID, redis.WorkCountField)}()go func() {defer wg.Done()// 添加到布隆过滤器common.AddToWorkCountBloom(fmt.Sprintf("%d", videoMsg.UserID))}()wg.Wait()zap.L().Info("视频消息处理成功", zap.Any("videoMsg", videoMsg))}()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/151307.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【轻松玩转MacOS】常用软件篇

引言 在本篇文章中,我将介绍如何安装和使用一些常用的软件,如Safari浏览器、邮件、日历、地图等。让我们一起来看看吧! 一、Safari浏览器 Safari是MacOS自带的浏览器,具有简洁、快速、安全的特点。 以下是一些Safari浏览器的使…

Zabbix自定义脚本监控MySQL数据库

一、MySQL数据库配置 1.1 创建Mysql数据库用户 [rootmysql ~]# mysql -uroot -p create user zabbix127.0.0.1 identified by 123456; flush privileges; 1.2 添加用户密码到mysql client的配置文件中 [rootmysql ~]# vim /etc/my.cnf.d/client.cnf [client] host127.0.0.1 u…

vue模版语法-{{}}/v-text/v-html/v-once

一、{{}}双括号:用于文本渲染 1、 {{变量名}}:data中返回对象的变量名 2、{{js表达式}}:可以直接进行js表达式处理 3、注意:双大括号中不要写等式书写 二、v-text 指令,用于文本渲染 1、为了解决双大括号渲染数据出现闪烁问题 三、v-cloak …

Qt中的基础数据类型

1.基础类型 因为Qt是一个C++ 框架, 因此C++中所有的语法和数据类型在Qt中都是被支持的, 但是Qt中也定义了一些属于自己的数据类型, 下边给大家介绍一下这些基础的数类型 QT基本数据类型定义在#include <QtGlobal> 中,QT基本数据类型有: 类型名称注释备注qint8signed ch…

盒子模型的基础

盒子模型 边框&#xff08;border&#xff09; border可以设置元素的边框&#xff0c;边框分成三部分&#xff0c;边框的&#xff08;粗细&#xff09;边框的样式&#xff0c;边框的颜色 <style>div {width: 100px;height: 100px;border-width: 200;border-style: 边框…

【面试HOT100】哈希双指针滑动窗口

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于LeetCodeHot100进行的&#xff0c;每个知识点的修正和深入主要参考…

openGauss学习笔记-92 openGauss 数据库管理-内存优化表MOT管理-内存表特性-使用MOT-MOT使用MOT SQL覆盖和限制

文章目录 openGauss学习笔记-92 openGauss 数据库管理-内存优化表MOT管理-内存表特性-使用MOT-MOT使用MOT SQL覆盖和限制92.1 不支持的特性92.2 MOT限制92.3 不支持的DDL操作92.4 不支持的数据类型92.5 不支持的索引DDL和索引92.6 不支持的DML92.7 不支持的JIT功能&#xff08;…

自然语言处理的分类

动动发财的小手&#xff0c;点个赞吧&#xff01; 简介 作为理解、生成和处理自然语言文本的有效方法&#xff0c;自然语言处理&#xff08;NLP&#xff09;的研究近年来呈现出快速传播和广泛采用。鉴于 NLP 的快速发展&#xff0c;获得该领域的概述并对其进行维护是很困难的。…

代码随想录算法训练营第四十五天 | 1049. 最后一块石头的重量 II、494. 目标和、474.一和零

1049. 最后一块石头的重量 II 视频讲解&#xff1a;动态规划之背包问题&#xff0c;这个背包最多能装多少&#xff1f;LeetCode&#xff1a;1049.最后一块石头的重量II_哔哩哔哩_bilibili 代码随想录 &#xff08;1&#xff09;代码 494. 目标和 视频讲解&#xff1a;动态规划…

计算机竞赛 深度学习疫情社交安全距离检测算法 - python opencv cnn

文章目录 0 前言1 课题背景2 实现效果3 相关技术3.1 YOLOV43.2 基于 DeepSort 算法的行人跟踪 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习疫情社交安全距离检测算法 ** 该项目较为新颖&#xff0c;适合作为竞赛…

剑指offer——JZ34 二叉树中和为某一值的路径(二) 解题思路与具体代码【C++】

一、题目描述与要求 二叉树中和为某一值的路径(二)_牛客题霸_牛客网 (nowcoder.com) 题目描述 输入一颗二叉树的根节点root和一个整数expectNumber&#xff0c;找出二叉树中结点值的和为expectNumber的所有路径。 1.该题路径定义为从树的根结点开始往下一直到叶子结点所经过…

Youtube视频下载工具分享-油管视频,音乐,字幕下载方法汇总

YouTube视频下载方法简介 互联网上存在很多 YouTube 下载工具&#xff0c;但我们经常会发现自己收藏的工具没过多久就会失效&#xff0c;我们为大家整理的这几种方法&#xff0c;是存在时间较久并且亲测可用的。后续如果这些工具失效或者有更好的工具&#xff0c;我们也会分享…

c++day2

1.XMIND 2. 自己封装一个矩形类(Rect)&#xff0c;拥有私有属性:宽度(width)、高度(height)&#xff0c;定义公有成员函数: 初始化函数:void init(int w, int h) 更改宽度的函数:set_w(int w) 更改高度的函数:set_h(int h) 输出该矩形的周长和面积函数:void show() #include &…

基于SSM的固定资产管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

网站强制跳转至国家反诈中心该怎么办?怎么处理?如何解封?

在互联网环境中&#xff0c;网站安全是非常重要的。然而&#xff0c;在实际操作过程中&#xff0c;不少网站可能因内容问题、技术安全漏洞等原因被迫下线甚至跳转至国家反诈骗中心网址。面对这一严峻问题&#xff0c;我们如何有效解决&#xff0c;让网站恢复运行并解除强制跳转…

点餐小程序实战教程06-首页开发

用户注册功能开发好了之后&#xff0c;我们就要开发小程序&#xff0c;首先我们是规划小程序的功能模块&#xff0c;我们一共是四个模块&#xff0c;分别是首页、订单、消息和我的。 首页我们主要是点餐的功能&#xff0c;可以选择菜品&#xff0c;加入到购物车&#xff0c;然…

【C++】stack/queue/deque

目录 一、stack 1.1 stack的接口 1.2 关于使用stack的例题 1.2.1 最小栈 1.2.2 栈的压入、弹出序列 1.2.4 逆波兰表达式求值 1.3 stack的模拟实现 二、queue 2.1 queue的接口 2.2 queue的模拟实现 三、deque 3.1 deque底层实现原理 3.1.1 头插实现原理 3.1.2 尾插…

Cocos Creator3.8 项目实战(五)背景无限滚屏效果如何实现

在游戏中&#xff0c;我们经常会实现背景无限滚动的效果。那这些效果是怎么实现的呢&#xff1f; 原理很简单&#xff0c;就是使用多张背景图&#xff0c;每张图&#xff0c;每一帧都同时移动&#xff0c;当图移出屏幕外时&#xff0c;将其位置设置到下一张图的初始位置&#x…

加速attention计算的工业标准:flash attention 1和2算法的原理及实现

transformers目前大火&#xff0c;但是对于长序列来说&#xff0c;计算很慢&#xff0c;而且很耗费显存。对于transformer中的self attention计算来说&#xff0c;在时间复杂度上&#xff0c;对于每个位置&#xff0c;模型需要计算它与所有其他位置的相关性&#xff0c;这样的计…

10.8c++作业

#include <iostream>using namespace std; class Rect {int width; //宽int height; //高 public://初始化函数void init(int w,int h){widthw;heighth;}//更改宽度void set_w(int w){widthw;}//更改高度void set_h(int h){heighth;}//输出矩形周长和面积void show(){co…