腾讯mini项目-【指标监控服务重构】2023-08-11

今日待办

  1. 使用watermill框架替代当前的base_runner框架
    a. 参考官方提供的sarama kafka Pub/Sub(https://github.com/ThreeDotsLabs/watermill-kafka/)实现kafka-go(https://github.com/segmentio/kafka-go)的Pub/Sub(sarama需要cgo,会导致一些额外的镜像依赖)
    b. 参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)
  2. 调研HyperScan golang implement(https://github.com/flier/gohs/)的使用和benchmark,扩充profile事件处理handler,添加一些正则处理任务(这个会给一些示例)

Watermill

what?

  • 高效处理消息流,事件驱动程序
  • 用于 event soucing, rpc over messages, sagas
  • pub/sub

why?

  • 微服务模式,异步通信
  • 减少复杂性

core?

  • Publisher
  • Subscriber
  • Message

官方示例

结合对 kafkapub/sub 的实现和对 routermiddleware 的使用尝试替换掉 baserunner

// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("fmt""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""log""profile/internal/config""profile/internal/schema""time"
)// 测试字段
type WatermillContext struct {Status        intStageErr      errorEvent         schema.EventAppID         string // API 上报FetchScenario string // API 上报
}var logger = watermill.NewStdLogger(false, false)func (ctx *WatermillContext) Init() {saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         "test_consumer_group",},logger,)if err != nil {panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {panic(err)}// SignalsHandler will gracefully shutdown Router when SIGTERM is received.// You can also close the router by just calling `r.Close()`.router.AddPlugin(plugin.SignalsHandler)// Router level middleware are executed for every message sent to the routerrouter.AddMiddleware(// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messagesmiddleware.CorrelationID,// The handler function is retried if it returns an error.// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,// Recoverer handles panics from handlers.// In this case, it passes them as errors to the Retry middleware.middleware.Recoverer,)// AddHandler returns a handler which can be used to add handler level middleware// or to stop handler.// just for debug, we are printing all messages received on `incoming_messages_topic`router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,printMessages,)// Now that all handlers are registered, we're running the Router.// Run is blocking while the router is running.ctx := context.Background()if err := router.Run(ctx); err != nil {panic(err)}
}func printMessages(msg *message.Message) error {fmt.Printf("\n> Received message: %s\n> %s\n> metadata: %v\n\n",msg.UUID, string(msg.Payload), msg.Metadata,)return nil
}

image-20230811193329885

image-20230811193006417

思考:

ConsumerDispatchHandler 每次消费消息都会执行,注册完 4 个 stageHandler 后再执行Run去异步调用 BastContextHandler

考虑在此处进行订阅者的初始化、MiddleWareHandler 的注册

image-20230811195245920

// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("context""fmt""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""log""profile/internal/config""time"
)var logger = watermill.NewStdLogger(true, false)func Init() {saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         config.Profile.GetString("kafka.group"),},logger,)if err != nil {panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {panic(err)}// SignalsHandler will gracefully shutdown Router when SIGTERM is received.// You can also close the router by just calling `r.Close()`.router.AddPlugin(plugin.SignalsHandler)// Router level middleware are executed for every message sent to the routerrouter.AddMiddleware(// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messagesmiddleware.CorrelationID,// The handler function is retried if it returns an error.// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,// Recoverer handles panics from handlers.// In this case, it passes them as errors to the Retry middleware.middleware.Recoverer,)// AddHandler returns a handler which can be used to add handler level middleware// or to stop handler.// just for debug, we are printing all messages received on `incoming_messages_topic`handler := router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,printMessages,)// Handler level middleware is only executed for a specific handler// Such middleware can be added the same way the router level oneshandler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {log.Println("executing handler specific middleware for ", message.UUID)return h(message)}})// Now that all handlers are registered, we're running the Router.// Run is blocking while the router is running.ctx := context.Background()if err := router.Run(ctx); err != nil {panic(err)}
}func printMessages(msg *message.Message) error {fmt.Printf("\n> Received message: %s\n> %s\n> metadata: %v\n\n",msg.UUID, string(msg.Payload), msg.Metadata,)return nil
}

可以正常消费到消息,接着就可以自定义 Handler 来对接到 Baserunner 的功能

image-20230811200725567

测试如下:

// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("context""encoding/json""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin"kafkago "github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx    context.ContextEvent         schema.EventAppID         string // API 上报FetchScenario string // API 上报
}//
// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Param ctx
// @Return *ProfileContext
//
func NewProfileContext(ctx context.Context) *ProfileContext {return &ProfileContext{Ctx: ctx,}
}// Init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) Init() {logger := watermill.NewStdLogger(true, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler:           kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup:         config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))panic(err)}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries:      3,InitialInterval: time.Millisecond * 100,Logger:          logger,}.Middleware,middleware.Recoverer,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.UnpackKafkaMessage,)/*router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.InitPerformanceEvent,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.AnalyzeEvent,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.WriteKafka,)*/if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))panic(err)}
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-11 22:29:21
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) UnpackKafkaMessage(msg *message.Message) (contextErr error) {// 反序列化,存入通用结构体if contextErr = json.Unmarshal(msg.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))return
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-11 22:30:36
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) InitPerformanceEvent(msg *message.Message) (contextErr error) {event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", event))profileCtx.Event.ProfileData = eventreturn
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-11 22:30:44
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) AnalyzeEvent(msg *message.Message) (contextErr error) {contextErr = profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn}// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilreturn
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafkago.Message{Topic: topic,Key:   []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}

可以正常执行 Handler 的逻辑

image-20230811224842411

明日待办

  • 为一个主题添加多个 Handler

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/134272.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx配置指南

nginx.conf配置 找到Nginx的安装目录下的nginx.conf文件,该文件负责Nginx的基础功能配置。 配置文件概述 Nginx的主配置文件(conf/nginx.conf)按以下结构组织: 配置块功能描述全局块与Nginx运行相关的全局设置events块与网络连接有关的设置http块代理…

windows11安装安卓程序的坑

首先,百度一下,网上大把教程,比如: 【2023最新版】Windows11家庭版:安卓子系统(WSA)安装及使用教程【全网最详细】_QomolangmaH的博客-CSDN博客 写的就比较详细了,仅供参考。 一些…

C【动态内存管理】

1. 为什么存在动态内存分配 int val 20;//在栈空间上开辟四个字节 char arr[10] {0};//在栈空间上开辟10个字节的连续空间 2. 动态内存函数的介绍 2.1 malloc&#xff1a;stdlib.h void* malloc (size_t size); int* p (int*)malloc(40); #include <stdlib.h> #incl…

9.3.5网络原理(应用层HTTP/HTTPS)

