Flume 快速入门【概述、安装、拦截器】

文章目录

    • 什么是 Flume?
    • Flume 组成
    • Flume 安装
    • Flume 配置任务文件
      • 应用示例
      • 启动 Flume 采集任务
    • Flume 拦截器
      • 编写 Flume 拦截器
      • 拦截器应用

什么是 Flume?

Flume 是一个开源的数据采集工具,最初由 Apache 软件基金会开发和维护。它的主要目的是帮助用户将大规模数据从各种数据源(如日志文件、网络数据源、消息队列等)采集、传输和加载到数据存储系统(如 Hadoop HDFS、Apache HBase、Apache Hive 等)。

Flume 旨在处理大规模数据流,以便进行数据分析和处理。

Flume 组成

Flume (配置)主要由以下 4 个部分组成:

1. 数据源(Source): Flume 可以从多种数据源收集数据,例如日志文件、网络流、消息队列等。

2. 通道(Channel): 采集的数据被存储在通道中,等待传输到目标数据存储系统。Flume 支持多种不同类型的通道,如内存通道、文件通道和 Kafka 通道。

3. 拦截器(Interceptor): 拦截器允许用户对采集的数据进行预处理和转换,以满足特定需求。

4. 接收器(Sink): 接收器将数据传输到目标数据存储系统,如 Hadoop HDFS、HBase、Kafka 等。

Flume 通过灵活的配置,允许用户根据其数据采集需求来定义数据流的整个流程,包括数据源、通道、拦截器和接收器。

这使得 Flume 成为处理大规模数据采集和传输任务的强有力工具,构建数据管道,将分散的数据整合到中心存储或处理系统中,用于实时或者离线数据分析和报告。

Flume 安装

官方安装包下载地址:http://archive.apache.org/dist/flume

本篇博客使用的版本为:Flume-1.10.1

1. 解压

tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/

2. 配置环境变量

vim /etc/profile

文件末尾添加:

#FLUME_HOME
export FLUME_HOME=/opt/flume-1.10.1
export PATH=$PATH:$FLUME_HOME/bin

刷新环境变量:source /etc/profile

其实到这里,Flume 算是安装完了,但是为了后期使用方便,这里再调整一下配置参数。

修改日志存储与输出:

cd $FLUME_HOMEvim conf/log4j2.xml

在该文件中修改日志文件的存储目录(正文第 3 行)

    <Property name="LOG_DIR">/opt/flume-1.10.1/logs</Property>

在该文件中添加日志控制台输出方式(正文末尾)

      <AppenderRef ref="Console" />

默认只有 LogFile 日志文件的输出方式。


修改堆内存大小:

cd $FLUME_HOMEvim conf/flume-env.sh

如果是本地学习或者测试环境建议调小一点:

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

我这里调整最小为 512MB,最大 2048MB,也可以将最大和最小调整为一样的,避免进行内存交换。

Flume 配置任务文件

Flume 最主要的内容就是配置任务文件了,在文章开头提到过,主要由四部分组成:

  • 数据源(Source)

  • 通道(Channel)

  • 拦截器(Interceptor)

  • 接收器(Sink)

我们可以根据需求,进入 Flume 的官方网站,查阅各项参数如何进行配置,按照要求配置即可。

配置查阅网站:Flume 1.10.1 User Guide

在这里插入图片描述

其中给出了一个模板文件,内容如下所示:

# example.conf: A single-node Flume configuration# Name the components on this agent  声明变量名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source  配置数据源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink  配置接收器(存储源)
a1.sinks.k1.type = logger# Use a channel which buffers events in memory 配置管道参数
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel  组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

根据该模板文件就可以来快速构建一个数据采集的配置文件啦。

应用示例

将 Maxwell 发送到 Kafka 消息队列中的数据采集到 HDFS 上。

