基于Kafka的日志采集

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/messages         ###要监控的日志文件
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: 'topic_test1'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/httpd/access_log         ###要监控的日志文件fields:kafka_topic: httpd_access
- type: logenabled: truepaths:- /var/log/httpd/error_log         ###要监控的日志文件fields:kafka_topic: httpd_error
setup.template.settings:index.number_of_shards: 3
output.kafka:#version:0.10.2             ### 根据不同 CKafka 实例开源版本配置hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口topic: '%{[fields.kafka_topic]}'       ###topic实例名partition.round_robin:reachable_only: falserequired_acks: 1compression: nonemax_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOFcat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOFsource /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOFsed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.dcat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"topic_test1" type =>"topic_test1"codec =>"json" } 
}
output { if [type] == "topic_test1" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"kafka-system-%{+YYYY.MM.dd}" } }
}EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_access" type =>"httpd_access"codec =>"json" } kafka{ bootstrap_servers =>"192.168.207.167:9092" topics =>"httpd_error" type =>"httpd_error"codec =>"json" }
}
output { if [type] == "httpd_access" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-access-%{+YYYY.MM.dd}" } }if [type] == "httpd_error" {elasticsearch { hosts => ["192.168.207.131:9200"] index =>"httpd-error-%{+YYYY.MM.dd}" } }
}EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/331984.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Google Find My Device:科技守护,安心无忧

在数字化的时代&#xff0c;我们的生活与各种智能设备紧密相连。而 Google Find My Device 便是一款为我们提供安心保障的实用工具。 一、Find My Decice Netword的定义 谷歌的Find My Device Netword旨在通过利用Android设备的众包网络的力量&#xff0c;帮助用户安全的定位所…

深入编程逻辑:从分支到循环的奥秘

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、编程逻辑的基石&#xff1a;分支与循环 分支逻辑详解 代码案例&#xff1a;判断整数是…

从 0 开始本地部署大语言模型

1、准备 ● Ollama&#xff1a;ollama.com ● Docker&#xff1a;https://docs.openwebui.com/ 2、下载 Ollama 进入 Ollama 官网&#xff0c;点击 Download 。 下载完成后&#xff0c;双击安装&#xff0c;什么都不需要勾选&#xff0c;直接下一步即可。安装完成&#xf…

【Qt】Qt组件设置背景图片

1. 方法1&#xff08;paintEvent方式&#xff09; 使用paintEvent()实现 1. .h文件中添加虚函数 protected:void paintEvent(QPaintEvent *event) override;添加虚函数方法&#xff1a; 选中父类&#xff0c;点击鼠标右键点击重构点击 Insert Virtual Funtion of Base Class…

【CCIE | 网络模拟器】部署 EVE-NG

目录 1. 环境准备2. 下载 EVE-NG 镜像3. 安装 EVE-NG 虚拟机3.1 创建 eve-ng 虚拟机3.2 选择存储3.3 定义虚拟机计算资源&#xff08;1&#xff09;开启CPU虚拟化功能&#xff08;2&#xff09;精简置备磁盘 3.4 检查虚拟机设置 4. 安装系统4.1 选择系统语言4.2 选择系统键盘类…

专业渗透测试 Phpsploit-Framework(PSF)框架软件小白入门教程(十二)

本系列课程&#xff0c;将重点讲解Phpsploit-Framework框架软件的基础使用&#xff01; 本文章仅提供学习&#xff0c;切勿将其用于不法手段&#xff01; 接上一篇文章内容&#xff0c;讲述如何进行Phpsploit-Framework软件的基础使用和二次开发。 我们&#xff0c;继续讲一…

爽!AI手绘变插画,接单赚爆了!

我最近发现一款名叫Hyper-SD15-Scribble的AI项目&#xff0c;可以实现一键手绘变插画的功能&#xff0c;而且它搭载了字节出品的超快速生成图片的AI大模型Hyper-SD15&#xff0c;可以实现几乎实时生成图片&#xff0c;有了它&#xff0c;拿去接一些手绘商单分分钟出图&#xff…

数据结构(四)串

2024年5月26日一稿(王道P127) 定义和实现

Ant Design Vue中 a-table 嵌套子表格

需求&#xff1a;在父表格中嵌套子表格&#xff0c;当点击展开某一行时&#xff0c;有展开的关闭当前展开行。使用a-table中的expandedRowKeys 属性和expand 方法。链接&#xff1a;Ant Design Vue 一、属性说明&#xff1a; expandedRowKeys&#xff1a;这个主要是控制展开某行…