一.HTTP: 1. HTTP是超文本传输协议,除了传输字符串,还可以传输图片,字体,视频,音频. 2. 3.HTTP协议报文格式:a.首行,b.请求头(header),c.空行(相当于一个分隔符,分隔了header和body),d.正文(body). 4. 5.URL:唯一资源描述符(长度不限制). a. b.注意:查询字符串(query stri…

MediaPipe+OpenCV 实现实时手势识别(附Python源码)

MediaPipe官网&#xff1a;https://developers.google.com/mediapipe MediaPipe仓库&#xff1a;https://github.com/google/mediapipe 一、MediaPipe介绍 MediaPipe 是一个由 Google 开发的开源跨平台机器学习框架&#xff0c;用于构建视觉和感知应用程序。它提供了一系列预训…

【SPI读取外部Flash】使用逻辑分析仪来读取FLASH Device ID

实验设备&#xff1a;25块钱的 逻辑分析仪 和 野火F429开发板 注意点&#xff0c;这个逻辑分析仪最大只能检测24M的波形&#xff0c;而SPI是在外部通道2&#xff0c;所以我们对系统时钟的分频&#xff0c;也就是给到通道2的时钟速度要在24M内&#xff0c;不然检测到的数据是有…

RFID车辆自动化称重管理

应用背景 随着物流和交通管理的发展&#xff0c;车辆称重成为了不可忽视的环节&#xff0c;传统的车辆称重管理方式存在诸多问题&#xff0c;如人工操作繁琐、数据准确性低、容易出现作弊等&#xff0c;为了提高车辆称重管理的效率和准确性&#xff0c;RFID技术被引入到车辆称…

Vue ——09、路由模式,404和路由勾子

路由嵌套&#xff0c;参数传递及重定向 一、路由模式&#xff08;有#号&#xff0c;跟没#号&#xff09;二、404三、路由勾子四、在钩子函数中使用异步请求————————创作不易&#xff0c;如觉不错&#xff0c;随手点赞&#xff0c;关注&#xff0c;收藏(*&#xffe3;︶…

Windows开机密码破解

Windows11以及Windows10(21H2)以上版本 先开机&#xff0c;不进行任何操作&#xff0c;静静的等待登录界面 按住Shift重启 进入“选择一个选项”界面&#xff0c;点击疑难解答 点击高级选项 点击命令提示符 输入两行命令 copy C:\windows\system32\uti1man.exe C: \Window…

面相面试知识--Lottery项目

面相面试知识–Lottery项目 1.设计模式 为什么需要设计模式&#xff1f; &#xff08;设计模式是什么&#xff1f;优点有哪些&#xff1f;&#xff09; 设计模式是一套经过验证的有效的软件开发指导思想/解决方案&#xff1b;提高代码的可重用性和可维护性&#xff1b;提高团…

【python之经验模态分解EMD实现】PyEMD库的安装和导入EMD, Visualisation问题解决方法[完整可运行]

现有的导入问题 目前网上的办法&#xff0c;直接导入&#xff1a;from PyEMD import EMD, Visualisation 是有问题的 可能会出现 在 ‘init.py | init.py’ 中找不到引用 ‘Visualisation’ 的报错。 原因似乎是现在导入的命令改了&#xff0c;这是一个坑&#xff0c;解决的…

算法简述-串和串的匹配、排序、深度/广度优先搜索、动态规划、分治、贪心、回溯、分支限界

目录 算法简述 基本 典型算法列举 串和串的匹配 排序 深度/广度优先搜索 动态规划 分治 贪心 回溯 分支限界 算法简述 基本 咳咳嗯…算法嘛&#xff0c;咱也不是 CS 科班学生&#xff0c;咱就说&#xff0c;算法是对已经建模后的问题的解决的具体途径和方法&#x…

Linux 多线程( 进程VS线程 | 线程控制 )

文章目录 Linux进程 VS 线程进程的多个线程共享 进程和线程的关系线程创建 pthread_create获取线程ID pthread_self线程等待 pthread_join终止线程进程分离线程ID及进程地址空间布局 Linux进程 VS 线程 进程是资源分配的基本单位。线程是OS调度的基本单位。 线程共享进程数据…

医院如何实现安全又稳定的跨网文件数据交换呢?

随着医疗信息化的发展&#xff0c;医院之间需要频繁地进行文件数据交换&#xff0c;以实现诊疗、科研、管理等方面的协同和共享。然而&#xff0c;由于医院网络环境的复杂性和敏感性&#xff0c;跨网文件数据交换面临着安全性和稳定性的双重挑战。如何在保证文件数据不被泄露、…

神经网络常用模型与应用

上手AI的一个捷径就是了解和使用各种网络模型&#xff0c;结合实际场景去打造自己的应用。神经网络模型是人类的共同财富。 神经网络 神经网络可以分为三种主要类型&#xff1a;前馈神经网络、反馈神经网络和图神经网络。 前馈神经⽹络&#xff08;feedforward neural netwo…

Unity SteamVR 开发教程:用摇杆/触摸板控制人物持续移动(2.x 以上版本)

文章目录 &#x1f4d5;教程说明&#x1f4d5;场景搭建&#x1f4d5;创建移动的动作&#x1f4d5;移动脚本⭐移动⭐实时调整 CharacterController 的高度 &#x1f4d5;取消手部和 CharacterController 的碰撞 持续移动是 VR 开发中的一个常用功能。一般是用户推动手柄摇杆&…

elasticsearch8-坐标查询和复合查询

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…

【计算机网络】75 张图详解:网络设备、网络地址规划、静态路由(万字长文)

75 张图详解&#xff1a;网络设备、网络地址规划、静态路由 1.网络设备1.1 交换机1.2 路由器 2.网络地址规划2.1 IP 地址2.2 分类地址2.3 子网掩码2.4 无类地址2.5 子网划分2.5.1 示例一2.5.2 示例二 2.6 超网合并 3.静态路由3.1 路由表3.2 直连路由3.3 静态路由3.4 默认路由3.…

图文文案音视频素材库流量主小程序开发

适用于全行业的资源素材运营变现小程序&#xff0c;支持文档、图片、文件、图文、音视频、网盘等多种资源形式&#xff0c;多种功能组合运营变现的小程序。 适用领域&#xff1a; 公司/微商素材、学习/考研/论文资料分享、PPT模板/背景图/壁纸/头像、知识付费、抖音素材等等…

代码随想录算法训练营第三十五天| 860.柠檬水找零 406.根据身高重建队列 452. 用最少数量的箭引爆气球

860.柠檬水找零 本题看上好像挺难&#xff0c;其实挺简单的&#xff0c;大家先尝试自己做一做。 代码随想录 public boolean lemonadeChange(int[] bills) {int five 0;int ten 0;for (int i 0; i < bills.length; i) {if (bills[i] 5) {five;} else if (bills[i] 10)…