Flume学习笔记(3)—— Flume 自定义组件

前置知识:

Flume学习笔记(1)—— Flume入门-CSDN博客

Flume学习笔记(2)—— Flume进阶-CSDN博客

Flume 自定义组件

自定义 Interceptor

需求分析:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

需要使用Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

实现流程:

代码

导入依赖:

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency>
</dependencies>

自定义拦截器的代码:

package com.why.interceptor;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private List<Event> addHeaderEvents;@Overridepublic void initialize() {//初始化集合addHeaderEvents = new ArrayList<>();}//单个事件拦截@Overridepublic Event intercept(Event event) {//获取头信息Map<String, String> headers = event.getHeaders();//获取body信息String body = new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains("why")){headers.put("type","first");}else {headers.put("type","second");}return event;}//批量事件拦截@Overridepublic List<Event> intercept(List<Event> events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}@Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new TypeInterceptor();}@Overridepublic void configure(Context context) {}}}

将代码打包放入flume安装路径下的lib文件夹中

配置文件

在job文件夹下创建group4目录,添加配置文件;

为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.why.interceptor.TypeInterceptor$TypeBuilder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

执行指令

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop104:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.logger=INFO,console

然后hadoop102通过nc连接44444端口,发送数据:

在hadoop103和104上分别接收到:

自定义 Source

官方提供的文档:Flume 1.11.0 Developer Guide — Apache Flume

给出的示例代码如下:

public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external client}@Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e = getSomeData();// Store the Event into this Source's associated Channel(s)getChannelProcessor().processEvent(e);status = Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;}
}

需要继承AbstractSource,实现Configurable, PollableSource

实战需求分析

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

代码

package com.why.source;import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel,这个方法将被循环调用@Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMap<String,String> headerMap = new HashMap<>();//创建事件SimpleEvent event = new SimpleEvent();//循环封装事件for (int i = 0; i < 5; i++) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field + i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长@Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间@Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context(读取配置文件内容)@Overridepublic void configure(Context context) {delay = context.getLong("delay");field = context.getString("field","Hello");}}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = com.why.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = why# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.logger=INFO,console

结果如下:

自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件

官方文档:Flume 1.11.0 Developer Guide — Apache Flume

接口实例:

public class MySink extends AbstractSink implements Configurable {private String myProp;@Overridepublic void configure(Context context) {String myProp = context.getString("myProp", "defaultValue");// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp = myProp;}@Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}@Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}@Overridepublic Status process() throws EventDeliveryException {Status status = null;// Start transactionChannel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event = ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status = Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;}
}

自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口

实战需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

代码

package com.why.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch = getChannel();//获取事务Transaction txn = ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件,直到读取到事件结束循环while (true) {event = ch.take();if (event != null) {break;}}try {//处理事件(打印)LOG.info(prefix + new String(event.getBody()) + suffix);//事务提交txn.commit();status = Status.READY;} catch (Exception e) {//遇到异常,事务回滚txn.rollback();status = Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}@Overridepublic void configure(Context context) {prefix = context.getString("prefix", "hello");suffix = context.getString("suffix");}
}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.why.sink.MySink
a1.sinks.k1.prefix = why:
a1.sinks.k1.suffix = :why# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.logger=INFO,console

结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/198938.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

交易机器人-微信群通知

微信公众号:大数据高性能计算 1 背景 背景是基于人工去做交易本身无法做到24小时无时无刻的交易,主要是虚拟币本身它是24小时交易,人无法做到24小时盯盘,其次就是如果你希望通过配置更加复杂的规则甚至需要爬取最新的信息走模型进行量化交易的时候,就需要自己去做一些量化…

Linux 环境搭建

✨个人主页&#xff1a; Anmia.&#x1f389;所属专栏&#xff1a; C Language &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 本章概要 1. 认识 Linux, 了解 Linux 的相关背景 2. 学会如何使用云服务器 3. 掌握使用远程终端工具 xshell 登陆 Linux 服务器 1. Li…

语义检索系统【全】:基于milvus语义检索系统指令全流程-快速部署版

搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术细节以及项目实战(含码源) 专栏详细介绍:搜索推荐系统专栏简介:搜索推荐全流程讲解(召回粗排精排重排混排)、系统架构、常见问题、算法项目实战总结、技术…

学生邮箱白嫖/免费安装JetBrains全家桶(IDEA/pycharm等) —— 保姆级教程

&#x1f9f8;欢迎来到dream_ready的博客&#xff0c;&#x1f4dc;相信您对博主首页也很感兴趣o (ˉ▽ˉ&#xff1b;) 博主首页&#xff0c;更多redis、java等优质好文以及各种保姆级教程等您挖掘&#xff01; 目录 前言 JetBrains全家桶介绍 申请过程&#xff1a; 获取学…

C++类与对象(2)—构造函数析构函数

目录 一、类的6个默认成员函数 二、构造函数 1、定义 2、特征 三、析构函数 1、定义 2、特征 四、默认生成构造&析构 1、定义 2、内置类型 3、自定义类型 4、声明处给默认值 5、总结 下一篇 一、类的6个默认成员函数 如果一个类中没有定义任何成员&#xff0…

Idea2023 Springboot web项目正常启动,页面展示404解决办法

