尚硅谷Flume(仅有基础)

q

1 概述

1.1 定义

        Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。 

        Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。

1.2 架构

1.2.1 Agent 

        Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。 
        Agent主要有3个部分组成,Source、Channel、Sink。 

1.2.2 Source   

        Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。 

1.2.3 Sink 

        Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 

1.2.4 Channel 

        Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink的读取操作。 
        Flume自带两种Channel:Memory Channel和 File Channel。 

        Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。 

        File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 

1.2.5 Event 

        传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

        Event由Header 和Body 两部分组成,Header用来存放该 event的一些属性,为K-V 结构,Body用来存放该条数据,形式为字节数组。

2 Flume基本操作

2.1 安装部署

http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-tar.gz

直接解压

将lib文件夹下的guava-11.0.2.jar删除以兼容 Hadoop 3.1.3

2.2 案例

2.2.1 监控端口数据官方案例

Flume 1.9.0 User Guide — Apache Flume

使用 Flume监听一个端口,收集该端口数据,并打印到控制台

(1)安装netcat工具 
[atguigu@hadoop102 software]$ sudo yum install -y nc 
(2)判断4444端口是否被占用 
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 4444
(3)创建Flume Agent配置文件flume-netcat-logger.conf 
(4)在flume目录下创建 job文件夹并进入job文件夹。 
[atguigu@hadoop102 flume]$ mkdir job 
[atguigu@hadoop102 flume]$ cd job/ 
(5)在job文件夹下创建 Flume Agent配置文件flume-netcat-logger.conf。 
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf 
(6)在flume-netcat-logger.conf文件中添加如下内容

# name the components on this agent 
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source 
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

原神启动  

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
or
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -flume.root.logger=INFO,console --conf/-c:表示配置文件存储在 conf/目录 --name/-n:表示给 agent 起名为 a1 --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logge参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。 

另一台nc启动

 nc localhost 4444

然后发消息

2023-10-25 14:03:53,633 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 30   

2.2.2 实时监控单个追加文件

实时监控 Hive 日志,并上传到HDFS中 

开启hadoop集群

start-all.sh

开启hive

/export/servers/hive/bin/hive --service metastore & nohup /export/servers/hive/bin/hive

vim flume-file-hdfs.conf

# Name the components on this agent 
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Describe/configure the source 
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log(要监控拉取的文件)# Describe the sink 
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop1:8020/flume/%Y%m%d/%H(这里的端口要和hadoop配置里hdfs的一样!!!!!!!!!)
#上传文件的前缀 
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹 
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹 
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位 
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳 
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才flush 到HDFS一次 
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memory 
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel 
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

  启动

bin/flume-ng agent -n a2 -c conf -f job/flume-file-hdfs.conf

ctrl+Z退出

在HDFS上查看文件。

2.2.3 实时监控目录下多个新文件 

使用Flume监听整个目录的文件,并上传至 HDFS 

         在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每 500毫秒扫描一次文件变动。 

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.confa3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /export/server/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Describe the sink 
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀:
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event 才flush 到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memory 
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel 
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

在/opt/module/flume 目录下创建upload文件夹 

向 upload文件夹中添加文件 

2.2.4 实时监控目录下的多个追加文件 

        Exec source适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 # Describe/configure the source 
a3.sources.r3.type = TAILDIR 
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups = f1 f2 
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* 
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.* # Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event 才flush 到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

Taildir 说明: 
  Taildir Source维护了一个 json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下

3 Flume 高级

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/172310.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国际腾讯云直播推流配置教程!

云直播的服务本质是一个广播的过程,类似于电视台的直播节目通过有线电视网发送给千家万户。为了完成这个过程,云直播需要有采集和推流设备(类似摄像头)、云直播服务(类似电视台的有线电视网)和播放设备&…

开源Linux社区Armbian开发指南

1. 什么是armbian Armbian是一个基于Debian或Ubuntu的开源操作系统,专门针对嵌入式ARM平台进行优化和定制。Armbian可以运行在多种不同的嵌入式设备上,例如树莓派、ArmSoM、香蕉派等等。Armbian针对不同的嵌入式平台,提供了相应的硬件支持&a…

PYTHON+CH341 3线SPI驱动UC1601 LCD实现汉字显示

前言 参考大佬用CH341驱动OLED,链接如下:GitHub - jimjiang2/ch341dll_wrap_typical_app: A ch341dll Wrap is for using in Python 32bits windows to access I2C SPI and MDIO (by GPIO), and Demo with display PC sreen on OLED by i2c or SPI . 本文主要实现了…

分享一款基于 AI 的 Chrome 插件

最近使用大模型比较多,公司虽然提供了免费的 ChatGPT 但是需要跳转特定页面才能访问,比较麻烦,于是就想到是否可以开发一款类似于有道词典一样的 Chrome 插件,可以在任意页面使用,虽然市面上也有类似的插件&#xff0c…

分布式消息队列:RabbitMQ(1)

