目录
- agent特性
- ChannelSelector
- 描述:
- SinkProcessor
- 描述:
- 串联架构
- 结构图解
- 定义与描述
- 配置示例
- Flume1(监测端node1)
- Flume3(接收端node3)
- 启动方式
- 复制和多路复用
- 结构图解
- 定义描述
- 配置示例
- node1
- node2
- node3
- 启动方式
- 聚合架构
- 结构图解
- 定义描述
- 示例
- node1
- node2
- node3
agent特性
ChannelSelector
ChannelSelector
是Flume中的一个关键组件,负责根据特定逻辑决定Event的流向。
名称 | 类型 | 描述 |
---|---|---|
ReplicatingSelector | ChannelSelector类型 | 将同一个Event复制并发往所有配置的Channel |
MultiplexingSelector | ChannelSelector类型 | 根据预设的规则或条件,将不同的Event分发至不同的Channel |
描述:
ReplicatingSelector
会无条件地将每个Event发送到与其关联的所有Channel中,实现事件复制。MultiplexingSelector
则基于某种规则(如Event中的特定字段、时间戳等)来将Event分发到不同的Channel,实现事件的多路复用。
SinkProcessor
SinkProcessor
是Flume中负责处理Sink中Event的组件,它决定了Event如何被发送和处理。
名称 | 类型 | 描述 |
---|---|---|
DefaultSinkProcessor | SinkProcessor类型 | 对应于单个Sink,直接处理并发送Event至该Sink |
LoadBalancingSinkProcessor | SinkProcessor类型 | 对应于Sink Group,实现负载均衡,将Event分发至多个Sink中处理 |
FailoverSinkProcessor | SinkProcessor类型 | 对应于Sink Group,提供错误恢复功能,当主Sink失败时自动切换至备用Sink |
描述:
DefaultSinkProcessor
是最基础的Sink处理器,直接与单个Sink关联,负责将Event发送至该Sink。LoadBalancingSinkProcessor
用于处理Sink Group,能够智能地将Event分发至多个Sink中,以实现负载均衡,提高处理效率。FailoverSinkProcessor
同样用于处理Sink Group,但它提供了错误恢复机制。当主Sink因故障无法工作时,它会自动将Event发送至备用Sink,以确保数据的连续性和可靠性。
串联架构
结构图解
Avro Sink作为Avro客户端,向Avro服务端发送Avro事件。它允许Flume Agent将数据以Avro格式序列化后,发送到指定的Avro Source或其他Avro客户端。
定义与描述
这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。
配置示例
Flume1(监测端node1)
Flume1(node1),监听node1上的44444端口(source),并输出到node3的10086端口上(sink)
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node1
# port,监听的端口
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = avro
# 指定 Avro Sink 发送数据的目标主机名和端口号
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 10086# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume3(接收端node3)
Flume3(node3),监听node3上的10086端口(source)(当然source内容是来自node1的44444端口的变化情况),输出一般的控制台内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
# 监听的来自node3上的source,source类型为avro
a1.sources.r1.type = avro
a1.sources.r1.bind = node3
# port,监听的端口
a1.sources.r1.port = 10086# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动方式
先启动node3(flume3),node3的监听是串行的最后一环,从后向前依次启动
理由:
先启动node3的监听(此时node1还未启动),再启动node1,此时可以保证没有任何内容错过
复制和多路复用
结构图解
定义描述
Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。详细可以参考上面的Agent ChannelSelector和SinkProcessor
配置示例
此部分示例会按照如上的结构图进行配置
node1
replicating_channel.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 这个selector是复制类型的。
# 复制selector会将接收到的每个事件复制到所有配置的channel中。
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# avro类型的sink,发送给下一个agent
# sink k1的参数配置
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node2
a1.sinks.k1.port = 10010# sink k2的参数配置
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node3
a1.sinks.k2.port = 10010# channel c1的参数配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# channel c2的参数配置
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
node2
接收node1,并输出到hdfs中,hdfs的参数配置:flume——hdfs
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# avro类型的source,接收来自上一个agent的sink输出
a2.sources.r1.type = avro
# 这个source来自于node2节点的10010端口
a2.sources.r1.bind = node2
a2.sources.r1.port = 10010# 传输至hdfs中
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = /flume2/%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 2
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
node3
接收node1,并输出到日志
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10010# Describe the sink
a3.sinks.k3.type = logger# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
启动方式
先启动node2(flume2)、node3(flume3),在启动node1(flume1)
理由:
同上,请注意,无论何种架构,都应到先启动最末端的接收,再启动发送
聚合架构
结构图解
定义描述
最常见实用的结构模式。
日常web应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
示例
node1
发送端1,输出到node3的10000端口
没什么需要特别注明的地方,关键节点已经在前面描述了,建议直接复制代码,GPT检查
[root@node1 jobs]# vim agg1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 10000# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
node2
发送端2,输出到node3的10000端口
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source端的netcat是一个数据接收服务
a2.sources.r1.type = netcat
a2.sources.r1.bind = node2
a2.sources.r1.port = 10000# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = node3
a2.sinks.k1.port = 10000# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
node3
最末的接收端,监听10000端口即可,前面两个节点会发送内容到此端口
[root@node3 jobs]# vim agg3.conf
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = node3
a3.sources.r3.port = 10000# Describe the sink
a3.sinks.k3.type = logger# Describe the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3