Flume 拦截器概念及自定义拦截器的运用

文章目录

    • Flume 拦截器
    • 拦截器的作用
    • 拦截器运用
      • 1.创建项目
      • 2.实现拦截器接口
      • 3.编写事件处理逻辑
      • 4.拦截器构建
      • 5.打包与上传
      • 6.编写配置文件
      • 7.测试运行

Flume 拦截器

在 Flume 中,拦截器(Interceptors)是一种可以在事件传输过程中拦截、处理和修改事件的组件。

位于 Source 与 Channel 之间,在写入Channel 之前,拦截器可以对数据进行转换、提取或删除,以满足特定的需求。每个拦截器只处理同一个 Source 接收到的事件,你也可以同时配置多个拦截器,它们会按顺序执行。

拦截器的作用

  • 数据处理和转换: 拦截器可以对事件数据进行处理和转换。例如,可以对原始日志进行解析、过滤、格式化等操作,以便后续处理或存储。

  • 数据增强: 拦截器可以为事件数据添加额外的信息或元数据。例如,可以添加时间戳、主机信息、标签等,以丰富事件数据的内容。

  • 数据过滤: 拦截器可以根据特定条件过滤掉不需要的事件数据,减少数据传输的量或过滤掉无效数据。

  • 监控和日志: 拦截器可以用于监控数据流的运行情况,记录日志信息或统计数据流中的事件数量、处理速率等指标,帮助用户进行性能分析和故障排查。

拦截器运用

1.创建项目

创建一个 Maven 工程项目,引入 Flume 依赖。

在 IDEA 中创建 Maven 项目想必大家都会,这里不再赘述。

根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency></dependencies>

无需将该依赖打包进最后的 JAR 包中,故将其作用域设置为 provided

当一个依赖项的 scope 被设置为 compile 时,它将在编译和运行时都可用,并包含在最终的项目包中。而 provided 范围的依赖项仅在编译和测试阶段需要,运行时不包括。

2.实现拦截器接口

创建测试类 TestInterceptor 实现拦截器 Interceptor,注意,导包时不要导错了。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {return null;}@Overridepublic List<Event> intercept(List<Event> list) {return null;}@Overridepublic void close() {}
}

在上面的代码中,我们实现了 Flume 拦截器接口 Interceptor,并重写了其中的四个方法:

  • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。

  • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。

  • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。

  • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。

3.编写事件处理逻辑

在这里做个简单的事件处理,如果数据中包含字符串 hello 则进行过滤操作,这样我们可以直观感受到拦截器的存在,下面来进行逻辑设计。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;public class TestInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取事件数据String eventData = new String(event.getBody(), StandardCharsets.UTF_8);// 检查事件数据中是否包含指定字符串if (eventData.contains("hello")) {// 如果包含指定字符串,则过滤掉该事件,返回 nullreturn null;}return event;}@Overridepublic List<Event> intercept(List<Event> events) {// 创建一个新的列表,存储处理过后的事件List<Event> interceptedEvents = new ArrayList<>();for (Event event : events) {Event interceptedEvent = intercept(event);if (interceptedEvent != null) {interceptedEvents.add(interceptedEvent);}}return interceptedEvents;}@Overridepublic void close() {}}

intercept(List<Event> events) 方法用于对事件列表进行批量处理。这个方法会遍历传入的事件列表,并对每一个事件调用 intercept(Event event) 方法来进行单独处理。

注意,如果只有 intercept(Event event) 方法被重写了,而没有实现 intercept(List<Event> events) 批处理方法,那么在处理事件时会以单个事件的方式进行处理。

在不需要进行初始化和释放资源的情况下,我们可以选择不重写 initializeclose 方法。

4.拦截器构建

在编写完事件处理逻辑后,我们还需要对拦截器进行构建。

在 Flume 中,拦截器的创建和配置通常是通过 Builder 模式来完成的。