Idea2023 Springboot web项目正常启动,页面展示404解决办法 问题&#xff1a; 项目启动成功&#xff0c;但是访问网页&#xff0c;提示一直提示重定向次数过多&#xff0c;404 解决方法 在IDEA的Run/Debug Configurations窗口下当前的Application模块的Working directory中添…

python 对图像进行聚类分析

import cv2 import numpy as np from sklearn.cluster import KMeans import time# 中文路径读取 def cv_imread(filePath, cv2_falgcv2.COLOR_BGR2RGB): cv_img cv2.imdecode(np.fromfile(filePath, dtypenp.uint8), cv2_falg) return cv_img# 自定义装饰器计算时间 def…

vue中使用echarts渐变柱状图 Cannot read properties of undefined (reading ‘graphic‘)解决方法

在使用渐变颜色时报错&#xff0c;Cannot read properties of undefined (reading ‘graphic’) echarts也下载了&#xff0c;引入了&#xff0c;就是报错&#xff0c;用不了new charts&#xff0c; 结果换了一个版本号就可以了&#xff0c;本来用的"echarts": "…

7.docker运行redis容器

1.准备redis的配置文件 从上一篇运行MySQL容器我们知道&#xff0c;需要给容器挂载数据卷&#xff0c;来持久化数据和配置&#xff0c;相应的redis也不例外。这里我们以redis6.0.8为例来实际说明下。 1.1 查找redis的配置文件redis.conf 下面这个网址有各种版本的配置文件供…

02-1解析xpath

我是在edge浏览器中安装的xpath&#xff0c;需要安装的朋友可以参考下面这篇博客最新版edge浏览器中安装xpath插件 一、xpathd的使用 安装lxml pip install lxml ‐i https://pypi.douban.com/simple导入lxml.etree from lxml import etreeetree.parse() 解析本地文件 htm…

linux镜像的下载,系统下载(个人使用)

文章目录 一、系统之家二、国内镜像源三、Centos官网四、安装成功截图五、镜像类型的区别参考文档 一、系统之家 系统之家官网 二、国内镜像源 下载镜像地址&#xff1a; 1、官网地址&#xff1a;https://www.centos.org/ 2、阿里镜像站&#xff1a;https://mirrors.aliyu…

java基础练习缺少项目?看这篇文章就够了(上)!

公众号&#xff1a;全干开发 。 专注分享简洁但高质量的动图技术文章&#xff01; 项目概述 本教程适合刚学习完java基础语法的同学&#xff0c;涉及if语句、循环语句、类的封装、集合等基础概念&#xff0c;使用大量gif图帮助读者演示代码操作、效果等&#xff0c;是一个非常…

基于单片机16路抢答器仿真系统

**单片机设计介绍&#xff0c; 基于单片机16路抢答器仿真系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的16路抢答器仿真系统是一种用于模拟和实现抢答竞赛的系统。该系统由硬件和软件两部分组成。 硬件方面&am…

CICD 持续集成与持续交付——gitlab

部署 虚拟机最小需求&#xff1a;4G内存 4核cpu 下载&#xff1a;https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/ 安装依赖性 [rootcicd1 ~]# yum install -y curl policycoreutils-python openssh-server perl[rootcicd1 ~]# yum install -y gitlab-ce-15.9.3-ce.0…

Idea 2023.2.5配置(插件、Maven等)

IDEA2023.2.5配置 一. 插件Alibaba Java Coding Guidelines plugin supportMaven HelperMyBatisXSonarLintTranslationVuesion Theme 二. 自定义创建live template&#xff0c;快速写代码三. 修改全局配置3.1 Maven配置3.1.1 安装MavenStep1. 下载Step2. 安装Step3. 创建系统环…

Linux常见命令手册

目录 文件命令 文件和目录命令 文件的权限命令 文件搜索命令 进程命令 查看进程命令 关闭进程命令 用户和群组命令 网络命令 firewall-cmd 网络应用命令 高级网络命令 网络测试命令 网络安全命令 网络配置命令 软件管理命令 系统信息命令 vi编辑器 关机命令…

【uniapp/uview1.x】u-upload 在 v-for 中的使用时, before-upload 如何传参

引入&#xff1a; 是这样一种情况&#xff0c;在接口获取数据之后&#xff0c;是一个数组列表&#xff0c;循环展示后&#xff0c;需要在每条数据中都要有图片上传&#xff0c;互不干扰。 分析&#xff1a; uview 官网中有说明&#xff0c;before-upload 是不加括号的&#xff…

.Net6 部署到IIS示例

基于FastEndpoints.Net6 框架部署到IIS 环境下载与安装IIS启用与配置访问网站 环境下载与安装 首先下载环境安装程序&#xff0c;如下图所示,根据系统位数选择x86或者x64进行下载安装,网址&#xff1a;Download .NET 6.0。 IIS启用与配置 启用IIS服务 打开控制面板&#xff…

FPC焊点剥离失效分析

一、案例背景 FPC在后续组装过程中&#xff0c;连接器发生脱落。在对同批次的样品进行推力测试后&#xff0c;发现连接器推力有偏小的现象。据此进行失效分析&#xff0c;明确FPC连接器脱落原因。 #1、#2样品连接器脱落连接器脱落&#xff1b;#3样品连接器未脱落&#xff1b;…

邮箱设置第三方登录授权码获取

以QQ邮箱为例 QQ邮箱设置——账户 开启POP3/SMTP服务——完成验证后获得授权码&#xff0c;保存授权码