go-es模块统计日志中接口被刷数和ip访问来源
以下是使用go的web框架gin作为后端,展示的统计页面
背景
上面的数据来自elk日志统计。因为elk通过kibana进行展示,但是kibana有一定学习成本且不太能满足定制化的需求,所以考虑用编程的方式对数据进行处理 首先是接口统计,kibana的页面只会在 字段uri 的 top500 进行百分比统计,展示前5条数据,统计不够充分 其次是网关日志,ip来源的采集字段是通过x_forward_for,这记录了各级的代理来源ip。并不能直接对用户的ip进行数据聚合的统计 举例,这里面 “223.104.195.51,192.168.29.135” ,这种数据我需要拿到223.104.195.51,因为这才是用户的ip。所以需要进行编程的处理
环境
https://www.elastic.co/downloads/past-releases/elasticsearch-7-9-3
go 1.17 ,gin 1.6.3,go-elasticsearch 7.9.0
# go1.17下载地址
https://go.dev/dl/
# 模块下载
go env -w GOPROXY=https://goproxy.cn,direct
go mod init go-ops # 本项目的go mod 名字
go get github.com/elastic/go-elasticsearch/v7@v7.9.0
go get github.com/gin-gonic/gin@v1.6.3
# layui下载
http://layui.dotnetcms.cn/res/static/download/layui/layui-v2.6.8.zip?v=1
# layui框架代码
http://layui.dotnetcms.cn/web/demo/admin.html
# layui数据表格
http://layui.dotnetcms.cn/web/demo/table.html
# echarts下载(需魔法)
https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js
# echarts直方图
https://echarts.apache.org/handbook/zh/get-started/
# echarts 饼图
https://echarts.apache.org/handbook/zh/how-to/chart-types/pie/basic-pie/
# gin静态文件服务(导入js、css、图片用的)
https://learnku.com/docs/gin-gonic/1.7/examples-serving-static-files/11402
# gin模板引擎(前后端不分离,后端数据渲染前端)
https://learnku.com/docs/gin-gonic/1.7/examples-html-rendering/11363
# gin绑定Uri(动态获取二级路由)
https://learnku.com/docs/gin-gonic/1.7/examples-bind-uri/11391
go-elasticsearch 模块
顾名思义此模块作用是充当es的客户端往es索引中读取数据,其原理和kibana上的dev tools一样,都是对es的restful api调用
# 以下是go-elasticsearch 的增删查改文档
https://www.elastic.co/guide/en/elasticsearch/client/go-api/current/getting-started-go.html
eql
实现统计数据分析的核心就是eql(es查询语言),通过go-elasticsearch模块进行eql的发送,再接收es返回的回复体 以下是使用go-es发送eql后,es的回复体的struct源码 可以看到type Response struct 中,我们想要的json数据在Body中,但是注意Body的类型为 io.ReadCloser , 因此是需要用go的io模块进行获取json数据 这边解决读取问题的代码如下。该函数接收es响应体,并返回未序列化的byte切片
func getResponseBody ( result * esapi. Response, context * gin. Context) [ ] byte { var bodyBytes [ ] byte bodyBytes, err := io. ReadAll ( result. Body) if err != nil { panic ( err) } return bodyBytes
}
es客户端建立连接
https://www.elastic.co/guide/en/elasticsearch/client/go-api/current/connecting.html
这边给的是https连接 + 用户名密码认证 + 不受信任证书的解决方法
package esinitimport ( "github.com/elastic/go-elasticsearch/v7" "io/ioutil" "log"
) var EsClient * elasticsearch. Clientfunc init ( ) { EsClient = newEsClient ( )
} func newEsClient ( ) * elasticsearch. Client { cert, certErr := ioutil. ReadFile ( "esinit/es.crt" ) if certErr != nil { log. Println ( certErr) } EsClient, error := elasticsearch. NewClient ( elasticsearch. Config{ Username: "你的用户名" , Password: "你的密码" , Addresses: [ ] string { "https://es-cluster1:9200" , "https://es-cluster2:9200" , "https://es-cluster3:9200" , } , CACert: cert, } ) if error != nil { panic ( error ) } return EsClient
}
实现统计的三段eql
这里eql使用 fmt.Sprintf() 方法进行参数传递 第一段eql是统计PV 这里注意的是,我们在东八区,所以统计pv从16:00开始。这里根据@timestamp字段进行“aggs”聚合统计
func getPvResponse ( startYear int , startMonth int , startDay int , endYear, endMonth int , endDay int ) * esapi. Response { query := fmt. Sprintf ( `
{"query": {"bool": {"must": [{"range": {"@timestamp": {"gte": "%d-%02d-%02dT16:00:00","lte": "%d-%02d-%02dT16:00:00"}}}]}},"aggs": {"log_count": {"value_count": {"field": "@timestamp"}}}
}
` , startYear, startMonth, startDay, endYear, endMonth, endDay) result, _ := esinit. EsClient. Search ( esinit. EsClient. Search. WithIndex ( "k8s-istio-ingress*" ) , esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) , ) return result
}
第二段是对微服务(java)的接口(uri字段)的聚合统计 这里用的 sortUri 是gin的绑定uri功能(动态获取二级路由的名字) 这里返回前1天10000条es文档的uri字段数据
func getSortResponse ( context * gin. Context) * esapi. Response { if err := context. ShouldBindUri ( & sortUri) ; err != nil { context. JSON ( 400 , gin. H{ "msg" : err} ) } query := fmt. Sprintf ( `{"_source": ["uri"],"query": {"bool": {"filter": [{"range": {"@timestamp": {"gte": "now-1d/d","lte": "now/d"}}}]}},"size": 10000}` ) result, _ := esinit. EsClient. Search ( esinit. EsClient. Search. WithIndex ( sortUri. Name+ "*" ) , esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) , ) return result
}
第三段是对istio前一小时的ip请求统计,返回2000条记录
func getIstioDataResponse ( ) * esapi. Response { query := `
{"_source": ["x_forwarded_for","@timestamp","path","user_agent_a","response_code","method","upstream_cluster"],"query": {"bool": {"filter": [{"range": {"@timestamp": {"gte": "now-1h/h","lte": "now/d"}}}]}},"size": 2000
}
` result, _ := esinit. EsClient. Search ( esinit. EsClient. Search. WithIndex ( "k8s-istio-ingress*" ) , esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) , ) return result
}
上面的eql函数,会return 一个 []byte切片,可以进行 json.Unmarshal 或其他struct转json的模块进行处理,就能够得到数据。然后便可进行渲染