在程序中,我们可以定义一个静态内部类 Builder,实现 Interceptor.Builder 接口来对拦截器进行构建,如下所示:

    public static class Builder implements Interceptor.Builder {@Overridepublic void configure(Context context) {// 配置操作,可留空}@Overridepublic Interceptor build() {// 返回构建的拦截器类}}

在我们这个案例中,完整的代码如下所示:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;public class TestInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 获取事件数据String eventData = new String(event.getBody(), StandardCharsets.UTF_8);// 检查事件数据中是否包含指定字符串if (eventData.contains("hello")) {// 如果包含指定字符串,则过滤掉该事件,返回 nullreturn null;}return event;}@Overridepublic List<Event> intercept(List<Event> events) {List<Event> interceptedEvents = new ArrayList<>();for (Event event : events) {Event interceptedEvent = intercept(event);if (interceptedEvent != null) {interceptedEvents.add(interceptedEvent);}}return interceptedEvents;}@Overridepublic void close() {}// 拦截器构建public static class Builder implements Interceptor.Builder {@Overridepublic void configure(Context context) {}@Overridepublic Interceptor build() {return new TestInterceptor();}}}

5.打包与上传

将写好的项目进行打包,并上传到集群中,进行测试。

注意,需要将打包好的拦截器包放在 Flume 安装目录下的 lib 文件夹中。

在这里插入图片描述

6.编写配置文件

这里为了验证拦截器的作用,通过一个 Flume 采集案例来进行体现。

如果你不知道如何编写配置文件,可以看我写的这篇文章 —— Flume 配置文件编写技巧(包会的,抄就完了)

这个配置案例是将发送到 HTTP 源中的数据采集到 HDFS 上,将本地文件作为缓冲通道,该配置文件命名为 httpToHDFS.conf

# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost# 拦截器配置
# 拦截器定义
a1.sources.r1.interceptors = i1
# 拦截器全类名
a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

拦截器全类名配置那里需要注意,格式为 拦截器的全类名 + $Builder

在 IDEA 中获取全类名的方式:右击需要引用的类,依次选择【File——>Copy Path/Reference…——>Copy Reference】即可复制。

你可以根据你的需要对该配置文件进行修改。

7.测试运行

因为我们是将数据采集到 HDFS 上,所以需要先启动 Hadoop,然后再进行操作。

# 运行 Flume
cd $FLUME_HOME# 注意引用路径,需要修改成你自己的
./bin/flume-ng agent -n a1 -c conf/ -f job/httpToHDFS.conf -Dflume.root.logger=INFO,console

Flume 启动完成后,如下所示:

在这里插入图片描述

我们通过其它窗口,使用 curl 命令向 HTTP 源发送两条模拟数据:

curl -X POST -d '[{"body":"hello body"}]'  http://localhost:5140curl -X POST -d '[{"body":"HELLO FLUME"}]'  http://localhost:5140

在这里插入图片描述

数据发送完成后,Flume 会采集到该数据,并存储到 HDFS 上。

在这里插入图片描述

通过命令,查看 HDFS 中存储的内容,验证拦截器是否生效:

hdfs dfs -text /flume/events/2024-04-04/1630/00/ev* 

结果如下所示:

在这里插入图片描述

可以看到,我们在上面分别发送了两条数据 hello bodyHELLO FLUME,但最终 HDFS 中只存储了一条数据。

这是因为我们设置的拦截器生效了,它对数据中包含 hello 字符串的事件进行了过滤,故只存储了一条数据。

Flume 拦截器就是起到这样的效果,对数据进行处理、转换、删除等操作,是不是很简单呀。(同学,包会的呀)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/300310.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring定义Bean对象笔记(二)

前言&#xff1a;上一篇记录了通过XML文件来定义Bean对象&#xff0c;这一篇将记录通过注解和配置类的方式来定义Bean对象。 核心注解&#xff1a; 定义对象&#xff1a;Component,Service,Repository,Controller 依赖注入&#xff1a; 按类型&#xff1a;Autowired 按名称&am…

【JavaScript】作用域 ③ ( JavaScript 作用域链 | 作用域链变量查找机制 )

文章目录 一、JavaScript 作用域链1、作用域2、作用域链3、作用域链变量查找机制 二、代码示例 - 作用域链 一、JavaScript 作用域链 1、作用域 在 JavaScript 中 , 任何代码都有 作用域 , 全局作用域 : 在 <script> 标签中 或者 js 脚本中 定义的变量 属于 全局作用域 …

Vue3【进阶】

简介 https://cn.vuejs.org/guide/introduction.html 创建vue3工程 【基于 vue-cli创建】 基本和vue-cli的过程类似&#xff0c;只是选择的时候用vue3创建 【基于vite创建】【推荐】 【官网】https://vitejs.cn/ 【可以先去学一下webpack】 步骤 【https://cn.vitejs.…

kubernetes集群添加到jumpserver堡垒机里管理

第一步、在kubernetes集群中获取一个永久的token。 jumpserver堡垒机用api的来管理kubernetes&#xff0c;所以需要kubernetes的token&#xff0c;这个token还需要是一个永久的token&#xff0c;版本变更&#xff1a;Kubernetes 1.24基于安全方面的考虑&#xff08;特性门控Le…

LeetCode-热题100:118. 杨辉三角

题目描述 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]] 示例 2: 输入: numRows 1 输出: [[1]]…

代码随想录第32天|455.分发饼干 376. 摆动序列

理论基础 贪心算法核心&#xff1a;选择每一阶段的局部最优&#xff0c;从而达到全局最优。 455.分发饼干 455. 分发饼干 - 力扣&#xff08;LeetCode&#xff09;代码随想录 (programmercarl.com)455. 分发饼干 - 力扣&#xff08;LeetCode&#xff09; 贪心算法理论基础&am…

【AI绘画/作图】风景背景类关键词模板参考

因为ds官网被墙,所以翻了IDE的源码整理了下stablestudio里的官方模板&#xff0c;顺便每个模板生成了一份…不知道怎么写关键词的可以参考 Stunning sunset over a futuristic city, with towering skyscrapers and flying vehicles, golden hour lighting and dramatic cloud…

