Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】

Go实现LogAgent:海量日志收集系统【下篇】

0 前置文章

Go实现LogAgent:海量日志收集系统【上篇——LogAgent实现】

前面的章节我们已经完成了日志收集(LogAgent),接下来我们需要将日志写入到kafka中,然后将数据落地到Elasticsearch中。

项目架构图:
在这里插入图片描述
项目逻辑图:
在这里插入图片描述

1 docker搭建Elasticsearcsh、Kibana

如果没有docker环境的,可以在本机安装docker desktop

# 1 创建一个docker网络
docker network create es-net
# 查看本机网络
docker network ls
# 删除一个网络
docker network rm es-net# 2 拉取es、kibana镜像
docker pull elasticsearch:7.17.4
docker pull kibana:7.17.4# 3 创建es容器并挂在数据卷
mkdir -p /Users/xxx/docker-home/es-data/_data
mkdir -p /Users/xxx/docker-home/es-plugins
mkdir -p /Users/xxx/docker-home/es-config
mkdir -p /Users/xxx/docker-home/kibana-configtouch elasticsearch.yml
touch kibana.yml

1.需要保证要挂载的目录有读写权限,包括要挂载的配置文件。如果没有则用chmod 777命令
2.如果要挂载配置文件,则需要提前把配置文件内容写好,不能为空,否则可能会影响es和kibana运行。
3.如果只挂载到配置文件目录,不准备配置文件,会导致创建容器后没有配置文件。报错

elasticsearch.yml:

cluster.name: "docker-cluster"
network.host: 0.0.0.0

kibana.yml:

server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

启动es:

docker run -d \--name es7.17.4 -p 9200:9200 -p 9300:9300 \-e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx128m" \-v /Users/xxx/docker-home/es-data/_data:/usr/share/elasticsearch/data \-v  /Users/xxx/docker-home/es-plugins:/usr/share/elasticsearch/plugins \-v  /Users/xxx/docker-home/es-config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \--privileged \--network es-net \elasticsearch:7.17.4

启动Kibana:

docker run -d \
--name kibana17 \
--network=es-net \
-p 5601:5601 \
-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 
kibana:7.17.4

-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 其中,es7.17.4的名称为上面es容器的名称

结果:
在这里插入图片描述

2 golang操作es

执行下面代码在es中添加索引,然后到kibana页面创建索引

package mainimport ("context""fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}func main() {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))if err != nil {fmt.Println("connect es error", err)return}fmt.Println("conn es succ")tweet := Tweet{User: "haohan", Message: "This is a test"}_, err = client.Index().Index("twitter").Id("1").BodyJson(tweet).Do(context.Background())if err != nil {// Handle errorpanic(err)return}fmt.Println("insert succ")
}
# 执行上面的go代码执行,控制台输出如下表明插入成功
conn es succ
insert succ

然后我们手动到kibana中添加对应的index即可搜索出对应数据

在这里插入图片描述
在这里插入图片描述

3 开发LogTransfer:从kafka中读取数据并写入es

在前面的开发中,我们已经将日志写入到了kafka。接下来我们要做的就是从kafka中消费数据,然后写入到es中。LogTransfer做的就是这个工作。

3.1 项目结构

├─config
│      logTransfer.conf
│
├─es
│      elasticsearch.go
│   
├─logs
│      my.log
│
└─mainkafka.goconfig.golog.gomain.go

在这里插入图片描述

3.2 项目代码

①LogTransfer/main/main.go

package mainimport ("github.com/astaxie/beego/logs"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")
}

②LogTransfer/main/log.go

package mainimport ("encoding/json""fmt""github.com/astaxie/beego/logs"
)func convertLogLevel(level string) int {switch level {case "debug":return logs.LevelDebugcase "warn":return logs.LevelWarncase "info":return logs.LevelInfocase "trace":return logs.LevelTrace}return logs.LevelDebug
}func initLogger(logPath string, logLevel string) (err error) {config := make(map[string]interface{})config["filename"] = logPathconfig["level"] = convertLogLevel(logLevel)configStr, err := json.Marshal(config)if err != nil {fmt.Println("初始化日志, 序列化失败:", err)return}_ = logs.SetLogger(logs.AdapterFile, string(configStr))return
}

③LogTransfer/main/kafka.go

