腾讯mini项目-【指标监控服务重构】2023-08-16

今日已办

v1

验证 StageHandler 在处理消息时是否为单例,【错误尝试】

type StageHandler struct {
}func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 1")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 2")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {log.Logger.Info("StageHandler Middleware 3")fmt.Printf("%p\n", &s)return h(msg)}
}func (s StageHandler) Handler1(msg *message.Message) error {log.Logger.Info("StageHandler Handler 1")fmt.Printf("%p\n", &s)return nil
}

image-20230816161140984

v2

  • 定义不同 Handler
type CrashHandler struct {Topic string
}func (s CrashHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")return nil
}type LagHandler struct {Topic string
}func (s LagHandler) Handler1(msg *message.Message) error {log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")fmt.Printf("%p\n", &s)time.Sleep(1 * time.Second)log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")return nil
}
  • 添加到router中
	for _, topic := range topics {var category stringvar handlerFunc message.NoPublishHandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashHandler{Topic: category}.Handler1} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagHandler{Topic: category}.Handler1} else {continue}handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)}

image-20230816171659632

  • 结论
    • handler 实例会不断创建
    • 不同的 handler 可以并行处理不同主题的消息
    • 相同的 handler 在处理该主题的消息时是顺序的

官方文档: Message Router (watermill.io)

订阅者可以一次消费一条消息,也可以并行消费多条消息

  • Single stream of messages 是最简单的方法,这意味着在调用 msg.Ack() 之前,订阅者将不会收到任何新消息
  • Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型

v3

存在并发安全问题

  1. 公用一个上下文
  2. 频繁的修改上下文中的字段值
  3. 不同Handler和MiddleWare存在并发

解决思路

  • 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}
  • 使用message的Context来传递这个数据

image-20230816222038683

  • 移除掉 ProfileCtx 的相关设计
  • 使用watermillzap.Logger来替换本身的 LoggerAdapter,更加直观且与原项目适配image-20230816225840032

完整代码

profile/internal/watermill/consumer/consumer_context.go

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumerimport ("context"kc "github.com/Kevinello/kafka-client""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""github.com/garsue/watermillzap""github.com/qiulin/watermill-kafkago/pkg/kafkago""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema/performance""strings""time"
)// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {logger := watermillzap.NewLogger(log.Logger)publisher, subscriber := newPubSub(logger)router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))if err != nil {log.Logger.Fatal("get topics failed", zap.Error(err))return}for _, topic := range topics {var category stringvar handlerFunc message.HandlerFuncif strings.Contains(topic, performance.CategoryCrash) {category = performance.CategoryCrashhandlerFunc = CrashWriteKafka} else if strings.Contains(topic, performance.CategoryLag) {category = performance.CategoryLaghandlerFunc = LagWriteKafka} else {continue}router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).AddMiddleware(UnpackKafkaMessage,InitPerformanceEvent,AnalyzeEvent)}if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))}
}// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {marshaler := kafkago.DefaultMarshaler{}publisher := kafkago.NewPublisher(kafkago.PublisherConfig{Brokers:     []string{config.Profile.GetString("kafka.bootstrap")},Async:       false,Marshaler:   marshaler,OTELEnabled: false,Ipv4Only:    true,Timeout:     100 * time.Second,}, logger)subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{Brokers:       []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:   marshaler,ConsumerGroup: config.Profile.GetString("kafka.group"),OTELEnabled:   false,}, logger)if err != nil {log.Logger.Fatal("Unable to create subscriber", zap.Error(err))}return publisher, subscriber
}

profile/internal/watermill/consumer/consumer_stage.go

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumerimport ("context""encoding/json""github.com/ThreeDotsLabs/watermill/message""go.uber.org/zap""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state"
)type ContextData struct {Status intEvent  schema.EventAppID         string // API 上报FetchScenario string // API 上报
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {var data ContextData// 反序列化,存入通用结构体if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))msg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)if contextErr != nil {data.Status = state.StatusEventFactoryErrorreturn nil, contextErr}log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))data.Event.ProfileData = eventmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)contextErr := data.Event.ProfileData.Analyze()if contextErr != nil {data.Status = state.StatusAnalyzeErrorreturn nil, contextErr}log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))// clear dimensions and valuesdata.Event.Dimensions = nildata.Event.Values = nilmsg.SetContext(context.WithValue(msg.Context(), "data", data))return h(msg)}
}// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {data := msg.Context().Value("data").(ContextData)toWriteBytes, contextErr := json.Marshal(data.Event)if contextErr != nil {data.Status = state.StatusUnmarshalErrorreturn nil, contextErr}msg = message.NewMessage(data.Event.ID, toWriteBytes)log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))return message.Messages{msg}, nil
}

