ELK安装、部署、调试(六) logstash的安装和配置

1.介绍

Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。

管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。如:json、multiline等

inputs(输入阶段):

会生成事件。包括:file、kafka、beats等

filters(过滤器阶段):

可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等

outputs(输出阶段):

将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等

Codecs(解码器):

基本上是流过滤器,作为输入和输出的一部分进行操作,可以轻松地将消息的传输与序列化过程分开。

logstash为开源的数据处理pipeline管道,能接受多个源的数据,然后进行转换,再发送到指定目的地,logstash通过插件的机
制实现各种功能插件的下载地址为https://github.com/logstash-plugins
logstash 主要用于接收数据,解析过滤并转化,输出数据三个部分
对应的插件为input插件,filter插件,output插件

ingpu插件介绍,主要接收数据,支持多少数据源。
支持file : 读取一个文件,类似于tail命令,一行一行的实施读取。
支持syslog:监听系统的514端口的syslog messages,使用RFC3164格式进行解析
redis: 可以从Redis服务器读取数据,此时redis类似于一个消息缓存组件
kafka: 从kafka集群读取数据,kafka和logstash架构一般用于数据量较大的场景,kafka用于数据的换从和存储
filebeat: filebeat为一个文本的日志收集器,占系统资源小


filter插件介绍,主要进行数据的过滤,解析,格式化
grok:是logstash最重要的插件,可以解析任意数据,支持正则表达式,并提供很多内置的规则及模板
mutate 提供丰富的技术类型数据处理能力。包括类型转换,字符串处理,字段处理等
date,转化日志记录中的时间字符串格式
Geoip:更具ip地址提供对应的地域信息。包括国,省,经纬度等,对于可视化地图和区域统计非常有用。

output插件,输入日志到目的地
elasticsearch :发动到es中
file:发动数据到文件中
redis :发送日志到redis
kafka:发送日志到kafka

2.下载 

https://www.elastic.co/cn/downloads/logstash
百度云elk上有

使用向导的连接
https://www.elastic.co/guide/en/logstash/7.9/installing-logstash.html#package-repositories

3.安装

logstashtar  zxvf logstash-7.9.3.tar.gz -C /usr/local/cd /usr/local/ mv logstash-7.9.3 logstashlogstash/bin/logstash为启动文件

4.启动

cd /usr/local/logstash/bin
./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

后台运行

nohup logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}' &


input 使用input插件
stdin{} 采用标准的终端输入,如键盘输入

output 使用output插件
stdout 为标准输出
codec 插件,定义输出格式
rubydebug 一般输出为jison格式的测试

5.测试

5.1 测试1:采用标准的输入和输出

