Flume最简单使用

文章目录

    • 一、简介
      • 1、定义
      • 2、基础架构
    • 二、快速入门
      • 1、解压Flume
      • 2、案例一:监控端口号
      • 3、案例二:将空目录下文件
    • 三、Flume进阶
      • 1、Flume事务
      • 2、Flume Agent内部原理
      • 3、案例一:监控日志
      • 4、案例二:多路复用和拦截器适应
        • 4.1 原理
        • 4.2 代码编写
        • 4.3 编写配置文件
        • 4.4 测试
      • 5、案例三:聚合
    • 四、Flume数据流监控
      • 1、Ganglia简介
      • 2、部署规划
      • 3、修改配置文件:`hadoop101`
      • 4、修改配置文件`3台`
      • 5、启动服务
      • 3、测试


一、简介

1、定义

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

Flume基于流式架构,灵活简单。

  1. Flume官网地址:http://flume.apache.org/
  2. 文档查看地址:http://flume.apache.org/FlumeUserGuide.html
  3. 下载地址:http://archive.apache.org/dist/flume/

在这里插入图片描述

2、基础架构

Flume组成架构如下图所示:

在这里插入图片描述

  • Agent

Agent:Flume的部署单元,本质是一个JVM进程,Agent内部是以事件的形式将数据从源头送至目的。

组成:Agent主要有3个部分组成,Source、Channel、Sink。

  • Source

Source:是负责接收数据到Flume Agent的组件。
特点:Source组件可以处理各种类型、各种格式的日志数据,
Source组件类型:

  1. avro:本质是RPC框架,支持跨语言、跨平台的数据传输,avro Source在flume中多用于Agent的连接。
  2. netcat:本质是Linux下的端口类工具,netcat Source在Flume中用于采集端口传输的数据。
  3. exec:支持执行命令的,并将命令执行后的标准输出作为数据采集,多用于采集一个可追加文件。
  4. spooling directory:支持对一个目录进行监听,采集目录中一个或多个新生成的文件数据。
  5. taildir:支持对多个目录进行监听,采集一个或多个目录下的一个或多个可追加文件,支持断点续传。
  6. 除此之外还有:thrift、jms、sequence generator、syslog、http、自定义Source。
  • Sink

Sink:是负责发送数据到外部系统的Flume Agent的组件。
特点:Sink组件不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量的、事务的写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink组件类型:

  1. logger:logger Sink组件则是将数据写到成Flume框架的运行日志中,配合运行参数-Dflume.root.logger=INFO,console可以将Flume运行日志(其中就包含了采集的数据)输出到控制台,多用于测试环境。
  2. hdfs:hdfs Sink组件是负责将数据传输到HDFS分布式文件系统中。
  3. avro:avro Sink组件配合avro Source组件可以实现Agent的连接。
  4. file:file Sink组件是将采集到的数据直接输出到本地文件系统中,即linux的磁盘上。
  5. 除此之外还有:thrift、ipc、HBase、solr、自定义Sink。
  • Channel

**Channel:**是负责暂存数据的,是位于Source和Sink组件之间的缓冲区。
特点:

  1. 由于Channel组件的存在,使得Source和Sink组件可以运作在不同的速率上。
  2. Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。

Flume自带两种Channel:

  1. Memory Channel:基于内存的队列存储事件,适用于对数据安全性要求不高的场景。 快,不安全
  2. File Channel:基于磁盘存储事件,宕机数据不丢失,适用于对数据安全敏感度高的场景。 慢,安全
  • Event

Event: agent中的事件,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。
特点: Event由Header和Body两部分组成,

  1. Header:用来存放该event的一些属性,为K-V结构。
  2. Body:用来存放该条数据,形式为字节数组。

二、快速入门

1、解压Flume

wget https://gitcode.net/weixin_44624117/software/-/raw/master/software/Linux/Flume/apache-flume-1.9.0-bin.tar.gz

解压文件

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

修改文件目录名称

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume-1.9.0

将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

rm /opt/module/flume-1.9.0/lib/guava-11.0.2.jar

2、案例一:监控端口号

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

在这里插入图片描述