package mainimport ("github.com/IBM/sarama""github.com/astaxie/beego/logs""strings"
)type KafkaClient struct {client sarama.Consumeraddr   stringtopic  string
}var (kafkaClient *KafkaClient
)func InitKafka(addr string, topic string) (err error) {kafkaClient = &KafkaClient{}consumer, err := sarama.NewConsumer(strings.Split(addr, ","), nil)if err != nil {logs.Error("启动Kafka消费者错误: %s", err)return nil}kafkaClient.client = consumerkafkaClient.addr = addrkafkaClient.topic = topicreturn
}

④LogTransfer/main/config.go

package mainimport ("fmt""github.com/astaxie/beego/config"
)type LogConfig struct {KafkaAddr  stringKafkaTopic stringEsAddr     stringLogPath    stringLogLevel   string
}var (logConfig *LogConfig
)func InitConfig(confType string, filename string) (err error) {conf, err := config.NewConfig(confType, filename)if err != nil {fmt.Printf("初始化配置文件出错:%v\n", err)return}// 导入配置信息logConfig = &LogConfig{}// 日志级别logConfig.LogLevel = conf.String("logs::log_level")if len(logConfig.LogLevel) == 0 {logConfig.LogLevel = "debug"}// 日志输出路径logConfig.LogPath = conf.String("logs::log_path")if len(logConfig.LogPath) == 0 {logConfig.LogPath = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"}// KafkalogConfig.KafkaAddr = conf.String("kafka::server_addr")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka addr失败")return}logConfig.KafkaTopic = conf.String("kafka::topic")if len(logConfig.KafkaAddr) == 0 {err = fmt.Errorf("初识化Kafka topic失败")return}// EslogConfig.EsAddr = conf.String("elasticsearch::addr")if len(logConfig.EsAddr) == 0 {err = fmt.Errorf("初识化Es addr失败")return}return
}

④LogTransfer/config/log_transfer.conf

[logs]
log_level = debug
log_path = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"[kafka]
server_addr = localhost:9092
topic = nginx_log[elasticsearch]
addr = http://localhost:9200/

⑤LogTransfer/es/es.go

package mainimport ("context""fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}func main() {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))if err != nil {fmt.Println("connect es error", err)return}fmt.Println("conn es succ")tweet := Tweet{User: "haohan", Message: "This is a test"}_, err = client.Index().Index("twitter").Id("1").BodyJson(tweet).Do(context.Background())if err != nil {// Handle errorpanic(err)return}fmt.Println("insert succ")
}

结果

LogTransfer的运行日志在LogTransfer/logs/log_transfer.log中

logs/log_transfer.log:

2023/09/02 19:55:29.037 [D]  初始化日志模块成功
2023/09/02 19:55:29.074 [D]  初始化Kafka成功

在这里插入图片描述

4 完成LogTransfer:将日志入库到es并通过kibana展示

前面我们将LogTransfer的配置初始化成功了,下面我们将从Kafka中消费数据,然后将日志入库到es,最后通过kibana展示。

在这里插入图片描述

4.1 将日志保存到es

在LogTransfer/main/main.go中添加初始化InitEs函数

①main.go中添加InitEs函数

LogTransfer/main/main.go:

package mainimport ("github.com/astaxie/beego/logs""logtransfer.com/es"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")// 初始化Eserr = es.InitEs(logConfig.EsAddr)if err != nil {logs.Error("初始化Elasticsearch失败, err:", err)return}logs.Debug("初始化Es成功")}

运行LogTransfer下的main.go可以发现log_transfer.log中输出的日志信息
在这里插入图片描述

②LogTransfer/es/es.go

package esimport ("fmt""github.com/olivere/elastic/v7"
)type Tweet struct {User    stringMessage string
}var (esClient *elastic.Client
)func InitEs(addr string) (err error) {client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))if err != nil {fmt.Println("connect es error", err)return nil}esClient = clientreturn
}

运行LogTransfer/main下的main函数

  • 可以从logs/log_transfer.log中看到打印初始化es、kafka等成功

③添加run.go:消费kafka中的数据

在main函数中添加run函数, 用于运行kafka消费数据到Es

