Logstash 过滤 Filter 插件
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构, 并将它们转换成通用格式,以便进行更强大的分析和实现商业价值。
Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响
常见的 Filter 插件:
- 利用 Grok 从非结构化数据中转化为结构数据
- 利用 GEOIP 根据 IP 地址找出对应的地理位置坐标
- 利用 useragent 从请求中分析操作系统、设备类型
- 简化整体处理,不受数据源、格式或架构的影响
官方链接
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.html
Grok 插件
Grok 介绍
Grok 是一个过滤器插件,可帮助您描述日志格式的结构。有超过200种 grok模式抽象概念,如IPv6地 址,UNIX路径和月份名称。
为了将日志行与格式匹配, 生产环境常需要将非结构化的数据解析成 json 结构化数据格式
比如下面行:
2016-09-19T18:19:00 [8.8.8.8:prd] DEBUG this is an example log message
使用 Grok 插件可以基于正则表达式技术利用其内置的正则表达式的别名来表示和匹配上面的日志,如下 效果
%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}
最终转换为以下格式
{"timestamp": "2016-09-19T18:19:00","ip": "8.8.8.8","environment": "prd","log_level": "DEBUG","message": "this is an example log message"
}
参考网站
https://www.elastic.co/cn/blog/do-you-grok-grok
http://grokdebug.herokuapp.com/
http://grokdebug.herokuapp.com/discover?#
范例: Nginx 访问日志
#cat /var/log/nginx/access.log
10.0.0.100 - - [03/Aug/2022:16:34:17 +0800] "GET / HTTP/1.1" 200 612 "-" "curl/7.68.0"%{COMBINEDAPACHELOG}
范例: 利用kibana网站将nginx日志自动生成grok的内置格式代码
58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] "GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1" 200 330 "http://www.wangxiaochun.com/?p=117" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-"
基于上面生成的代码转化为 Json 格式
%{COMBINEDAPACHELOG}
范例:使用 grok pattern 将 Nginx 日志格式化为 json 格式
[root@logstash ~]#vim /etc/logstash/conf.d/http_grok_stdout.conf
input {http {port =>6666}
}
filter {#将nginx日志格式化为json格式grok {match => {"message" => "%{COMBINEDAPACHELOG}" #将message字段转化为指定的Json格式}}
}
output {stdout {codec => rubydebug}
}
[root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_stdout.conf -r[root@logstash ~]#curl -XPOST -d'58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] "GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1" 200 330 "http://www.wangxiaochun.com/?p=117" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-"' 10.0.0.180:6666
范例: 直接将nginx的访问日志转化为Json格式
[root@ubuntu2004 ~]#cat /etc/logstash/conf.d/nginx_grok_stdout.conf
input {file {path => "/var/log/nginx/access.log"type => "nginx-accesslog"start_position => "beginning"stat_interval => "3"}
}
filter {
#将nginx日志格式化为json格式grok {match => {"message" => "%{COMBINEDAPACHELOG}" #将message字段转化为指定的Json格式}}
}
output {stdout {codec => rubydebug}
}
Geoip 插件
geoip 根据 ip 地址提供的对应地域信息,比如:经纬度,国家,城市名等,以方便进行地理数据分析
filebeat配置范例:
[root@kibana ~]#cat /etc/filebeat/logstash-filebeat.yml
filebeat.inputs:
- type: logenabled: true #开启日志 paths:- /var/log/nginx/access.log #指定收集的日志文件 #json.keys_under_root: true #默认false,只识别为普通文本,会将全部日志数据存储至message字段,改为true则会以Json格式存储#json.overwrite_keys: true #设为true,使用json格式日志中自定义的key替代默认的message字段,此项可选tags: ["nginx-access"]
output.logstash:hosts: ["10.0.0.180:5044"] #指定Logstash服务器的地址和端口 [root@kibana ~]#cat /var/log/nginx/access.log
58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] "GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1" 200 330 "http://www.wangxiaochun.com/?p=117" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" "-"
logstash配置范例:
[root@logstash ~]#vim /etc/logstash/conf.d/beats_geoip_stdout.conf
input {beats {port =>5044#codec => "json"}
}
filter {#将nginx日志格式化为json格式 grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}#以上面提取clientip字段为源,获取地域信息geoip {#source => "clientip" #7.X版本指定源IP的所在字段source => "[source][address]" #8.X版本变化target => "geoip"}
}
output {stdout {codec => rubydebug}
}
数据展示
[root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/beats_geoip_stdout.conf -r{"user_agent" => {"original" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"},"message" => "58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] \"GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1\" 200 330 \"http://www.wangxiaochun.com/?p=117\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\" \"-\"","geoip" => {"geo" => {"city_name" => "Shenzhen","region_name" => "Guangdong","continent_code" => "AS","location" => {"lat" => 22.5559,"lon" => 114.0577},"country_iso_code" => "CN","region_iso_code" => "CN-GD","country_name" => "China","timezone" => "Asia/Shanghai"},"ip" => "58.250.250.21"},"input" => {"type" => "log"},"@timestamp" => 2025-01-03T08:14:38.824Z,"source" => {"address" => "58.250.250.21"},"@version" => "1","url" => {"original" => "/wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3"},"timestamp" => "14/Jul/2020:15:07:27 +0800","http" => {"request" => {"method" => "GET","referrer" => "http://www.wangxiaochun.com/?p=117"},"version" => "1.1","response" => {"body" => {"bytes" => 330},"status_code" => 200}},"tags" => [[0] "nginx-access",[1] "beats_input_codec_plain_applied"],"event" => {"original" => "58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] \"GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1\" 200 330 \"http://www.wangxiaochun.com/?p=117\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\" \"-\""},"host" => {"name" => "kibana"},"ecs" => {"version" => "8.0.0"},"log" => {"offset" => 623,"file" => {"path" => "/var/log/nginx/access.log"}},"agent" => {"name" => "kibana","id" => "a3acb99e-b483-4367-a2df-535d8a39a0fa","version" => "8.8.2","ephemeral_id" => "5d8aad32-46e7-4500-8fa5-d18dd314f8d2","type" => "filebeat"}
}
Date 插件
Date插件可以将日志中的指定的日期字符串对应的源字段生成新的目标字段。
然后替换@timestamp 字段(此字段默认为当前写入logstash的时间而非日志本身的时间)或指定的其他 字段
match #类型为数组,用于指定需要使用的源字段名和对应的时间格式
target #类型为字符串,用于指定生成的目标字段名,默认是 @timestamp
timezone #类型为字符串,用于指定时区域
官方说明
https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
时区格式参考
http://joda-time.sourceforge.net/timezones.html
范例: 利用源字段timestamp生成新的字段名access_time
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_date_stdout.conf
input {http {port => 6666}
}
filter {#将nginx日志格式化为json格式grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}#解析源字段timestamp的date日期格式: 14/Jul/2020:15:07:27 +0800date {match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]#target => "access_time" #将时间写入新生成的access_time字段,源字段仍保留target => "@timestamp" #将时间覆盖原有的@timestamp字段timezone => "Asia/Shanghai"}
}
output { stdout {codec => rubydebug}
}
数据展示
[root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_date_stdout.conf -r
{"@timestamp" => 2020-07-14T07:07:27.000Z,"message" => "58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] \"GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1\" 200 330 \"http://www.wangxiaochun.com/?p=117\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\" \"-\"","url" => {"domain" => "10.0.0.180","path" => "/","original" => "/wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3","port" => 6666},"event" => {"original" => "58.250.250.21 - - [14/Jul/2020:15:07:27 +0800] \"GET /wpcontent/plugins/akismet/_inc/form.js?ver=4.1.3 HTTP/1.1\" 200 330 \"http://www.wangxiaochun.com/?p=117\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\" \"-\""},"user_agent" => {"original" => [[0] "curl/7.81.0",[1] "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"]},"host" => {"ip" => "10.0.0.180"},"http" => {"version" => [[0] "HTTP/1.1",[1] "1.1"],"method" => "POST","request" => {"body" => {"bytes" => "274"},"method" => "GET","referrer" => "http://www.wangxiaochun.com/?p=117","mime_type" => "application/x-www-form-urlencoded"},"response" => {"body" => {"bytes" => 330},"status_code" => 200}},"source" => {"address" => "58.250.250.21"},"timestamp" => "14/Jul/2020:15:07:27 +0800","@version" => "1"
}
范例: 将UNIX时间转换指定格式
date {match => ["timestamp","UNIX","YYYY-MM-dd HH:mm:ss"]target =>"@timestamp"timezone => "Asia/shanghai"
}
Useragent 插件
useragent 插件可以根据请求中的 user-agent 字段,解析出浏览器设备、操作系统等信息, 以方便后续 的分析使用
范例:
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_useragent_stdout.conf
input {http {port =>6666}
}
filter {#将nginx日志格式化为json格式grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}#解析date日期如: 10/Dec/2020:10:40:10 +0800date {match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]target => "@timestamp" #将时间覆盖原有的@timestamp字段#target => "access_time" #将时间写入新生成的access_time字段,源字段仍保留timezone => "Asia/Shanghai"}#提取agent字段,进行解析useragent {#source => "agent" #7,X指定从哪个字段获取数据source => "message" #8.X指定从哪个字段获取数据#source => "[user_agent][original]" #8.X指定从哪个字段获取数据target => "useragent" #指定生成新的字典类型的字段的名称,包括os,device等内容}}
output {stdout {codec => rubydebug}
}
数据展示
[root@logstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_useragent_stdout.conf -r
{"user_agent" => {"original" => [[0] "curl/7.81.0",[1] "Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1"]},"message" => "10.0.0.1 - - [03/Jan/2025:16:58:13 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\"","useragent" => {"name" => "Mobile Safari","device" => {"name" => "iPad"},"version" => "16.6","os" => {"name" => "iOS","version" => "16.6","full" => "iOS 16.6"}},"url" => {"domain" => "10.0.0.180","path" => "/","original" => "/","port" => 6666},"source" => {"address" => "10.0.0.1"},"http" => {"version" => [[0] "HTTP/1.1",[1] "1.1"],"method" => "POST","response" => {"status_code" => 304,"body" => {"bytes" => 0}},"request" => {"method" => "GET","mime_type" => "application/x-www-form-urlencoded","body" => {"bytes" => "197"}}},"@version" => "1","@timestamp" => 2025-01-03T08:58:13.000Z,"event" => {"original" => "10.0.0.1 - - [03/Jan/2025:16:58:13 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\""},"host" => {"ip" => "10.0.0.180"},"timestamp" => "03/Jan/2025:16:58:13 +0800"
}
Mutate 插件
官方链接:
https://www.elastic.co/guide/en/logstash/master/plugins-filters-mutate.htmlhttps://www.elastic.co/guide/en/logstash/7.6/plugins-filters-mutate.html
Mutate 插件主要是对字段进行、类型转换、删除、替换、更新等操作,可以使用以下函数
remove_field #删除字段
split #字符串切割,相当于awk取列
add_field #添加字段
convert #类型转换,支持的数据类型:integer,integer_eu,float,float_eu,string,boolean
gsub #字符串替换
rename #字符串改名
lowercase #转换字符串为小写
remove_field 删除字段
范例:
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_remove_field_stdout.conf
input {http {port =>6666}
}filter {#将nginx日志格式化为json格式grok {match => {"message" => "%{COMBINEDAPACHELOG}"}}#解析date日期如: 10/Dec/2020:10:40:10 +0800date {match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]target => "@timestamp"#target => "access_time"timezone => "Asia/Shanghai"}#mutate 删除指定字段的操作mutate {#remove_field => ["headers","message", "agent"] #7.Xremove_field => ["timestamp","message", "http"] #8.X}
}
output {stdout {codec => rubydebug}
}
数据展示
[root@logstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_remove_field_stdout.conf -r
{"event" => {"original" => "10.0.0.1 - - [03/Jan/2025:16:58:13 +0800] \"GET / HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\""},"url" => {"domain" => "10.0.0.180","path" => "/","original" => "/","port" => 6666},"@timestamp" => 2025-01-03T08:58:13.000Z,"user_agent" => {"original" => [[0] "curl/7.81.0",[1] "Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1"]},"host" => {"ip" => "10.0.0.180"},"source" => {"address" => "10.0.0.1"},"@version" => "1"
}
Split 切割
mutate 中的 split 字符串切割,指定字符做为分隔符,切割的结果用于生成新的列表元素
示例: 1000|提交订单|2020-01-08 09:10:21
范例: split 切割字符串取列
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_split_stdout.conf
input {http {port =>6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split => { "message" => "|" } #将message字段按 | 分割成名称message列表中多个列表元素}
}
output {stdout {codec => rubydebug}
}
数据展示
#启动
[root@logstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_split_stdout.conf
{"message" => [[0] "1000",[1] "提交订单",[2] "2020-01-08 09:10:21"],"event" => {"original" => "1000|提交订单|2020-01-08 09:10:21"},"user_agent" => {"original" => "curl/7.81.0"},"url" => {"domain" => "10.0.0.180","path" => "/","port" => 6666},"@version" => "1","host" => {"ip" => "10.0.0.180"},"@timestamp" => 2025-01-03T09:14:03.422624536Z,"http" => {"version" => "HTTP/1.1","method" => "POST","request" => {"mime_type" => "application/x-www-form-urlencoded","body" => {"bytes" => "37"}}}
}[root@logstash]#curl -XPOST -d '1000|提交订单|2020-01-08 09:10:21' 10.0.0.180:6666/
add_field 添加字段
用指定源字段添加新的字段,添加完成后源字段还存在
范例:
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_add_field_stdout.conf
input {http {port =>6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split => { "message" => "|" }#添加字段,将message的列表的第0个元素添加字段名user_idadd_field => {"user_id" => "%{[message][0]}" "action" => "%{[message][1]}""time" => "%{[message][2]}"}#添加字段做索引名#add_field => {"[@metadata][target_index]" => "app-%{+YYY.MM.dd}"} #删除无用字段remove_field => ["headers","message"]}
}
output {stdout {codec => rubydebug}
}
数据展示
#启动
[root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_add_field_stdout.conf
{"url" => {"domain" => "10.0.0.180","path" => "/","port" => 6666},"user_id" => "1000","@version" => "1","http" => {"request" => {"body" => {"bytes" => "37"},"mime_type" => "application/x-www-form-urlencoded"},"version" => "HTTP/1.1","method" => "POST"},"user_agent" => {"original" => "curl/7.81.0"},"event" => {"original" => "1000|提交订单|2020-01-08 09:10:21"},"@timestamp" => 2025-01-03T09:21:45.406866933Z,"time" => "2020-01-08 09:10:21","action" => "提交订单","host" => {"ip" => "10.0.0.180"}
}#用curl提交日志,可以看到上面输出信息
[root@ubuntu2004 ~]#curl -XPOST -d '1000|提交订单|2020-01-08 09:10:21' 10.0.0.180:6666/
convert 转换
mutate 中的 convert 可以实现数据类型的转换。 支持转换integer、float、string等类型
范例:
[root@logstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_convert_stdout.conf
input {http {port =>6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split => { "message" => "|" }#添加字段add_field => {"user_id" => "%{[message][0]}""action" => "%{[message][1]}""time" => "%{[message][2]}"}#删除无用字段remove_field => ["headers","message"]#对新添加字段进行格式转换convert => {"user_id" => "integer""action" => "string""time" => "string"}#convert => ["excute_time","float] #此格式也可以支持#convert => ["time","string" ]}
}
output {stdout {codec => rubydebug}
}
[root@logstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_convert_stdout.conf -r
gsub 替换
gsub 实现字符串的替换
filter {mutate {gsub=>["message","\n", " "] #将message字段中的换行替换为空格}
}
条件判断
Filter 语句块中支持 if 条件判断功能
filebeat范例:
#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.logtags: ["access"]- type: logenabled: truepaths:- /var/log/nginx/error.logtags: ["error"]
output.logstash:hosts: ["10.0.0.104:5044","10.0.0.105:5044",]#loadbalance: true #负载均衡#worker: 2 #number of hosts * workers #开启多进程
logstash配置
#vim /etc/logstash/conf.d/filebeat_logstash_es.conf
input {beats {port => 5044}
}
filter {if "access" in [tags][0] {mutate {add_field => { "target_index" => "access-%{+YYYY.MM.dd}"}}}else if "error" in [tags][0] {mutate {add_field => { "target_index" => "error-%{+YYYY.MM.dd}"}}}else if "system" in [tags][0] {mutate {add_field => { "target_index" => "system-%{+YYYY.MM.dd}"}}}}
output {elasticsearch {hosts =>["10.0.0.181:9200","10.0.0.182:9200","10.0.0.183:9200"] #一般写data地址index => "%{[target_index]}" #使用字段target_index值做为索引名template_overwrite => true #覆盖索引模板 }
}
范例:
#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.logfields:project: test-accessenv: test
output.logstash:hosts: ["10.0.0.104:5044","10.0.0.105:5044",] #vim /etc/logstash/conf.d/filebeat_logstash_es.conf
input {beats {port => 5044}file {path => "/tmp/wang.log"type => wanglog #自定义的类型,可以用于条件判断start_position => "beginning"stat_interval => "3" }}
output {if [fields][env] == "test" {elasticsearch {hosts =>["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"] index => "test-nginx-%{+YYYY.MM.dd}" }}if [type] == "wanglog" {stdout {codec => rubydebug}}}