安装netcat工具(发送Http请求)

sudo yum install -y nc

判断44444端口是否被占用

sudo netstat -nlp | grep 44444

在Flume目录下,创建job任务

cd /opt/module/flume-1.9.0
mkdir -p job/simpleCase
cd /opt/module/flume-1.9.0/job/simpleCase

添加配置文件

注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html

vim flume-1-netcat-logger.con
#Name the components on this agent
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

解释:

#Name the components on this agent
a1.sources = r1                                      # 为a1的Source组件命名为r1,多个组件用空格间隔
a1.sinks = k1                                        # 为a1的Sink组件命名为k1,多个组件用空格间隔
a1.channels = c1                                    # 为a1的Channel组件命名为c1,多个组件用空格间隔# Describe/configure the source
a1.sources.r1.type = netcat                      # 配置r1的类型
a1.sources.r1.bind = localhost                  # 配置r1的绑定地址(注意localhost和hadoop102的区别)
a1.sources.r1.port = 44444                       # 配置r1的监听端口# Describe the sink
a1.sinks.k1.type = logger                        # 配置k1的类型为logger,输出到控制台# Use a channel which buffers events in memory
a1.channels.c1.type = memory                    # 配置c1的类型为memory
a1.channels.c1.capacity = 1000                 # 配置c1的容量为1000个事件
a1.channels.c1.transactionCapacity = 100     # 配置c1的事务容量为100个事件# Bind the source and sink to the channel
a1.sources.r1.channels = c1                    # 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1                        # 配置k1的channel属性,指定k1连接到那个channel

运行Flume监听端口

#	方式一:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console
#	方式二:bin/flume-ng agent -c conf/ -n a1 -f job/simpleCase/flume-1-netcat-logger.conf -Dflume.root.logger=INFO,console

参数说明:

  • --conf/-c:表示配置文件存储在conf/目录
  • --name/-n:表示给agent起名为a1
  • --conf-file/-f:指定读取的配置文件是在job/simpleCase文件夹下的flume-1-1netcat-logger.conf文件。
  • -Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

**测试:**在Hadoop101上启动nc,并发送请求

nc localhost 44444
hello
world
hello world

在这里插入图片描述

3、案例二:将空目录下文件

Source选择:

  • Exec source:适用于监控一个实时追加的文件,不能实现断点续传;
  • Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
  • Taildir Source:适合用于监听多个实时追加的文件,并且能够实现断点续传。

案例需求:

  • 使用Flume监听整个目录的实时追加文件,并上传至HDFS。

在这里插入图片描述

创建配置文件

cd /opt/module/flume-1.9.0/job/simpleCase
vim flume-2-taildir-hdfs.conf

配置文件

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a2.sources.r1.filegroups = f1 f2
a2.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/datas/tailCase/files/.*file.*
a2.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/datas/tailCase/logs/.*log.*# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flume/tailDir/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = tail-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream) 
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

启动监控

cd /opt/module/flume-1.9.0
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simpleCase/flume-2-taildir-hdfs.conf

测试

新建受监控目录

mkdir -p datas/tailCase/files
mkdir -p datas/tailCase/logs

/opt/module/flume/datas/目录下创建tailCase/files文件夹向files文件夹下文件追加内容

测试/opt/module/flume-1.9.0/datas/tailCase/files/.*file.*

#	当前目录下会上传file的文件
cd /opt/module/flume-1.9.0/datas/tailCase/filestouch file1.txt
echo I am file1 >> file1.txt
touch log1.txt
echo I am log1 >> log1.txt

测试: /opt/module/flume-1.9.0/datas/tailCase/logs/.*log.*

#	当前目录下,会上传 log的文件
cd /opt/module/flume-1.9.0/datas/tailCase/logs
touch file2.txt
echo I am file2 >> file2.txt
touch log2.txt
echo I am log2 >> log2.txt

文件上传到HDFS上

在这里插入图片描述

断点续传监控

关掉flume采集程序,对logs/和files/下文件追加,再开启flume采集程序,验证flume的断点续传。

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/datas/tailCase/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/datas/tailCase/logs/log2.txt"}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