package mainimport ("github.com/Shopify/sarama""github.com/astaxie/beego/logs"
)func run() (err error) {partitionList, err := kafkaClient.Client.Partitions(kafkaClient.Topic)if err != nil {logs.Error("Failed to get the list of partitions: ", err)return}for partition := range partitionList {pc, errRet := kafkaClient.Client.ConsumePartition(kafkaClient.Topic, int32(partition), sarama.OffsetNewest)if errRet != nil {err = errRetlogs.Error("Failed to start consumer for partition %d: %s\n", partition, err)return}defer pc.AsyncClose()kafkaClient.wg.Add(1)go func(pc sarama.PartitionConsumer) {for msg := range pc.Messages() {logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))err = es.SendToES(kafkaClient.topic, msg.Value)if err != nil {logs.Warn("send to es failed, err:%v", err)}}kafkaClient.wg.Done()}(pc)}kafkaClient.wg.Wait()return
}

④main.go中添加SendToES函数

package mainimport ("github.com/astaxie/beego/logs""logtransfer.com/es"
)func main() {// 初始化配置err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")if err != nil {panic(err)return}logs.Debug("初始化配置成功")//初始化日志模块err = initLogger(logConfig.LogPath, logConfig.LogLevel)if err != nil {panic(err)return}logs.Debug("初始化日志模块成功")// 初始化Kafkaerr = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)if err != nil {logs.Error("初始化Kafka失败, err:", err)return}logs.Debug("初始化Kafka成功")// 初始化Eserr = es.InitEs(logConfig.EsAddr)if err != nil {logs.Error("初始化Elasticsearch失败, err:", err)return}logs.Debug("初始化Es成功")// 运行err = run()if err != nil {logs.Error("运行错误, err:", err)return}select {}
}

5 联调

5.1 运行LogAgent:采集数据并存储到kafka

# 用于向docker中的etcd写入对应key
docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 "[{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\",\"topic\":\"mysql_log\"},{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\",\"topic\":\"nginx_log\"}]"

通过上面的命令,用于向etcd中写入对应key,etcd的watcher监视到后会对应更新配置

在这里插入图片描述

查看LogAgent的运行日志:
在这里插入图片描述

5.2 运行LogTransfer:消费kafka数据并存到es

选中LogTransfer下main文件夹下的所有go文件,鼠标右击运行,查看控制台输出

在这里插入图片描述
查看LogTransfer的运行日志:
在这里插入图片描述

5.3 在kibana创建index并查看

Management - Stack Management - Kibana - Index Patterns ,根据kafka中的topic创建对应的索引。以nginx_log为例:

在这里插入图片描述
回到overview,根据nginx_log这个index搜索信息:
在这里插入图片描述

可以看到成功读取到日志信息,至此该项目已开发完成

参考文章:https://blog.csdn.net/qq_43442524/article/details/105072952

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/118623.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SpringSecurity】十二、集成JWT搭配Redis实现退出登录

文章目录 1、登出的实现思路2、集成Redis3、认证成功处理器4、退出成功处理器5、修改token校验过滤器6、调试 1、登出的实现思路 这是目前的token实现图: 因为JWT的无状态,服务端无法在使用过程中主动废止某个 token,或者更改 token 的权限…

【算法刷题-双指针篇】

目录 1.leetcode-27. 移除元素2.leetcode-344. 反转字符串3.leetcode-剑指 Offer 05. 替换空格4.leetcode-206. 反转链表5.leetcode-19. 删除链表的倒数第 N 个结点6.leetcode-面试题 02.07. 链表相交7.leetcode-142. 环形链表 II8.leetcode-15. 三数之和9.leetcode-18. 四数之…

python unitest自动化框架

以下举一个最简单的unitest实例,包含备注,自己拉取代码运行一次就知道原理了 import unittest import osclass TestSample(unittest.TestCase):classmethoddef setUpClass(cls) -> None:print(整个测试类只执行一次)def setUp(self) -> None:prin…

【python零基础入门学习】python基础篇之判断与for循环(二)

本站以分享各种运维经验和运维所需要的技能为主 《python》:python零基础入门学习 《shell》:shell学习 《terraform》持续更新中:terraform_Aws学习零基础入门到最佳实战 《k8》暂未更新 《docker学习》暂未更新 《ceph学习》ceph日常问题解…

flutter plugins插件【二】【FlutterAssetsGenerator】

2、FlutterAssetsGenerator 介绍地址:https://juejin.cn/post/6898542896274735117 配置assets目录 ​ 插件会从pubspec.yaml文件下读取assets目录,因此要使用本插件,你需要在pubspec.yaml下配置资源目录 flutter:# The following line ens…

Navicat连接数据库报2003错误解决办法

是防火墙还没有开启 查看防火墙管理的端口 设置3306防火墙开启,重载防火墙 连接成功