# --------- 声明变量名称 ---------
a1.sources = r1
a1.sinks = k1
a1.channels = c1# --------- 配置数据源 ---------
# 指定数据源类型
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 指定连接地址
a1.sources.r1.kafka.bootstrap.servers = hadoop120:9092,hadoop121:9092,hadoop122:9092
# 指定消费者组别,防止多个消费者之间引发数据冲突
a1.sources.r1.kafka.consumer.group.id = flume1
# 指定主题名称,这里需要和 MaxWell 指定发送的主题保持一致,否则会采集不到数据
a1.sources.r1.kafka.topics = topic_db# --------- 配置接收器 ---------
# 指定存储源类型
a1.sinks.k1.type = hdfs
# 动态规划 HDFS 写入路径
a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/# 当以下值中的其中一个满足时,触发滚动操作,将数据写入到新的文件中(避免小文件过多)
# 根据运行时间判定(s),测试环境调小,开发环境30m-1h
a1.sinks.k1.hdfs.rollInterval = 10 
# 根据数据量大小判定(B),128 MB
a1.sinks.k1.hdfs.rollSize = 134217728
# 根据文件的条数判断,为 0 时表示不依据该参数
a1.sinks.k1.hdfs.rollCount = 0# 压缩文件
# 指定文件类型为压缩流
a1.sinks.k1.hdfs.fileType = CompressedStream
# 指定数据压缩格式
a1.sinks.k1.hdfs.codeC = gzip# --------- 配置通道 ---------
# 通道类型
a1.channels.c1.type = file
# 检查点存储路径
a1.channels.c1.checkpointDir = /opt/module/flume-1.10.1/file-channel/checkpoint1
# 用于存储日志文件的目录,多个路径用逗号分隔
a1.channels.c1.dataDirs = /opt/module/flume-1.10.1/file-channel/data1
# 指定允许等待的时间
a1.channels.c1.keep-alive = 6# --------- 组装 ---------
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume 采集任务

cd $FLUME_HOME./bin/flume-ng agent -c conf/ -f job_file -n a1

参数解析:

  • ./bin/flume-ngflume-ng 是 Flume 的执行脚本,它用于启动 Flume 的 agent 实例。
  1. agent:这告诉 Flume-ng 启动一个代理实例,也就是一个数据采集和传输任务的执行单元。

  2. -c conf/:指定 Flume 配置文件的目录。

  3. -f job_file:指定 Flume 任务配置文件的参数,其中配置文件包含了数据源、通道、接收器以及数据处理的详细信息。

  4. -n a1:指定代理实例的名称,与配置文件中的对应。

Flume 拦截器

当我们在配置文件中定义了动态参数时,例如上方示例中接收器的配置语句:

a1.sinks.k1.hdfs.path = /test/%{tableName}_inc/%Y/%m/%d/

我们设想的是将表名称和年月日进行动态规划,但在未设置拦截器时,这些动态参数值都会被默认为空,如果是系统预定义的参数则为系统设定值。

如下所示:


其中 tableName 是自定义的值,Flume 系统并没有对其进行预定义,所以为空,但 %Y %m %d 这三个值系统默认为当前的日期值,所以不为空。

如果想将上述值设定为希望出现的值,此时便引出了拦截器的概念。通过对拦截器的配置,将采集的数据进行预处理和转换,以满足特定需求。

编写 Flume 拦截器

在 IDEA 中编写拦截器代码,然后打包上传,使用依赖如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.work</groupId><artifactId>intercepted</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- JSON 解析包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- flume 包,不打包该 Jar 包--><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

注意,JDK 版本与平台保持一致。

拦截器实现,用于设定表头与写入日期:

package com.work.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @author Moon_coder* @version 1.0* @date 2023/10/29 17:32*/
public class TableNameAndTimestamp implements Interceptor {/*** 初始化方法*/@Overridepublic void initialize() {}/**** 处理单条数据* @param event* @return*/@Overridepublic Event intercept(Event event) {try{// 1.获取头数据Map<String, String> headers = event.getHeaders();// 2.获取数据内容,将字节数据转换为字符串String log = new String(event.getBody(), StandardCharsets.UTF_8);// 3.将字符串转换为 JSON 对象JSONObject jsonObject = JSONObject.parseObject(log);// 4.获取表名String table = jsonObject.getString("table");// 5.获取时间,我的数据是经 Maxwell 采集的,Maxwell 中的数据是 10 位时间戳,不含毫秒,将数据存入 HDFS 时 *1000String ts = jsonObject.getString("ts") + "000";// 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);}catch (JSONException e){// 如果不是 JSON 数据,则将该数据定义为脏数据return null;}return event;}/**** 批量数据处理方法* @param events* @return*/@Overridepublic List<Event> intercept(List<Event> events) {// 批量处理 event 同时实现过滤功能events.removeIf(next -> intercept(next) == null);return events;}/*** 关闭方法*/@Overridepublic void close() {}// TODO 返回拦截器类public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TableNameAndTimestamp();}@Overridepublic void configure(Context context) {}}}

类名或 Jar 包名称都没有特别要求,自定义即可。

注意: 当我们在往头信息里面放东西时,需要与键名一一对应。