root@localhost bin]# ./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:22:35,442][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:22:36,612][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:22:40,097][INFO ][org.reflections.Reflections] Reflections took 77 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:22:42,937][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["config string"], :thread=>"#<Thread:0xdd19934 run>"}
[2023-08-29T10:22:44,380][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time
{"seconds"=>1.41}
[2023-08-29T10:22:44,466][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:22:44,590][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:22:45,030][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}
hello             #我用键盘输入的,下面的信息是logstash的输出。
{"message" => "hello","@version" => "1","host" => "localhost.localdomain","@timestamp" => 2023-08-29T02:22:53.965Z
}


5.2 测试2:使用配置文件 +标准输入输出

参见logstash-1.conf
内容如下

[root@localhost logstash]# cat logstash-1.conf
input { stdin { }
}output {stdout { codec => rubydebug }
}


[root@localhost logstash]#
执行
 ./logstash -f ../logstash-1.conf

[root@localhost bin]# ./logstash -f ../logstash-1.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:30:50,169][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:30:51,542][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:30:55,077][INFO ][org.reflections.Reflections] Reflections took 82 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:30:58,205][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["/usr/local/logstash/logstash-1.conf"], :thread=>"#<Thread:0x7b58200e run>"}
[2023-08-29T10:30:59,713][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time
{"seconds"=>1.48}
[2023-08-29T10:30:59,829][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:31:00,060][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:31:00,664][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}

5.3 测试三:配置文件+file输入 +标准的屏幕输出

vi logstash-2.conf

input {file {path => "/var/log/messages"}
}
output {stdout {codec=>rubydebug}
}

[root@localhost logstash]# vi logstash-2.conf

[root@localhost logstash]# ./bin/logstash -f logstash-2.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:36:25,171][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}[2023-08-29T10:36:37,490][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}
{"@timestamp" => 2023-08-29T02:37:14.162Z,"path" => "/var/log/messages","host" => "localhost.localdomain","message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 992.","@version" => "1"
}
{"@timestamp" => 2023-08-29T02:37:14.231Z,"path" => "/var/log/messages","host" => "localhost.localdomain","message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 993.","@version" => "1"
}

5.4 测试4:配置文件+文件输入+kafka输出

[root@localhost logstash]# cat logstash-3.conf

input {file {path => "/var/log/messages"}
}
output {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}
[root@localhost logstash]# ./bin/logstash -f logstash-3.conf

到10.10.10.71上用kafka消费

./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages

有信息输出,

2023-08-29T02:49:42.383Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 996 of user
root.
2023-08-29T02:49:42.300Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 997 of user
root.
2023-08-29T02:49:42.380Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 997 of user
root.
2023-08-29T02:49:42.384Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 996 of user
root.
2023-08-29T02:49:43.465Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Activating service
name='org.freedesktop.problems' (using servicehelper)
2023-08-29T02:49:43.467Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Successfully
activated service 'org.freedesktop.problems'
2023-08-29T02:50:01.531Z localhost.localdomain Aug 29 10:50:01 localhost systemd: Started Session 998 of user
root.


5.5 测试5:配置文件+filebeat端口输入+标准输出

服务器产生日志(filebeat)---》logstash服务器

[root@localhost logstash]# cat logstash-4.conf
input {beats {port => 5044}
}
output {stdout {codec => rubydebug}
}
[root@localhost logstash]# ./bin/logstash -f logstash-4.conf

启动后会在本机启动一个5044端口,此段欧在配置文件中可
以任意配置。不要和系统已启动的端口冲突即可

[root@localhost logstash]# netstat -anp | grep 5044
tcp6       0      0 :::5044                 :::*                    LISTEN      10545/java
[root@localhost logstash]#

配合测试我们在filebeat服务器上修改配置文件

# ------------------------------ Logstash Output -------------------------------
output.logstash:hosts: ["10.10.10.74:5044"]      

   

##filebeat使用logstash作为输出,

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications 
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/ce

修改filebeat配置后,重启filebeat

nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat-to-logstash.yml &

经过测试在logstash下看到了filebeat的日志内容。


5.6测试6   配置文件+filebeat端口输入+输出到kafka

场景
服务器产生日志(filebeat)--->  logstash服务器---->kafka服务器

[root@localhost logstash]# cat logstash-5.conf
input {beats {port => 5044}
}
output {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}

filebeat服务器:
filebeat.yml配置同5

# ------------------------------ Logstash Output -------------------------------
output.logstash:hosts: ["10.10.10.74:5045"]   


logstash服务器:
配置

[root@localhost logstash]# cat logstash-5.conf
input {beats {port => 5044}
}
output {kafka {
#  codec ==> json                 #打开注释将会使用json格式传输,使用json感觉更乱bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topic_id => "osmessages"}
}
./bin/logstash -f logstash-5.conf

kafka服务器:

./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages


消费了一下日志,可以看到filebeat服务器同步过来的信息。


5.7 测试7 配置文件+kafka读取当输入+输出到es


服务器产生日志(filebeat)--->  kafka服务器__抽取数据___> logstash服务器---->ESlogstash的配置
[root@localhost logstash]# cat f-kafka-logs-es.conf

input {kafka {bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"topics => ["osmessages"]}
}
output {elasticsearch {hosts => ["10.10.10.65:9200,10.10.10.66:9200,10.10.10.67:9200"]index => "osmessageslog-%{+YYYY-MM-dd}"}
}


./bin/logstash -f f-kafka-logs-es.conf
nohup /usr/local/logstash/bin logstash -f /usr/local/logstash/f-kafka-logs-es.conf


filebeat.yml配置

# ------------------------------ KAFKA Output -------------------------------
output.kafka:eanbled: truehosts: ["10.10.10.71:9092","10.10.10.72:9092","10.10.10.73:9092"]version: "2.0.1"topic: '%{[fields][log_topic]}'partition.round_robin:reachable_only: trueworker: 2required_acks: 1compression: gzipmax_message_bytes: 10000000


 nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml &
完成到kafka的输出

完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/113502.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年高教社杯 国赛数学建模思路 - 案例:感知机原理剖析及实现

文章目录 1 感知机的直观理解2 感知机的数学角度3 代码实现 4 建模资料 # 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 感知机的直观理解 感知机应该属于机器学习算法中最简单的一种算法&#xff0c;其…

LeetCode239.滑动窗口最大值

看到这道题我就有印象&#xff0c; 我在剑指offer里面做过这道题&#xff0c;我记得当时用的是优先队列&#xff0c;然后我脑子里一下子就有了想法&#xff0c;拿优先队列作为窗口&#xff0c;每往右移动一步&#xff0c;把左边的数remove掉&#xff0c;把右边的数add进来&…

Mysql-索引查询相关

一、单表查询 1.1 二级索引为null 不论是普通的二级索引&#xff0c;还是唯一二级索引&#xff0c;它们的索引列对包含 NULL 值的数量并不限制&#xff0c;所以我们采用key IS NULL 这种形式的搜索条件最多只能使用 ref 的访问方法&#xff0c;而不是 const 的访问方法 1.2 c…

MySQL函数和约束

MySQL常见函数 字符串常见函数 # concat : 字符串拼接 select concat(Hello , MySQL); # lower : 全部转小写 SELECT LOWER(Hello); # upper : 全部转大写 SELECT UPPER(hello); # lpad : 左填充 SELECT LPAD(hello,10,0); # rpad : 右填充 SELECT RPAD(hello,10,0); # trim…

关于一个git的更新使用流程

1.第一步使用git bash 使用git bash命令来进行操作&#xff08;当然我是个人比较喜欢用这种方法的&#xff09; 2. 第二步&#xff1a;连接 3.第三步&#xff1a;进入 4.第四步&#xff1a;查看分支 5.第五步&#xff1a;切换分支 将本地文件更新后之后进行提交 6.第六步&am…

【VUE】数字动态变化到目标值-vue-count-to

vue-count-to是一个Vue组件&#xff0c;用于实现数字动画效果。它可以用于显示从一个数字到另一个数字的过渡动画。 插件名&#xff1a;vue-count-to 官方仓库地址&#xff1a;GitHub - PanJiaChen/vue-countTo: Its a vue component that will count to a target number at a…

Leetcode.100 相同的树

给你两棵二叉树的根节点 p 和 q &#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 代码如下&#xff1a;…

前端(十六)——微信小程序语音转文字,文字转语音功能的实现

&#x1f60a;博主&#xff1a;小猫娃来啦 &#x1f60a;文章核心&#xff1a;微信小程序语音转文字&#xff0c;文字转语音功能的实现 文章目录 资源下载链接最关键的问题控制台报错30003语音转文字文字转语音效果图应用场景作用和优势实现思路 资源下载链接 CSDN资源下载&am…

【QT】信号和槽(15)

前面的内容说了很多不同的控件如何使用&#xff0c;今天来看下QT的核心&#xff0c;信号与槽&#xff08;Signals and slots&#xff09;&#xff01; 简单理解一下&#xff0c;就是我们的信号与槽连接上了之后&#xff0c;发射一个信号给到槽&#xff0c;槽函数接收到了这个信…

uniapp 实现切换tab锚点定位到指定位置

1.主要使用uniapp scroll-view 组件的scroll-into-view属性实现功能 2.代码如下 <scroll-view:scroll-into-view"intoView"><u-tabsclass"tabs-list"change"tabChange":list"tabList"></u-tabs><view id"1&…

Java实现根据按图搜索商品数据,按图搜索获取1688商品详情数据,1688拍立淘接口,1688API接口封装方法

要通过按图搜索1688的API获取商品详情跨境属性数据&#xff0c;您可以使用1688开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过1688开放平台API获取商品详情属性数据接口&#xff1a; 首先&#xff0c;确保您已注册成为1688开放平台…

探索Java集合框架—数据结构、ArrayList集合

一、背景介绍 Java集合的使用相信大家都已经非常得心应手&#xff0c;但是我们怎么做到知其然&#xff0c;更知其所以然这种出神入化的境界呢&#xff1f;我们揭开集合框架底层神秘面纱来一探究竟 目录 一、背景介绍 二、思路&方案 数据结构是什么&#xff1f; 数据结…

linux————ELK(日志收集系统集群)

目录 一、为什么要使用ELK 二、ELK作用 二、组件 一、elasticsearch 特点 二、logstash 工作过程 INPUT&#xff08;输入&#xff09; FILETER(过滤) OUTPUTS&#xff08;输出&#xff09; 三、kibana 三、架构类型 ELK ELKK ELFK ELFKK EFK 四、构建ELk集群…

Apache的简单介绍(LAMP架构+搭建Discuz论坛)

文章目录 1.Apache概述1.1什么是apache1.2 apache的功能及特性1.2.1功能1.2.2特性 1.3 MPM 工作模式1.3.1 prefork模式1.3.2 worker模式1.3.3 event模式 2.LAMP概述2.1 LAMP的组成2.2 LAMP各组件的主要作用2.3 LAMP的工作过程2.4CGI和FastCGI 3.搭建Discuz论坛所需4.编译安装Ap…

Mysql中explain执行计划信息中字段详解

Mysql中explain执行计划信息中字段详解 1. 获取执行计划2. 字段含义2.1 id2.2 select_type2.3 table2.4 partitions2.5 type2.6 possible_keys2.7 key2.8 ley_len2.9 ref2.10 rows2.11 extra 1. 获取执行计划 explain select * from t5; --或 desc select * from t5;2. 字段含…

滑动窗口实例1(长度最小的子数组)

题目&#xff1a; 给定一个含有 n 个正整数的数组和一个正整数 target 。 找出该数组中满足其和 ≥ target 的长度最小的 连续子数组 [numsl, numsl1, ..., numsr-1, numsr] &#xff0c;并返回其长度。如果不存在符合条件的子数组&#xff0c;返回 0 。 示例 1&#xff1a; …

zookeeper 3.8.1安装和入门使用

1、zookeeper环境搭建&#xff08;Windows单机版&#xff09; 1.1、 前提 必须安装jdk 1.8&#xff0c;配置jdk环境变量&#xff0c;步骤略 1.2、安装zookeeper 地址&#xff1a;https://zookeeper.apache.org/ 1.2.1、选择releases版本 1.2.2、下载安装包并解压 1.2.3、配…

前端文件、图片直传OOS、分片上传、el-upload上传(vue+elementUI)

前言&#xff1a;基于天翼云的面相对象存储(Object-Oriented Storage&#xff0c;OOS),实现小文件的直接上传&#xff0c;大文件的分片上传。 开发文档地址&#xff1a;网址 上传之前的相关操作&#xff1a;注册账户&#xff0c;创建 AccessKeyId 和 AccessSecretKey之后&…

企业怎么优化固定资产管理

在优化固定资产管理的过程中&#xff0c;不仅要关注硬件设备和设施的维护&#xff0c;还要重视软件系统和数据管理。一些可能的方法&#xff1a;  需要建立一套完整的资产管理系统。这个系统应该包括资产的采购、登记、使用、维修、报废等各个环节的管理流程。通过这个系统&a…