modbus开源库libmodbus的C语言使用记录(实现简单的modbus主机/丛机程序,解决libmodbus库安装出现的问题)

libmodbus简介 libmodbus 是一个开源的、跨平台的C库,用于实现Modbus通讯协议。它支持Modbus RTU(RS-232/485)和Modbus TCP协议,可以使开发者方便地在项目中集成Modbus通讯功能。libmodbus的设计目标是简单、灵活和高效,适用于各种大小的嵌入式和桌面应用。 编译运行测试…

树莓派学习笔记——树莓派的三种GPIO编码方式

1、板载编码&#xff08;Board pin numbering&#xff09;: 板载编码是树莓派上的一种GPIO引脚编号方式&#xff0c;它指的是按照引脚在树莓派主板上的物理位置来编号。这种方式对于初学者来说可能比较直观&#xff0c;因为它允许你直接根据引脚在板上的位置来编程。 2、BCM编…

C++第十九弹---string模拟实现(下)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1、修改操作 2、迭代器操作 3、字符串操作 4、非成员函数重载操作 总结 1、修改操作 1、string& operator (const char* s); //尾部插入…

Vue移动端登录页面

使用的是vant组件&#xff0c;引用和使用组件请去官网学习&#xff0c;链接↓vant组件官网 <div class"center"><!-- 背景图片 --><div class"background"><imgsrc"/assets/background.jpg"width"100%"heigh…

2024年5月大语言模型论文推荐:模型优化、缩放到推理、基准测试和增强性能

前一篇文章总结了关于计算机视觉方面的论文&#xff0c;这篇文章将要总结了2024年5月发表的一些最重要的大语言模型的论文。这些论文涵盖了塑造下一代语言模型的各种主题&#xff0c;从模型优化和缩放到推理、基准测试和增强性能。 大型语言模型(llm)发展迅速&#xff0c;跟上…

MySQL 8.4.0 LTS 变更解析:I_S 表、权限、关键字和客户端

↑ 关注“少安事务所”公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ MySQL 8.4.0 LTS 已经发布 &#xff0c;作为发版模型变更后的第一个长期支持版本&#xff0c;注定要承担未来生产环境的重任&#xff0c;那么这个版本都有哪些新特性、变更&#xff0c;接下来少…

Spark-RDD-常用算子(方法)详解

Spark概述 Spark-RDD概述 Spark RDD 提供了丰富的方法来对数据进行转换和操作。 对 RDD&#xff08;Resilient Distributed Dataset&#xff09;的操作可以分为两大类&#xff1a;转换算子&#xff08;Transformations&#xff09;和行动算子&#xff08;Actions&#xff09;…

在aspNetCore中 使用System.Text.Json的定制功能, 将定制化的json返回给前端

C# 默认大写, 而大部分的前端默认小写, 这时候可以如此配置: builder.Services.AddControllers().AddJsonOptions((opt) > {opt.JsonSerializerOptions.PropertyNamingPolicy System.Text.Json.JsonNamingPolicy.CamelCase;opt.JsonSerializerOptions.WriteIndented true…

lspci 显示当前设备的PCI总线信息

lspci 显示当前设备的PCI总线信息 lspci 显示当前设备的PCI总线信息显示当前主机的所有PCI总线信息&#xff1a;以数字方式显示PCI厂商和设备代码同时显示数字方式还有设备代码信息以树状结构显示PCI设备的层次关系&#xff1a;更多信息 lspci 显示当前设备的PCI总线信息 lspc…

Python中别再用 ‘+‘ 拼接字符串了!

大家好&#xff0c;在 Python 编程中&#xff0c;我们常常需要对字符串进行拼接。你可能会自然地想到用 操作符将字符串连接起来&#xff0c;毕竟这看起来简单明了。 在 Python 中&#xff0c;字符串是不可变的数据类型&#xff0c;这意味着一旦字符串被创建&#xff0c;它就…

2024年【高压电工】新版试题及高压电工找解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 高压电工新版试题是安全生产模拟考试一点通生成的&#xff0c;高压电工证模拟考试题库是根据高压电工最新版教材汇编出高压电工仿真模拟考试。2024年【高压电工】新版试题及高压电工找解析 1、【单选题】 110KV及以下…