测试

上报PERF_LAG Event可以并发处理 2 条消息,不必等待上一条消息处理完

image-20230816222712201

image-20230816222820505

多次测试发现是由于两条消息走了不同的 Handler

image-20230816230158777

暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!

暂定先写死两个主题名称测试是否正常

明日待办

  1. 开会讨论项目规划和任务分工
  2. 继续完成需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/134687.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mac 本地运行 http-proxy-middleware ,请求超时

const http require(http)"/customer": {target: "http://10.10.111.192:8080/",// target: "http://user.jinfu.baohan.com/",changeOrigin: true, // 是否启用跨域// 解决mac 代理超时问题headers: {Connection: "keep-alive"},// …

脚本:python绘制七夕爱心

文章目录 效果脚本Reference 效果 脚本 import random from math import sin, cos, pi, log from tkinter import *CANVAS_WIDTH 640 # 画布的宽 CANVAS_HEIGHT 640 # 画布的高 CANVAS_CENTER_X CANVAS_WIDTH / 2 # 画布中心的X轴坐标 CANVAS_CENTER_Y CANVAS_HEIGHT /…

WebGL 根据模型矩阵的逆转置矩阵计算运动物体的光照效果

目录 前言 坐标变换引起法向量变化 变化规律: 魔法矩阵:逆转置矩阵 逆转置矩阵的用法总结 Matrix4对象的 setInverseOf 、transpose 方法规范(以完成逆转置矩阵) 示例代码(LightedTranslatedRotatedCube.js&am…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS基础(二)

人生是旷野&#xff0c;不是轨道。 思维导图 一、运算符 1.1 赋值运算符 1.2 一元运算符 1.3 比较运算符 1.4 逻辑运算符 逻辑与&#xff0c;一假则假 逻辑或&#xff0c;一真则真 <!DOCTYPE html> <html lang"en"><head><meta charset&quo…

win10远程桌面控制Ubuntu服务器 - 内网穿透实现公网远程

文章目录 前言视频教程1. ubuntu安装XRDP2.局域网测试连接3. Ubuntu安装cpolar内网穿透4.cpolar公网地址测试访问5.固定域名公网地址 转载自cpolar极点云文章&#xff1a;树莓派使用Nginx 搭建轻量级网站远程访问 前言 XRDP是一种开源工具&#xff0c;它允许用户通过Windows R…

【力扣周赛】第 362 场周赛(⭐差分匹配状态压缩DP矩阵快速幂优化DPKMP)

文章目录 竞赛链接Q1&#xff1a;2848. 与车相交的点解法1——排序后枚举解法2——差分数组⭐差分数组相关题目列表&#x1f4d5;1094. 拼车1109. 航班预订统计2381. 字母移位 II2406. 将区间分为最少组数解法1——排序贪心优先队列解法2——差分数组 2772. 使数组中的所有元素…

Java 高频疑难问题系列一

​​​​​​​ 目录 ​编辑​​​​​​​ 1.零长度 2.redis的有序集的排序 3.Unsafe类 4.带资源的try语句 5.Spring如何实现计划任务 6.Java中普通代码块,构造代码块,静态代码块执行顺序 7.MyBatis缓存机制 8.Redis Java 2种类型操作转换 9.CAS底层原理和问题 1…

无涯教程-JavaScript - AGGREGATE函数

描述 返回列表或数据库中的聚合。 AGGREGATE函数可以将不同的聚合函数应用于列表或数据库,并且可以选择忽略隐藏的行和错误值。 AGGREGATE函数具有两种不同的格式- 参考格式数组格式 参考格式 语法 AGGREGATE (function_num, options, ref1, [ref2] …)争论 Argument描述…

【学习笔记】Java 一对一培训(2.1)Java基础语法

【学习笔记】Java 一对一培训&#xff08;2.1&#xff09;Java基础语法 关键词&#xff1a;Java、Spring Boot、Idea、数据库、一对一、培训、教学本文主要内容含Java简介、Java基础语法、Java对象和类、Java基本数据类型、Java变量类型、Java修饰符计划2小时完成&#xff0c;…

机器学习:PCA(Principal Component Analysis主成分)降维

参考&#xff1a;PCA降维原理 操作步骤与优缺点_TranSad的博客-CSDN博客 PCA降维算法_偶尔努力翻身的咸鱼的博客-CSDN博客 需要提前了解的数学知识&#xff1a; 一、PCA的主要思想 PCA&#xff0c;即主成分分析方法&#xff0c;是一种使用最广泛的数据降维算法。PCA的主要思想…

【计算机视觉】Vision and Language Pre-Trained Models算法介绍合集(一)

文章目录 一、ALIGN二、Contrastive Language-Image Pre-training&#xff08;CLIP&#xff09;三、Learning Cross-Modality Encoder Representations from Transformers&#xff08;LXMERT&#xff09;四、BLIP: Bootstrapping Language-Image Pre-training五、Vision-and-La…

带你进入桌面数控机床金工实训室

桌面型数控车床实训室 你知道中国哪所大学金工实训室拥有多的小型数控机床吗&#xff1f;答案是安徽工程大学。其国际工程师学院里面建了一栋新楼&#xff0c;专门分配了4个独立的房间作为实训室&#xff0c;占地300平方米&#xff0c;分别配置了小型数控车床&#xff0c;小型…

ES6中新增加的Symbol数据类型及其使用场景

聚沙成塔每天进步一点点 ⭐ 专栏简介在这里插入图片描述 ⭐ ES6中的Symbol数据类型⭐ 对象属性名称⭐ 防止属性冲突⭐ 内置Symbols⭐ 迭代器和生成器⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航…

chatgpt综述和报告

ChatGPT究竟强在哪&#xff1f;复旦大学邱锡鹏教授《大型语言模型的能力分析与应用》_哔哩哔哩_bilibili2022年底&#xff0c;美国OpenA1公司发布了ChatGPT&#xff0c;一个可以与人类对话交互的千亿规模参数的大型语言模型。它可以根据用户输入的指令完成各种语言相关的任务&a…

电商API的应用价值:淘宝1688京东API接口系列

API接口是一种软件应用程序&#xff0c;它充当两个不同软件应用程序之间的中介。它帮助不同的应用程序相互通信&#xff0c;共享数据&#xff0c;从而使用户能够完成不同的任务。API接口的用途非常广泛&#xff0c;下面是一些常见的用途&#xff1a; 数据共享&#xff1a;API接…

ChatGPT在职业规划中的智能助手

随着科技的不断发展&#xff0c;人工智能&#xff08;AI&#xff09;正逐渐成为我们日常生活的一部分。ChatGPT作为一种智能语言模型&#xff0c;可以在职业规划中充当智能助手的角色。本文将探讨ChatGPT在职业规划中的应用&#xff0c;以及它如何成为未来工作的智能伙伴。 首先…

2020-2023中国高等级自动驾驶产业发展趋势研究-中国高等级自动驾驶发展近况

1.2 中国高等级自动驾驶发展近况 通过对中国高等级自动驾驶行业的观察和分析&#xff0c;亿欧汽车认为&#xff0c;除技术解决方案提供商外&#xff0c;如今的车企、政府、资本同样在产业链中扮演重要角色。此外&#xff0c;车路协同技术的发展也为高等级自动驾驶的发展提供了更…

EagleSDR USB HAT FT600

给EagleSDR做了个USB 3.0的子卡&#xff0c;采用FT600方案&#xff0c;实物如下&#xff1a; 用FT600DataStreamerDemoApp测试&#xff0c;速度如下&#xff1a; 由于FT600是16bit的接口&#xff0c;如果用FT601的32bit接口&#xff0c;性能应该还会有大幅提升。 测试代码很简…

数据库基础

文章目录 1. 什么是数据库2. 服务器&#xff0c;数据库&#xff0c;表关系3. MySQL架构4. SQL分类 1. 什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问题 文件不利于数据查询和管理 文件不利于…

水仙花数(熟悉Python后再写)

CSDN问答社区的一个提问&#xff0c;勾起我当时写代码的烦困。 (本笔记适合熟悉一门编程语言的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 python 完全自学教程》&#xff0c;不仅仅是…