今日待办
- 使用watermill框架替代当前的base_runner框架
a. 参考官方提供的sarama kafka Pub/Sub(https://github.com/ThreeDotsLabs/watermill-kafka/)实现kafka-go(https://github.com/segmentio/kafka-go)的Pub/Sub(sarama需要cgo,会导致一些额外的镜像依赖)
b. 参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)- 调研HyperScan golang implement(https://github.com/flier/gohs/)的使用和benchmark,扩充profile事件处理handler,添加一些正则处理任务(这个会给一些示例)
Watermill
what?
- 高效处理消息流,事件驱动程序
- 用于 event soucing, rpc over messages, sagas
- pub/sub
why?
- 微服务模式,异步通信
- 减少复杂性
core?
- Publisher
- Subscriber
- Message
官方示例
结合对 kafka
的 pub/sub
的实现和对 router
、middleware
的使用尝试替换掉 baserunner
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("fmt""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""log""profile/internal/config""profile/internal/schema""time"
)// 测试字段
type WatermillContext struct {Status intStageErr errorEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}var logger = watermill.NewStdLogger(false, false)func (ctx *WatermillContext) Init() {saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: "test_consumer_group",},logger,)if err != nil {panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {panic(err)}// SignalsHandler will gracefully shutdown Router when SIGTERM is received.// You can also close the router by just calling `r.Close()`.router.AddPlugin(plugin.SignalsHandler)// Router level middleware are executed for every message sent to the routerrouter.AddMiddleware(// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messagesmiddleware.CorrelationID,// The handler function is retried if it returns an error.// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,// Recoverer handles panics from handlers.// In this case, it passes them as errors to the Retry middleware.middleware.Recoverer,)// AddHandler returns a handler which can be used to add handler level middleware// or to stop handler.// just for debug, we are printing all messages received on `incoming_messages_topic`router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,printMessages,)// Now that all handlers are registered, we're running the Router.// Run is blocking while the router is running.ctx := context.Background()if err := router.Run(ctx); err != nil {panic(err)}
}func printMessages(msg *message.Message) error {fmt.Printf("\n> Received message: %s\n> %s\n> metadata: %v\n\n",msg.UUID, string(msg.Payload), msg.Metadata,)return nil
}
思考:
ConsumerDispatchHandler
每次消费消息都会执行,注册完 4 个stageHandler
后再执行Run
去异步调用BastContext
的Handler
考虑在此处进行订阅者的初始化、
MiddleWare
、Handler
的注册
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("context""fmt""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin""log""profile/internal/config""time"
)var logger = watermill.NewStdLogger(true, false)func Init() {saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: config.Profile.GetString("kafka.group"),},logger,)if err != nil {panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {panic(err)}// SignalsHandler will gracefully shutdown Router when SIGTERM is received.// You can also close the router by just calling `r.Close()`.router.AddPlugin(plugin.SignalsHandler)// Router level middleware are executed for every message sent to the routerrouter.AddMiddleware(// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messagesmiddleware.CorrelationID,// The handler function is retried if it returns an error.// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,// Recoverer handles panics from handlers.// In this case, it passes them as errors to the Retry middleware.middleware.Recoverer,)// AddHandler returns a handler which can be used to add handler level middleware// or to stop handler.// just for debug, we are printing all messages received on `incoming_messages_topic`handler := router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,printMessages,)// Handler level middleware is only executed for a specific handler// Such middleware can be added the same way the router level oneshandler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {log.Println("executing handler specific middleware for ", message.UUID)return h(message)}})// Now that all handlers are registered, we're running the Router.// Run is blocking while the router is running.ctx := context.Background()if err := router.Run(ctx); err != nil {panic(err)}
}func printMessages(msg *message.Message) error {fmt.Printf("\n> Received message: %s\n> %s\n> metadata: %v\n\n",msg.UUID, string(msg.Payload), msg.Metadata,)return nil
}
可以正常消费到消息,接着就可以自定义 Handler 来对接到 Baserunner 的功能
测试如下:
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillximport ("context""encoding/json""github.com/Shopify/sarama""github.com/ThreeDotsLabs/watermill""github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka""github.com/ThreeDotsLabs/watermill/message""github.com/ThreeDotsLabs/watermill/message/router/middleware""github.com/ThreeDotsLabs/watermill/message/router/plugin"kafkago "github.com/segmentio/kafka-go""go.uber.org/zap""profile/internal/config""profile/internal/connector""profile/internal/log""profile/internal/schema""profile/internal/schema/performance""profile/internal/state""time"
)// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx context.ContextEvent schema.EventAppID string // API 上报FetchScenario string // API 上报
}//
// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Param ctx
// @Return *ProfileContext
//
func NewProfileContext(ctx context.Context) *ProfileContext {return &ProfileContext{Ctx: ctx,}
}// Init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) Init() {logger := watermill.NewStdLogger(true, false)saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()// equivalent of auto.offset.reset: earliestsaramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldestsubscriber, err := kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString("kafka.bootstrap")},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: config.Profile.GetString("kafka.group"),},logger,)if err != nil {log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))panic(err)}router, err := message.NewRouter(message.RouterConfig{}, logger)if err != nil {log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))panic(err)}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.UnpackKafkaMessage,)/*router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.InitPerformanceEvent,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.AnalyzeEvent,)router.AddNoPublisherHandler("print_incoming_messages","to_analyzer__0.PERF_CRASH",subscriber,profileCtx.WriteKafka,)*/if err = router.Run(context.Background()); err != nil {log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))panic(err)}
}// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-11 22:29:21
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) UnpackKafkaMessage(msg *message.Message) (contextErr error) {// 反序列化,存入通用结构体if contextErr = json.Unmarshal(msg.Payload, &profileCtx.Event); contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))return
}// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-11 22:30:36
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) InitPerformanceEvent(msg *message.Message) (contextErr error) {event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr != nil {profileCtx.Status = state.StatusEventFactoryErrorreturn}log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", event))profileCtx.Event.ProfileData = eventreturn
}// AnalyzeEvent
// @Description
// @Author xzx 2023-08-11 22:30:44
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) AnalyzeEvent(msg *message.Message) (contextErr error) {contextErr = profileCtx.Event.ProfileData.Analyze()if contextErr != nil {profileCtx.Status = state.StatusAnalyzeErrorreturn}// clear dimensions and valuesprofileCtx.Event.Dimensions = nilprofileCtx.Event.Values = nilreturn
}// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {toWriteBytes, contextErr := json.Marshal(profileCtx.Event)if contextErr != nil {profileCtx.Status = state.StatusUnmarshalErrorreturn}topic := connector.GetTopic(profileCtx.Event.Category)contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafkago.Message{Topic: topic,Key: []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr != nil {profileCtx.Status = state.StatusWriteKafkaErrorreturn}log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))return
}
可以正常执行 Handler 的逻辑
明日待办
- 为一个主题添加多个 Handler