三、Flume进阶

1、Flume事务

在这里插入图片描述

在Flume中一共有两个事务

  • Put事务:在Source组件和Channel组件之间,保证Source组件到Channel组件之间数据传递的可靠性。
  • take事务:在Channel组件和Sink组件之间,保证channel组件到Sink组件之间数据传输的可靠性。

Put事务流程

  1. source组件采集外部数据到agent内部,并且将数据包装为事件。
  2. source组件开始将事件传输到Channel组件中。
  3. 首先,会开启事务,在事务内部,通过doPut方法将一批数据放入到putlist中存储。
  4. 之后,调用doCommit方法,把putList中的所有Event放到Channel中,成功之后就清空putList。

失败重试机制

  • putList在像channel中发送数据前会先检查channel中的容量是否放得下,放不下一个都不会放,调用doRollback
  • 调用doRollback方法后,doRollback方法会进行两步操作:
    • 将putList清空。
    • 抛出ChannelException异常。
  • source组件会捕捉到doRollback抛出的异常后,source就将刚才的一批数据重新采集,然后就开启一个新的事务。
  • 数据批的大小取决于Source组件的配置参数batch size的值。
  • putList的大小取决于Channel组件的配置参数transactionCapacity的值(capacity参数是指Channel的容量)。

Take事务流程

  1. Sink组件不断的轮询Channel,当其中有新的事件到达时,开启take事务。
  2. take事务开启后,会调用doTake方法将Channel组件中的Event剪切到takeList中。
  3. 当takeList中存放了batch size数量的Event之后,就会调用doCommit方法。
  4. doCommit方法中,首先会将数据写出到外部系统,成功后就会清空takeList。
  5. 当事务失败时,就会调用doRollback方法来进行回滚,就是将takeList中的数据原封不动的还给channel。

2、Flume Agent内部原理

在这里插入图片描述

组件名称概述组件包含类型特点
ChannelSelector选出Event将要发到那个channelReplication Channel Selector复制,默认选项
Multiplexing Channel Seletctor多路复用
SinkProcessor通过配置不同类型的SinkProcess实现不同的功能DefaultSinkProcessor单个Sink,默认
LoadBalancingSinkProcessor负载均衡
FailoverSinkProcessor故障转移

执行流程

  1. Source组件采集外部数据到agent内部,并包装为Event
  2. 然后,将事件发送到ChannelProcessor中,
    • 通过拦截器链中每个拦截器的拦截过滤,符合要求的Event会返回到ChannelProcessor中。
    • 在通过ChannelSelector,根据不同的选择器来决定Event去往哪个Channel,然后返回到ChannelProcessor。
  3. 开启Put事务,将批量的Event发送到Channel中
  4. 更具SinkProcessor组件配置的类型不同,实现相应的功能(负载均衡或故障转移),最终都会且同一时刻只能有一个Sink去拉取数据。
  5. Sink组件不断的轮询Channel,当有新的Event到达Channel时,向外部系统写出。

3、案例一:监控日志

需求:

  • 使用Flume-1监控文件变动。
  • Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。
  • 同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

在这里插入图片描述

模拟日志文件:在/opt/module/flume/datas/目录下创建模拟日志文件realtime.log

mkdir -p /opt/module/flume-1.9.0/datas
touch /opt/module/flume-1.9.0/datas/realtime.log

复制的配置文件:在/opt/module/flume/job目录下创建enterprise/copy文件夹

