本文图片来自于8.flume实时监控文件hdfs sink使用演示_哔哩哔哩_bilibili
Apache Flume 的启动过程及其配置文件和脚本
在官网下载的Flume的压缩包中,.lib文件有大量的jar包,按道理说只有.lib文件就可以运行Flume程序了。只不过需要java -jar命令还要加非常多的参数,这时.conf配置文件夹和.bin启动脚本命令夹就有用了:.bin中的命令本质上就是对于java -jar命令一系列参数的封装,给.bin命令传递参数,它会进一步变成含有参数的java-jar命令,进而启动jar包中的进程。然后一些参数的设置会在.conf中,和.bin配合执行,.bin脚本会读配置文件中的信息。
Flume 启动过程
依赖库(.lib 文件夹):Flume 分发包中的
lib
目录包含了 Flume 运行所需的所有 Java 库(.jar
文件)。这些库包括 Flume 自身的代码、用于处理不同数据源和目的地(如 Kafka、HDFS)的插件,以及其他必要的支持库(如日志库、网络通信库等)。配置文件(.conf 文件夹):Flume 的配置文件通常存放在
conf
目录中。这些.conf
配置文件包含了定义 Flume Agent 的各种属性和参数,如使用的 Source、Channel 和 Sink 类型及其具体配置。这些配置决定了 Flume 如何收集、传输和存储数据。启动脚本(.bin 文件夹):
bin
目录包含了用于启动 Flume 服务的脚本。这些脚本简化了启动过程,因为手动通过java -jar
命令启动 Flume 不仅需要指定 Flume 应用程序的主.jar
文件,还需要包含lib
目录下所有依赖库的路径,以及必要的运行参数和 JVM 选项。
启动脚本的作用
启动脚本(例如,flume-ng
脚本)的主要作用是:
- 设置类路径(Classpath):脚本会自动将
lib
目录下的所有.jar
文件添加到 Java 类路径中,确保 Flume 运行时可以找到所有必要的类和库。- 读取配置文件:脚本允许用户通过命令行参数指定配置文件(位于
conf
目录)。脚本将确保 Flume 启动时使用正确的配置。- 简化命令行参数:用户只需要提供简单的命令行参数(如要使用的配置文件名和 Flume Agent 的名称),而无需手动指定所有
.jar
文件和其他复杂的 JVM 参数。- 定制 JVM 选项:脚本还可以包含默认的或自定义的 JVM 启动选项,如内存设置,这对于优化 Flume 的性能和行为非常重要。
实际运行
实际上,当运行 Flume 的启动脚本时(如 bin/flume-ng agent --conf conf --conf-file example.conf --name a1
),脚本会解析这些参数,并构建一个完整的 java
命令,其中包括所有必要的类路径设置、JVM 选项和 Flume 的主类。这个命令最终会启动一个 JVM 实例,其中运行 Flume Agent,根据指定的 .conf
文件中的配置来执行数据收集和传输任务。
注意事项
这老师讲的太好了6.flume查看源码的基础方法_哔哩哔哩_bilibili,一些注意点:
- Flume的配置文件是很重要的,基本上就是在配置文件上配置信息,然后直接启动Flume即可。
- 一个sink只能接收一个channel,但是一个Source可以连接多个channel
- 监控本地文件,在启动Flume之前一定确保文件存在,否则报错无法正常启动。
断点续传
Apache Flume 提供了一种机制,允许在处理文件时实现断点续传功能,这在使用如 Spooling Directory Source
来监控并读取本地文件时尤为重要。这种机制确保了即使 Flume 实例发生故障或重启,也能从上次停止的位置继续读取文件,避免了数据重复或丢失。
实现断点续传的关键:Position File
Flume 通过维护一个偏移量文件(position file)来实现断点续传。这个偏移量文件记录了 Flume 读取每个文件的当前位置,如下图所示。
-
文件偏移量(pos):表示 Flume 在文件中的读取位置。每次 Flume 读取一部分数据后,它会更新这个偏移量,以反映下一次读取应该开始的位置。
-
位置文件(Position File):是一个特殊的文件,Flume 用它来存储每个正在被监控和读取的文件的偏移量。通过这种方式,即使 Flume 服务重启,它也能通过读取位置文件来确定上次读取停止的位置,并从那里继续读取数据。
positionFile原本是默认创建新目录的,为了方便可以修改配置文件直接更换positionFile的位置,这个文件中记录了监控的文件,每一个都包含绝对目录和inode文件唯一标识,pos是核心表示字节偏移量,根据这个偏移量实现断点续传的位置搜索,所以如果将文件改名,或者使用vim编辑(vim是先将原本文件删除然后再新建一个文件),Flume都会重新读取文件的所有内容。此时断点续传所依赖的positionfile中的文件inode会不一样,(如果是改名的话inode和file路径都会不一样)表示新的文件。
工作流程
启动:Flume 启动时,
Spooling Directory Source
会检查其位置文件,以确定每个文件的读取位置。读取和更新:当 Flume 从一个文件中读取数据时,它会实时更新该文件在位置文件中对应的偏移量。这确保了 Flume 始终知道下一次应从哪里开始读取。有点向Kafka消费者的消费位置。
故障和重启:如果 Flume 发生故障或被重启,
Spooling Directory Source
会再次读取位置文件,并根据文件中的偏移量信息继续读取数据,从而实现断点续传。
优势和局限
数据一致性:这种机制最大的优势是提高了数据处理的一致性和可靠性,确保数据不会因为系统故障而丢失或重复处理。
局限:值得注意的是,这种机制依赖于位置文件的准确性和完整性。如果位置文件被损坏或丢失,Flume 可能无法正确地继续之前的读取过程,可能会导致数据重复或遗漏。因此,保证位置文件的安全和完整性是实现断点续传机制的重要条件。
事务
Apache Flume 在处理数据流时采用了事务机制来保证数据的可靠性。在 Flume 的架构中,数据从 Source 流向 Channel,再从 Channel 流向 Sink。为了保证数据不会因为处理过程中的失败而丢失,Flume 对这个流程中的数据传输采用了事务控制。具体来说,Flume 中有两种基本的事务类型:put
事务和 take
事务。理解这两种事务的工作方式对于掌握 Flume 的数据一致性和可靠性保证机制至关重要。
Put 事务
-
定义:
put
事务是指 Source 将数据放入(put)Channel 的过程。在这个过程中,数据的传输被包裹在一个事务中。 -
事务提交失败与数据回滚:当 Source 尝试将数据放入 Channel 时,如果 Channel 已满(即没有足够的空间接收更多的数据),则
put
事务会提交失败。在这种情况下,事务会被回滚,这意味着数据不会被放入 Channel,并且 Source 需要重新尝试这个操作。这里的“回滚”指的是事务的回滚,而不是数据本身被回滚或删除。换句话说,事务的回滚意味着尝试放入数据的操作被取消,数据保持在 Source 端,等待下一次尝试。 -
重试机制:在实践中,Source 会不断重试
put
事务,直到 Channel 有足够的空间接收新数据为止。这个过程确保了即使在短暂的系统压力或资源限制下,数据也不会丢失,只是其传输可能会延迟。
具体而言,Flume 会根据 Source 的配置决定重试机制:
- 持久化 Source:如果 Source 是持久化的(比如监控文件系统的 Source),则数据不会丢失,因为它仍然保存在源系统中(如文件中)。Flume 会在稍后重试
put
操作,这通常涉及等待直到 Channel 有足够空间可用。- 非持久化 Source:如果 Source 不是持久化的(比如接收内存中数据流的 Source),则在
put
事务持续失败的情况下,数据可能会丢失,因为没有其他地方存储这些数据直到 Channel 可用。
Take 事务
-
定义:
take
事务是指 Sink 从 Channel 中取出(take)数据的过程。同样,这个过程也被包裹在一个事务中,以确保数据的可靠传输。 -
事务确认与回滚:如果在尝试处理(take)数据时成功,Sink 将提交事务,确认数据被成功处理。如果处理过程中遇到问题(比如目的地无法写入数据),则事务会被回滚,这意味着数据会保留在 Channel 中,Sink 需要重新尝试这个操作。
在 Flume 中,“数据回滚”实际上是指“事务回滚”,意味着由于某些原因(如资源限制),尝试的数据传输操作(无论是 put 还是 take)被取消,数据保持在原位置不变,等待重新操作。这种事务机制确保了 Flume 能够在面对系统故障或资源瓶颈时,通过重试保证数据不丢失,最终可靠地完成数据传输任务。
事务中的事件
在 Apache Flume 中,一个事务可以包含多个事件(Event),其数量由配置中的参数决定。对于如何配置一个事务中可以处理的事件数量,这主要取决于使用的 Channel 类型以及相关的配置参数。每种 Channel 类型(如 Memory Channel、File Channel 等)都有自己的特定参数,用于控制事务的行为,包括一个事务中可以包含的事件数量。
Memory Channel
对于 Memory Channel,有两个主要的配置参数影响一个事务中可以处理的事件数量:
capacity
:这个参数设置了 Memory Channel 总的容量,即它可以存储的最大事件数。transactionCapacity
:这个参数设置了每个事务最多可以包含的事件数量。这直接影响了每次put
或take
操作可以处理的最大事件数。
例如,如果你设置 transactionCapacity
为 100,那么每个 put
或 take
事务最多处理 100 个事件。
File Channel
对于 File Channel,关键的配置参数包括:
capacity
:定义了 File Channel 可以持有的最大事件数。transactionCapacity
:和 Memory Channel 类似,这个参数定义了每个事务可以处理的最大事件数。
配置示例
对于 Memory Channel,配置可能如下所示:
agent.channels.memChannel.type = memory agent.channels.memChannel.capacity = 10000
agent.channels.memChannel.transactionCapacity = 100
这意味着 Memory Channel 的总容量为 10000 个事件,而每个事务最多可以处理 100 个事件。
默认参数
对于 Memory Channel 和 File Channel,如果没有明确指定 transactionCapacity
,它们的默认值可能因 Flume 的版本和具体实现而异,因此查阅当前使用版本的 Flume 文档是确定这些默认值的最佳方式。
Flume Agent 内部原理
ChannelSelector 概述
Flume 的 ChannelSelector
是用于决定一个事件 (Event
) 应该被发送到哪一个或哪些 Channel
的组件。
ChannelSelector 的类型
-
ReplicatingSelector:这种类型的
ChannelSelector
会将接收到的每个事件复制并发送到所有配置的Channel
上。这主要用于数据的冗余存储,当你希望同一份数据被送到多个目的地进行处理或备份时非常有用。 -
MultiplexingSelector:
MultiplexingSelector
允许基于事件的某些属性(通常是事件头部的键值对)来决定事件应该被发送到哪个Channel
。这种选择器支持更复杂的路由策略,比如基于事件类型或来源将数据发送到不同的处理管道。
Channel 是分布式管道集群吗?
不完全是。在 Flume 中,Channel
更像是连接 Source
(数据来源)和 Sink
(数据目的地)的内部队列或缓冲区。每个 Channel
都独立存在,负责暂存数据直到数据被 Sink
处理。尽管 Channel
可以在不同的 Flume Agent 上分布运行,但每个 Channel
通常是指单个实例中的一个组件,而不是一个分布式系统中的多个组件集群。
ChannelSelector之一:MultiplexingSelector
在多个channel的情况下,由于一个sink只能连接一个channel,所以使用MultiplexingSelector本质上就是根据事件的性质来为不同的sink(即不同的目的地)发送事件。
将
MultiplexingSelector
描述为一种“多路复用”确实可能引起一些混淆,尤其是在通常理解的网络或通信协议中的多路复用概念上。在那种情况下,“多路复用”通常指的是在单一通信通道上同时传输多个信号或数据流的技术,以提高效率或带宽利用率。
在 Apache Flume 的上下文中,MultiplexingSelector
的使用确实更接近于一个“分类选择器”或“路由器”。它的主要作用是基于事件的属性(例如,事件头部的键值对)来决定事件应该发送到哪个或哪些 Channel
,从而实现对事件流的逻辑分割或路由到不同的处理流程。这种机制允许不同类型的事件被处理和存储在不同的目的地。
多路复用的命名
-
逻辑上的多路复用:虽然 Flume 中的多路复用不增加单个
Channel
的带宽或处理能力,它通过允许单个Source
向多个Channel
分发事件,实现了逻辑上的多路复用。这样,基于事件特性的不同处理逻辑可以并行运行,而无需为每种事件类型配置独立的Source
。 -
效率提高的另一层意义:虽然直观上看似乎没有提高传输效率,但通过允许更细粒度的数据流管理和处理,
MultiplexingSelector
实际上提高了整个数据处理流程的效率和灵活性。它使得数据可以根据需求被送往最适合处理该数据的Sink
,从而优化了资源的使用,例如,可以避免将所有数据都通过一个高成本的处理流程。
实践意义
在实践中,MultiplexingSelector
允许开发人员和系统设计者基于数据内容或类型实现复杂的数据路由策略,例如:
- 将特定类型的日志发送到专门的分析系统。
- 根据事件严重性级别将事件分发到不同的存储或处理队列。
- 实现数据分流,将部分数据送往实时处理系统,而将其他数据归档存储。
作者总结
这种逻辑上的多路复用,其实就是在同一个Source源数据路径中,实现不同性质或者不同处理逻辑的事件同时传输,这种复用相当于将Selector和多个Channel结合形成多种逻辑上的Source源数据路径,但其实只有一个Source源数据路径。实现了数据按需分发,极大地增强了大数据处理和日志收集系统的能力。
其实这个MultiplexingSelector是最能体现Event事件中Header的作用的组件了,因为有了Header,就可以通过Selector选择最合适的Sink,这样的话Sink对于事件的一致性处理逻辑会减少很多,也在变相地优化了资源的使用!
实际应用场景
考虑一个实际的应用场景:一个系统产生了多种类型的日志,包括系统日志、应用日志和访问日志。这些日志被发送到同一个 Flume
Source
,通过在事件的Header
中标识日志类型,MultiplexingSelector
可以将它们路由到专门处理这三种日志的不同Sinks
—— 例如,系统日志被存储于 HDFS 以便长期分析,应用日志被发送到一个实时监控系统,而访问日志则被归档到外部存储以供未来审计。
通过这样的设计,不仅优化了资源的使用,也确保了数据处理流程的清晰和高效,同时减轻了后续处理环节的负担,因为每个 Sink
都明确知道自己将处理哪种类型的数据,不需要再做额外的数据分类处理。这种基于 Header
的智能路由策略是 Flume 架构灵活性和强大功能的一个重要体现。
本地数据即时同步到HDFS就用Flume
使用 Apache Flume 同步本地数据到 HDFS 是一个常见且有效的用例。Flume 提供了灵活的配置选项来适应不同的数据流场景,包括文件滚动策略,这对于避免在 HDFS 上产生大量小文件非常关键。HDFS 中的小文件问题是因为每个文件在 NameNode 上都会占用内存来维护文件系统的命名空间和块映射信息,过多的小文件会消耗大量的 NameNode 内存资源,影响其性能。
配置参数调整
为了解决小文件问题,可以调整以下几个与文件滚动相关的 Flume 配置参数:
-
rollSize
:这个参数设置了触发滚动(创建新文件)之前,一个文件可以达到的最大大小(以字节为单位,通常在HDFS中设置为128MB的字节数量)。增加这个值可以减少生成的文件数量,但也可能导致数据在写入 HDFS 之前在 Flume 中停留更长时间。 -
rollCount
:这个参数设置在触发滚动之前,一个文件中可以包含的最大事件(Event
)数。调整这个值也可以帮助控制文件数量和大小。 -
rollInterval
:这个参数设置了滚动的时间间隔(以秒为单位)。设置为0禁用基于时间的滚动,可能有助于减少由于时间间隔到达而生成的小文件数量,尤其是在数据流量较低时。
示例配置
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://namenode/path/to/data
agent.sinks.hdfsSink.hdfs.filePrefix = events-
agent.sinks.hdfsSink.hdfs.rollSize = 134217728 # 128MB
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollInterval = 0
在这个例子中,hdfsSink
配置为当文件大小达到128MB时进行滚动,禁用了基于事件数量和时间间隔的滚动,这有助于减少小文件的生成。但是在实际开发过程中,基本上只需要设置事件间隔的滚动,通常为3600s即1h。而不基于事件数量,将其设置为0。原因如下:
在处理大数据流时,控制文件的大小或写入间隔比控制每个文件的事件数量通常更有意义。同时,在许多情况下,数据源的事件产生率是变化的,基于时间(rollInterval
)的滚动提供了一种简单的机制来周期性地生成文件,而不需要根据数据流的变化动态调整事件计数阈值(rollCount
)。这简化了配置管理,尤其是在数据量大且变化不定的生产环境中。
注意事项
- 数据延迟:增加文件大小或事件计数的阈值可能会导致数据在达到滚动条件之前在内存中保留更长时间,这可能会稍微增加数据到达 HDFS 的延迟。
- 资源管理:确保 Flume Agent 有足够的资源(如内存)来处理较大的滚动文件大小。
通过合理配置这些参数,可以有效地减轻 HDFS 小文件问题,同时保持数据同步的及时性和准确性。不过,最佳的配置参数取决于具体的使用场景,包括数据产生的速率、预期的延迟要求以及 HDFS 集群的容量,可能需要一些实验和调整来找到最优的平衡点。