2024年java面试--多线程(2)

系列文章目录 2024年java面试(一)–spring篇2024年java面试(二)–spring篇2024年java面试(三)–spring篇2024年java面试(四)–spring篇2024年java面试–集合篇2024年java面试–redi…

CSS中如何实现弹性盒子布局(Flexbox)的换行和排序功能?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 换行(Flexbox Wrapping)⭐ 示例:实现换行⭐ 排序(Flexbox Ordering)⭐ 示例:实现排序⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得…

【uniapp 配置启动页面隐私弹窗】

为什么需要配置 原因 根据工业和信息化部关于开展APP侵害用户权益专项整治要求,App提交到应用市场必须满足以下条件: 1.应用启动运行时需弹出隐私政策协议,说明应用采集用户数据 2.应用不能强制要求用户授予权限,即不能“不给权…

iPhone 14 Plus与iPhone 14 Pro:你应该买哪一款

又到了iPhone季,这意味着你可能会在几种不同的机型之间左右为难,无法决定买哪一款。更令人困惑的是,苹果推出的iPhone变体——iPhone 14 Plus,只比老款iPhone 14 Pro低100美元。 有这么多选择,你可能想知道哪款iPhone最适合你。你应该买一部大屏幕的iPhone 14 Plus并节省…

C语言每日一练--------Day(11)

本专栏为c语言练习专栏,适合刚刚学完c语言的初学者。本专栏每天会不定时更新,通过每天练习,进一步对c语言的重难点知识进行更深入的学习。 今日练习题关键字:找到数组中消失的数字 哈希表 💓博主csdn个人主页&#xff…

leetcode原题: 最小值、最大数字

题目1:最小值 给定两个整数数组a和b,计算具有最小差绝对值的一对数值(每个数组中取一个值),并返回该对数值的差 示例: 输入:{1, 3, 15, 11, 2}, {23, 127, 235, 19, 8} 输出:3&…

网络编程 http 相关基础概念

文章目录 表单是什么http请求是什么http请求的结构和说明关于http方法 GET和POST区别http常见状态码http响应http 请求是无状态的含义html是什么 (前端内容,了解即可)html 常见标签 (前端内容,了解即可)关于…

项目总结知识点记录-文件上传下载(三)

(1)文件上传 代码: RequestMapping(value "doUpload", method RequestMethod.POST)public String doUpload(ModelAttribute BookHelper bookHelper, Model model, HttpSession session) throws IllegalStateException, IOExcepti…

XSS的分析

目录 1、XSS的原理 2、XSS的攻击类型 2.1 反射型XSS 2.2 存储型XSS 2.3 DOM-based 型 2.4 基于字符集的 XSS 2.5 基于 Flash 的跨站 XSS 2.6 未经验证的跳转 XSS 3、复现 3.1 反射性 3.2 DOM-based型 1、XSS的原理 XSS的原理是恶意攻击者往 Web 页面里插入恶意可执行…

Android中级——消息机制

消息机制 概念ThreadLocalMessageQueueLooperHandlerrunOnUiThread() 概念 MessageQueue:采用单链表的方法存储消息列表Looper:查询MessageQueue是否有新消息,有则处理,无则等待ThreadLocal:用于Handler获取当前线程的…

TypeScript配置-- 1. 新手处理TS文件红色波浪线的几种方式

Typescript 规范化了JS的项目开发,但是对一些项目的一些新手来说,确实是不怎么优好,譬如我:将我之前珍藏的封装JS代码,拿进了配置了tsconfig.json的vue3项目,在vscode下,出现了满屏的红色 &…

UML四大关系

文章目录 引言UML的定义和作用UML四大关系的重要性和应用场景关联关系继承关系聚合关系组合关系 UML四大关系的进一步讨论UML四大关系的实际应用软件开发中的应用其他领域的应用 总结 引言 在软件开发中,统一建模语言(Unified Modeling Language&#x…

如何在Spring Boot应用中使用Nacos实现动态更新数据源

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 &a…

STM32G0 定时器PWM DMA输出驱动WS2812配置 LL库

通过DMA方式输出PWM模拟LED数据信号 优点:不消耗CPU资源 缺点:占用内存较大 STM32CUBEMX配置 定时器配置 定时器通道:TIM3 CH2 分频:0 重装值:79,芯片主频64Mhz,因此PWM输出频率&#xff1a…