            // 6.更新头数据信息headers.put("tableName",table);headers.put("timestamp",ts);

如果是自定义的值,名称与 Flume 配置文件设定的必须对应:


如果是系统预定义的值,则需要在官方网站中查询其对应的键名。例如这里出现的 %Y %m %d 这三个值,在接收器的参数定义那里即可查询到(HDFS Sink¶),如下所示:

在这里插入图片描述

这里官方给出了提示,说 对于所有与时间相关的转义序列,事件的标头中必须存在一个关键字为 “timestamp” 的标头,所以在拦截器中对头信息的时间进行操作时,对应的键名为 timestamp

拦截器应用

将打包好的拦截器 Jar 包上传到 Flume 中的 lib 目录下,然后在 Flume 任务配置文件中添加拦截器配置,如下所示:

# --------- 拦截器 ---------
# 拦截器名称
a1.sources.r1.interceptors = i1
# 编写的拦截器全类名 + $Builder 标识符
a1.sources.r1.interceptors.i1.type = com.work.flume.interceptor.TableNameAndTimestamp$Builder

再次执行上方的示例任务,可以看到配置完拦截器后,头信息已经达到了我们预期的结果。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/177681.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目知识点总结-住房图片信息添加-Excel导出

&#xff08;1&#xff09;住房信息添加 Controller&#xff1a; RequestMapping("/add")public String add(Home home, Model model) throws IOException{String sqlPath null;//定义文件保存的本地路径String localPath"D:\\AnZhuang\\Java项目\\选题\\Xin-…

SQLServer数据库透明加密 安当加密

安当TDE透明加密组件是一种用于数据保护的解决方案&#xff0c;它对数据进行加密&#xff0c;以防止未经授权的访问和数据泄露。 以下是安当TDE透明加密组件的主要功能介绍&#xff1a; 数据保护&#xff1a;安当TDE透明加密组件可以对数据库中的敏感数据进行加密&#xff0c;…

HNU-计算机网络-实验1-应用协议与数据包分析实验(Wireshark)

计算机网络 课程基础实验一 应用协议与数据包分析实验(Wireshark) 计科210X 甘晴void 202108010XXX 一、实验目的&#xff1a; 通过本实验&#xff0c;熟练掌握Wireshark的操作和使用&#xff0c;学习对HTTP协议进行分析。 二、实验内容 2.1 HTTP 协议简介 HTTP 是超文本…

每日自动化提交git

目前这个功能&#xff0c;有个前提&#xff1a; 这个git代码仓库&#xff0c;是一个人负责&#xff0c;所以不存在冲突问题 我这个仓库地址下载后的本地路径是&#xff1a;D:\Projects\Tasks 然后我在另外一个地方新建了一个bat文件&#xff1a; bat文件所在目录为&#xff1a…

【PyTorch 卷积】实战自定义的图片归类

前言 卷积神经网络是一类包含卷积计算且具有深度结构的前馈神经网络&#xff0c;是深度学习的代表算法之一&#xff0c;它通过卷积层、池化层、全连接层等结构&#xff0c;可以有效地处理如时间序列和图片数据等。关于卷积的概念网络上也比较多&#xff0c;这里就不一一描述了。…

c++之类和对象

首先我们要理解cin,cout只能自动识别内置类型&#xff0c;原因就是因为cin,cout里面的函数重载。 那么如果我想输入非内置类型&#xff0c;就要进行运算符重载。 但是会发生如下的情况。 友元函数可以访问对象的私有。 运算符重载的总结 成员初始化既可以用函数体内初始化也可…

docker部署minio并使用springboot连接

需求&#xff1a;工作中&#xff0c;在微信小程序播放时&#xff0c;返回文件流并不能有效的使用&#xff0c;前端需要一个可以访问的地址&#xff0c;springboot默认是有资源拦截器的&#xff0c;但是不适合生产环境的使用 可以提供使用的有例如fastdfs或者minio&#xff0c;这…

【Linux】安装配置解决CentosMobaXterm的使用及Linux常用命令以及命令模式

目录 Centos的介绍 centos安装配置&MobaXterm 创建 安装 ​编辑 配置 ​编辑 MobaXterm使用 Linux常用命令&模式 常用命令 vi或vim编辑器 三种模式 命令模式 编辑模式 末行模式 拍照备份 Centos的介绍 CentOS&#xff08;Community Enterprise Op…

软件测试面试,一定要准备的7个高频面试题(附答案,建议收藏)

问题1&#xff1a;请自我介绍下&#xff1f; 核⼼要素&#xff1a;个⼈技能优势⼯作背景经验亮点参考回答&#xff1a; 第一种&#xff1a;基本信息离职理由 ⾯试官您好&#xff0c;我叫张三&#xff0c;来⾃番茄市&#xff0c;在软件测试⾏业有 3 年的⼯作经验。做过 Web/APP…

无需编程技术,快速搭建个人网站

如果你想拥有一个属于自己的个人网站&#xff0c;但又没有任何编程经验&#xff0c;别担心&#xff0c;我们今天将为你介绍一个简单的方法&#xff0c;让你轻松搭建网站&#xff0c;无需任何编程知识。让我们一起来看看吧&#xff01; 在乔拓云建站工具中&#xff0c;自带了许多…

图纸管理制度《八》设计图纸管理制度

第一章 总则 第1条 目的。为做好设计图纸的管理工作&#xff0c;使其收发及时、手续齐全、废图绝迹、不遗失、无差错&#xff0c;特制定本办法。 第2条 适用范围。本办法适用于企业所有工程项目的图纸管理工作。 第3条 相关部门及人员职责 (1) 工程技术部负责图纸管理的监督…

0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)

大纲 滑动&#xff08;Sliding&#xff09;和滚动&#xff08;Tumbling&#xff09;的区别样例窗口为2&#xff0c;滑动距离为1窗口为3&#xff0c;滑动距离为1窗口为3&#xff0c;滑动距离为2窗口为3&#xff0c;滑动距离为3 完整代码参考资料 在 《0基础学习PyFlink——个数…

Explaining and harnessing adversarial examples

Explaining and harnessing adversarial examples----《解释和利用对抗样本》 背景&#xff1a; 早期的研究工作认为神经网络容易受到对抗样本误导是由于其非线性特征和过拟合。 创新点&#xff1a; 该论文作者认为神经网络易受对抗性扰动影响的主要原因是它的线性本质&#xf…

stm32 模拟I2C

目录 简介 I2C 物理层 协议层 ①②&#xff1a;起始信号和结束信号 ③ 应答和非应答信号 ④数据有效性 ⑤数据传输 ⑥空闲状态 简介 I2C 物理层 一个 I2C 总线两条线组成&#xff0c;一个双向串行数据线SDA用来表示数据&#xff0c;一个串行时钟线SCL用于数据收发同步…

5个最流行的文本生成纹理AI工具

在线工具推荐&#xff1a; Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 3D场景编辑器 拥抱文本生成纹理AI模型改变游戏规则的力量&#xff0c;人工智能驱动的创新彻底改变了游戏开发中的资产创建。 这些出色的工具可将书面描述转换…

《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》阅读笔记

论文标题 《Generic Dynamic Graph Convolutional Network for traffic flow forecasting》 干什么活&#xff1a;交通流预测&#xff08;traffic flow forecasting &#xff09;方法&#xff1a;动态图卷积网络&#xff08;Dynamic Graph Convolutional Network&#xff09;…

Ubuntu 使用 nginx 搭建 https 文件服务器

Ubuntu 使用 nginx 搭建 https 文件服务器 搭建步骤安装 nginx生成证书修改 config重启 nginx 搭建步骤 安装 nginx生成证书修改 config重启 nginx 安装 nginx apt 安装&#xff1a; sudo apt-get install nginx生成证书 使用 openssl 生成证书&#xff1a; 到对应的路径…

【Mybatis-Plus】常见的@table类注解

目录 引入Mybatis-Plus依赖 TableName 当实体类的类名在转成小写后和数据库表名相同时 当实体类的类名在转成小写后和数据库表名不相同时 Tableld TableField 当数据库字段名与实体类成员不一致 成员变量名以is开头&#xff0c;且是布尔值 ​编辑 成员变量名与数据库关…

IDEA中application.properties文件中文乱码

现象&#xff1a; 原因&#xff1a; 项目编码格式与IDEA编码格式不一致导致的 解决办法&#xff1a; 在File->Settings->Editor->File Encodings选项中&#xff0c;将Global Encoding,Project Encoding,Default encoding for properties files这三个选项置为一致&a…