go-高效处理应用程序数据

一、背景

大型的应用程序为了后期的排障、运营等,会将一些请求、日志、性能指标等数据保存到存储系统中。为了满足这些需求,我们需要进行数据采集,将数据高效的传输到存储系统

二、问题

  • 采集服务仅仅针对某个需求开发,需要修改业务代码逻辑,会给业务带来比较大的负担,并且耦合度太高
  • 数据采集导致已有的服务请求延时变高
  • 采集性能差,需要较长的时间才能采集完一批数据
  • 服务关闭时会丢失数据

三、解决方案

  • 针对问题1,我们可以将数据采集从业务服务中解耦出来,专门创建一个数据采集服务。业务程序只需要将数据传输到指定的中间件,至于数据的处理、采样、过滤、传出等逻辑都在采集服务中完成。
  • 针对问题2,将数据的导出由同步改为异步,异步开启多个协程消费通道中的数据,这样对程序的性能几乎微乎其微
  • 针对问题3,设置好采集最长间隔、批量采集大小以及使用高性能的数据中间件作为中转,比如redis、kafka
  • 针对问题4,为导出器和采集服务增加关闭监听,程序关闭时将数据清空完再退出

四、采集服务实现

4.1 架构设计

在这里插入图片描述

4.2 storage

负责从数据中间件中拿到数据,接口定义如下:

type AnalyticsStorage interface {Init(config interface{}) errorGetName() stringConnect() boolGetAndDeleteSet(string) []interface{}
}

redis的storage实现

