zookeeper+kafka分布式消息队列集群的部署
紧接上期,在ELFK的基础上,添加kafka做数据缓冲
附kafka消息队列
nginx服务器配置filebeat收集日志:192.168.116.40,修改配置将采集到的日志转发给kafka;
kafka集群:192.168.116.10,192.168.116.20,192.168.116.30(生产和消费端口9092);
logstash+kibana:192.168.116.50,修改配置从kafka中消费日志,并输出到kibana前端展示;
elasticsearch群集:192.168.116.60,192.168.116.70,对格式化后的数据进行索引和存储。
1.修改filebeat配置文件filebeat.yml收集日志转发(生产)给kafka
2.修改logstash配置从kafka中消费日志,并输出到kibana前端展示
input {kafka {bootstrap_servers => "192.168.116.10:9092,192.168.116.20:9092,192.168.116.10:9092"topics => "nginx_log"type => "nginx_log"codec => "json"auto_offset_reset => "latest"decorate_events => true}
}filter {grok {match => ["message", "(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)\] \"(?<http_method>.+) (?<url_path>/.*) (?<http_ver>.+)\" (?<rev_code>\d+) \d+ \".*\" \"(?<User_agent>.+)\" \".*\""]}mutate {replace => { "host" => "nginx_server" }}date {match => ["logTime","dd/MMM/yyyy:HH:mm:ss Z"]timezone => "Asia/Shanghai"}
}output {if [source] == "/var/log/nginx/access.log" {elasticsearch {hosts => ["192.168.116.60:9200","192.168.116.70:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}if [source] == "/var/log/nginx/error.log" {elasticsearch {hosts => ["192.168.116.60:9200","192.168.116.70:9200"]index => "nginx_error-%{+YYYY.MM.dd}"}}stdout {codec => rubydebug}
}
开启logstash,此时访问web测试页面,就可以在kibana对日志收集分析了
访问网站
在kibana中收集和分析