mkdir -p /opt/module/flume-1.9.0/job/enterprise/copy
  • Source:flume-1-exec-avro.conf
  • Sink:``flume-2-avro-hdfsflume-3-avro-file`

配置文件1::flume-1-exec-avro.conf

vim /opt/module/flume-1.9.0/job/enterprise/copy/flume-1-exec-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel,其实默认就是replicating
a1.sources.r1.selector.type = replicating# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置文件2:flume-2-avro-hdfs.conf

vim /opt/module/flume-1.9.0/job/enterprise/copy/flume-2-avro-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 4141# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop101:8020/flume/copy/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = copy-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置文件3:flume-3-avro-file.conf

vim /opt/module/flume-1.9.0/job/enterprise/copy/flume-3-avro-file.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop101
a3.sources.r1.port = 4142# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/datas/copy_result# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
mkdir /opt/module/flume-1.9.0/datas/copy_result
cd /opt/module/flume-1.9.0
bin/flume-ng agent -c conf/ -n a3 -f /opt/module/flume-1.9.0/job/enterprise/copy/flume-3-avro-file.conf
bin/flume-ng agent -c conf/ -n a2 -f /opt/module/flume-1.9.0/job/enterprise/copy/flume-2-avro-hdfs.conf
bin/flume-ng agent -c conf/ -n a1 -f /opt/module/flume-1.9.0/job/enterprise/copy/flume-1-exec-avro.conf
echo 2021-10-41 09-10-32 >> /opt/module/flume-1.9.0/datas/realtime.log

4、案例二:多路复用和拦截器适应

4.1 原理

需求:

使用flume采集服务器端口日志数据,需要按照日志类型的不同,将不同种类的日志发往不同分析系统。

原理

  • 背景:在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。
    此时会用到Flumechannel selecter中的Multiplexing结构。
  • Multiplexing的原理是:根据event中Header的某个key的值,将不同的event发送到不同的Channel中。
  • 自定义Interceptor:实现为不同类型的event的Header中的key赋予不同的值。
  • 总结:在该案例中,我们以端口数据模拟日志,以数字和字母模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。

在这里插入图片描述

4.2 代码编写

Maven配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lydms</groupId><artifactId>first-flume</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>first-flume</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>6</source><target>6</target></configuration></plugin></plugins></build>
</project>

接口:

package com.lydms.flume.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;public class CustomInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1. 从事件中获取数据byte[] body = event.getBody();// 2. 判断数据开头的字符是字母还是数据if (body[0] >= 'a' && body[0] <= 'z') {event.getHeaders().put("type", "letter");         // 是字母就在事件头部设置type类型为letter} else if (body[0] >= '0' && body[0] <= '9') {event.getHeaders().put("type", "number");         // 是数字就在事件头部设置type类型为number}// 3. 返回事件return event;}// 对批量事件进行拦截@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}// 拦截器对象的构造对象public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new CustomInterceptor();}@Overridepublic void configure(Context context) {}}
}

将项目打包,并导入到flume的lib目录下(/opt/module/flume-1.9.0/lib)。

4.3 编写配置文件

Hadoop101:添加配置文件

mkdir -p /opt/module/flume-1.9.0/job/custom/multi
vim /opt/module/flume-1.9.0/job/custom/multi/flume-1-netcat-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
#	Java文件目录
a1.sources.r1.interceptors.i1.type = com.lydms.flume.interceptor.CustomInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Hadoop102:添加配置文件

mkdir -p /opt/module/flume-1.9.0/job/custom/multi
vim /opt/module/flume-1.9.0/job/custom/multi/flume-2-avro-logger.conf
# agent
a2.sources=r1
a2.sinks = k1
a2.channels = c1# source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141# sink
a2.sinks.k1.type = logger# Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# bind
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

Hadoop103:添加配置文件

mkdir -p /opt/module/flume-1.9.0/job/custom/multi
vim /opt/module/flume-1.9.0/job/custom/multi/flume-3-avro-logger.conf
# agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4242# sink
a3.sinks.k1.type = logger# Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# bind
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
4.4 测试

启动项目

cd /opt/module/flume-1.9.0
bin/flume-ng agent -c conf/ -n a3 -f /opt/module/flume-1.9.0/job/custom/multi/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a2 -f /opt/module/flume-1.9.0/job/custom/multi/flume-2-avro-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent -c conf/ -n a1 -f /opt/module/flume-1.9.0/job/custom/multi/flume-1-netcat-avro.conf -Dflume.root.logger=INFO,console

测试:

nc localhost 44444
hello
world
1231231
41341

在这里插入图片描述

5、案例三:聚合

案例:

  • hadoop102:flume-1监控文件/opt/module/flume-1.9.0/datas/.*file*.
  • hadoop103:flume-2监控某一个端口的数据流。
  • hadoop104:flume-3,接收flume-1flume-2的数据,flume-3将最终数据打印到控制台。

在这里插入图片描述

Hadoop101:配置文件flume-1-exec-avro.conf

mkdir /opt/module/flume-1.9.0/job/enterprise/juhe
vim /opt/module/flume-1.9.0/job/enterprise/juhe/flume-1-exec-avro.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/datas/realtime.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Hadoop102 :配置文件flume-2-netcat-avro.conf

mkdir -p /opt/module/flume-1.9.0/job/enterprise/juhe
vim /opt/module/flume-1.9.0/job/enterprise/juhe/flume-2-netcat-avro.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

Hadoop103:配置文件flume-1-exec-avro.conf

mkdir -p /opt/module/flume-1.9.0/job/enterprise/juhe
vim /opt/module/flume-1.9.0/job/enterprise/juhe/flume-3-avro-logger.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4141# Describe the sink
a3.sinks.k1.type = logger# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

测试:

#	Hadoop103
/opt/module/flume-1.9.0/bin/flume-ng agent –c conf/ -n a3 -f /opt/module/flume-1.9.0/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console
#	Hadoop102
/opt/module/flume-1.9.0/bin/flume-ng agent –c conf/ -n a2 -f /opt/module/flume-1.9.0/job/enterprise/juhe/flume-2-netcat-avro.conf
#	Hadoop101
/opt/module/flume-1.9.0/bin/flume-ng agent –c conf/ -n a1 -f /opt/module/flume-1.9.0/job/enterprise/juhe/flume-1-exec-avro.conf

Hadoop101:向/opt/module/flume/datas/目录下的realtime.log追加内容

echo 'Hello Worlld Hadoop101' > /opt/module/flume-1.9.0/datas/realtime.log

Hadoop102:向44444端口发送数据

nc hadoop102 44444
hello world

Hadoop103:查看数据

在这里插入图片描述

四、Flume数据流监控

1、Ganglia简介

Ganglia由gmond、gmetad和gweb三部分组成。

  • gmond(Ganglia Monitoring Daemon):
    是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。
    使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
  • gmetad(Ganglia Meta Daemon):
    整合所有信息,并将其以RRD格式存储至磁盘的服务。
  • gweb(Ganglia Web)Ganglia可视化工具:
    gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。
    在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。

2、部署规划

gwebgmetadgmod
Hadoop101turetruetrue
Hadoop102true
Hadoop103true

安装步骤

#	Hadoop101
sudo yum -y install epel-release
sudo yum -y install ganglia-gmetad
sudo yum -y install ganglia-web
sudo yum -y install ganglia-gmond#	Hadoop102
sudo yum -y install epel-release
sudo yum -y install ganglia-gmond#	Hadoop103
sudo yum -y install epel-release
sudo yum -y install ganglia-gmond

3、修改配置文件:hadoop101

修改配置:Hadoop101

  • 修改配置文件/etc/httpd/conf.d/ganglia.conf
sudo vim /etc/httpd/conf.d/ganglia.conf
#	修改内容(2种配置。二选一)
Require ip 192.168.1.1          
#	Require all granted  

在这里插入图片描述

修改配置文件/etc/ganglia/gmetad.conf

sudo vim /etc/ganglia/gmetad.conf
#	修改内容
data_source "my cluster" hadoop101

在这里插入图片描述

修改配置文件/etc/selinux/config

sudo vim /etc/selinux/config
#	修改内容
SELINUX=disabled
SELINUXTYPE=targeted

4、修改配置文件3台

修改配置:Hadoop101、Hadoop102、Hadoop103

修改配置文件/etc/sudganglia/gmond.conf

sudo vim /etc/ganglia/gmond.conf 
#	修改内容==================
# 数据发送给hadoop101
host = hadoop101# 接收来自任意连接的数据
bind = 0.0.0.0

在这里插入图片描述

5、启动服务

修改文件权限

chown ganglia:ganglia /var/lib/ganglia
sudo chmod -R 777 /var/lib/ganglia

启动脚本(Hadoop101)

sudo systemctl start gmond
sudo systemctl start httpd
sudo systemctl start gmetad

连接地址:http://hadoop101/ganglia

当不能查看到页面时,修改配置文件,再重启。

Require ip 192.168.1.1  

在这里插入图片描述

3、测试

在这里插入图片描述
EventPutAttemptCountsource尝试写入Channel的事件总量
在这里插入图片描述 EventPutSuccessCount成功写入channel且提交的事件总量img StartTimechannel停止时间
img EventTakeAttemptCountsource尝试写入Channel的事件总量img EventTakeSuccessCount成功写入channel且提交的事件总量img StopTimechannel停止时间
img ChannelSize目前Channel中事件的总数量img ChannelFillPercentagechannel占用百分比img ChannelCapacitychannel的容量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/139749.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 操作技巧

目录 一、shell-命令解释器 二、Linux中的特殊符号 三、命令历史--history 一、shell-命令解释器 shell——壳&#xff0c;命令解释器&#xff0c;负责解析用户输入的命令 ——内置命令&#xff08;shell内置&#xff09; ——外置命令&#xff0c;在文件系统的某个目录下&…

【学习草稿】背包问题

一、01背包问题 图解详细解析 &#xff08;转载&#xff09; https://blog.csdn.net/qq_37767455/article/details/99086678 &#xff1a;Vi表示第 i 个物品的价值&#xff0c;Wi表示第 i 个物品的体积&#xff0c;定义V(i,j)&#xff1a;当前背包容量 j&#xff0c;前 i 个物…

2010-2017年WIND分省政府性债务余额面板数据

2010-2017年WIND分省政府性债务余额面板数据 1、时间&#xff1a;2010-2017年 2、指标&#xff1a;债务余额 3、范围&#xff1a;30个省 4、来源&#xff1a;wind 5、指标解释&#xff1a;地方政府债务分为一般债务和专项债务。 一般债务对应的是一般公共预算&#xff0c…

方案:浅析利用AI智能识别与视频监控技术打造智慧水产养殖监管系统

一、方案背景 针对目前水产养殖集约、高产、高效、生态、安全的发展需求&#xff0c;基于智能传感、智慧物联网、人工智能、视频监控等技术打造智慧水产系统&#xff0c;成为当前行业的发展趋势。传统的人工观察水产养殖方式较为单一&#xff0c;难以及时发现人员非法入侵、偷…

跨域问题解决方案(三种)

Same Origin Policy同源策略&#xff08;SOP&#xff09; 具有相同的Origin&#xff0c;也即是拥有相同的协议、主机地址以及端口。一旦这三项数据中有一项不同&#xff0c;那么该资源就将被认为是从不同的Origin得来的&#xff0c;进而不被允许访问。 Cross-origin resource…

SpringBean的生命周期

SpringBean的生命周期 SperingBean的生命周期是从Bean实例化之后&#xff0c;即通过反射创建出对象之后&#xff0c;到Bean成为一个完整对象&#xff0c;最终存储到单例池中&#xff0c;这个过程被称为Spring Bean的生命周期。Spring Bean的生命周期大体上分为三个阶段 Bean的…

Win7开启触摸键盘方法

在Win7系统中&#xff0c;自带有触摸屏幕键盘&#xff0c;能够在屏幕上显示虚拟键盘&#xff0c;让用户可以用指针设备或触屏等进行输入操作&#xff0c;那么Win7系统怎么开启触摸键盘呢&#xff1f;想知道的小伙伴可以跟着我一起来学习一下。 1、首先打开Win7系统的开始菜单&a…

小程序中如何查看会员的访问记录

​在小程序中&#xff0c;我们可以通过如下方式来查看会员的访问记录。下面是具体的操作流程&#xff1a; 1. 找到指定的会员卡。在管理员后台->会员管理处&#xff0c;找到需要查看访客记录的会员卡。也支持对会员卡按卡号、手机号和等级进行搜索。 2. 查看会员卡详情。点…

Smart UI Web 16.0.1 WebComponents htmlelements Crack

Javascript Web 组件库 Smart UI Web 组件库是您构建令人惊叹的 Web 应用程序所需的唯一套件。它包含 70 多个快速且专业设计的 UI 组件&#xff0c;可在单个包中实现美观且始终现代的 Web 应用程序。 具有高级功能的即用型Javascript 组件。只需几行代码即可使用数据网格、甘特…

解决编译中遇到的问题:Please port gnulib freadahead.c to your platform

今天在编译旧版的gzip-1.7时遇到了一个错误&#xff1a; error: #error "Please port gnulib freadahead.c to your platform! Look at the definition of fflush, fread, ungetc on your system, then report this to bug-gnulib." 在网上搜了一下解决方法&#xf…

400电话申请流程详解,助您快速办理联通、移动、电信400电话

导语&#xff1a;随着企业业务的发展&#xff0c;越来越多的企业开始关注400电话的申请与办理。本文将为您详细介绍联通、移动、电信400电话的申请流程&#xff0c;帮助您快速办理400电话&#xff0c;提升企业形象和客户服务质量。 一、联通400电话申请流程 咨询与选择&#x…

nginx知识点详解:反向代理+负载均衡+动静分离+高可用集群

一、nginx基本概念 1. nginx是什么&#xff0c;做什么事情&#xff1f; Nginx是一个高性能的HTTP和反向代理服务器&#xff0c;特点是占有内存少&#xff0c;并发能力强。Nginx转为性能优化而开发&#xff0c;能经受高负载考验。支持热部署&#xff0c;启动容易&#xff0c;运…

Avl树(有详细图解)

目录 介绍 引入 概念 特点 模拟实现 思路 插入 旋转 左旋 无子树 有子树 右旋 无子树 有子树 左右旋 引入(也就是有子树版本的抽象图解) 解决方法(也就是左右旋) 总结 无子树(也就是curright的位置就是newnode) 有子树 模型高度解释 旋转 更新三个…

资料分析笔记

统计术语 现期&#xff1a;现在的时间 基期&#xff1a;之前的时间 现期量 基期量 增长量&#xff08;有正负&#xff09; 增长率 【增幅、增速、r】&#xff08;有正负&#xff09; 同比&#xff1a;例&#xff1a;2014年5月 和 2013年5月 环比&#xff1a;例&#xff1a;20…

[Linux入门]---git命令行的基本使用

文章目录 1.git使用gitee仓库创建git使用测试ignore文件 1.git使用 git是一款对文件进行版本控制的软件&#xff0c;gitee、github是基于git软件搭建的网站&#xff0c;是可以对代码进行托管的平台&#xff1b;github是国外的网站&#xff0c;访问慢&#xff0c;不稳定&#xf…

基于微信小程序的高校宿舍信息管理系统设计与实现(源码+lw+部署文档+讲解等)

前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f447;&#x1f3fb;…

【C++入门指南】C如何过渡到C++?祖师爷究竟对C++做了什么?

【C入门指南】C如何过渡到C&#xff1f;祖师爷究竟对C做了什么&#xff1f; 前言一、命名空间1.1 命名空间的定义1.2 命名空间使用 二、C输入、输出2.1 std命名空间的使用惯例 三、缺省参数3.1 缺省参数的定义3.2 缺省参数分类 四、函数重载4.1 函数重载概念4.2 C支持函数重载的…

逻辑漏洞挖掘之XSS漏洞原理分析及实战演练 | 京东物流技术团队

一、前言 2月份的1.2亿条用户地址信息泄露再次给各大公司敲响了警钟&#xff0c;数据安全的重要性愈加凸显&#xff0c;这也更加坚定了我们推行安全测试常态化的决心。随着测试组安全测试常态化的推进&#xff0c;有更多的同事对逻辑漏洞产生了兴趣&#xff0c;本系列文章旨在…

C++QT day11

绘制时钟 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent>//绘制事件类 #include <QDebug>//信息调试类 #include <QPainter>//画家类 #include <QTimer>//定时器类 #include <QTime> #include &…

039_小驰私房菜_Camera perfermance debug

全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 一、抓取trace 1. adb shell "echo vendor.debug.trace.perf=1 >> /system/build.prop" 2. …