import ("crypto/tls""strconv""time"redis "github.com/go-redis/redis/v7""github.com/marmotedu/errors""github.com/mitchellh/mapstructure"genericoptions "github.com/marmotedu/iam/internal/pkg/options""github.com/marmotedu/iam/pkg/log"
)// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------// RedisKeyPrefix defines prefix for iam analytics key.
const (RedisKeyPrefix      = "analytics-"defaultRedisAddress = "127.0.0.1:6379"
)var redisClusterSingleton redis.UniversalClient// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {db        redis.UniversalClientKeyPrefix stringHashKeys  boolConfig    genericoptions.RedisOptions
}// NewRedisClusterPool returns a redis cluster client.
func NewRedisClusterPool(forceReconnect bool, config genericoptions.RedisOptions) redis.UniversalClient {if !forceReconnect {if redisClusterSingleton != nil {log.Debug("Redis pool already INITIALIZED")return redisClusterSingleton}} else {if redisClusterSingleton != nil {redisClusterSingleton.Close()}}log.Debug("Creating new Redis connection pool")maxActive := 500if config.MaxActive > 0 {maxActive = config.MaxActive}timeout := 5 * time.Secondif config.Timeout > 0 {timeout = time.Duration(config.Timeout) * time.Second}var tlsConfig *tls.Configif config.UseSSL {tlsConfig = &tls.Config{InsecureSkipVerify: config.SSLInsecureSkipVerify,}}var client redis.UniversalClientopts := &RedisOpts{MasterName:   config.MasterName,Addrs:        getRedisAddrs(config),DB:           config.Database,Password:     config.Password,PoolSize:     maxActive,IdleTimeout:  240 * time.Second,ReadTimeout:  timeout,WriteTimeout: timeout,DialTimeout:  timeout,TLSConfig:    tlsConfig,}if opts.MasterName != "" {log.Info("--> [REDIS] Creating sentinel-backed failover client")client = redis.NewFailoverClient(opts.failover())} else if config.EnableCluster {log.Info("--> [REDIS] Creating cluster client")client = redis.NewClusterClient(opts.cluster())} else {log.Info("--> [REDIS] Creating single-node client")client = redis.NewClient(opts.simple())}redisClusterSingleton = clientreturn client
}func getRedisAddrs(config genericoptions.RedisOptions) (addrs []string) {if len(config.Addrs) != 0 {addrs = config.Addrs}if len(addrs) == 0 && config.Port != 0 {addr := config.Host + ":" + strconv.Itoa(config.Port)addrs = append(addrs, addr)}return addrs
}// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public
// in redis library. Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptionsfunc (o *RedisOpts) cluster() *redis.ClusterOptions {if len(o.Addrs) == 0 {o.Addrs = []string{defaultRedisAddress}}return &redis.ClusterOptions{Addrs:     o.Addrs,OnConnect: o.OnConnect,Password: o.Password,MaxRedirects:   o.MaxRedirects,ReadOnly:       o.ReadOnly,RouteByLatency: o.RouteByLatency,RouteRandomly:  o.RouteRandomly,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:        o.DialTimeout,ReadTimeout:        o.ReadTimeout,WriteTimeout:       o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}func (o *RedisOpts) simple() *redis.Options {addr := defaultRedisAddressif len(o.Addrs) > 0 {addr = o.Addrs[0]}return &redis.Options{Addr:      addr,OnConnect: o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}func (o *RedisOpts) failover() *redis.FailoverOptions {if len(o.Addrs) == 0 {o.Addrs = []string{"127.0.0.1:26379"}}return &redis.FailoverOptions{SentinelAddrs: o.Addrs,MasterName:    o.MasterName,OnConnect:     o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}// GetName returns the redis cluster storage manager name.
func (r *RedisClusterStorageManager) GetName() string {return "redis"
}// Init initialize the redis cluster storage manager.
func (r *RedisClusterStorageManager) Init(config interface{}) error {r.Config = genericoptions.RedisOptions{}err := mapstructure.Decode(config, &r.Config)if err != nil {log.Fatalf("Failed to decode configuration: %s", err.Error())}r.KeyPrefix = RedisKeyPrefixreturn nil
}// Connect will establish a connection to the r.db.
func (r *RedisClusterStorageManager) Connect() bool {if r.db == nil {log.Debug("Connecting to redis cluster")r.db = NewRedisClusterPool(false, r.Config)return true}log.Debug("Storage Engine already initialized...")// Reset it just in caser.db = redisClusterSingletonreturn true
}func (r *RedisClusterStorageManager) hashKey(in string) string {return in
}func (r *RedisClusterStorageManager) fixKey(keyName string) string {setKeyName := r.KeyPrefix + r.hashKey(keyName)log.Debugf("Input key was: %s", setKeyName)return setKeyName
}// GetAndDeleteSet get and delete key from redis.
func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string) []interface{} {log.Debugf("Getting raw key set: %s", keyName)if r.db == nil {log.Warn("Connection dropped, connecting..")r.Connect()return r.GetAndDeleteSet(keyName)}log.Debugf("keyName is: %s", keyName)fixedKey := r.fixKey(keyName)log.Debugf("Fixed keyname is: %s", fixedKey)var lrange *redis.StringSliceCmd_, err := r.db.TxPipelined(func(pipe redis.Pipeliner) error {lrange = pipe.LRange(fixedKey, 0, -1)pipe.Del(fixedKey)return nil})if err != nil {log.Errorf("Multi command failed: %s", err)r.Connect()}vals := lrange.Val()result := make([]interface{}, len(vals))for i, v := range vals {result[i] = v}log.Debugf("Unpacked vals: %d", len(result))return result
}// SetKey will create (or update) a key value in the store.
func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error {log.Debugf("[STORE] SET Raw key is: %s", keyName)log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))r.ensureConnection()err := r.db.Set(r.fixKey(keyName), session, 0).Err()if timeout > 0 {if expErr := r.SetExp(keyName, timeout); expErr != nil {return expErr}}if err != nil {log.Errorf("Error trying to set value: %s", err.Error())return errors.Wrap(err, "failed to set key")}return nil
}// SetExp is used to set the expiry of a key.
func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {err := r.db.Expire(r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()if err != nil {log.Errorf("Could not EXPIRE key: %s", err.Error())}return errors.Wrap(err, "failed to set expire time for key")
}func (r *RedisClusterStorageManager) ensureConnection() {if r.db != nil {// already connectedreturn}log.Info("Connection dropped, reconnecting...")for {r.Connect()if r.db != nil {// reconnection workedreturn}log.Info("Reconnecting again...")}
}

4.3 pump

我们首先会针对某种业务创建对应的数据结构,比如

type AnalyticsRecord struct {TimeStamp  int64     `json:"timestamp"`Username   string    `json:"username"`Effect     string    `json:"effect"`Conclusion string    `json:"conclusion"`Request    string    `json:"request"`Policies   string    `json:"policies"`Deciders   string    `json:"deciders"`ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

pump负责将数据导出到指定的数据存储系统,比如promethus、mongo、ES等,它的接口定义如下:

type Pump interface {GetName() stringNew() PumpInit(interface{}) errorWriteData(context.Context, []interface{}) errorSetTimeout(timeout int)GetTimeout() int
}

4.4 exporter

exporter依赖storage和pump,每个exporter负责一种业务,一般对应一种数据结构

type AnalyticsRecord struct {TimeStamp  int64     `json:"timestamp"`Username   string    `json:"username"`Effect     string    `json:"effect"`Conclusion string    `json:"conclusion"`Request    string    `json:"request"`Policies   string    `json:"policies"`Deciders   string    `json:"deciders"`ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

exporter从storage中取出数据转成对应的数据结构,并进行过滤和去掉冗余字段内容。最后将数据通过pump导出到数据系统中,exporter大概如下:

type AnalyticsExporter struct {storage storage.Storagepump    pump.Pumpfilter      []filter.Filters  //对数据进行过滤timeout               intOmitDetailedRecording bool  //将冗余字段置为空
}func (e *AnalyticsExporter) Export() {//1.从storage中拉取数据//2. 转换为对应的数据结构//3. 过滤数据//4. 置空冗余字段//5. 导出数据
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/376780.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Profibus协议转Profinet协议网关模块连接智能电表通讯案例

一、背景 在工业自动化领域,Profibus协议和Profinet协议是两种常见的工业通讯协议,而连接智能电表需要用到这两种协议之间的网关模块。本文将通过一个实际案例,详细介绍如何使用Profibus转Profinet模块(XD-PNPBM20)实…

Kubernetes 为pod指定DNS

在k8s里面,默认创建pod会给pod默认分配一个默认的dns,这个dns是哪来的呢?可不可以改成其他的dns呢? 先进入到pod里面来,可以看到这里面默认设置的DNS服务器,这个服务器地址为10.96.0.10。这个地址是k8s自动…

[web]-图片上传、文件包含-图片上传

题目内容提示:上传图片试试吧,注意统一时区问题 打开页面如图,源码没有过滤,随便输入,进入上传目录 根据链接可以看到是文件包含,可以利用编码读取源码,这里只列出有用页面的编码(?…

一文入门【NestJs】Providers

Nest学习系列 ✈️一文入门【NestJS】 ✈️一文入门【NestJs】Controllers 控制器 🚩 前言 在NestJS的世界里,理解“Providers”是构建健壮、可维护的后端服务的关键。NestJS,作为Node.js的一个现代框架,采用了Angular的一些核…

科普文:微服务技术栈梳理

概叙 如上两图所示,微服务架构下,需要的组件很多,上面中也并未列全。下面将梳理一下国内微服务架构下,用到的技术栈,仅供参考。 科普文:12种常见的软件架构-CSDN博客 没有最好的架构,只有最适…

基于springboot+vue+uniapp的机电公司管理信息系统

开发语言:Java框架:springbootuniappJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7(一定要5.7版本)数据库工具:Navicat11开发软件:eclipse/myeclipse/ideaMaven包&#…

【C++】 List 基本使用

C List 基本使用 基本概念 list 是一个序列容器,它内部维护了一个双向链表结构。与 vector 或 deque 等基于数组的容器不同,list 在插入和删除元素时不需要移动大量数据,因此在这些操作上具有较高的效率。然而,访问列表中的特定…

MAC通过SSH连接VirtualBox中的虚拟机

1、虚拟机网络连接方式使用桥接方式-桥接网卡 2、重启虚拟机,查看虚拟机ip地址是否跟Mac宿主机在同一网段 3、SSH工具(推荐Tabby)输入IP、用户名和密码就能连接虚拟机了

通过Bugly上报的日志查找崩溃闪退原因

第一步,解析堆栈信息 在bugly上收集到的信息是这样的 0x000000010542e46c 0x0000000104db4000 6792300 OS应用发生崩溃时,系统会生成一份崩溃日志,这份日志中包含了崩溃时的堆栈信息,但这些堆栈信息并非直接指向源代码&#x…

[ACM独立出版]2024年虚拟现实、图像和信号处理国际学术会议(ICVISP 2024)

最新消息ICVISP 2024-已通过ACM出版申请投稿免费参会,口头汇报或海报展示(可获得相应证明证书) ————————————————————————————————————————— [ACM独立出版]2024年虚拟现实、图像和信号处理国际学术会议(ICVI…

ArduPilot开源飞控之AP_Mount_Topotek

ArduPilot开源飞控之AP_Mount_Topotek 1. 源由2. 框架设计3. 重要函数3.1 动态过程3.1.1 AP_Mount_Topotek::update3.1.2 AP_Mount_Backend::calculate_poi 3.2 基础能力3.2.1 AP_Mount_Topotek::healthy3.2.2 AP_Mount_Topotek::has_pan_control 3.3 设备功能3.3.1 AP_Mount_T…

(十一) Docker compose 部署 Mysql 和 其它容器

文章目录 1、前言1.1、部署 MySQL 容器的 3 种类型1.2、M2芯片类型问题 2、具体实现2.1、单独部署 mysql 供宿主机访问2.1.1、文件夹结构2.1.2、docker-compose.yml 内容2.1.3、运行 2.2、单独部署 mysql 容器供其它容器访问(以 apollo 为例)2.2.1、文件…

Vue1-Vue核心

目录 Vue简介 官网 介绍与描述 Vue的特点 与其它 JS 框架的关联 Vue周边库 初识Vue Vue模板语法 数据绑定 el与data的两种写法 MVVM模型 数据代理 回顾Object.defineProperty方法 何为数据代理 Vue中的数据代理 数据代理图示 事件处理 事件的基本使用 事件修…

[Python学习篇] Python包管理工具pip

目录 什么是pip pip主要功能 配置pip 安装pip 升级pip 卸载pip 查看pip是否安装成功 pip帮助信息 设置国内镜像源 使用pip 安装包 安装一个包 安装指定版本的包 安装大于或小于某个版本的包 requirements.txt文件的使用 管理当前环境中的包及其版本 批量安装包…

【java】力扣 合并k个升序链表

文章目录 题目链接题目描述思路代码 题目链接 23.合并k个升序链表 题目描述 给你一个链表数组,每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中,返回合并后的链表 思路 我在这个题里面用到了PriorityQueue(优先队列) 的知识 Prio…

Qt文件下载工具

在Qt中实现文件下载功能,通常可以通过多种方式来完成,包括使用 QNetworkAccessManager 和 QNetworkReply 类,或者使用更高级别的 QHttpMultiPart 类。以下是两种常见的实现方法: 方法1:使用 QNetworkAccessManager 和…

LangChain框架详解

LangChain框架详解 LangChain是一个基于语言模型开发应用程序的强大框架,旨在帮助开发人员简化与大模型交互、数据检索以及将不同功能模块串联起来以完成复杂任务的过程。它提供了一套丰富的工具、组件和接口,使开发人员能够轻松构建上下文感知和具备逻…

SwiftUI 截图(snapshot)视频画面的极简方法

功能需求 在 万物皆可截图:SwiftUI 中任意视图(包括List和ScrollView)截图的通用实现 这篇博文中,我们实现了在 SwiftUI 中截图几乎任何视图的功能,不幸的是它对视频截图却无能为力。不过别着急,我们还有妙招。 在上面的演示图片中,我们在 SwiftUI 中可以随心所欲的截图…

机器人相关工科专业课程体系

机器人相关工科专业课程体系 前言传统工科专业机械工程自动化/控制工程计算机科学与技术 新兴工科专业智能制造人工智能机器人工程 总结Reference: 前言 机器人工程专业是一个多领域交叉的前沿学科,涉及自然科学、工程技术、社会科学、人文科学等相关学科的理论、方…

jmeter-beanshell学习9-放弃beanshell

写这篇时候道心不稳了,前面写了好几篇benashell元件,突然发现应该放弃。想回去改前面的文章,看了看无从下手,反正已经这样了,我淋了雨,那就希望别人也没有伞吧,哈哈哈哈,放在第九篇送…