目录 一:中间件 二:分布式消息队列 2.1:是消息队列 2.1.1:消息队列的优势 2.1.1.1:异步处理化 2.1.1.2:削峰填谷 2.2:分布式消息队列 2.2.1:分布式消息队列的优势 2.2.1.1:数据的持久化 2.2.1.2:可扩展性 2.2.1.3:应用解耦 2.2.1.4:发送订阅 2.2.2:分布式消息队列…

生成树协议:监控 STP 端口和交换机

什么是生成树协议 生成树协议 (STP) 用于网络交换机,以防止循环和广播风暴。在局域网 (LAN) 中,两条或多条冗余路径可以连接到同一网段。当交换机或网桥从所有可用端口传输帧时,这些帧开始在网…

1818_ChibiOS的计数信号量

全部学习汇总: GreyZhang/g_ChibiOS: I found a new RTOS called ChibiOS and it seems interesting! (github.com) 之前见过计数信号量,也是在FreeRTOS中看到的。也看到过这样的功能在驱动设计中的应用,但是当时没有理解这个使用的方式。 1.…

【STM32】标准库的引入

一、为什么要会有标志外设库 1、传统单片机软件开发方式 (1)芯片厂商提供数据手册、示例代码、开发环境 (2)单片机软件工程师面向产品功能,查阅数据手册,参考官方示例代码进行开发 (3)硬件操作的方式是用C语言对寄存器进行读写以操作硬件 (4)主要工作量…

指针仪表读数YOLOV8NANO

指针仪表读数YOLOV8 NANO 采用YOLOV8 NANO训练,标记,然后判断角度,得出角度,可以通过角度,换算成数据

Table-GPT:让大语言模型理解表格数据

llm对文本指令非常有用,但是如果我们尝试向模型提供某种文本格式的表格数据和该表格上的问题,LLM更有可能产生不准确的响应。 在这篇文章中,我们将介绍微软发表的一篇研究论文,“Table-GPT: Table- tuning GPT for Diverse Table…

Linux系统之watch命令的基本使用

Linux系统之watch命令的基本使用 一、watch命令介绍二、watch命令的使用帮助2.1 watch命令的help帮助2.2 watch命令的语法解释 三、watch命令的基本使用3.1 使用默认的2秒时间间隔执行ls命令3.2 每隔10秒执行一次ps命令3.3 每隔1秒输出一次磁盘使用情况3.4 高亮显示grep命令的输…

Springboot 使用JavaMailSender发送邮件 + Excel附件

目录 1.生成Excel表格 1.依赖设置 2.代码: 2.邮件发送 1.邮件发送功能实现-带附件 2.踩过的坑 1.附件名中文乱码问题 3.参考文章: 需求描述:项目审批完毕后,需要发送邮件通知相关人员,并且要附带数据库表生成的…

京东平台数据分析:2023年9月京东空气净化器行业品牌销售排行榜

鲸参谋监测的京东平台9月份空气净化器市场销售数据已出炉! 9月份,空气净化器的销售同比上年增长。根据鲸参谋平台的数据显示,今年9月,京东平台空气净化器的销量将近15万,同比增长约1%;销售额将近2亿元&…

C++多态(超级详细版)

目录 一、什么是多态 二、多态的定义及实现 1.多态构成条件 2.虚函数的重写和协变 虚函数重写的两个例外: 2.1协变 2.2析构函数的重写 (析构函数名统一处理成destructor) 3.重载、覆盖(重写)、隐藏(重定义)的对比 4.final 和 overr…

【计算机网络笔记】DNS报文格式

DNS 提供域名到主机IP地址的映射  域名服务的三大要素:  域(Domain)和域名(Domain name): 域指由地 理位置或业务类型而联系在一起的一组计算机构 成。  主机:由域名来标识。域名是由字符和(或&a…

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载 1,安装anaconda1.1 下载anaconda安装包1.2 安装anaconda1.3 设计环境变量1.4 安装完成验证 2 Anaconda安装pytorch2.1 创建虚拟环境2.2 查看现存环境2.3 激活环境2.4 选择合适的pytorch版本下…

Python:实现日历到excel文档

背景 日历是一种常见的工具,用于记录事件和显示日期。在编程中,可以使用Python编码来制作日历。 Python提供了一些内置的模块和函数,使得制作日历变得更加简单。 在本文,我们将探讨如何使用Python制作日历,并将日历输出到excel文档中。 效果展示 实现 在代码中会用到cale…

FFmpeg5.1.3编译动态库踩坑之旅(基于Linux虚拟机)

准备工作 环境准备 1.Windows安装Oracle VM VirtualBox 7.0.10,安装ubuntu-22.04.3。 坑一:无法往虚拟机里拖放复制文件,解决办法:登录Ubuntu虚拟机时切换到xorg方式登录,参考地址:Ubuntu Desktop 22.04…

软考系统架构之案例篇(架构设计相关概念)

案例篇-架构设计相关概念 1. 架构风格的概念2. 五大架构风格有哪些3. MVC架构含义4. 云计算架构5. 云原生架构设计原则6. ESB的主要功能包括7. 质量属性的含义及其设计策略8. EJB中的 Bean 分三种类型9. 风险点、敏感点、权衡点的含义10. REST 的5个原则 其它相关推荐&#xff…