Flink日志采集-ELK可视化实现

一、各组件版本

组件版本
Flink1.16.1
kafka2.0.0
Logstash6.5.4
Elasticseach6.3.1
Kibana6.3.1

  针对按照⽇志⽂件⼤⼩滚动⽣成⽂件的⽅式,可能因为某个错误的问题,需要看好多个⽇志⽂件,还有Flink on Yarn模式提交Flink任务,在任务执行完毕或者任务报错后container会被回收从而导致日志丢失,为了方便排查问题可以把⽇志⽂件通过KafkaAppender写⼊到kafka中,然后通过ELK等进⾏⽇志搜索甚⾄是分析告警。

二、Flink配置将日志写入Kafka

2.1 flink-conf.yaml增加下面两行配置信息

env.java.opts.taskmanager: -DyarnContainerId=$CONTAINER_ID
env.java.opts.jobmanager: -DyarnContainerId=$CONTAINER_ID

2.2 log4j.properties配置案例如下

##################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
##################################################################
# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30# This affects logging for both user code and Flink
#rootLogger.appenderRef.file.ref = MainAppender
rootLogger.level = INFO
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.appenderRef.file.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 500MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10#appender.main.name = MainAppender
#appender.main.type = RollingFile
#appender.main.append = true
#appender.main.fileName = ${sys:log.file}
#appender.main.filePattern = ${sys:log.file}.%i
#appender.main.layout.type = PatternLayout
#appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#appender.main.policies.type = Policies
#appender.main.policies.size.type = SizeBasedTriggeringPolicy
#appender.main.policies.size.size = 100MB
#appender.main.policies.startup.type = OnStartupTriggeringPolicy
#appender.main.strategy.type = DefaultRolloverStrategy
#appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.syncSend = true
appender.kafka.ignoreExceptions = false
appender.kafka.topic = flink_logs
appender.kafka.property.type = Property
appender.kafka.property.name = bootstrap.servers
appender.kafka.property.value = xxx1:9092,xxx2:9092,xxx3:9092
appender.kafka.layout.type = JSONLayout
apender.kafka.layout.value = net.logstash.log4j.JSONEventLayoutV1
appender.kafka.layout.compact = true
appender.kafka.layout.complete = false# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF#通过 flink on yarn 模式还可以添加⾃定义字段
# 日志路径
appender.kafka.layout.additionalField1.type = KeyValuePair
appender.kafka.layout.additionalField1.key = logdir
appender.kafka.layout.additionalField1.value = ${sys:log.file}
# flink-job-name
appender.kafka.layout.additionalField2.type = KeyValuePair
appender.kafka.layout.additionalField2.key = flinkJobName
appender.kafka.layout.additionalField2.value = ${sys:flinkJobName}
# 提交到yarn的containerId
appender.kafka.layout.additionalField3.type = KeyValuePair
appender.kafka.layout.additionalField3.key = yarnContainerId
appender.kafka.layout.additionalField3.value = ${sys:yarnContainerId}

  上⾯的 appender.kafka.layout.type 可以使⽤ JSONLayout ,也可以⾃定义。

  ⾃定义需要将上⾯的appender.kafka.layout.type 和 appender.kafka.layout.value 修改成如下:

appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern ={"log_level":"%p","log_timestamp":"%d{ISO8601}","log_thread":"%t","log_file":"%F","l
og_line":"%L","log_message":"'%m'","log_path":"%X{log_path}","job_name":"${sys:flink
_job_name}"}%n

2.3 基于Flink on yarn模式提交任务前期准备

2.3.1 需要根据kafka的版本在flink/lib⽬录下放⼊kafka-clients的jar包

在这里插入图片描述

2.3.2 kafka处于启动状态

2.3.3 Flink Standalone集群

# 根据kafka的版本放⼊kafka-clients
kafka-clients-3.1.0.jar
# jackson对应的jar包
jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar

2.4 Flink on yarn任务提交案例

/root/software/flink-1.16.1/bin/flink run-application \
-t yarn-application \
-D yarn.application.name=TopSpeedWindowing \
-D parallelism.default=3 \
-D jobmanager.memory.process.size=2g \
-D taskmanager.memory.process.size=2g \
-D env.java.opts="-DflinkJobName=TopSpeedWindowing" \
/root/software/flink-1.16.1/examples/streaming/TopSpeedWindowing.jar

【注意】启动脚本需要加入这个参数,日志才能采集到任务名称(-D env.java.opts="-DflinkJobName=xxx")

  消费flink_logs案例

{instant: {epochSecond: 1698723428,nanoOfSecond: 544000000,},thread: 'flink-akka.actor.default-dispatcher-17',level: 'INFO',loggerName: 'org.apache.flink.runtime.rpc.akka.AkkaRpcService',message: 'Stopped Akka RPC service.',endOfBatch: false,loggerFqcn: 'org.apache.logging.slf4j.Log4jLogger',threadId: 68,threadPriority: 5,logdir: '/yarn/container-logs/application_1697779774806_0046/container_1697779774806_0046_01_000002/taskmanager.log',flinkJobName: 'flink-log-collect-test',yarnContainerId: 'container_1697779774806_0046_01_000002',
}

  ⽇志写⼊Kafka之后可以通过Logstash接⼊elasticsearch,然后通过kibana进⾏查询或搜索

三、LogStash部署

  部署过程略,网上都有

  需要注意Logstash内部kafka-clients和Kafka版本兼容问题,需要根据Kafka版本选择合适的Logstash版本

  将以下内容写⼊config/logstash-sample.conf ⽂件中

input {kafka {bootstrap_servers => ["xxx1:9092,xxx2:9092,xxx3:9092"] group_id => "logstash-group"topics => ["flink_logs"] consumer_threads => 3 type => "flink-logs" codec => "json"auto_offset_reset => "latest"}
}output {elasticsearch {hosts => ["xxx:9200"] index => "flink-log-%{+YYYY-MM-dd}"}
}

  Logstash启动:

logstash-6.5.4/bin/logstash -f logstash-6.5.4/config/logstash-sample.conf 2>&1 >logstash-6.5.4/logs/logstash.log &

四、Elasticsearch部署

  部署过程略,网上都有

  注意需要用root用户以外的用户启动Elasticsearch

  启动脚本:

Su elasticsearchlogtestelasticsearch-6.3.1/bin/elasticsearch

在这里插入图片描述

  Windows访问ES客户端推荐使用ElasticHD,本地运行后可以直连ES
在这里插入图片描述

五、Kibana部署

  部署过程略,网上都有

  启动脚本:

  kibana-6.3.1-linux-x86_64/bin/kibana

5.1 配置规则

在这里插入图片描述
在这里插入图片描述

5.2 日志分析

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/180945.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vSLAM中IMU预积分的作用--以惯性导航的角度分析

作为一个学过一点惯导的工程师,在初次接触视觉slam方向时,最感兴趣的就是IMU预积分了。但为什么要用这个预积分,在看了很多材料和书后,还是感觉模模糊糊,云里雾里。 在接触了vSLAM的更多内容后,站在历史研究…

如何避免 JavaScript 中的内存泄漏?

一、什么是内存泄漏? JavaScript 就是所谓的垃圾回收语言之一,垃圾回收语言通过定期检查哪些先前分配的内存仍然可以从应用程序的其他部分“访问”来帮助开发人员管理内存。垃圾回收语言中泄漏的主要原因是不需要的引用。如果你的 JavaScript 应用程序经…

java中:cmd界面输入javac后提示:找不到或无法加载主类,怎么解决

找不到或无法加载主类 检查环境变量cmd下用 java命令运行文件,提示找不到主类待续、更新中 检查环境变量 CLASSPATH 少写.;安装jdk过程有两部,一步为安装jdk文件夹,全部一致; 另一步为安装jre文件夹与jdk文件夹不一致(或者文件夹安装位置, 一路全部默认)path中将java变量移到顶…

js调整table表格上下相邻元素顺序

有时候我们会遇到要通过箭头控制table表格上下顺序的需求,如下: 点击向下就将该元素下移一位,下面的一位元素就移上来,点击向上就将该元素上移一位,上面的一位元素就移下来,也就是相邻元素互换位置顺序: <el-table :data="targetTable" border style=&quo…

HTTP 协议请求头 If-Match、If-None-Match 和 ETag

概述 在 HTTP 协议中&#xff0c;请求头 If-Match、If-None-Match、If-Modified-Since、If-Unmodified-Since、If-Range 主要是为了解决浏览器缓存数据而定义的请求头标准&#xff0c;按照协议规范正确的判断和使用这几个请求头&#xff0c;可以更精准的处理浏览器缓存&#x…

Springboot3整合Mybatis-plus3.5.3报错

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a; 报错以及Bug ✨特色专栏&#xff1a; …

研发效能DevOps: Git安装

目录 一、理论 1.Git 2.Git 工具 二、实验 1.Git安装 2.配置Git 3. VS Code加载Git 一、理论 1.Git &#xff08;1&#xff09;简介 Git 是一个分布式版本控制及源代码管理工具;Git 可以为你的项目保存若干快照&#xff0c;以此来对整个项目进行版本管理。 Git 是一个…

ASTM F963-23美国玩具安全新标准发布

新标准发布 2023年10月13日&#xff0c;美国材料与试验协会&#xff08;ASTM&#xff09;发布了新版玩具安全标准ASTM F963-23。 主要更新内容 与ASTM F963-17相比&#xff0c;此次更新包括&#xff1a;单独描述了基材重金属元素的豁免情况&#xff0c;更新了邻苯二甲酸酯的管控…

Java与Redis的集成以及Redis中的项目应用

一、Java连接Redis Redis与MySQL都是数据库&#xff0c;java操作redis其实跟操作mysql的过程是一样的。 1.1 导入依赖 打开IDEA&#xff0c;进入Java项目&#xff0c;导入pom依赖&#xff0c;代码如下&#xff1a; <dependency><groupId>redis.clients</gro…

MySQL笔记--Ubuntu安装MySQL并基于C++测试API

目录 1--安装MySQL 2--MySQL连接 3--代码案例 1--安装MySQL # 安装MySQL-Server sudo apt install mysql-server# 设置系统启动时自动开启 sudo systemctl start mysql # sudo systemctl enable mysql# 检查MySQL运行状态 sudo systemctl status mysql# 进入MySQL终端 sudo…

视频剪辑技巧:批量合并视频,高效省时,添加背景音乐提升品质

随着社交媒体的兴起&#xff0c;视频制作越来越受到人们的关注。掌握一些视频剪辑技巧&#xff0c;可以让我们轻松地制作出令人惊艳的视频。本文将介绍一种高效、省时的视频剪辑技巧&#xff0c;帮助您批量合并视频、添加背景音乐&#xff0c;并提升视频品质。现在一起来看看云…

hadoop配置文件自检查(解决常见报错问题,超级详细!)

本篇文章主要的内容就是检查配置文件&#xff0c;还有一些常见的报错问题解决方法&#xff0c;希望能够帮助到大家。 一、以下是大家可能会遇到的常见问题&#xff1a; 1.是否遗漏了前置准备的相关操作配置&#xff1f; 2.是否遗的将文件夹(Hadoop安装文件夹&#xff0c;/dat…

社区牛奶智能售货机为你带来便利与实惠

社区牛奶智能售货机为你带来便利与实惠 低成本&#xff1a;社区牛奶智能货机的最大优势在于成本低廉&#xff0c;租金和人工开支都很少。大部分时间&#xff0c;货柜都是由无人操作来完成销售任务。 购买便利&#xff1a;社区居民只需通过手机扫码支付&#xff0c;支付后即可自…

2023年03月 Python(三级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python等级考试&#xff08;1~6级&#xff09;全部真题・点这里 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 十进制数111转换成二进制数是&#xff1f;&#xff08; &#xff09; A: 111 B: 1111011 C: 101111 D: 1101111 答案…

flink的安装与使用(ubuntu)

组件版本 虚拟机&#xff1a;ubuntu-20.04.6-live-server-amd64.iso flink&#xff1a;flink-1.18.0-bin-scala_2.12.tgz jdk&#xff1a;jdk-8u291-linux-x64.tar flink 下载 1、官网&#xff1a;https://flink.apache.org/downloads/ 2、清华镜像&#xff1a;https://mirr…

【Linux进行时】磁盘文件结构

磁盘 上篇文章&#xff0c;我们提及文件是存放在磁盘当中&#xff0c;本篇文件我们来了解一下磁盘的结构&#xff01;&#xff01;&#xff01; 磁盘的概念&#xff1a; ❓什么是磁盘&#xff1f; &#x1f4a1;磁盘&#xff08;disk&#xff09;是指利用磁记录技术存储数据…

图解系列--理解L3交换机的性能与功能

04.01 何为 L3 交换机 L3交换机是一种在L2 交换机的基础上增加了路由选择功能的网络硬件&#xff0c;能够通过基于ASIC 和 FPGA 的硬件处理高速实现网络功能和转发分组。L2 是指 OSI 参考模型中的L2, 也就是数据链路层。L2 交换机能够基于该层主要编址的 MAC 地址&#xff0c;…

【Linux】:重定向和用户缓冲区

重定向和用户缓冲区 一.输出重定向1.现象2.系统调用接口 二.缓冲区1.引子2.刷新 三.回答引例 文件描述符对应匹配规则&#xff1a;从0下标开始&#xff0c;寻找最小的没有被使用的数组位置&#xff0c;它就是新的文件描述符(fd)。 一.输出重定向 1.现象 在这里我们向1号文件内…

[C++进阶篇]STL中vector的使用

一、vector的介绍 1.vector的介绍 vector是表示可变大小数组的序列容器。vector也采用的连续存储空间来存储元素&#xff0c;就是可以采用下标对vector的元素进行访问&#xff0c;和数组一样。它的大小是可以动态改变的。 2.重要的接口组成 二、 vector迭代器的使用 2.1 ve…

Spring Boot 整合SpringSecurity和JWT和Redis实现统一鉴权认证

&#x1f4d1;前言 本文主要讲了Spring Security文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日一句&#xff1a;努力…