C语言高效的网络爬虫:实现对新闻网站的全面爬取

1. 背景 搜狐是一个拥有丰富新闻内容的网站&#xff0c;我们希望能够通过网络爬虫系统&#xff0c;将其各类新闻内容进行全面地获取和分析。为了实现这一目标&#xff0c;我们将采用C语言编写网络爬虫程序&#xff0c;通过该程序实现对 news.sohu.com 的自动化访问和数据提取。…

行业型软文怎么写,媒介盒子分享

行业型软文即指面对行业内人群的软文,此类文章的目的通常是为了扩大行业影响力,奠定行业品牌地位。企业的行业地位将直接影响到其核心竞争力,甚至会影响到最终用户的选择。今天媒介盒子就和大家聊聊行业型软文怎么写。 一、了解受众需求&#xff1a; 首先&#xff0c;深入研究…

【C++第三阶段】string容器

以下内容仅为当前认识&#xff0c;可能有不足之处&#xff0c;欢迎讨论&#xff01; 文章目录 string容器基本概念构造函数赋值操作拼接操作字符串查找和替换字符串比较字符串存取字符串插入和删除字符串子串 string容器 基本概念 本质&#x1f449;string是C风格的字符串&…

快速获取文件夹及其子文件夹下的所有文件名

1、在文件夹中新建文本文档&#xff0c;命名为“命令.txt” 2、输入以下内容 tree /F > 文件名.txt dir *.* /B > 文件名.txt 其中文件名和文件格式可以是任意的&#xff0c;tree命令可生成文件及其子文件夹下所有文件的名称&#xff0c;dir命令只生成当前目…

网络基础三——初识IP协议

网络基础三 ​ 数据通过应用层、传输层将数据传输到了网络层&#xff1b; ​ 传输层协议&#xff0c;如&#xff1a;TCP协议提供可靠性策略或者高效性策略&#xff0c;UDP提供实时性策略&#xff0c;保证向下层交付的数据是符合要求的的&#xff1b;而网络层&#xff0c;如&a…

LeetCode 40. 组合总和 II

解题思路 cand[]{1,2,3,4} target4; 相关代码 class Solution {List<List<Integer>> res;List<Integer> path;boolean st[];public List<List<Integer>> combinationSum2(int[] candidates, int target) {path new ArrayList<>();res …

c++前言

目录 1. 什么是 C 2. C 发展史 3. C 的重要性 4. 如何学习 C 5. 关于本门课程 1. 什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的 程序&#xff0c;需要高度的抽象和建模时&#xff0c; C 语言则不合适…

Acwing.1388 游戏(区间DP对抗思想)

题目 玩家一和玩家二共同玩一个小游戏。 给定一个包含 N个正整数的序列。 由玩家一开始&#xff0c;双方交替行动。 每次行动可以在数列的两端之中任选一个数字将其取走&#xff0c;并给自己增加相应数字的分数。&#xff08;双初始分都是 0分&#xff09; 当所有数字都被…

备战蓝桥杯---线段树应用2

来几个不那么模板的题&#xff1a; 对于删除&#xff0c;我们只要给那个元素附上不可能的值即可&#xff0c;关键问题是怎么处理序号变化的问题。 事实上&#xff0c;当我们知道每一个区间有几个元素&#xff0c;我们就可以确定出它的位置&#xff0c;因此我们可以再维护一下前…

优秀企业都在用的企微知识库,再不搭建就晚了!

每个团队都在寻找让工作效率提升的方法。如果你想知道哪些团队能够高效地完成任务&#xff0c;而另一些却步履维艰&#xff0c;那么答案可能就是“企业微信知识库”。见过很多团队都在使用它&#xff0c;而且效果非常显著。如果你还没有搭建属于自己的企微知识库&#xff0c;可…

C++ 【原型模式】

简单介绍 原型模式是一种创建型设计模式 | 它使你能够复制已有对象&#xff0c;客户端不需要知道要复制的对象是哪个类的实例&#xff0c;只需通过原型工厂获取该对象的副本。 以后需要更改具体的类或添加新的原型类&#xff0c;客户端代码无需改变&#xff0c;只需修改原型工…

各类系统业务功能架构图整理

一、前言 很多软件系统一直经久不衰&#xff0c;主要这些系统都是一些生产工作经营不可或缺的系统。比如财务系统&#xff0c;商城系统&#xff0c;支付系统&#xff0c;供应链系统&#xff0c;人力资源管理系统&#xff0c;ERP系统等等。这些系统不管大公司还是小公司往往都需…

简约轻量-失信录系统源码

失信录系统-最新骗子收录查询系统源码 首页查询&#xff1a; 举报收录页&#xff1a; 后台管理页&#xff1a; 失信录系统 V1.0.0 更新内容&#xff1a; 1.用户查询,举报功能 2.界面独立开发 3.拥有后台管理功能 4.xss,sql安全过滤 5.平台用户查询 6.用户中心&#xff08;待完…